최근 자바병렬프로그래밍 책으로 스터디를 하고 있는데, 아래 문구를 이해하기가 쉽지 않았다.

ConcurrentHashMap 클래스는 독점적으로 사용할 수 있는 락이 없기 때문에, Vector 클래스를 대상으로 살펴봤던 것처럼 클라이언트 측 락 기법을 활용할 수 없다.

활용할 수 없다... 왜? 왜 안되는 걸까? 궁금증을 해소하기 이것저것 알아보기 시작했다. stackoverflow에 비슷한 질문이 있는 거 보면 나만의 궁금증은 아닌 것 같다.


우선 Vector 클래스를 알 필요가 있다. VectorArrayList를 비교하곤 하는데 주요 차이점은 잘 알다시피 동기화 처리 유무이다. 그래서 Vector는 멀티 스레드에서 안전하고 ArrayList는 그렇지 않다. 하지만 synchronized 처리로 인해 VectorArrayList보다 느리기 때문에 보통은 ArrayList를 사용하고 Vector는 많이 사용되지 않는다.


Vector 클래스를 살펴보면 주요 메서드에 synchronized 키워드가 있는 것을 볼 수 있다. synchronized 키워드가 메서드에 있다면, 메서드가 있는 클래스 객체를 기준으로 락이 지정된다. 다시 말하면 같은 Vector 객체에서 synchronized가 지정된 메서드가 실행되고 있다면 같은 객체 다른 synchronized 메서드 실행은 대기를 하게된다. 그렇기 때문에 아래와 같이 클라이언트 락을 지정하더라도 문제가 발생하지 않는다. (Vector의 add 메서드에는 synchronized가 지정되어 있다.)
Vector<Integer> vector = new Vector<>();

public void addSync(Integer element) {
	synchronized (vector) {
		vector.add(element);
	}
}

public void add(Integer element) {
	vector.add(element);
}


하지만 ConcurrentHashMapVector처럼 메서드에 synchronized가 지정된 방식이 아니라 내부적으로 여러 개의 세그먼트로 두고 각각 별도 락을 지정하는 방식으로 처리를 하는데, 이런 방법을 lock striping이라고 한다. 이는 메서드에 지정한 방식보다 동시성이 더 좋다. 이런 차이점으로 인해 ConcurrentHashMap은 아래와 같이 클라이언트 락을 지정하더라도 동시성을 보장받을 수 없다.

private Map<Integer, Integer> hashMap = new ConcurrentHashMap<>();

public void putSync(Integer key) {
	synchronized (hashMap) {
		hashMap.put(key, Integer.MIN_VALUE);
	}
}

public void put(Integer key) {
	hashMap.put(key, Integer.MIN_VALUE);
}


알고 보면 당연하고 간단한 이유인데 한참을 고민하고 나서야 이해를 할 수 있었다. Vector는 메서드에 synchronized 키워드가 지정되어 있고, 메서드에 synchronized가 지정되면 어떻게 동작하는지 알고 있었다면 쉽게 이해했을 것이다.

'Dev > Java' 카테고리의 다른 글

자바 메모리 누수 확인  (0) 2017.04.14
멀티 스레드에서 synchronized가 필요한 경우  (0) 2017.04.13
CompletableFuture에 관해서  (0) 2017.02.27
Collector 인터페이스  (0) 2017.02.14

출근길에 책을 하나 읽고 있는데 자바 메모리 누수 얘기가 나왔다. 그동안 자바는 가비지 컬렉터가 있으니 막연히 메모리 누수가 발생할 거라는 생각을 하지 않았다. 결론부터 말하면 발생할 수 있다. 이에 대한 이해가 필요한 것 같아 직접 테스트를 통해서 확인하고 싶은 마음이 생겼다. 책에 있는 예제를 참고하여 코딩해봤다.

public class MemoryLeakSample {

	private static final SimpleStack<String> SIMPLE_STACK = new SimpleStack<>();

	public static class SimpleStack<T> {
		private final List<T> stack;
		private int pointer = 0;

		public SimpleStack() {
			this.stack = new ArrayList<>();
			pointer = 0;
		}

		public void push(T element) {
			stack.add(pointer++, element);
		}

		public T pop() {
			if (pointer > 0) {
				return stack.get(--pointer);
			}
			return null;
		}
	}
}


