자바병렬프로그래밍_Chapter05_구성단위

자바 병렬 프로그래밍. Chapter 05. 구성 단위

5.1 동기화된 컬렉션 클래스

5.1.1 동기화된 컬렉션의 문제점

  • 동기화된 컬렉션을 사용하면 따로 락이나 동기화 기법을 사용하지 않는다 해도 이런 대부분의 기능이 모두 스레드 안전하다.
  • 하지만 여러 스레드가 해당 컬렉션 하나를 놓고 동시에 그 내용을 변경하려한다면 컬렉션 클래스가 상식적으로 올바른 방법으로 동작하지 않을 수 있다.
public static void Object getLast(Vector list) {
  int lastIndex = list.size() - 1;
  return list.get(lastIndex);
}

public static void deleteLast(Vector list) {
  int lastIndex = list.size() - 1;
  list.remove(lastIndex);
}
  • 오동작 할 수 있는 예외
  • 그럼에도 불구하고 Vector 자체 스레드 안전성에는 문제가 없다.
  • 위의 예제는 컬렉션을 락으로 사용하면 해결 가능하다.
public static void Object getLast(Vector list) {
  synchronized(list) {
    int lastIndex = list.size() - 1;
    return list.get(lastIndex);
  }
}

public static void deleteLast(Vector list) {
  synchronized(list) {
    int lastIndex = list.size() - 1;
    list.remove(lastIndex);
  }
}
  • List로 동기화한 코드
for (int i = 0; i < vector.size(); i++)
  doSomething(vector.get(i));
  • ArrayIndexOutBoundsException이 발생할 수 있는 반복문
synchronized (vector) {
  for (int i = 0; i < vector.size(); i++)
    doSomething(vector.get(i));
}
  • 스레드 안전성은 확보할수 있다.
  • 하지만 for문을 도는 동안에 다른 스레드들이 vector에 접근하지 못한다.
  • 여러 스레드가 동작하는 병렬 프로그램의 큰 장점은 잃어버린다.

5.1.2 Iterator와 ConcurrentModificationException

  • Collection에 들어가 있는 값을 차례로 반복해서 읽어내는 가장 표준적인 방법은 Iterator를 사용하는 것이다.
  • 하지만 Iterator 또한 같은 시점에 다른 스레드가 컬렉션 클래스 내부의 값을 변경하는 작업을 처리하지 못하게 만들어져 있다.
  • 대신 즉시 멈춤 fail-fast 방식을 사용한다. 반복 도중 내부의 값 변경이 포착되면 ConcurrentModificationException 예외를 발생시키고 멈추는 방식이다.
List<Widget> widgetList = Collection.synchronizedList(new ArrayList<Widjet>>());
......
// ConcurrentModificationException이 발생할수 있다.
for (Widget w : widgetList)
  doSomething(w);
  • for-each 반복문은 javac로 컴파일할때 내부적으로 Iterator를 사용해 hashNext나 next 메소드를 호출하는 방식으로 바뀐다.
  • 따라서 반복문을 실행할때 ConccurentModificationException을 방지할 방법은 반복문 전체를 동기화 하는 방법 밖에 없다.

but 반복문을 실행하는 코드 전체를 동기화시키는 방법이 그다지 훌륭하지 못한 방법이라고 주장하는 이유를 살펴볼 필요가 있다.

  • 컬렉션에 엄청나게 많은 수의 값이 들어 있거나 값마다 반복하면서 실행해야 하는 작업이 시간이 많이 소모되는 작업 일수 있다.

  • 이 경우 컬렉션 클래스 내부의 값을 사용하고자 하는 스레드가 상당히 오랜 시간 대기 상태에서 기다려야 할 수 있다.

  • 또한 반복문에서 락을 잡고 있는 상황에서 다른 락을 확보해야 하는 상황이 벌어지면 데드락 deadlock이 발생할 수 있다.

  • 소모상태 starvation이나 데드락의 위험이 있는 상태에서 컬렉션 클래스를 오랫동안 막아두면 애플리케이션 전체의 확장성을 해칠 수 있다.

  • 반복문에서 락을 오래 잡고 있으면 있을수록 락을 확보하고자 하는 스레드가 대기 상태에 많이 쌓일 수 있고, 대기 상태에 스레드가 적체되면 될수록 CPU 사용량이 급격하게 증가할 가능성이 높다.

    • 편법 - 반복문을 실행하는 동안 컬렉션 클래스에 들어있는 내용에 락을 걸어둔것과 같은 효과를 내려면 clone 메소드로 복사한 사본을 대상으로 반복문을 사용하면 된다
    • 물론 최소한 복사 clone을 하는 도중에는 동기화를 사용해야 한다.

