Published on

RabbitMQ 튜토리얼

Authors
  • avatar
    Name
    이건창
    Twitter

개념 이해하기

publisher-subscriberproducer-consumer 이야기가 나왔고 어떤 차이가 있는지 분석한다.

발행자는 메시지를 발행해 모든 구독자에게 전송한다. 모든 구독자는 동일한 메시지를 받기 위한 방식이다.

Publish/Subscriber: Subscribers subscribe to the publisher. Each message the Publisher publishes is sent to all the subscribers. That is, all subscribers receive the same message. (Think of a newspaper or magazine subscription. All subscribers receive the same magazine or newspaper)

소비자는 생상자가 생산한 메시지를 소비한다. 이는 여러 소비자에게 작업 부하를 분산하는 방식이다. 소비자를 그룹으로 묶어 그룹내 소비자는 동일한 메시지를 소비하게 구성할 수도 있다.

Producer/Consumer: Each message the producer produces will be consumed by a single consumer. This is a mechanism to distribute the workload to multiple consumers. (Think of the several cash registers at the supermarket. Each customer goes to a single cash register. The customers are like the messages that are produced and the cash registers are the consumers)

수도 코드 작성해보기

publishersubscriber 관계라면 이렇게 동작할거다.

fun main() {
    val pubSub = PubSub()
    pubSub.publish("hello1")
    pubSub.publish("world1")

    val subscriber1 = Subscriber()
    pubSub.subscribe(subscriber1)

    pubSub.publish("hello2")
    pubSub.publish("world2")

    val subscriber2 = Subscriber()
    pubSub.subscribe(subscriber2)

    pubSub.publish("hello3")
    pubSub.publish("world3")

    Thread.sleep(1000)

    println("Done")

    pubSub.exit()
}

class PubSub {
    private val queue = Collections.synchronizedList(mutableListOf<String>())
    private val subscribers = mutableListOf<Subscriber>()
    private var flag = true

    init {
        thread {
            while (flag) {
                if (queue.isNotEmpty()) {
                    val message = queue.removeAt(0)
                    subscribers.forEach { it.receive(message) }
                }
            }
        }
    }

    fun publish(message: String) {
        queue.add(message)
    }

    fun subscribe(subscriber: Subscriber) {
        subscribers.add(subscriber)
    }

    fun exit() {
        flag = false
    }
}

class Subscriber() {
    fun receive(message: String) {
        println("Subscriber $message received message: $message")
    }
}

producerconsumer 관계라면 이렇게 갈거다.

fun main() {
    val producer = Producer()
    val consumer1 = Consumer("Consumer 1")

    producer.produce("hello1")
    producer.produce("world1")

    consumer1.consume(producer)

    producer.produce("hello2")
    producer.produce("world2")

    val consumer2 = Consumer("Consumer 2")
    consumer2.consume(producer)
}

class Producer(
    private val queue: MessageQueue = MessageQueue(),
) {
    fun produce(message: String) = queue.add(message)
    fun deliver(index: Int) = queue.getItem(index)
}

class Consumer(
    private val name: String,
) {
    fun consume(producer: Producer) {
        var index = 0
        thread {
            while (true) {
                val item = producer.deliver(index)
                index = item.nextIndex
                println("$name received message: $item")
                Thread.sleep(500)
            }
        }
    }
}

class MessageQueue {
    private val queue = Collections.synchronizedList(mutableListOf<String>())

    fun add(message: String) = queue.add(message)

    fun getItem(index: Int) = if (index < queue.size) Item(queue[index], index + 1) else Item("", index)
}

data class Item(
    val message: String,
    val nextIndex: Int,
)

여기에서 느낀 차이점은 BackPressure 여부처럼 보인다. 게시-구독 관계에서는 구독자가 요청양을 제어할 수 없다. 생산-소비 관계에서는 구독자가 요청양을 제어할 수 있다. 주는 책임이 어디에 있냐에 따라 달라보인다. 여기까지 내용을 정리해보겠다.

  • 개념 차이
    • 게시/구독 모델은 모든 구독자가 동일한 메시지를 전달받길 원한다.
    • 생산/소비 모델은 생산된 메시지를 분산해서 처리하길 원한다.
  • 코드 수준 차이
    • 게시/구독 모델에서 구독자는 게시자가 전달하는 메시지를 받을 수 있도록 대기한다.
    • 생산/소비 모델에서 생산자는 생산자가 생산한 메시지를 직접 가져간다.

