회사 후배가 질문했다. 멀티 스레드 환경에서 List 형태의 데이터를 처리하고 싶은데  List에는 중복된 데이터가 존재할 수 있다고 했다. 문제는 어떤 스레드에서 처리 중인 데이터가 다른 스레드에서 연속해서 처리되면 안 된다고 했다. 약간 고민되긴 했지만 synchronized 키워드가 떠올랐다. 사실 Java를 처음 입문했을 때  동시성이 떨어진다는 이유로 웹 환경에서는 synchronized를 잘 사용하지 않는다고 얘기를 들어서 처음부터 나의 관심 밖의 키워드였다. 하지만 후배가 질문한 내용을 보면 멀티 스레드 환경에서 동시성 제어를 통해서 충분히 해결할 수 있다.

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 1, 2);

ExecutorService es = Executors.newFixedThreadPool(7);

ExecutionMeasurer.measure(() -> {
	List<CompletableFuture<Void>> futures = list.stream()
			.map(num -> CompletableFuture.runAsync(() -> {
				synchronized (num) {
					ExecutionMeasurer.delay(1000);
				}
			}, es))
			.collect(Collectors.toList());

	futures.forEach(CompletableFuture::join);
});

es.shutdown();


list에는 총 7개의 요소가 있고 각각의 요소를 처리하는데 1,000ms가 걸린다고 가정해 보자. 총 7개의 스레드로 처리를 하면 몇 초가 걸릴까? 중복된 요소 1, 2가 존재하기 때문에 7개의 스레드로 실행하더라도  synchronized에 의해서 대략 2초 정도의 시간이 소요된다. 주어진 자원을 100% 다 사용하지 못했지만 최소한의 lock으로 중복 처리 없이 동시성을 높였다고 할 수 있다. synchronized 키워드는 평소가 자주 사용하지는 않지만 상황에 따라서는 유용하게 사용할 수도 있다.

위 예제는 단일 JVM 구조에서 가볍게 실행될 경우이고 분산 환경에서는 좀 더 다른 고민이 필요할 것이다. 데이터 분포는 어떠한지 분배는 어떻게 처리할지 또는 lock의 단위를 어떻게 할지 고민해봐야 한다.

'Dev > Java' 카테고리의 다른 글

ConcurrentHashMap은 Client lock이 안된다.  (0) 2017.06.15
자바 메모리 누수 확인  (0) 2017.04.14
CompletableFuture에 관해서  (0) 2017.02.27
Collector 인터페이스  (0) 2017.02.14
지인이 말했다. 기존 레거시 코드에서 MySQL 동시성 문제를 해결하기 위해 SELECT ~ FOR UPDATE를 사용하고 있으며 많은 곳에서 무분별하게 사용된 것으로 보아 다른 고민 없이 copy&paste로 개발된 것 같다고 했다. 평소 SELECT ~ FOR UPDATE에 대한 지식이나 별다른 고민을 해보지 않았기 때문에 이번 기회에 알아보기로 했다.

DB에서는 동시성 제어를 위해 2가지 방식이 존재한다.
  • 비관적 동시성 제어(Pessimistic Concurrency Control) 
  • 낙관적 동시성 제어(Optimistic Concurrency Control)

비관적 동시성 제어는 앞에 말한 대로 SELECT ~ FOR UPDATE를 통해서 설정할 수 있다. 데이터를 읽는 시점에 해당 레코드에 lock을 설정해서 조회/갱신이 완료될 때까지 유지하게 되는데 이 과정 중 동시성 저하가 발생할 수 있다. 비관적 동시성 제어로 게시물을 select 하고 조회 수를 update 하는 기능이 있다고 가정해보자.
이 게시물에 트래픽이 증가하게 되면 어떤 일이 발생하게 될까? 모든 요청이 해당 게시물 레코드에 잠금이 설정되어 각각의 요청들은 페이지를 조회하기 위해 많은 시간이 소요될 것이다. 비관적 동시성 제어는 엄격하게 데이터 잘못 갱신되는 것을 막을 필요가 있는 곳에 제한적으로 사용하는 것이 좋을 것으로 보인다.
@Transactional
public void read(int no) {
	Sample sample = jdbcTemplate.queryForObject(
			"select * from sample where no = ? for update ",
			new Object[] { no },
			new BeanPropertyRowMapper<>(Sample.class));

	delay(500);

	jdbcTemplate.update(
			"update sample set count = ? where no = ?",
			sample.getCount() + 1, sample.getNo());
}


위 샘플 코드를 아래와 같이 멀티 스레드로 실행하면 SELECT ~ FOR UPDATE까지는 동시에 접근이 되지만 비관적 동시성으로 인해 결국 각각의 요청이 0.5초 이상 기다렸다가 순차적으로 실행이 되는 것을 확인할 수 있다.

@Test
public void run() throws Exception {
	ExecutorService executorService = Executors.newFixedThreadPool(10);

	List<CompletableFuture<Void>> futures = IntStream.rangeClosed(1, 10)
			.mapToObj(i -> CompletableFuture.runAsync(() ->
					sampleService.read(1), executorService))
			.collect(toList());

	futures.forEach(CompletableFuture::join);
}


낙관적 동시성 제어는 사용자들이 동시에 같은 데이터를 변경하지 않을 것이라고 가정하고 작업을 처리한다. 데이터를 select할 때 별다른 lock을 설정하지 않고 데이터를 수정하는 시점에 where 절을 통해서 데이터 변경 여부를 확인한다. select의 where 문에 맞춰 update의 where 문을 설정을 해야 하는 번거로움이 있다. update를 실패할 경우 이후 아무 작업을 하지 않거나 적절한 exception을 발생시키는 작업이 필요할 것이다. 아래 read() 메서드를 멀티 쓰레드로 동시에 실행하게 되면 select를 하고 0.5를 기다리게 되기 때문에 업데이트를 성공한 첫번째 요청을 제외하고 나머지는 select한 정보가 이미 변경되었기 때문에 exception이 발생할 것이다.
@Transactional
public void read(int no) {
	Sample sample = jdbcTemplate.queryForObject(
			"select * from sample where no = ?",
			new Object[] { no },
			new BeanPropertyRowMapper<>(Sample.class));

	delay(500);

	int affectedRow = jdbcTemplate.update(
			"update sample set count = ? where no = ? and count = ?",
			sample.getCount() + 1, sample.getNo(), sample.getCount());

	if (affectedRow == 0) {
		throw new RuntimeException("이미 업데이트가 완료 되었습니다.");
	}
}


그동안은 동시성에 대한 고민 없이 습관적으로 낙관적 동시성 제어를 사용하고 있었다. 정상 동작을 하지 못했을 때에 대한 처리가 번거로울 수 있고 요청이 몰릴 경우 다수의 exception이 발생할 수 있다는 점은 문제가 될 수 있을 것이다.(exception이 비즈니스 흐름에 더 자연스러울 수도 있다.)
 
동시성 문제는 모듈을 좀 더 세분화하거나 MQ 또는 비동기 처리 등으로 극복할 수 있을 것 같은데 무엇보다 자신에 환경에 맞게 더 효율적인 방법을 직접 찾아야 한다. 비즈니스를 충분히 이해하고 적절한 비관적/낙관적 동시성 제어를 선택해서 처리하도록 하자.


+ Recent posts