5.1.3. 숨겨진 Iterator

클래스 내부에서 필요한 변수를 모두 캡슐화하면 그 상태를 보존하기가 훨씬 편리한 것처럼, 동기화 기법을 클래스 내부에 캡슐화하면 동기화 정책을 적용하기가 쉽다.
@AntiPattern
public class HiddenIterator {
  @GuardedBy("this")
  pirvate final Set<Integer> set = new HashSet<Integer>();

  public synchronized void add(Integer i) { set.add(i); }
  public synchronized void remove(Integer i) { set.remove(i); }

  public void addTenThings () {
    Random r = new Random();
    for(int i = 0; i < 10; i++)
      add(r.nextInt());
    System.out.println("DEBUG: added ten elements to " + set);
  }
}
  • AntiPattern : 문자열 연결 연산 내부에 iterator가 숨겨져 있는 상황.

  • 위 예제 뿐만 아니라 toString 메소드, hashCode, equals, containsAll, removeAll, retainAll 등의 메소드도 내부적으로 Iterator를 사용한다. 이러한 모든 메소드에서 ConcurrentModificationException이 발생할 가능성이 있다.

5.2 병렬 컬렉션

  • 동기화된 컬렉션 클래스는 내부 변수에 접근하는 통로를 일련화해서 스레드 안전성을 확보했다.

  • 하지만 이렇다 보니 여러 스레드가 한번에 동기화된 컬렉션을 사용하려고 하면 동시 사용성은 상당 부분 손해를 볼 수 밖에 없다.

  • 하지만 병렬 컬렉션은 여러 스레드에서 동시에 ㅅ용할 수 있도록 설계되어 있다.

  • 병렬성을 극대화한 ConcurrentHashMap이 있다. 또 CopyOnWrighteArrayList는 추가되어 잇는 갹체 목록을 반복시키며 열람하는 연산의 성능을 최우선으로 구현한 List 클래스 하위 클래스 이다.

  • 또한 ConcurrentMap 인터페이스는 추가하려는 항목이 기존에 없을때만 추가하는 put-if-absent 연산, 대치 replace 연산, 조건부 제거 conditional remove 연산 등을 정의하고 있다.

기존에 사용하던 동기화 컬렉션 클래스를 병렬 컬렉션으로 교체하는 것만으로도 별다른 위험 요소 없이 전체적인 성능을 상당히 끌어 올릴 수 있다.
  • 또한 Queue 인터페이스를 구현하는 여러 종류의 Queue 또한 있다.
  • ConcurrentLinkedQueue는 널리 알려져 있는 전통적인 FIFO 큐이고 PriorityQueue는 특정한 우선 순위에 따라 큐에 쌓여 있는 항목이 추출되는 순서가 바뀌는 특성을 갖고 있다.
  • BlockingQueue는 큐에 항목을 추가하거나 뽑아낼 때 상황에 따라 대기할 수 있도록 구현되어 있다.
  • ConcurrentHashMap 클래스가 해시 기반의 동기화된 Map인것처럼 ConcurrentSkipListMap과 ConcurrentSkipListSet이 있는데 이 둘은 각각 StoredMap과 SortedSet 클래스의 병렬성을 높이도록 발전된 형태이다.

5.2.1. ConcurrentHashMap

  • ConcurrentHashMap은 HashMap과 같이 해시를 기반으로 하는 Map이다. 하지만 내부적으로는 이전과 다른 동기화 기법을 적용해 병렬성과 확장성이 훨씬 나아졌다.
  • 내부적으로 락 스트라이핑 lock striping이라 부르는 굉장히 세밀한 동기화 방법을 사용해 여러 스레드에서 공유하는 상태에 훨씬 잘 대응 할 수 있다.
    • 읽기 연산은 많은 수의 스레드라도 동시에 처리할 수 있다.
    • 읽기와 동시에 쓰기 연산도 수행 가능하다.
    • 쓰기 연산은 제한된 개수만큼 동시에 수행할 수 있다.
    • 속도면에서는 여러 스레드가 동시에 동작하는 환경에서 일반적으로 훨씬 높은 성능 결과를 볼 수 있다.
    • 단일 스레드 환경에서의 단점도 거의 찾아볼 수 없다.
  • 다른 병렬 컬렉션 클래스와 비슷하게 ConcurrentHashMap 클래스도 Iterator를 만들어 내는 부분에서 많이 발전했는데, ConcurrentHashMap이 만들어낸 Iterator는 ConcurrentModificationException을 발생시키지 않는다.
  • 강해진 병렬성 만큼 조금 약해진 부분도 있다
    • size나 isEmpty의 결과값을 받을 당시에 값이 바뀔수도 있다.
      • 추정값이 된다.
  • 하지만 핵심 연산인 get, put, containsKey, remove등의 핵심 연산의 병렬성과 성능은 좋다.