SimpleStack 클래스는 데이터를 저장하고 삭제를 할 수 있는 아주 심플한 스택 클래스이다. 테스트를 해보면 정상적으로 동작하는 것을 확인할 수 있다. 문제는 20번째 줄에 있는 stack.get()이다. List에서 remove 하는 것이 아니라 get()을 통해서 처리하고 있다.

pointer의 위치를 변경 했기 때문에 다시 push()를 하면 새로운 element로 해당 pointer에 add()를 하기 때문에 동작 자체는 이상 없다. 하지만 해당 객체의 참조 정보는 여전히 stack에 남아 있게 된다. 참조 정보가 남아 있다는 것은 GC의 대상이 아니라는 의미이다. SIMPLE_STACK이 GC 대상이 되지 않는 한 해당 정보는 사라지지 않는다. (static final인 SIMPLE_STACK이 애플리케이션이 종료되기 전에 GC 대상이 될 일은 없다.) 

MemoryLeakSamplemain() 메서드를 추가하여 실제 메모리 누수가 발생하는지 간단하게 테스트를 해보자. 코드를 간단하게 설명하면 SIMPLE_STACK에 문자열을 push() 하고 바로 pop()을 호출한다. 등록/삭제를 100,000번 반복하는 것이다. 이 과정을 다시 100,000번 반복한다. Assert.isTrue를 통해서 실행 자체에 대한 이상 없음을 확인할 수 있다.

public static void main(String[] args) {
	IntStream.rangeClosed(0, 99999)
			.forEach(i -> IntStream.rangeClosed(0, 99999)
					.forEach(j -> {
						String value = "테스트 데이터입니다. 넘버: " + i + j;
						SIMPLE_STACK.push(value);
						Assert.isTrue(Objects.equals(value, SIMPLE_STACK.pop()), "ERROR");
					}));
}


문제는 SIMPLE_STACK에서 remove 처리되지 못한 문자열의 참조 정보들은 GC 대상이 되지 못한 상태로 여전히 남아 있게 된다. 실제로 그런지 결과를 확인하기 위해 JVM option을 -Xms128m -Xmx128m로 지정하고 VisualVM으로 모니터링을 해봤다. 아래 그래프를 통해서 보듯이 일정 시간 이후에 OOM이 발생하면서 애플리케이션이 종료하게 된다.


Heap Dump를 통해서 String 객체가 압도적으로 많이 생성될 걸 볼 수 있는데 모두 SIMPLE_STACK에 있음을 확인할 수 있다.


이제 stack.get()을 stack.remove()로 변경하고 다시 테스트를 진행해보자. 아래 그래프 보듯이 OOM 발생 없이 정상적으로 실행되는 것 을 알 수 있다.


궁금해서 ArrayListremove() 메서드를 보니 명시적으로 null을 지정한다. (clear to let GC to its work) 그렇다고 개발할 때 모든 경우에 null을 명시적으로 지정해야 GC 대상이 되는건 아니다. 오해하지 말자.


그동안 자바 메모리 누수에 대해 생각해 본 적이 없었는데 이번 기회에 직접 테스트를 통해서 결과를 확인하니 이해하는데 많은 도움이 되는 것 같다.

'Dev > Java' 카테고리의 다른 글

ConcurrentHashMap은 Client lock이 안된다.  (0) 2017.06.15
멀티 스레드에서 synchronized가 필요한 경우  (0) 2017.04.13
CompletableFuture에 관해서  (0) 2017.02.27
Collector 인터페이스  (0) 2017.02.14

회사 후배가 질문했다. 멀티 스레드 환경에서 List 형태의 데이터를 처리하고 싶은데  List에는 중복된 데이터가 존재할 수 있다고 했다. 문제는 어떤 스레드에서 처리 중인 데이터가 다른 스레드에서 연속해서 처리되면 안 된다고 했다. 약간 고민되긴 했지만 synchronized 키워드가 떠올랐다. 사실 Java를 처음 입문했을 때  동시성이 떨어진다는 이유로 웹 환경에서는 synchronized를 잘 사용하지 않는다고 얘기를 들어서 처음부터 나의 관심 밖의 키워드였다. 하지만 후배가 질문한 내용을 보면 멀티 스레드 환경에서 동시성 제어를 통해서 충분히 해결할 수 있다.

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 1, 2);

ExecutorService es = Executors.newFixedThreadPool(7);

