5.5 동기화 클래스
블로킹 큐는 다양한 컬렉션 클래스 가운데 특히나 눈에 띄는 특성이 많다.
- 객체를 담을 수 있는 컬렉션 클래스
- 프로듀서-컨슈머 사이에서 take와 put 등의 블로킹 메소드를 사용해 작업의 흐름을 조절할 수 있다.
상태 정보를 사용해 스레드 간 작업 흐름을 조절할 수 있도록 만들어진 클래스를 동기화 클래스 synchronizer
라고 한다.
또 다른 동기화 클래스의 예로는
세마포어 semaphore
,배리어 barrier
,래치 latch
등이 있다.모든 동기화 클래스는 구조적인 특징을 갖고 있다.
동기화 클래스에 접근하려는 스레드가 어느 경우에 통과하고 어느 경우에는 대기하도록 멈추게 해야 하는지를 결정하는 상태 정보를 갖고 있고 그 상태를 변경할 수 있는 메소드를 제공하고, 동기화 클래스가 특정 상태에 진입할 떄까지 효과적으로 대기할 수 있는 메소드도 제공한다.
5.5.1 래치
- 래치는 스레드가 동작하는 과정을 늦출 수 있도록 해주는 동기화 클래스이다.
- 래치는 일종의 관문과 같은 형태로 동작한다.
- 래치가 터미널 상태에 이르기 전에는 관문이 닫혀 있고 어떤 스레드도 통과할 수 없다.
- 래치가 터미널 상태에 다다르면 관문이 열리고 모든 스레드가 통과한다.
- 한번 열린 관문은 계속해서 열린 상태로 유지된다.
- 이런 특성의 래치는 특정한 단일 동작이 완료되기 이전에는 어떤 기능도 동작하지 않도록 막아내야 하는 경우 요긴하다.
1. 특정 자원을 확보하기 전에는 작업을 시작하지 말아야 하는 경우.
2. 의존성을 갖고 있는 다른 서비스가 시작하기 전에는 특정 서비스가 실행되지 않도록 막아야 하는 경우.
3. 특정 작업이 필요한 모든 객체가 실행할 준비를 갖출 때까지 기다리는 경우.
- CountDownLatch 클래스의 countDown 메소드는 대기하던 이벤트가 발생했을 때 내부에 갖고 있는 이벤트 카운터를 하나 낮춘다.
- await 메소드는 래치 내부의 카운터가 0이 될때까지, 대기하던 모든 이벤트가 호출될때 까지 대기한다.
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1); // 1초기화
final CountDownLatch endGate = new CountDownLatch(nThreads); // 쓰레드만큼
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await(); // start는 기다린다.
try {
task.run(); // 태스크 실행
} finally {
endGate.countDown(); // 끝나면 갯수를 낮춘다.
}
} catch (InterruptedException ignored) {
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
}
- 왜 TestHarness는 스레드가 만들어지는대로 바로 작업을 시작하도록 하지 않았을까?
- n개의 스레드가 동시에 동작할 때 전체 작업 시간이 얼마나 걸리는지 확인하려고 만들어진 클래스이다.
5.5.2 FutureTask
- FutureTask는 Future 인터페이스를 구현한다.
- Future 인터페이스는 연산 결과를 알려준다.
- FutureTask가 나타내는 연산 작업은 Callable 인터페이스를 구현하도록 되어있는데, 시작 전 대기, 시작됨, 종료됨과 같은 세 가지 상태를 가질 수 있다.
- 종료된 상태는 정상적인 종료, 취소, 예외 상황 발생과 같이 연산이 끝나는 모든 종류의 상태를 의미한다.
- FutureTask가 한번 종료됨 상태에 이르면 더 이상 상태가 바뀌지 않는다.
- Future.get 메소드도 실행 상태에 따라 동작 상태가 다르다.
- FutureTask의 작업이 종료 됐으면 get 메소드는 그 결과를 즉시 알려준다.
- 종료 상태에 이르지 못했다면 get 메소드는 끝날때 까지 대기한다.
- FutureTask는 실제로 연산을 실행했던 스레드에서 만들어 낸 결과 객체를 실행시킨 스레드에게 넘겨준다.
- FutureTask 클래스에 명시된 것처럼 결과 객체는 안전한 공개 방법을 통해 넘겨주게 되어 있다.
- FutureTask는 Executor 프레임웍에서 비동기적인 작업을 실행하고자 할 때 사용된다.
- 기타 시간이 많이 필요한 모든 작업이 있을 떄 실제 결과가 필요한 시점 이전에 미리 작업을 실행시켜두는 용도로 사용한다.
public class Preloader {
ProductInfo loadProductInfo() throws DataLoadException {
return null;
}
private final FutureTask<ProductInfo> future =
new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
public ProductInfo call() throws DataLoadException {
return loadProductInfo();
}
});
private final Thread thread = new Thread(future);
public void start() { thread.start(); }
public ProductInfo get()
throws DataLoadException, InterruptedException {
try {
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException)
throw (DataLoadException) cause;
else
throw LaunderThrowable.launderThrowable(cause);
}
}
interface ProductInfo {
}
}
class DataLoadException extends Exception { }
- Preloader 클래스는 FutureTask를 사용해 결과 값이 필요한 시점 이전에 시간이 많이 걸리는 작업을 미리 실행시켜둔다.
- 시간이 많이 걸리는 작업을 미리 시작시켜두면 실제로 결과를 필요로 하는 시점이 됐을 때 기다리는 시간을 줄일 수 있다.
Callable
Callable 인터페이스로 정의되어 있는 작업에서는 예외를 발생시킬 수 있으며 어디에서든 Error도 발생 시킬 수 있따.
/** 변수 t의 내용이 Error 라면 그대로 throw한다.
* 변수 t의 내용이 RuntimeException이라면 그대로 리턴한다.
* 다른 모든 경우에는 IllegalStateException을 throw한다.
**/
public static RuntimeException launderThrowable(Throwable t) {
if (t instanceof RuntimeException)
return (RuntimeException) t;
else if (t instanceof Error)
throw (Error) t;
else
throw new IllegalStateException("RuntimeException이 아님", t);
}
5.5.3 세마포어
카운팅 세마포어 counting semaphore
는 특정 자원이나 특정 연산을 동시에 사용하거나 호출할 수 있는 스레드의 수를 제한하고자 할 때 사용한다.이런 기능을 활용하면 자원
풀 pool
이나 컬렉션의 크기에 제한을 두고자 할 때 유용하다.Semaphore 클래스는 가상의
퍼밋 permit
을 만들어 내부 상태를 관리한다.Semaphore를 처음 생성할 떄 생성 메소드에 최초로 생성할 퍼밋 수를 넘겨 준다.
외부 스레드는 퍼밋을 요청해 확보하거나, 이전에 확보한 퍼밋을 반납할 수도 있다.
현재 사용할 수 있는 퍼밋이 없는 경우, acquire 메소드는 남은 퍼밋이 생기거나 인터럽트가 걸리거나 지정한 시간을 넘겨 타임아웃이 걸리기 전까지 대기한다.
release 메소드는 확보했던 퍼밋을 다시 세마포어에게 반납하는 기능을 한다.
세마포어는 데이터베이스 연결 풀과같은 자원 풀에서 요긴하게 사용할 수 있다.
세마포어를 사용하면 어떤 클래스라도 크기가 제한된 컬렉션 클래스로 활용할 수 있다. ex BoundedHashSet
public class BoundedHashSet <T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded)
sem.release();
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved)
sem.release();
return wasRemoved;
}
}
- 세마포어를 이용해 컬렉션의 크기 제한하기
5.5.4 배리어
관문을 만들수 있는 래치는 일회성이다.
래치가 한 번 터미널 상태에 다다르면 다시는 이전 상태로 회복할 수 없다.
배리어 barrier
는 특정 이벤트가 발생할 때까지 여러 개의 스레드를 대기 상태로 잡아둘 수 있다는 측면에서 래치와 비슷하다.래치와의 차이점은 모든 스레드가 배리어 위치에 동시에 이르러야 관문이 열리고 계속해서 실행할 수 있다는 점이 다르다.
래치는
이벤트
를 기다리기 위한 동기화 클래스이다.배리어는
다른 스레드
를 기다리기 위한 동기화 클래스이다.CyclicBarrier
클래스는 여러 스레드가 특정한 배리어 포인트에서 반복적으로 서로 만나는 기능을 모델링할 수 있다.커다란 문제 하나를 여러 개의 작은 부분 문제로 분리해 반복적으로 병렬 처리하는 알고리즘을 구현할 수 있따.
스레드는 각자가 배리어 포인트에 다다르면 await 메소드를 호출하며, await 메소드는 모든 스레드가 배리어 포인트에 도달할 때까지 대기한다.
모든 스레드가 배리어 포인트에 도달하면 배리어는 모든 스레드를 통과시키며, await 메소드에서 대기하고 있던 스레드는 대기 상태가 모두 풀려 실행되고, 배리어는 다시 초기상태로 돌아가 다음 배리어 포인트를 준비한다.
만약 await을 호출 후 시간이 너무 오래 지나 타임아웃이 걸리거나 await 메소드에서 대기하던 스레드에 인터럽트가 걸리면 배리어는 깨진 것으로 간주하고 await에서 대기하던 모든 스레드에 BrokenBarrierException이 발생한다.
배리어가 성공적으로 통과하면 await 메소드는 각 스레드별로 배리어 포인트에 도착한 순서를 알려주며, 다음 배리어 포인트로 반복 작업을 하는동안 뭔가 특별한 작업을 진행할 일종의 리더를 선출하는데 이 값을 사용할 수도 있다.
CyclicBarrier는 생성자를 통해 배리어 작업을 넘겨받을 수 있도록 설계되어 있다.
배리어 작업은 Runnable 인터페이스를 구현한 클래스이다. 배리어 작업은 배리어가 성공적으로 통과된 이후 대기하던 스레드를 놓아주기 직전에 실행된다.
배리어는 대부분 실제 작업은 모두 여러 스레드에서 병렬로 처리하고, 다음 단계로 넘어가기 전에 이번 단계에서 계산해야 할 내용을 모두 취합해야 하는 등의 작업이 많이 일어나는 시뮬레이션 알고리즘에서 유용하게 사용할 수 있다.
public class CellularAutomata {
private final Board mainBoard;
private final CyclicBarrier barrier;
private final Worker[] workers;
public CellularAutomata(Board board) {
this.mainBoard = board;
int count = Runtime.getRuntime().availableProcessors();
this.barrier = new CyclicBarrier(count,
new Runnable() {
public void run() {
mainBoard.commitNewValues();
}});
this.workers = new Worker[count];
for (int i = 0; i < count; i++)
workers[i] = new Worker(mainBoard.getSubBoard(count, i));
}
private class Worker implements Runnable {
private final Board board;
public Worker(Board board) { this.board = board; }
public void run() {
while (!board.hasConverged()) {
for (int x = 0; x < board.getMaxX(); x++)
for (int y = 0; y < board.getMaxY(); y++)
board.setNewValue(x, y, computeValue(x, y));
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
private int computeValue(int x, int y) {
// Compute the new value that goes in (x,y)
return 0;
}
}
public void start() {
for (int i = 0; i < workers.length; i++)
new Thread(workers[i]).start();
mainBoard.waitForConvergence();
}
interface Board {
int getMaxX();
int getMaxY();
int getValue(int x, int y);
int setNewValue(int x, int y, int value);
void commitNewValues();
boolean hasConverged();
void waitForConvergence();
Board getSubBoard(int numPartitions, int index);
}
}
출처 :브라이언 괴츠, 더그 리, 팀 피얼스, 조셉 보우비어, 데이빗 홈즈, 조슈아 블로쉬 공저, 『 멀티코어를 100% 활용하는 자바 병렬 프로그래밍』, 에이콘(2008.7.30), 5장 인용.