5.2.2 Map 기반의 또 다른 단일 연산

  • ConcurrentHashMap 클래스는 일반적으로 많이 사용하는 없을 경우 추가하는 put-if-absent, 동일한 경우에 제거하는 remove-if-equal, 동일한 경우 대치하는 replace-if-equal 연산과 같이 자주 필요한 몇가지 연산이 이미 구현되어 있다.
  • 만약 이미 구현되어 있지 않은 기능을 사용해야 한다면, ConcurrentHashMap 보다 ConcurrentMap을 사용해 보는 편이 났다.

5.2.3 CopyOnWriteArrayList

  • CopyOnWriteArrayList 클래스는 동기화된 List 클래스보다 병렬성을 훨씬 높이고자 만들어졌다. 예를 들어 대부분의 일반적인 용도에 쓰일때 병렬성이 ㅎㅇ상 됐고 특히 List를 복제할 필요가 없다.

5.3. BlockingQueue와 Producer-Consumer 패턴

  • BlockingQueue는 puttake라는 핵심 메소드와 offer, poll이라는 메소드를 가지고 있다.

  • put은 큐가 가득차면 공간이 생길때 까지 대기한다.

  • take는 큐가 비어있을때 값이 생길때까지 대기한다.

  • 큐와 함께 스레드 풀을 사용하는 경우가 바로 프로듀서-컨슈머 패턴을 활용하는 가장 흔한 경우이다.

  • 작업 큐와 스레드 풀을 사용하는 부분은 Excutor 프레임웍을 사용한다.

  • offer 메소드는 큐에 값을 넣을 수 없을 때 대기하지 않고 바로 공간이 모자라 추가할 수 없다는 오류를 알려준다.

  • offer 메소드를 잘 활용하면 프로듀서가 작업을 많이 만들어 과부하에 이르는 상태를 좀더 효과적으로 처리할 수 있다.

  • 예를 들어 부하를 분배하거나, 작업할 내용을 직렬화 serialized해서 디스크에 임시로 저장하거나, 아니면 프로듀서 스레드의 수를 동적으로 줄이거나, 기타 여러가지 방법을 사용해 프로듀서가 작업을 생성하는 양을 조절할 수 있다.

블로킹 큐는 애플리케이션이 안정적으로 동작하도록 만들고자 할 때 요긴하게 사용할 수 있는 도구이다. 블로킹 큐를 사용하면 처리할 수 있는 양보다 훨씬 많은 작업이 생겨 부하가 걸리는 상황에서 작업량을 조절해 애플리케이션이 안정적으로 동작하도록 유도할 수 있다.
  • 자바 클래스 라이브러리에는 BlockingQueue 인터페이스를 구현한 클래스가 몇가지 들어 있다.

  • LinkedBlockingQueue와 ArrayBlockingQueue는 FIFO 형태의 큐이다. 기존 LinkedList나 ArrayList에 대응 된다.

  • PriorityBlockingQueue는 우선 순위를 기준으로 동작하는 큐이다.

    • PriorityBlockingQueue는 항목을 정렬 시킬수 있는 Comparable 인터페이스를 구현했다.
  • 마지막으로 SynchronousQueue 클래스도 BlockingQueue 인터페이스를 구현한다.

    • 큐에 항목이 쌓이지 않으며 따라서 큐 내부에 값을 저장할 수 있도록 공간을 할당하지도 않는다.
    • 대신 큐에 값을 추가하려는 스레드나 값을 읽어가려는 스레드의 큐를 관리한다.
    • 이처럼 SynchronousQueue는 take나 put을 호출하면 호출한 메소드의 상대편 측에 해당하는 메소드를 다른 스레드가 호출할때까지 대기하고 그 즉시 넘겨준다.
    • 이처럼 SynchronousQueue는 데이터를 넘겨받을수 있는 스레드가 충분할때 사용하는 편이 좋다.

    5.3.1. 예제: 데스크탑 검색