RabbitMQ tutorial - hello world!

RabbitMQ는 메시지 브로커로 메시지를 수신(accept)하고 전달(forward)한다. 우편함(post box)에 우편(mail)을 넣으면 집배원(letter carrier)가 수신자에게 배달한다. 여기에서 RabbitMQ는 우체국(post office)이며 우편함(post box), 집배원(letter carrier) 역할을 한다. RabbitMQ는 우편(mail)을 binary 형태의 BLOB 자료구조로 사용해 수신하고 저장하며 전달한다. Rabbitmq는 producer, queue, consumer로 프로세스를 설명한다. producer는 메시지를 전송하고, queue는 메시지를 저장하고, consumer는 메시지를 수신하기 위해 기다린다.

BLOB은 Binary Large Object 약자로 이진 데이터를 저장할 때 사용하는 데이터 자료구조다.

샘플코드를 작성해봤다. 다음처럼 의존성을 구성한다.


val slf4jVersion = "2.0.16"
val amqpClientVersion = "5.17.1"

dependencies {
    // https://mvnrepository.com/artifact/org.slf4j/slf4j-api
    implementation("org.slf4j:slf4j-api:$slf4jVersion")
    // https://mvnrepository.com/artifact/org.slf4j/slf4j-simple
    implementation("org.slf4j:slf4j-simple:$slf4jVersion")
    // https://mvnrepository.com/artifact/com.rabbitmq/amqp-client
    implementation("com.rabbitmq:amqp-client:$amqpClientVersion")
}

게시하는 경우는 다음처럼 코드를 구성한다.

fun main(args: Array<String>) {
    val factory = ConnectionFactory().apply {
        username = "myuser"
        password = "secret"
    }
    factory.newConnection().use {
        val channel = it.createChannel()
        channel.queueDeclare(QUEUE_NAME, false, false, false, null)
        val message = "Hello World!"
        channel.basicPublish("", QUEUE_NAME, null, message.toByteArray())
    }
}

구독하는 경우 다음처럼 코드를 구성한다.

fun main() {
    val connectionFactory = ConnectionFactory().apply {
        username = "myuser"
        password = "secret"
    }

    val integer = AtomicInteger()

    // connection 의 auto closable 을 활성화하는 경우 더이상 데이터받을 게 없으면 종료된다.
    val connection = connectionFactory.newConnection()
    val channel = connection.createChannel()
    channel.queueDeclare(QUEUE_NAME, false, false, false, null)
    println(" [*] Waiting for messages. To exit press CTRL+C");

    val deliverCallback = { _: String, delivery: Delivery ->
        val message = String(delivery.body, StandardCharsets.UTF_8)
        println(" [x] Received '$message ${integer.getAndIncrement()}'")
    }

    channel.basicConsume(QUEUE_NAME, true, deliverCallback) { _ -> }
}

try-with-resources를 사용하는 경우 데이터를 전부 읽었지만 읽는도중 connection이 닫힌다. 데이터를 하나만 출력했지만 모두 구독했다고 생각해서 메시지를 지운다.

// consume 할 경우 auto closable 주의!!
connectionFactory.newConnection().use {
    val channel = it.createChannel()
    channel.queueDeclare(QUEUE_NAME, false, false, false, null)
    println(" [*] Waiting for messages. To exit press CTRL+C");

    val deliverCallback = { _: String, delivery: Delivery ->
        val message = String(delivery.body, StandardCharsets.UTF_8)
        println(" [x] Received '$message ${integer.getAndIncrement()}'")
    }

    channel.basicConsume(QUEUE_NAME, true, deliverCallback) { _ -> }
}

RabbitMQ tutorial - Work Queues

we'll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.

작업 대기열(Work Queues)은 작업 분배를 위해 사용된다. 작업 대기열(Work Queues)는 완료될 때까지 기다리는 일을 피해 리소스 집약적인 작업을 수행하는 일을 목표로 한다. 이런 일은 HTTP 요청동안 복잡한 일을 하기 어려운 작업에 유용하다.

images