ExecutionMeasurer.measure(() -> {
	List<CompletableFuture<Void>> futures = list.stream()
			.map(num -> CompletableFuture.runAsync(() -> {
				synchronized (num) {
					ExecutionMeasurer.delay(1000);
				}
			}, es))
			.collect(Collectors.toList());

	futures.forEach(CompletableFuture::join);
});

es.shutdown();


list에는 총 7개의 요소가 있고 각각의 요소를 처리하는데 1,000ms가 걸린다고 가정해 보자. 총 7개의 스레드로 처리를 하면 몇 초가 걸릴까? 중복된 요소 1, 2가 존재하기 때문에 7개의 스레드로 실행하더라도  synchronized에 의해서 대략 2초 정도의 시간이 소요된다. 주어진 자원을 100% 다 사용하지 못했지만 최소한의 lock으로 중복 처리 없이 동시성을 높였다고 할 수 있다. synchronized 키워드는 평소가 자주 사용하지는 않지만 상황에 따라서는 유용하게 사용할 수도 있다.

위 예제는 단일 JVM 구조에서 가볍게 실행될 경우이고 분산 환경에서는 좀 더 다른 고민이 필요할 것이다. 데이터 분포는 어떠한지 분배는 어떻게 처리할지 또는 lock의 단위를 어떻게 할지 고민해봐야 한다.

'Dev > Java' 카테고리의 다른 글

ConcurrentHashMap은 Client lock이 안된다.  (0) 2017.06.15
자바 메모리 누수 확인  (0) 2017.04.14
CompletableFuture에 관해서  (0) 2017.02.27
Collector 인터페이스  (0) 2017.02.14

CompletableFuture에 대해서 알아보기 전에 먼저 Java 5부터 사용할 수 있었던 Future 인터페이스를 확인해 보겠습니다. Future를 이용하면 호출한 스레드가 결과를 기다리는 동안 다른 작업을 수행할 수 있습니다. 

아래 소스코드를 기준으로 예를 들면 치킨배달이 완료(10초)되는 동안 그냥 기다리는 것이 아니라 TV시청을 한 이후에 배달된 치킨을 먹을 수 있습니다. TV시청 시간이 10초 보다 짧다면 배달이 완료되는 나머지 시간만큼은 그냥 대기를 하게 되고 배달시간(10초) 보다 더 많은 시간을 보게 되면 바로 먹을 수 있습니다. 배달사고로 인해 무한정 기다리게 되는 경우가 발생할 수도 있기 때문에 최대 기다릴 수 있는 타임아웃 시간(15초)도 지정했습니다.

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
	Thread.sleep(10000);
	return "치킨배달 완료";
});

doWatchingTv();

try {
	String result = future.get(15, TimeUnit.SECONDS);
	System.out.println("result: " + result);
	executor.shutdown();
} catch (ExecutionException | InterruptedException | TimeoutException e) {
	e.printStackTrace();
}


이제 CompletableFuture를 활용할 수 있도록 코드를 변경해 보겠습니다. 먼저 DeliveryStore라는 클래스를 만들어 배달 주문 받을 수 있도록 하겠습니다. 주문 이후 배달완료까지 3초 소요되도록 변경했습니다. 이때 시간 지연이 발생하므로 CompletableFuturesupplyAsync()으로 처리 했습니다. supplyAsync()의 두 번째 인수를 지정하지 않으면 commonPool를 사용하게 됩니다. 물론 직접 생성해서 지정할 수도 있습니다.

@Data
@AllArgsConstructor
public static class DeliveryStore {
	private String name;

	public Future<String> orderAsync(String item) {
		return CompletableFuture.supplyAsync(() -> {
			int seconds = 3000;
			delay(seconds);
			return "[" + name + "] " + item + " 배달 완료!! (배달시간: " + seconds + "msecs)";
		});
	}

	public String order(String item) {
		int seconds = 3000;
		delay(3000);
		return "[" + name + "] " + item + " 배달 완료!! (배달시간: " + seconds + "msecs)";
	}

