개요
- 김영한 강사님의 강의를 듣고 정리하였다.
생산자 소비자 문제는 여러 스레드가 동시에 데이터를 생산하고 소비하는 경우 발생하는 문제이다.
알아보자!.
본론
개념
- 생산자, Producer는 데이터를 생산하는 역할을 한다.
- 소비자, Consumer는 데이터를 소비하는 역할을 한다.
- 버퍼, Buffer는 생산자가 생성한 데이터를 소비자가 소비하기 전까지 임시로 저장하는 공간이다.
문제
- 만약 생산자가 너무 빠르게 데이터를 생산할 경우, 버퍼가 가득차서 생산자는 버퍼의 빈공간이 생길 때 까지 대기해야 한다.
- 만약 소비자가 너무 빠르게 데이터를 소비할 경우, 버퍼에 새로운 데이터가 생산될 때까지 대기해야 한다.
예제
버퍼(큐)
package thread.start.bounded;
public interface BoundedQueue {
void put(String data);
String take();
}
- 버퍼 역할을 수행하는 큐 역할을 하며 다음의 작업을 수행한다.
- put : 큐에 값을 넣는다.
- take : 큐에 값을 소비한다.
버퍼(구현체)
package thread.start.bounded;
import java.util.ArrayDeque;
import java.util.Queue;
import static thread.start.util.MyLogger.log;
public class BoundedQueueV1 implements BoundedQueue{
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV1(int max){
this.max = max;
}
@Override
public synchronized void put(String data) {
if(queue.size() == max){
log("[put] 큐가 가득 참, 버림 : " + data);
return;
}
queue.offer(data);
}
@Override
public synchronized String take() {
if(queue.isEmpty()){
return null;
}
return queue.poll();
}
@Override
public String toString(){
return queue.toString();
}
}
- 여기서 queue는 여러 스레드가 사용하는 공유 자원이므로, synchronized 키워드를 사용하였다.
스레드 작업(producer)
public class ProducerTask implements Runnable{
private BoundedQueue queue;
private String request;
public ProducerTask(BoundedQueue queue, String request){
this.queue=queue;
this.request=request;
}
@Override
public void run() {
log("[생산 시도] " + request + " -> " + queue);
queue.put(request);
log("[생산 완료] " + request + " -> " + queue);
}
}
- 생산자는 새로운 데이터를 버퍼에 저장한다.
스레드 작업(consumer)
public class ConsumerTask implements Runnable{
private BoundedQueue queue;
public ConsumerTask(BoundedQueue queue){
this.queue=queue;
}
@Override
public void run() {
log("[소비 시도] ? <- " + queue);
String data = queue.take();
log("[소비 완료] " + data + " <- " + queue);
}
}
- 소비자는 버퍼의 데이터를 소비한다.
예제1 실행 결과
이 작업들을 예제 코드와 함께 실행하면 다음의 결과가 나온다.
16:44:52.007 [ main] == [생산자 먼저 실행] 시작, BoundedQueueV1 ==
16:44:52.011 [ main] 생산자 시작
16:44:52.016 [producer1] [생산 시도] data 1 -> []
16:44:52.017 [producer1] [생산 완료] data 1 -> [data 1]
16:44:52.118 [producer2] [생산 시도] data 2 -> [data 1]
16:44:52.118 [producer2] [생산 완료] data 2 -> [data 1, data 2]
16:44:52.223 [producer3] [생산 시도] data 3 -> [data 1, data 2]
16:44:52.223 [producer3] [put] 큐가 가득 참, 버림 : data 3
16:44:52.223 [producer3] [생산 완료] data 3 -> [data 1, data 2]
16:44:52.328 [ main] 현재 상태 출력, 큐 데이터 : [data 1, data 2]
16:44:52.328 [ main] producer1: TERMINATED
16:44:52.328 [ main] producer2: TERMINATED
16:44:52.328 [ main] producer3: TERMINATED
16:44:52.329 [ main] 소비자 시작
16:44:52.329 [consumer1] [소비 시도] ? <- [data 1, data 2]
16:44:52.329 [consumer1] [소비 완료] data 1 <- [data 2]
16:44:52.434 [consumer2] [소비 시도] ? <- [data 2]
16:44:52.434 [consumer2] [소비 완료] data 2 <- []
16:44:52.539 [consumer3] [소비 시도] ? <- []
16:44:52.540 [consumer3] [소비 완료] null <- []
16:44:52.644 [ main] 현재 상태 출력, 큐 데이터 : []
16:44:52.645 [ main] producer1: TERMINATED
16:44:52.645 [ main] producer2: TERMINATED
16:44:52.645 [ main] producer3: TERMINATED
16:44:52.646 [ main] consumer1: TERMINATED
16:44:52.646 [ main] consumer2: TERMINATED
16:44:52.646 [ main] consumer3: TERMINATED
16:44:52.647 [ main] == [생산자 먼저 실행] 시작, BoundedQueueV1 ==
- 해당 예제의 경우, 생산자 스레드 3은 버퍼가 가득차 있어 데이터를 버리게 되고, 소비자 스레드3은 데이터가 없으므로 null값을 받게 된다.
이렇게 데이터를 버리지 않고, null값을 소비하지 않도록 하려면, 버퍼가 비어있거나 버퍼가 가득 찬 경우, 버퍼가 비워지거나 채워질 때까지 스레드들이 대기하면 문제가 해결될 것이다.
- 단순히 생각하여 sleep()을 통해 대기해보자.!
예제 2
package thread.start.bounded;
import java.util.ArrayDeque;
import java.util.Queue;
import static thread.start.util.MyLogger.log;
import static thread.start.util.ThreadUtils.sleep;
public class BoundedQueueV2 implements BoundedQueue{
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV2(int max){
this.max = max;
}
@Override
public synchronized void put(String data) {
while(queue.size() == max){
log("[put] 큐가 가득 참, 생산자 대기");
sleep(1000);
}
queue.offer(data);
}
@Override
public synchronized String take() {
while(queue.isEmpty()){
log("[take] 큐에 데이터가 없음, 소비자 대기");
sleep(1000);
}
return queue.poll();
}
@Override
public String toString(){
return queue.toString();
}
}
- 버퍼가 비어있거나, 버퍼가 가득 차 있는 경우 스레드가 sleep을 통해 대기 상태로 들어가도록 수정하였다.
예제2 실행 결과
16:56:12.934 [ main] == [생산자 먼저 실행] 시작, BoundedQueueV2 ==
16:56:12.938 [ main] 생산자 시작
16:56:12.944 [producer1] [생산 시도] data 1 -> []
16:56:12.944 [producer1] [생산 완료] data 1 -> [data 1]
16:56:13.046 [producer2] [생산 시도] data 2 -> [data 1]
16:56:13.046 [producer2] [생산 완료] data 2 -> [data 1, data 2]
16:56:13.148 [producer3] [생산 시도] data 3 -> [data 1, data 2]
16:56:13.148 [producer3] [put] 큐가 가득 참, 생산자 대기
16:56:13.254 [ main] 현재 상태 출력, 큐 데이터 : [data 1, data 2]
16:56:13.254 [ main] producer1: TERMINATED
16:56:13.254 [ main] producer2: TERMINATED
16:56:13.254 [ main] producer3: TIMED_WAITING
16:56:13.254 [ main] 소비자 시작
16:56:13.255 [consumer1] [소비 시도] ? <- [data 1, data 2]
16:56:13.360 [consumer2] [소비 시도] ? <- [data 1, data 2]
16:56:13.465 [consumer3] [소비 시도] ? <- [data 1, data 2]
16:56:13.569 [ main] 현재 상태 출력, 큐 데이터 : [data 1, data 2]
16:56:13.569 [ main] producer1: TERMINATED
16:56:13.570 [ main] producer2: TERMINATED
16:56:13.570 [ main] producer3: TIMED_WAITING
16:56:13.570 [ main] consumer1: BLOCKED
16:56:13.570 [ main] consumer2: BLOCKED
16:56:13.570 [ main] consumer3: BLOCKED
16:56:13.571 [ main] == [생산자 먼저 실행] 시작, BoundedQueueV2 ==
16:56:14.154 [producer3] [put] 큐가 가득 참, 생산자 대기
16:56:15.159 [producer3] [put] 큐가 가득 참, 생산자 대기
16:56:16.165 [producer3] [put] 큐가 가득 참, 생산자 대기
16:56:17.171 [producer3] [put] 큐가 가득 참, 생산자 대기
- 실행 결과를 확인해보면, 데이터가 소비되지 못하고 생산자 스레드가 계속 버퍼가 비어있는지 확인하고, 대기하고를 반복한다.
문제점
해당 코드는 단순히 스레드를 기다리도록 해서 버퍼가 비워지거나 채워지게 했지만, 우리가 간과한 점이 존재한다.
바로 생산자 스레드3이 락을 가지고 대기한다는 것이다.
- 따라서 다른 스레드들이 임계 영역인 ,synchronized 메서드 내부로 진입하여 작업하지 못하고, 따라서 버퍼가 비워지는 일이 없기 때문에 생산자 스레드3은 무한히 대기하게 된다.
그렇다면 이것을 어떻게 해결할까?
예제3
Object.wait(), notify()
자바는 멀티 스레드를 고려하여 탄생한 언어이기 때문에 이런 문제에 대한 해결책을 이미 제공해주고 있다.
- Object.wait()
- 현재 스레드가 가진 락을 반납하고 WAITING 상태로 들어간다.
- 락을 소유한 경우에만 호출할 수 있다.
- 이 WAITING 상태는 notify() 혹은 notifyAll()이 호출될 때까지 대기 상태를 유지한다.
- Object.notify()
- 대기 중인 스레드 중 하나를 깨운다.
- 깨운 스레드는 락을 획득할 기회를 얻게 되며, 여러 스레드 중 하나의 스레드만 깨운다.
- Object.notifyAll()
- 대기 중인 모든 스레드를 깨운다.
- 모든 스레드들은 락을 획득할 기회를 얻게 된다.
이 메서드들을 활용하면 락을 양보할 수 있기 때문에 무한히 대기하고 있는 문제를 해결할 수 있다.
public class BoundedQueueV3 implements BoundedQueue{
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV3(int max){
this.max = max;
}
@Override
public synchronized void put(String data) {
while(queue.size() == max){
log("[put] 큐가 가득 참, 생산자 대기");
try {
wait(); // RUNNABLE -> WAITING, 락을 반납한다.
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, notify 호출");
notify();
}
@Override
public synchronized String take() {
while(queue.isEmpty()){
log("[take] 큐에 데이터가 없음, 소비자 대기");
try {
wait();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, notify() 호출");
notify();
return data;
}
@Override
public String toString(){
return queue.toString();
}
}
- 예제 2와의 차이점은, 만약 버퍼에 데이터가 가득차있거나, 비어있는 경우, Object.wait() 메서드를 호출해서 락을 반납하고 대기 상태로 들어간다.
- 그리고 생산자가 데이터를 생산하거나, 소비하게되면 락을 반납하고 대기상태로 들어간 스레드에게 이를 알려야 하기 때문에 메서드를 종료할 때, Object.notify()를 호출하고 메서드를 종료한다.
예제3 실행 결과
17:13:14.797 [ main] == [생산자 먼저 실행] 시작, BoundedQueueV3 ==
17:13:14.801 [ main] 생산자 시작
17:13:14.805 [producer1] [생산 시도] data 1 -> []
17:13:14.805 [producer1] [put] 생산자 데이터 저장, notify 호출
17:13:14.806 [producer1] [생산 완료] data 1 -> [data 1]
17:13:14.908 [producer2] [생산 시도] data 2 -> [data 1]
17:13:14.908 [producer2] [put] 생산자 데이터 저장, notify 호출
17:13:14.908 [producer2] [생산 완료] data 2 -> [data 1, data 2]
17:13:15.013 [producer3] [생산 시도] data 3 -> [data 1, data 2]
17:13:15.013 [producer3] [put] 큐가 가득 참, 생산자 대기
17:13:15.119 [ main] 현재 상태 출력, 큐 데이터 : [data 1, data 2]
17:13:15.119 [ main] producer1: TERMINATED
17:13:15.119 [ main] producer2: TERMINATED
17:13:15.119 [ main] producer3: WAITING
17:13:15.119 [ main] 소비자 시작
17:13:15.120 [consumer1] [소비 시도] ? <- [data 1, data 2]
17:13:15.120 [consumer1] [take] 소비자 데이터 획득, notify() 호출
17:13:15.120 [producer3] [put] 생산자 깨어남
17:13:15.120 [consumer1] [소비 완료] data 1 <- [data 2]
17:13:15.120 [producer3] [put] 생산자 데이터 저장, notify 호출
17:13:15.120 [producer3] [생산 완료] data 3 -> [data 2, data 3]
17:13:15.225 [consumer2] [소비 시도] ? <- [data 2, data 3]
17:13:15.225 [consumer2] [take] 소비자 데이터 획득, notify() 호출
17:13:15.225 [consumer2] [소비 완료] data 2 <- [data 3]
17:13:15.330 [consumer3] [소비 시도] ? <- [data 3]
17:13:15.330 [consumer3] [take] 소비자 데이터 획득, notify() 호출
17:13:15.331 [consumer3] [소비 완료] data 3 <- []
17:13:15.435 [ main] 현재 상태 출력, 큐 데이터 : []
17:13:15.436 [ main] producer1: TERMINATED
17:13:15.436 [ main] producer2: TERMINATED
17:13:15.436 [ main] producer3: TERMINATED
17:13:15.437 [ main] consumer1: TERMINATED
17:13:15.437 [ main] consumer2: TERMINATED
17:13:15.437 [ main] consumer3: TERMINATED
17:13:15.438 [ main] == [생산자 먼저 실행] 시작, BoundedQueueV3 ==
- 이를 통해 무한 대기 문제와, 데이터가 버려지거나 null 데이터를 소비하는 문제가 해결되었다.
Object.wait(), notify()의 한계점
이 방식의 한계점은, 스레드들이 대기하는 집합이 하나만 존재하기 때문에 notify()를 통해 깨울 수 있는 스레드를 특정할 수 없다.
- 즉, 생산자 스레드는 버퍼에 새로운 데이터가 채워졌음을 소비자 스레드에게 알려야하고, 소비자 스레드는 버퍼에 빈 공간이 생겼음을 생산자 스레드에게 알려야하지만 알리는 방법은 랜덤이다.
- 따라서 소비자가 소비자 스레드를 깨우고 생산자가 생산자 스레드를 깨우는 비효율이 존재한다.
또한 어떤 스레드를 깨우는지 알 수 없기 때문에 하나의 스레드가 지속적으로 깨어나지 못하는 기아 문제도 발생할 수 있다.
- notifyAll()을 사용하여 해결할 수 있지만, 여전히 비효율적이다.
결론
임계 구역에서 공유 자원의 상태에 따라 스레드가 대기해야할 때, 어떻게 락을 반납하고, 대기할 수 있는지 알아보았다.
- Object.notify(), wait(), notifyAll() 메서드를 사용
다음 포스트에서는 Object.wait()과 notify() notifyAll()의 한계점을 해결하는 방안에 대해서 알아보자!
'JAVA' 카테고리의 다른 글
[JAVA] ReentrantLock이 뭔가요? (0) | 2024.09.24 |
---|---|
[JAVA] java.util.concurrent.LockSupport 알아보자 (0) | 2024.09.24 |
[JAVA] 자바의 임계 영역과 동기화, synchronized (0) | 2024.09.23 |
[JAVA] 자바의 volatile 키워드와 메모리 가시성 (1) | 2024.09.23 |
[JAVA] 스레드의 양보 Thread.yield() (0) | 2024.09.21 |