중요한건 두 개 이상의 작업자(worker)가 존재한다면 작업자(worker)가 작업(task)을 꺼내(pop) 실행하며 분산해서 처리하게 된다.

images

작업자(worker)를 4개로 설정하고 실행하는 경우 다음처럼 분산 처리하는 모습을 볼 수 있다. 기본적으로 라운드로빈에 맞춰 작업을 분산하게 된다.

images

소비자가 작업중 종료되면 어떻게 될까?

메시지는 소비자(consumer)에게 전달되자마자 삭제된다. 작업을 처리하는 동안 작업자(worker)가 종료된다면 처리 중인 메시지는 손실된다. RabbitMQ는 메시지 손실을 방지하기 위해 message acknowledgments를 지원한다. 즉, 메시지를 수신하고 처리했음을 알린 후 삭제하도록 해 손실을 방지한다. 설정하는 방법은 basicConsume(…) 에서 autoAck 를 false로 설정하고 basicAck(…)를 이용해 delivery에 대한 ack를 전송하도록 한다. 다음은 예제 코드다.

fun main() {
    val connectionFactory = ConnectionFactory().apply {
        username = "myuser"
        password = "secret"
    }

    // connection 의 auto closable 을 활성화하는 경우 더이상 데이터받을 게 없으면 종료된다.
    val connection = connectionFactory.newConnection()
    val channel = connection.createChannel()
    channel.queueDeclare(QUEUE_NAME, false, false, false, null)
    println(" [*] Waiting for messages. To exit press CTRL+C");

    val deliverCallback = { _: String, delivery: Delivery ->
        val message = String(delivery.body, StandardCharsets.UTF_8)
        println(" [x] Received $message")
        try {
            taskWithMessage(message)
        } finally {
		        // task를 처리하면 delivery 에 대한 ACK를 전송한다.
            println(" [x] Send ACK")
            channel.basicAck(delivery.envelope.deliveryTag, false)
        }
    }

    channel.basicConsume(QUEUE_NAME, false, deliverCallback) { _ -> }
}

private fun taskWithMessage(message: String) {
    for (c in message.toCharArray()) {
        if (c == '.') Thread.sleep(1000)
    }
}

다음은 실행과 종료를 반복하면서 ACK 받은 메시지만 지우는 모습을 확인할 수 있다.

 % Receiver 실행

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received message number 0...
 [x] Send ACK
 [x] Received message number 1...
 [x] Send ACK
 [x] Received message number 2...

 % Receiver 재실행

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received message number 2...
 [x] Send ACK
 [x] Received message number 3...
 [x] Send ACK
 [x] Received message number 4...
 [x] Send ACK
 [x] Received message number 5...

 % Receiver 재실행

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received message number 5...
 [x] Send ACK
 [x] Received message number 6...
 [x] Send ACK
 [x] Received message number 7...
 [x] Send ACK
 [x] Received message number 8...

작업은 처리됐지만 basicAck가 전송되지 않는 경우 꽤나 심각하다. RabbitMQ는 ack 전달받지 않는 메시지를 스스로 방출(release)할 수 없으므로 메모리에 계속 남게된다. 이련 케이스를 디버깅하기 위해 messages_unacknowledeged를 파악해야 한다. (이것저것 해봤는데 messages_unacknowledeged 상태 파악이 어렵다. 어떻게 해야 나오는거지..?)

% rabbitmqctl list_queues name messages_ready messages_unacknowledged

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
hello   12              0

auto ack 달린게 있고 달리지 않은게 함께 붙었다면 어떻게 동작할까?

RabbitMQ가 종료되면 어떻게 될까?

RabbitMQ 서버가 종료되면 작업은 여전히 손실된다. 이런 경우 RabbitMQ 에서 durable 옵션을 설정하면 쉽게 달성할 수 있다.

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

durable을 false로 설정한 대기열(Work Queues)은 durable을 true로 변경할 수 없으니 주의해야 한다.

% rabbitmqctl list_queues name messages_ready messages_unacknowledged

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
hello   12              0

# 재실행했더니 채널이 날라갔다.
% rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...

다음 durable 옵션을 추가하면 channel 에 대한 영속성이 보장된다.

% rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
durable-queue   21      0

# 채널은 살아있다.
% rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
durable-queue   0       0