	private void delay(int seconds) {
		try {
			Thread.sleep(seconds);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}


이제 새로만든 DeliveryStore 클래스를 이용해서 주문을 해보겠습니다.

public static void main(String[] args) {
	DeliveryStore chicken = new DeliveryStore("치킨");
	Future<String> future = chicken.orderAsync("후라이드");

	doWatchingTv();

	try {
		String result = future.get(8, TimeUnit.SECONDS);
		System.out.println("결과: " + result);
	} catch (ExecutionException | InterruptedException | TimeoutException e) {
		e.printStackTrace();
	}
}


배달 정보가 잘 출력 되는 것을 확인할 수 있습니다.

결과: [치킨] 후라이드 배달 완료!! (배달시간: 3000msecs)


이번에는 비동기 주문(orderAsync())이 아닌 동기 주문(order())을 호출해서 여러 DeliveryStore에 주문을 해보겠습니다. 아래와 같이 세개의 DeliveryStore에 "세트1호"를 주문하면 모두 완료되려면 얼만큼의 시간이 소요될까요? 3초씩 총 9초가 소요됩니다.

List deliveryStores = Arrays.asList(
		new DeliveryStore("치킨"),
		new DeliveryStore("피자"),
		new DeliveryStore("분식"));

deliveryStores.stream()
		.map(ds -> ds.order("세트1호"))
		.forEach(System.out::println);


총 배달소요 시간을 줄이기 위해 스트림을 병렬 스트림으로 이용해 보겠습니다. 이번에는 총 3초가 소요되었습니다. (테스트 환경의 코어 수에 따라 결과는 다를 수 있습니다.)

deliveryStores.parallelStream()
		.map(ds -> ds.order("세트1호"))
		.forEach(System.out::println);


DeliveryStore 수가 훨씬 많다고 가정하면 결과는 어떻게 될까요? commonPool의 제한된 쓰레드풀이 아닌 필요한 작업량을 고려해서 쓰레드풀을 만들 수 있어야 합니다. Executor를 통해서 설정해보겠습니다.

List deliveryStores = Arrays.asList(
		new DeliveryStore("치킨"),
		new DeliveryStore("피자"),
		new DeliveryStore("일식"),
		new DeliveryStore("중식"),
		new DeliveryStore("토스트"),
		new DeliveryStore("김밥"),
		new DeliveryStore("케익"),
		new DeliveryStore("과일"),
		new DeliveryStore("분식"));

// 최대 스레드 30개
ExecutorService excutor = Executors.newFixedThreadPool(Math.min(deliveryStores.size(), 30), r -> {
	Thread t = new Thread(r);
	t.setDaemon(true);  // 데몬스레드 사용(프로그램종료 방해 방지)
	return t;
});

List<CompletableFuture<String>> futures = deliveryStores.stream()
		.map(ds -> CompletableFuture.supplyAsync(() -> ds.order("세트1호"), excutor))
		.collect(toList());

List<String> result = futures.stream()
		.map(CompletableFuture::join)
		.collect(toList());


일반 스레드가 실행 중이면 자바프로그램이 종료되지 않는데 만약 어떤 작업을 한없이 기다리게 되면 문제가 발생할 수 있기 때문에 자바 프로그램이 종료 될 때 같이 강제로 실행이 종료 될 수 있도록 데몬 스레드로 지정합니다. 

CompletableFuture.supplyAsync() 호출 이후에 CompletableFuture.join을 바로 호출하지 않고 스트림을 분리한 이유는 함께 연속해서 호출하게 되면 모든 DeliveryStore가 동기적, 순차적으로 실행되게 되므로 분리하여 호출해야 합니다.  실행 결과를 대략 3초정도 소요됩니다. commonPool을 사용했다면 더 많은 시간이 소요됐을 것입니다. 그렇다고 무한정 스레드 수를 늘릴 수는 없습니다. 너무 많으면 오히려 서버가 크래시 될 수 있으므로 서버의 CPU 코어 수, CPU 활용 비율, 대기시간/처리시간의 비율을 고려하여 적정한 수를 지정해야 합니다.

'Dev > Java' 카테고리의 다른 글

ConcurrentHashMap은 Client lock이 안된다.  (0) 2017.06.15
자바 메모리 누수 확인  (0) 2017.04.14
멀티 스레드에서 synchronized가 필요한 경우  (0) 2017.04.13
Collector 인터페이스  (0) 2017.02.14

Java8에서 Collector 인터페이스를 직접 구현할 일은 많지 않습니다. 보통은 Collectors 클래스에서 미리 구현한 static 메서드를 사용합니다. Collectors라는 이름을 통해서도 어떤 역할을 하는지 충분히 유추할 수 있습니다.

public static void main(String[] args) {
	List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 1, 4);
	
	List<Integer> evens = list.stream()
			.filter(i -> i % 2 == 0)
			.collect(Collectors.toList());
	
	Map<Integer, List<Integer>> grouping = list.stream()
			.collect(Collectors.groupingBy(Integer::valueOf));
	
	Integer sum = list.stream()
			.collect(Collectors.reducing(0, Integer::sum));
	
	List<String> titles = Arrays.asList("Apple", "Banana", "cherry", "lemon");
	
	String collect = titles.stream().collect(Collectors.joining(", "));
}


Stream의 최종 연산중에 하나인 collect()는 두 가지 타입이 있는데 Collectors를 이용할 경우 인자를 Collector 인터페이스를 받는 메서드를 사용합니다.  Collector 인터페이스를 한번 확인해 보겠습니다. 제네릭 타입도 3개나 되고 구현 대상 메서드도 5개나 되는 것이 뭔가 복잡한 느낌을 줍니다. 하나씩 살펴보겠습니다.

public interface Collector<T, A, R> {
	Supplier<A> supplier();
	BiConsumer<A, T> accumulator();
	Function<A, R> finisher();
	BinaryOperator<A> combiner();
	Set characteristics();
}


