Published on

생산자 소비자 패턴에 대한 개인적인 정리

Authors
  • avatar
    Name
    이건창
    Twitter

Introduction

생산자 소비자 패턴 간단하게 알기

생산자 소비자 패턴은 아이템을 임시로 보관하는 버퍼에 여러 명의 생산자들과 소비자들이 접근해 동시 요청을 제어하는 방식이다. 생산자-소비자 패턴으로 작업 처리 순서를 결정하면 비결정적 이슈를 해결할 수 있다.

이미지

생산자 소비자 패턴 일반적인 이슈

생산자-소비자 패턴은 두 가지 이슈가 있다.

  • 가득 찬 곳에 담으려는 경우
  • 빈 곳에서 가져오려는 경우

아이템을 생산했지만 버퍼가 가득차면 요청을 담을 수 없다. 이런 경우 버리거나 대기해야 한다. 대기한다면 요청을 전부 처리하지만 생산자는 이후 요청을 받을 수 없다. 버린다면 생산자는 작업을 계속 받을 수 있지만 요청을 전부 처리할 수 없다.

이미지

소비할 아이템이 없는 경우도 문제가 발생한다. 이런 경우 아이템이 적재될 때까지 대기하다 데이터를 받아야 한다.

이미지

두 가지 이슈에서 공통점이 있다. 대기해야 한다는 점이다. 대기하는 방식은 두 개로 신호를 받을 때까지 대기하는 방식과 버퍼를 계속해서 확인하는 방식이다.

  • 신호 받을 때까지 대기한다. (뮤텍스)
  • 버퍼 안을 계속 확인한다. (스핀락)

이 중 뮤텍스가 적절한 이유는 스핀락은 작업자가 많아질 수록 경쟁 조건이 치열하기 때문이다. 생산자-소비자 패턴을 활용할 때, ReentrantLock이 제공하는 await, signal 기능을 사용하면 다음과 같이 구현할 수 있다.

private const val BUFFER_SIZE = 5

class AccountQueue(
) {
    private val buffer = PriorityQueue<AccountingBehavior>()
    private val lock = ReentrantLock()
    private val notFull: Condition = lock.newCondition()
    private val notEmpty: Condition = lock.newCondition()

    fun produce(item: AccountingBehavior) {
        lock.lock()
        try {
            while (buffer.size == BUFFER_SIZE) {
                println("Buffer is full, waiting... [$item]")
                notFull.await()
            }
            buffer.offer(item)
            println("Produced: $item")
            notEmpty.signal()
        } finally {
            lock.unlock()
        }
    }

    fun consume(): AccountingBehavior {
        lock.lock()
        try {
            while (buffer.isEmpty()) {
                println("Buffer is empty, waiting...")
                notEmpty.await()
            }
            val item = buffer.poll()
            println("Consumed: $item")
            notFull.signal()
            return item
        } finally {
            lock.unlock()
        }
    }
}

버퍼 가득차면 더이상 담지 않도록 대기(await)하고 버퍼가 비어있을 때 가져가는 시도를 하지 않도록 대기(await)한다. 가득찬 버퍼가 비게 된다면 대기하고 있는 생산자에게 신호(signal)를 보내어 작업을 처리하도록 한다. 마찬가지로 비어있는 버퍼에 아이템이 적재되면 대기하고 있는 소비자에게 신호(signal)를 보내어 작업을 처리하도록 한다.

생산자 소비자 패턴에서 발생하는 이슈와 해결 방법

위 모델의 단점은 소비자를 늘리기 어렵다. 작업을 순차적으로 처리하려면 소비자가 하나여야 가능하다. 만약 두 개 이상의 소비자라면 순차 처리가 어려워진다.

이미지

우리는 앞서 동시성과 병렬성에서 병렬성과 동시성의 차이를 알아보았다.

  • 병렬성은 다른 일을 함께 처리하는 일이다.
  • 동시성은 같은 일을 함께 처리하는 일이다.

동시성을 제어하고 병렬성을 높이기 위해서 순차 처리 개념을 구체화 할 수 있다. 만약 개인에 대해서만 순차적인 요청만 진행한다면 버퍼 공간을 샤딩하고 소비자를 늘려나가는 방식이 가능하다.

이미지

코드는 다음처럼 구현했다.

class ProducerConsumerService {
    private val map: Map<Long, AccountQueue> = mapOf(
        0L to AccountQueue(0L),
        1L to AccountQueue(1L),
        2L to AccountQueue(2L),
    )

    fun produce(item: AccountingBehavior) {
        val shardingKey = getShardingKey(item.member.id)
        val accountService = map[shardingKey]!!
        accountService.produce(item)
    }

    fun consume(shardingKey: Long): AccountingBehavior {
        val accountService = map[shardingKey]!!
        return accountService.consume()
    }

    private fun getShardingKey(memberId: Long): Long {
        return memberId % 3
    }
}

해당 방식의 단점은 producer, buffer, consumer 간 유연한 스케일링이 되지 않는다는 점이다. 어떻게 하면 서로 의존하지 않고 유연하게 스케일링할 수 있을지 정리해보겠다.