프로듀서-컨슈머 모델을 적용해 볼 수 있는 좋은 프로그램 예제로 데스크탑 검색 프로그램을 들 수 있겠다. 데스크탑 검색 프로그램은 로컬 하드 디스크에 들어 있는 문서를 전부 읽어들이면서 나중에 검색하기 좋게 색인을 만들어 두는 작업을 한다. 많이 알려진 구글 데스크탑 검색이나 윈도우 인덱싱 서비스 등이 이런 일을 하는 프로그램의 대표적인 예이다.

5.3.2. 직렬 스레드 한정

java.util.concurrent 패키지에 들어있는 블로킹 큐 관련 클래스는 모두 프로듀서 스레드에서 객체를 가져와 컨슈머 스레드에 넘겨주는 과정이 세심하게 동기화 되어 있다.

프로듀서-컨슈머 패턴과 블로킹 큐는 가변 객체 mutable object를 사용할 때 객체의 소유권을 프로듀서에서 컨슈머로 넘기는 과정에서 직렬 스레드 한정 serial thread confinement 기법을 사용한다.

스레드에 한정된 객체는 특정 스레드 하나만이 소유권을 가질 수 있는데, 객체를 안전한 방법으로 공개하면 객체에 대한 소유권을 이전 transfer할 수 있다.

  • 객체 풀 object pool은 직렬 스레드 한정 기법을 잘 활용한 예이다.

  • 풀에서 소유하고 있던 객체를 외부 스레드에게 빌려주는 일이 본업이기 때문이다.

  • 가변 객체의 소유권을 이전해야 할 필요가 있다면, 위에서 설명한 것과 다른 객체 공개 방법을 사용할 수도 있다. 하지만 항상 소유권을 이전받는 스레드는 단 하나여야 한다는 점을 주의하자.

  • 블로킹 큐를 사용하면 이런 점을 정확하게 지킬 수 있다.

  • 덧붙여 ConcurrentMap의 remove 메소드를 사용하거나 AtomicReference의 compareAndSet 메소드를 사용하는 경우에도 약간의 추가 작업만 해준다면 원활하게 처리할 수 있다.