  • supplier()는 빈 결과로 이루어진 Supplier를 반환해야 합니다. collect 과정에서 비어(empty) 있는 누적자 인스턴스를 만드는 파라미터가 없는 함수입니다. 
  • accumulator()은 리듀싱 연산을 수행할 함수를 반환해야 합니다. 
  • finisher()는 스트림 탐색을 끝내고 누적자 객체를 최종 결과로 반환하면서 누적 과정을 끝낼 때 호출할 함수를 반환해야 합니다.
  • combiner()는 병렬로 처리할 때 누적자가 이 결과를 어떻게 처리할지를 정의합니다. 
  • characteristics()는 스트림을 병렬로 리듀스 할 것인지 한다면 어떤 최적화를 선택해야 할지 힌트를 제공합니다. 
글로 적으니 너무 이해하기 힘드네요. Collectors.toList()의 소스를 확인해보는 것이 좋을 것 같습니다.
public static  Collector> toList() {
	return new CollectorImpl<>((Supplier>) ArrayList::new, List::add,
			(left, right) -> { left.addAll(right); return left; }, CH_ID);
}


  • 첫 번째 인자인 SupplierArrayList::new로 생성자 레퍼런스를 전달했습니다.(단순히 ArrayList의 생성자만 전달하면 Supplier로 처리가 되지 않기 때문에 캐스팅 한 것을 확인할 수 있습니다.) 
  • 두 번째 인자는 BiConsumer로는 List::add가 사용되어 있습니다. Supplier를 통해서 생성된 누적자 인스턴스에 add() 메서드를 통해서 리듀싱 연산을 수행하게 됩니다.
  • 세 번째 인자는 BinaryOperator로 병렬로 처리할 때 서로 다른 서브 파트를 어떻게 누적시킬지를 결정하게 되는데 addAll()을 통해서 리스트를 추가하게 됩니다.
  • 네 번째 인자인 CH_ID는 미리 정의된 EnumSet의 이름으로 Characteristics.IDENTITY_FINISH가 지정되어 있습니다.
눈에 띄는 것은 finisher()를 정의하지 않았는데 이럴 경우 supplier()에서 생성한 객체를 그대로 반환하게 됩니다.

Characteristics enum의 종류와 설명

  • Characteristics.UNORDERED리듀싱 결과는 스트림 요소의 방문 순서나 누적 순서에 영향을 받지 않는다.
  • Characteristics.CONCURRENT - 다중 스레드에서 accumulator 함수를 동시에 호출할 수 있으며 이 컬렉터는 스트림의 병렬 리듀싱을 수행할 수 있다. 컬렉터의 플래그에 UNORDERED를 함께 설정하지 않았다면 데이터 소스가 정렬되어 있지 않은(즉, 집합처럼 요소의 순서가 문의한) 상황에서만 병렬 리듀싱을 수행할 수 있다.
  • Characteristics.IDENTITY_FINISH - finisher 메서드가 반환하는 함수는 단순히 identity를 적용할 뿐이므로 이를 생략할 수 있다. 따라서 리듀싱 과정의 최종 결과로 누적자를 객체로 바로 사용할 수 있다. 또한 누적자 A를 결과로 R로 안전하게 형 변환할 수 있다.


+ Recent posts