메시지에 대한 PERSISTENT_TEXT_PLAIN 옵션을 설정해야 메시지도 남게된다.

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.toByteArray())

PERSISTENT_TEXT_PLAIN 옵션 덕분에 메시지도 남은 모습 확인 가능하다.

% rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
durable-queue   21      0

# 메시지도 살아있다.
% rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
durable-queue   21      0

메시지를 공정하게 처리하고 싶은 경우 워커에 해당 옵션을 추가하면 된다. basicQos 를 이용해 승인된 메시지 수만 보내도록 설정한다. 다음처럼 설정한다면 두 개 이상 메시지를 제공하지 않는다.

channel.basicQos(1);

메시지를 받아 임의로 시간을 조절해봤다.

private fun taskWithMessage(message: String) {
    for (c in message.toCharArray()) {
        if (c == '.') Thread.sleep(Random.nextLong(100, 10000))
    }
}

다음처럼 라운드 로빈으로 실행되는게 아니라 종료가 될 때마다 작업을 가져와 처리하는 모습을 볼 수 있다.

images

RabbitMQ tutorial - Publish/Subscribe

작업 대기열(Work Queues)에서는 각 작업이 정확히 한 명 작업자에게 전달된다. 그러나 게시(publish)/구독(subscribe)은 여러 소비자에게 메시지를 전달한다. 다음처럼 channelexchange로 선언해서 메시지를 게시한다.

fun main() {
    val factory = ConnectionFactory().apply {
        username = "myuser"
        password = "secret"
    }

    factory.newConnection().use {
        val channel = it.createChannel()
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
        val message = "info: Hello World!"
        channel.basicPublish(EXCHANGE_NAME, "", null, message.toByteArray())
        println(" [x] Sent '$message'")
    }
}

구독도 channelexchange로 선언해서 구독한다.

fun main() {
    val factory = ConnectionFactory().apply {
        username = "myuser"
        password = "secret"
    }

    val connection = factory.newConnection()
    val channel = connection.createChannel()

    // fanout 형태 채널 구독
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
    val queueName = channel.queueDeclare().queue
    channel.queueBind(queueName, EXCHANGE_NAME, "")

    val deliverCallback = { _: String, delivery: Delivery ->
        val message = String(delivery.body, StandardCharsets.UTF_8)
        println(" [x] Received '$message'")
    }

    channel.basicConsume(queueName, true, deliverCallback) { _ -> }
}

메시지를 게시(pubilsh)하는 순간 누군가 읽지 않아도 방출된다.

rabbitmqctl list_bindings 명령어를 사용하면 큐에 바인딩된 상태를 확인할 수 있다.

# subscriber가 하나인 경우
% rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name                destination_kind      routing_key                     arguments
                exchange        amq.gen-AxjfNp23VQLRDyf6T1-HXw  queue                 amq.gen-AxjfNp23VQLRDyf6T1-HXw  []
logs            exchange        amq.gen-AxjfNp23VQLRDyf6T1-HXw  queue                                                 []

# subscriber를 4개로 늘린다면 다음처럼 변경된다.
% rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name                destination_kind      routing_key                     arguments
                exchange        amq.gen-AxjfNp23VQLRDyf6T1-HXw  queue                 amq.gen-AxjfNp23VQLRDyf6T1-HXw  []
                exchange        amq.gen-CugFhbNmIttpFhIAFcEu2Q  queue                 amq.gen-CugFhbNmIttpFhIAFcEu2Q  []
                exchange        amq.gen-2jlAgUK4fr8VLon3nK83hg  queue                 amq.gen-2jlAgUK4fr8VLon3nK83hg  []
                exchange        amq.gen-Ka5dyEWJ9XK7NnKZjJxgzA  queue                 amq.gen-Ka5dyEWJ9XK7NnKZjJxgzA  []
logs            exchange        amq.gen-2jlAgUK4fr8VLon3nK83hg  queue                                                 []
logs            exchange        amq.gen-AxjfNp23VQLRDyf6T1-HXw  queue                                                 []
logs            exchange        amq.gen-CugFhbNmIttpFhIAFcEu2Q  queue                                                 []
logs            exchange        amq.gen-Ka5dyEWJ9XK7NnKZjJxgzA  queue                                                 []