public class FileCrawler implements Runnable {
  private final BlockingQueue<File> fileQueue;
  private final FileFilter fileFilter;
  private final File root;
  ....
  ....
  public void run() {
    try {
      crawl(root);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

  private void crawl(File root) throws InterruptedException {
    File[] entries = root.listFiles(fileFilter);
    if(entries != null) {
      for(File entry : entries)
        if(entry.isDirectory())
          crawl(entry);
        else if(!alreadyIndexed(entry))
          fileQueue.put(entry);
    }
  }

  public class Indexer implements Runnable {
    private final BlockingQueue<File> queue;

    public Indexer(BlockingQueue<File> queue) {
      this.queue = queue;
    }

    public void run() {
      try {
        while (true)
          indexFile(queue.take());
      }catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }
  }
}
  • 프로듀서-컨슈머 패턴을 활용한 데스크탑 검색 애플리케이션 구조
public static void startIndexing(File[] roots) {
  BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
  FileFilter filter = new FileFilter() {
    public boolean accept(File file) { return true; }
  };

  for(File root : roots)
    new Thread(new FileCrawler(queue, filter, root)).start();

  for(int i = 0; i < N_CONSUMERS; i++)
    new Thread(new Indexer(queue)).start();
}

5.3.3. 덱, 작업 가로채기

  • Deque('덱'이라고 읽는다)과 BlockingDeque
  • Deque과 BlockingDeque는 각각 Queue와 BlockingQueue를 상속받은 인터페이스이다.
  • Deque는 앞과 뒤 어느 쪽에서도 객체를 쉽게 삽입하거나 제거할 수 있도록 준비된 큐이다.
  • Deque를 상속받은 클래스는 ArrayDeque과 LinkedBlockingDeque가 있다.

deque 덱

  • 작업 가로채기 work stealing 패턴은 덱을 그대로 가져다 사용할 수 있다.

  • 프로듀서-컨슈머 패턴에서는 모든 컨슈머가 하나의 큐를 공유해 사용한다.

  • 하지만 작업 가로채기 패턴에서는 모든 컨슈머가 각자의 덱을 갖는다.

  • 만약 특정 컨슈머가 자신의 덱에 들어있던 작업을 모두 처리하고 나면 다른 컨슈머의 덱에 쌓여있는 작업 가운데 맨 뒤 추가된 작업을 가로채 가져올 수 있다.

  • 작업 가로채기의 특성상 컨슈머가 하나의 큐를 바라보면서 서로 작업을 가져가려고 경쟁하지 않기 때문에 일반적인 프로듀서-컨슈머 패턴보다 규모가 큰 시스템을 구현하기에 적당하다.

  • 컨슈머가 다른 컨슈머의 큐에서 작업을 가져올때 맨 뒤의 작업을 가져오기 때문에 맨 앞의 작업을 가져가려하는 원 소유자와 경쟁이 일어나지 않는다.

  • 작업 가로채기 패턴은 컨슈머가 프로듀서의 역할도 갖고 있는 경우에 적용하기 좋다.

  • 스레드가 작업을 진행하는 도중에 새로 처리해야 할 작업이 생기면 자신의 덱에 새로운 작업을 추가한다.(작업을 서로 공유하도록 구성하는 경우에는 다른 작업 스레드의 덱에 추가하기도 한다.).

  • 만약 자신의 덱이 비었다면 다른 작업 스레드의 덱을 살펴보고 밀린 작업이 있다면 가져다가 처리해 자신의 덱이 비었다고 쉬는 스레드가 없도록 관리한다.

5.4 블로킹 메소드, 인터럽터블 메소드

스레드는 여러가지 원인에 의해 블록 당하거나, 멈춰질 수 있다.
1. I/O 작업이 끝나기를 기다릴 수 있다.
2. 락을 확보하기 위해 기다릴 수 있다.
3. Thread.Sleep 메소드가 끝나기를 기다릴 수 있다.
4. 다른 스레드가 작업 중인 내용의 결과를 확인하기 위해 기다리는 경우도 있다.
5. 스레드가 블록되면 동작이 멈춰진 다음 블록된 상태(BLOCKED, WAITING, TIMED_WAITING) 가운데 하나를 갖게된다.
  • 호출하려는 메소드 가운데 InterruptedExceptuon이 발생할 수 있는 메소드가 있다면 그 메소드를 호출하는 메소드 역시 블로킹 메소드이다. 따라서 InterruptedException이 발생했을 때 그에 대처할 수 있는 방법을 마련해둬야 한다.
  • 일반적으로 라이브러리 형태의 코드라면 두가지를 사용해 볼 수 있다.
  1. InterruptedException을 전달 : 받아낸 InterruptedException을 그대로 호출한 메소드에 넘긴다. 받은 메소드는 catch로 잡지 않거나 cat로 잡은 다음 정리 작업을 진행하고 throw로 다시 던지는 방법도 있다.
  2. 인터럽트를 무시하고 복구: 특정 상황에서는 InterruptedException을 throw할 수 없을 수도 있는데, 예를 들어 Runnable 인터페이스를 구현한 경우이다.
public class TaskRunnable implements Runnable {
  BlockingQueue<Task> queue;
  ...
  public void run() {
    try {
      processTask(queue.take());
    } catch (InterruptedException e) {
      // 인터럽트가 발생한 사실을 저장한다.
      Thread.currentThread().interrupt();
    }
  }
}
  • 인터럽트가 발생했음을 저장해 인터럽트 상황을 잊지 않아야 한다.

인터럽트를 잘 활용하면 훨씬 세밀하게 고급 기능을 구현할 수 있지만, 위의 두가지 방법을 사용하면 대부분의 경우 대응할 수 있다.

AntiPattern : InterruptedException을 catch 하고 아무것도 하지 않은 일은 하지 말자.

  • 발생한 InterruptedException을 먹어버리고 더 이상 전파하지 않을 수 있는 경우는 Thread 클래스를 직접 상속하는 경우뿐이다.

출처 :브라이언 괴츠, 더그 리, 팀 피얼스, 조셉 보우비어, 데이빗 홈즈, 조슈아 블로쉬 공저, 『 멀티코어를 100% 활용하는 자바 병렬 프로그래밍』, 에이콘(2008.7.30), 5장 인용.