๐งจ RabbitMQ๋
AMQP๋ฅผ ๋ฐ๋ฅด๋ ์คํ ์์ค ๋ฉ์์ง ๋ธ๋ก์ปค์ด๋ค.
์์ฒญ์ ๋ํ ์ฒ๋ฆฌ ์๊ฐ์ด ๊ธธ ๋, ํด๋น ์์ฒญ์ Queue์ ์์ํ๊ณ ๋น ๋ฅด๊ฒ ์๋ตํ ๋ ์ฌ์ฉํ๋ค.
๋ํ, MQ๋ฅผ ์ฌ์ฉํ์ฌ ์ ํ๋ฆฌ์ผ์ด์ ๊ฐ ๊ฒฐํฉ๋๋ฅผ ๋ฎ์ถ ์ ์๋ค.
๐งจ AMQP๋
Advaced Message Queing Protocol์ ์ฝ์๋ก์จ, MQ์ ํ์ค ํ๋กํ ์ฝ์ด๋ค.
์ด๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ๋์จ ์ ํ ์ค ํ๋๊ฐ RabbitMQ์ด๋ค.
๋ฑ์ฅ ๋ฐฐ๊ฒฝ
์ด์ ์๋ ๊ฐ ํ๋ซํผ์ ์ข ์์ ์ธ ์ ํ๋ค์ด์๊ธฐ ๋๋ฌธ์ ์ด๊ธฐ์ข ๊ฐ ๋ฉ์์ง ๊ตํ์ ์ํด์
ํฌ๋งท ์ปจ๋ฒ์ ์ ํด์ผ ํ๊ธฐ ๋๋ฌธ์ ๋ฉ์์ง ๋ธ๋ฆฟ์ง๋ฅผ ์ด์ฉํ๊ฑฐ๋(์๋ ์ ํ ๋ฐ์) ์์คํ ์์ฒด๋ฅผ ํต์ผ ์์ผ์ผ ํ๋
๋ถํธํจ๊ณผ ๋นํจ์จ์ฑ์ด ์์๋ค.์ด๋ฌํ ๋จ์ ์ ๋ณด์ํ๊ธฐ ์ํด ๋์จ ํ์ค ํ๋กํ ์ปฌ์ด AMQP์ด๋ค.
์ฆ, AMQP๊ฐ ๋์จ ๋ชฉ์ ์ ์ด๊ธฐ์ข ๊ฐ ์์คํ ๊ฐ์ ์ต๋ํ ํจ์จ์ ์ธ ๋ฐฉ๋ฒ์ผ๋ก ๋ฉ์์ง๋ฅผ ๊ตํํ๊ธฐ ์ํด์๋ค.
AMQP ์ถฉ์กฑ ์กฐ๊ฑด
ํน์ ๋ฒค๋์ ์ข ์๋๋ ๊ฒ์ ๋ฐฉ์งํ๊ธฐ ์ํด ์๋ ์กฐ๊ฑด์ ์ถฉ์กฑํ๋ค.
- ๋ชจ๋ ๋ธ๋ก์ปค๋ค์ ๋๊ฐ์ ๋ฐฉ์์ผ๋ก ๋์ํ ๊ฒ
- ๋ชจ๋ ํด๋ผ์ด์ธํธ๋ค์ ๋๊ฐ์ ๋ฐฉ์์ผ๋ก ๋์ํ ๊ฒ
- ๋คํธ์ํฌ ์์ผ๋ก ์ ์ก๋๋ ๋ช ๋ น์ด๋ค์ ํ์คํ
- ํ๋ก๊ทธ๋๋ฐ ์ธ์ด ์ค๋ฆฝ
docker run
docker run -d --hostname cloud-stream-rabbit --name cloud-stream-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
๐งจ @Async๊ฐ ์๋๊ณ ์ MQ์ธ๊ฐ
@Async๋ก ํ ๊ฒฝ์ฐ๋ ๋ฌธ์ ์
- ์๋ฒ๊ฐ ๋ค์ด๋ ๊ฒฝ์ฐ ๋ชจ๋ ์ฐ๋ ๋๋ ์ฌ๋ผ์ง๊ธฐ ๋๋ฌธ์ ๋ฐ์ดํฐ ์ ์ค
- ์คํจํ ๋ฉ์์ง๋ฅผ DLQ์ ๊ฐ์ ๊ณณ์ ๋ณด์กดํ ์ ์๋ค.
- ์ฌ์๋๊ฐ ๋ฒ๊ฑฐ๋กญ๋ค.
DB Queue๊ฐ ์๋ ์ด์
๊ธฐ์กด php์์๋ DB Queue๋ฅผ ์ฌ์ฉํ๊ณ ์์๋ค.
ํ์ง๋ง ์คํ๋ง์์๋ ์ด๋ฅผ ์ง์ํ์ง ์๊ธฐ ๋๋ฌธ์ ๊ตณ์ด ๊ตฌํํ ์๊ณ ๋ฅผ ๋๊ธฐ ์ํด
spring cloud stream์์ ์ ๊ณตํ๋ RabbitMQ๋ฅผ ์ฌ์ฉํ๋ค.
kafka๋ ๋น์ฉ์ด ๋ฐ์ํ๊ธฐ ๋๋ฌธ์ ์ฌ์ฉํ์ง ์๋๋ค.
๐งจ ์ฌ์๋ ์ ๋ต
์ฌ์๋ ์ ๋ต์ด๋?
์ฌ์๋ ์ ๋ต์ consume์ด ์คํจํ์ ๊ฒฝ์ฐ ๋ช๋ณ์ ์ฌ์๋ ํ๊ณ , ์ฌ์๋ ๊ฐ๊ฒฉ์ ์ด๋ป๊ฒ ํ ์ง ๋ฑ์ ์ ๋ต์ด๋ค.
์ฐ๋ฆฌ ์๋น์ค์์ ์ฌ์๋ ์ ๋ต
ํฌ๊ฒ ๋ณต์กํ๊ฒ ๊ฐ์ ธ๊ฐ ์ํฉ์ ์๋๋ผ์ ์์ฃผ ๊ฐ๋จํ๊ฒ ๊ฐ์ ธ๊ฐ๋ค.
- ์ฌ์๋๋ ์ผ๋จ 1๋ฒ์ผ๋ก ํ๋ค.
- DLX๋ก ์ ์กํ๋ค.
- DLQ์ ๋ด๊ธด ๋ฉ์์ง๋ฅผ ์๋์ผ๋ก ์ฒ๋ฆฌํ๊ฑฐ๋ ์คํฌ๋ฆฝํธ๋ฅผ ๋ง๋ ๋ค.
๐งจ ์๋ฌ ํธ๋ค๋ง ์ ๋ต
์๋ฌ ํธ๋ค๋ง ์ ๋ต์ด๋
๋ฉ์์ง consume์ด ์คํจํ ๊ฒฝ์ฐ ๋ฉ์์ง๋ฅผ ๋ฒ๋ฆด์ง, ๋ณด์กดํ ์ง ๋ณด์กดํ๋ค๋ฉด ์ด๋ป๊ฒ ๋ณด์กดํ ์ง ๋ฑ์ ์ ๋ต์ด๋ค.
์ฐ๋ฆฌ์ ์ ๋ต
์ฐ๋ฆฌ๋ DLX๋ฅผ ํ์ฉํ๋ค.
DLX์ ์ ์ก๋ ๋ฉ์์ง๋ ์๋์ผ๋ก ์ฒ๋ฆฌํ ์๋ ์๊ณ , ์๋ํ ์คํฌ๋ฆฝํธ๋ฅผ ๊ฐ๋ฐํ ์๋ ์๋ค.
์ ๋ต ์ข ๋ฅ
์ ๋ต ์ข ๋ฅ์๋ ๋๋ต ํฌ๊ฒ 3๊ฐ์ง๊ฐ ์๋ค.
- Message Drop: ์คํจํ ๋ฉ์์ง๋ ํ๊ธฐํ๋ค.
- Re Queue Message: ์คํจํ ๋ฉ์์ง๋ฅผ ๋ค์ queue์ ๋ฃ๋๋ค.
- Dead Letter Queue: ์คํจํ ๋ฉ์์ง๋ DLQ๋ก ๋ณด๋ธ๋ค.
Message Drop
ํน๋ณํ ๊ฒ ์์ด ๋ฉ์์ง๋ฅผ ํ๊ธฐํ๊ณ ๋์ด๋ค.
Re Queue Message
๋ฉ์์ง๋ฅผ ๋ค์ queue์ ๋ฃ๋๋ฐ ๋ฌด์์ ๊ณ์ ๋ฐ๋ณตํ๋ฉด ํด๋น ๋ฉ์์ง๋ ํ์ queue์ ๋จ์์๊ฒ ๋๋ค.
๊ทธ๋ ๊ธฐ ๋๋ฌธ์ ๋ฉ์์ง ๊ธฐ๊ฐ์ด๋ ์๋ ํ์์ ๋ฐ๋ผ์ ํธ๋ค๋งํ๋ ๋ฑ์ ํ์๊ฐ ํ์ํ๋ค.
Dead Letter Queue(์ฌ์ฉํ ์ ๋ต)
์คํจํ ๋ฉ์์ง๋ฅผ ๋ณด๊ดํ๋ ๋ณ๋์ queue์ด๋ค.
DLQ๋ก ๋ฉ์์ง๊ฐ ๋ค์ด์ฌ ๊ฒฝ์ฐ ๊ฐ๋ฐ์์๊ฒ ์๋ฆผ
์ ๊ฐ๊ฒ ํ๋ ๋ฑ์ ์ฒ๋ฆฌ๋ก ์๋์ผ๋ก ์ฌ์๋ํ๋ ๋ฑ์
๋ฐฉ์์ ์ธ ์ ์๋ค. ์๋๋ฉด ์ด๋ฅผ ์๋ํํ๋ ์ฝ๋๋ฅผ ์์ฑํด๋ ์ข์ ๊ฒ ๊ฐ๋ค.
Error Handling
DLQ๋ฅผ ์ฌ์ฉํ์ง ์๊ฑฐ๋ Re Queue๋ฅผ ํ์ง ์์ ๊ฒฝ์ฐ ๋ณ๋์ ํธ๋ค๋ง์ด ํ์ํ ๊ฒ์ด๋ค.
๊ทธ๋ด ๊ฒฝ์ฐ ErrorMessage์ฉ consumer๋ฅผ ๋ง๋ค์ด์ ํธ๋ค๋งํ ์ ์๋ค.
๋๋ต ์ดํด๋ณด๋ฉด ์๋์ ๊ฐ์ด ํธ๋ค๋งํ ์ ์๊ณ , ๋ ๊น์ ๋ด์ฉ์ ๋ค๋ฃจ์ง ์๋๋ค.
java code
@Bean
public Consumer<Employee> sample() {
return (employee) -> {
System.out.println(employee.toString());
if (employee.getAge() > 100) {
throw new RuntimeException("occured consumer");
}
};
}
@Bean
public Consumer<ErrorMessage> errorHandler() {
return e -> {
errorOccur++;
System.out.println("์๋ฌ ๋ฐ์: " + e);
};
}
- errorHandler()
- ์ด ์ญ์ ๋๊ฐ์ Consumer์ด๊ธฐ ์์ฑ๋๋ exchange๋ errorHandler-in-0์ด๋ค.
- ๋ค๋ง T ํ์
๋ง ErrorMessage์ผ ๋ฟ์ด๋ค. ์ด๋ ๊ฒ ํ์
๋ง ๋ฐ๊ฟ์ฃผ๋ฉด ์์์
์๋ฌ ์ ์ฉ excahnge์ consumer๊ฐ ๋ฐ์ธ๋ฉ ๋๋ค. - ์ฌ์๋๊น์ง ๋ชจ๋ ์คํจํ์ ๊ฒฝ์ฐ ์๋ฌ๋ก ํ๋จํด์ ๊ทธ ๋ queue์ ๋ด๊ฒจ์ consumeํ๊ฒ ๋๋ค.
- ๋ฌผ๋ก ์ด๋ ๊ฒ ํด๋ ์๋ฌด ์ฒ๋ฆฌ๋ ํ์ง ์๊ธฐ ๋๋ฌธ์ ๋ฉ์์ง๊ฐ ๋ฒ๋ ค์ง๋ ๊ฒ์ ๋๊ฐ๋ค.
๐งจ ์ฉ์ด ์ ๋ฆฌ
๋จ์ผ TCP ์ฐ๊ฒฐ
- TCP ๊ธฐ๋ฐ
- ํจ์จ์ฑ์ ์ํด ์๋ช ์ด ๊ธด ์ฐ๊ฒฐ์ ํ๋ค. ์ฆ ํ๋กํ ์ฝ ์์ ๋น ์ ์ฐ๊ฒฐ์ด ์ด๋ฆฌ์ง ์๋๋ค.
- ํ๋์ ํด๋ผ์ด์ธํธ๋ ๋จ์ผ TCP ์ฐ๊ฒฐ์ ์ฌ์ฉํ๋ค.
- ์ฆ, ์ปค๋ฅ์ ํ์ฒ๋ผ ์ปค๋ฅ์ ์ ์ ์งํ๋ค.
- ์๋ฅผ ๋ค์ด a ์๋ฒ์์ 10๊ฐ์ ์ปค๋ฅ์ ์ ํ ๋ฒ ๋ง๋ค์๋ค๋ฉด 10๊ฐ๊ฐ ๊ณ์ ์ ์ง๋๋ค.
AMQP 0-9-1
- ํ๋์ ์ปค๋ฅ์
์ Channel์ด๋ผ๊ณ ํ๋ ์ฌ๋ฌ ๊ฒฝ๋ ์ฐ๊ฒฐ์ ์ด ์ ์๋ค.
์ด ํ๋กํ ์ฝ์ AMQP 0-9-1์ด๋ผ๊ณ ํ๋ค. - AMQP 0-9-1๋ ํ๋ ์ด์์ ์ฑ๋์ ์ด๊ณ ์ฑ๋์์ ํ๋กํ ์ฝ ์์ (๊ตฌ๋ , ์๋น)๋ฅผ ์ํํ๋ค.
Channel
- channel์ connection์์ด๋ ์กด์ฌํ ์ ์๋ค.
- ๋จ์ผ TCP ์ฐ๊ฒฐ์ ๊ฒฝ๋ ์ฐ๊ฒฐ์ Channel์ด๋ผ๊ณ ํ๋ค.
- ๋ชจ๋ ์์ ์ ์ฑ๋์์ ๋ฐ์ํ๋ค.
- ์ต๋ ์ฑ๋ ์๋ฅผ ์๋ฐ์์ ์ ์ดํ ์ ์๋ค.
- ์ก์ ์ ํ๋ฉด ์๋์ ๊ฐ์ด pulisher ์ ์ฉ ์ฑ๋์ด ์๊ธด๋ค. publish channel์ consume channel๊ณผ๋
๋ค๋ฅด๋ค. ๋ณ๊ฐ์ด๋ค. - ํ๋ก์ธ์ค์ ์ฐ๋ ๋์ ๋น๊ตํ์๋ฉด, connection์ ํ๋ก์ธ์ค์ด๊ณ , channel์ด ์ฐ๋ ๋์ ๋น์ทํ๊ฒ ์๊ฐํ ์ ์์ ๊ฒ ๊ฐ๋ค.
๐งจ python์ผ๋ก connection, channel ์์๋ณด๊ธฐ
consum ์ฝ๋ ์์ฑ
#!/usr/bin/env python import pika, sys, os def main(): def callback(ch, method, properties, body): print(" [x] Received %r" % body) # ์ปค๋ฅ์ ๋ฐ ์ฑ๋ ์์ฑ connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # ํ ์ ์ธ, ์ต์ค์ฒด์ธ์ง ์ ์ธ, ํ ๋ฐ์ธ๋ฉ channel.queue_declare(queue='hello') channel.exchange_declare(exchange='defaultExchange',exchange_type='direct') channel.queue_bind(queue='hello', exchange='defaultExchange', routing_key='defaultRoutingKey') # ์ปจ์๋จธ ์ผํ channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) # ์ปจ์จ ์์ print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() if __name__ == '__main__': try: main() except KeyboardInterrupt: print('Interrupted') try: sys.exit(0) except SystemExit: os._exit(0)
receiver.py
consume ์คํ
- ์คํํ๊ฒ ๋๋ฉด ์ปจ์จ์ ํ๊ธฐ ์ํด ๋๊ธฐํ๊ฒ ๋๋ค.
- ๋ธ๋ก์ปค์ defaultExchange, queue, binding์ด ๋ชจ๋ ์์ฑ๋๋ค.(๋งค๋์ง๋จผํธ ํด์์ ํ์ธ ํด๋ณด๋ฉด ๋๋ค.)
- ์คํํ๊ฒ ๋๋ฉด ์ปจ์จ์ ํ๊ธฐ ์ํด ๋๊ธฐํ๊ฒ ๋๋ค.
python receiver.py
publisher ์ฝ๋ ์์ฑ
#!/usr/bin/env python import pika if __name__ == "__main__": connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.exchange_declare(exchange='defaultExchange', exchange_type='direct') channel.queue_bind(queue='hello', exchange='defaultExchange', routing_key='defaultRoutingKey') channel.basic_publish(exchange='defaultExchange', routing_key='defaultRoutingKey', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
receiver์ฝ๋์ ๊ฑฐ์ ๋์ผํ๊ณ ํ, ์ต์ค์ฒด์ธ, ๋ฐ์ธ๋ฉ ์ฝ๋๊ฐ ์ค๋ณต๋๋ค.๊ทธ๋ฐ๋ฐ send.py๊ฐ ๋จผ์ ์คํ๋ ์ง receiver.py๊ฐ ๋จผ์ ์คํ๋ ์ง๋ ์ ์ ์๊ธฐ ๋๋ฌธ์ ๋์ผํ ์ ํ ์ด ๋ค์ด๊ฐ๊ฒ ๋๋ค.
ํด๋น ์ฝ๋๋ ๋ธ๋ก์ปค์ ํ, ์ต์ค์ฒด์ธ์ง, ๋ฐ์ธ๋ฉ์ด ์๋์ง ์๋์ง ํ๋จํ๊ณ ์์ผ๋ฉด ๊ทธ๋๋ก ์ฌ์ฉํ๊ณ ์์ผ๋ฉด ์๋ก ๋ง๋ ๋ค.
send.py
publisher ์คํ
- publishing์ ํ์ผ๋ฏ๋ก ๋๊ธฐ ์ค์ด๋ consumer๋ consume์ ํ๊ฒ ๋๋ค.
- publishing์ ํ์ผ๋ฏ๋ก ๋๊ธฐ ์ค์ด๋ consumer๋ consume์ ํ๊ฒ ๋๋ค.
python send.py
connections ๋ฐ channels ํ์ธํ๊ธฐ
์ ์ฌ์ง์ ๋ณด๋ฉด connection์ด 2๊ฐ์ด๋ค.
60370์ด consume connection์ด๊ณ , 58306์ด publish connection์ด๋ค.
send.py, [receiver.py](http://receiver.py) ์ฝ๋์์ ๊ฐ๊ฐ connection์ ์์ฑํ๊ธฐ ๋๋ฌธ์ ๊ทธ๋ ๋ค.
publishing, consume ์๋ฒ๊ฐ ๋ถ๋ฆฌ ๋์ด ์๋ค๋ฉด ๋น์ฐํ ์ด๋ฐ ๊ตฌ์กฐ๊ฐ ๋์ค๋ ๊ฒ์ด๋ค.
publishing, consume์ด ๊ฐ์ connection์ ๊ฐ์ channel ์ฌ์ฉํ๊ธฐ
์ ์ฌ์ง๊ณผ ๊ฐ์ด ํ๋์ channel์์ publising, consume์ ๋ชจ๋ ํ๊ฒ ๋๋ค.
#!/usr/bin/env python import pika if __name__ == "__main__": connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.exchange_declare(exchange='defaultExchange', exchange_type='direct') channel.queue_bind(queue='hello', exchange='defaultExchange', routing_key='defaultRoutingKey') # publisher channel.basic_publish(exchange='defaultExchange', routing_key='defaultRoutingKey', body='Hello World!') print(" [x] Sent 'Hello World!'") # consumer def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) channel.start_consuming() print('a') print('b') print('c') connection.close()
rabbitMQ๊ฐ ์ถ๊ตฌํ๋ ๊ฒฝ๋ connection ๋ง๋ค๊ธฐ
#!/usr/bin/env python import pika if __name__ == "__main__": connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel1 = connection.channel() channel2 = connection.channel() channel1.queue_declare(queue='hello1') channel1.exchange_declare(exchange='defaultExchange1', exchange_type='direct') channel1.queue_bind(queue='hello1', exchange='defaultExchange1', routing_key='defaultRoutingKey1') channel2.queue_declare(queue='hello2') channel2.exchange_declare(exchange='defaultExchange2', exchange_type='direct') channel2.queue_bind(queue='hello2', exchange='defaultExchange2', routing_key='defaultRoutingKey2') # publisher channel1.basic_publish(exchange='defaultExchange1', routing_key='defaultRoutingKey1', body='Hello World1!') channel2.basic_publish(exchange='defaultExchange2', routing_key='defaultRoutingKey2', body='Hello World2!') print(" [x] Sent 'Hello World!'") connection.close()
์ ์ฝ๋์ ๊ฐ์ด connection์์ channel๋ง ์๋ก ๋ง๋๋ ๊ฒ์ด๋ค. ๊ทธ๋ฌ๋ฉด connection 1๊ฐ์ 2๊ฐ์ publishing channel์ด ๋ง๋ค์ด์ง๋ค.
rabbitMQ๋ ๊ฒฝ๋ connection์ connection ํ๋์ ์ฌ๋ฌ๊ฐ์ channel์ ์ด์ํ๋ ๊ฒ์ด๋ค.
channel ์์ธํ๊ฒ ์์๋ณด๊ธฐ
- ๊ตฌ์กฐ
- spring boot server ํ๋์์ pruducer, rabbitMQ, consume ๋ชจ๋ ํจ.
- exchage:
defaultExchange
,customExchange
- queue:
defaultExchange.defaultQueue
,customExchange.customQueue
- spring boot server ์คํ
- connection ์ ๋ณด
- channels ์ ๋ณด
- queue, consumer ์ ๋ณด
- channel: 62740(1)
- channel: 62740(2)
- channel: 62740(1)
- connection: 62740
- connection ์ ๋ณด
defaultExchange
,customExchange
์ ๋ชจ๋ ์ ์ก- connection ์ ๋ณด
- server ์คํ ์ ์๊ฒผ๋ connection(
62740
)์ channel(consume)์ด 1๊ฐ ์ถ๊ฐ๋๊ณ ,62868
connection์ด ์๋ก ์๊ธฐ๋ฉด์ channel(publisher)์ด 1๊ฐ ์ถ๊ฐ๋์ด ์ด 4๊ฐ์ channels๊ฐ ๋๋ค. - channel ์ ๋ณด
- 62868(1): publisher ๋ผ๊ณ ๋์์์.
- 62740(1), 62740(2), 62740(3)(1), (2)์ ๊ธฐ์กด์ exchange๊ฐ ์ฐ๊ฒฐ๋์ด ์์๊ธฐ ๋๋ฌธ์ ์ด ์ฑ๋๋ก consumeํ๋ค.
(1), (2)๋ ๊ธฐ์กด์ ์๋ ๊ฒ์ด๊ณ (3)์ด ์ถ๊ฐ๋ ๊ฒ์ธ๋ฐ ์ด๋ค exchange๋ ์ฐ๊ฒฐ๋์ง ์์๋ค.
???!@
- 62868(1): publisher ๋ผ๊ณ ๋์์์.
channel ์ ๋ฆฌ
- 62868(1) ์ฑ๋์ publisher ๋ผ๊ณ ๋์ ์๋๋ฐ ์ channel์ด 1๊ฐ์ผ๊น? exchange๊ฐ 2๊ฐ์ธ๋ฐ 2๊ฐ ์๊ฒจ์ผ ํ๋๊ฑฐ ์๋๊ฐ?
- 62740(3)๋ ๋ญ๊น? ์๋ฌด๋ฐ ์ญํ ์ด ์์ด ๋ณด์ด๋๋ฐ?
- consumer๋ ๊ฐ๊ฐ์ ์ฑ๋์ ์ด์ฉํ๋ค๋ผ๋๊ฒ ๋ง์๊น?
exchange์ prefetch count
https://blog.dudaji.com/general/2020/05/25/rabbitmq.html
๐งจ ๊ตฌ์กฐ
๋ชจ๋ ์ก์ ํ๋์ queue, exchange๋ก ๊ด๋ฆฌํ๋ค. ์ฌ๊ฑฐ ํ๋ฅผ ๊ด๋ฆฌํ๋ ๊ฒ ๋ณด๋ค๋ ํ๋์ ํ์์ ๊ด๋ฆฌํ๋ ๊ฒ์ด ํธํ ๊ฒ์ผ๋ก ํ๋จํ๊ณ
๋ถ๋ฆฌ๊ฐ ํ์ํ๋ค๋ฉด ๊ทธ ๋ ๊ฐ์ ๋ถ๋ฆฌํ๋ ๋ฐฉํฅ์ ์๊ฐํด ๋ณด์.
๐งจ ๊ตฌํ ๋ ๋ฒจ
์์ ๋์๊ณผ ๊ฐ์ด ๋ชจ๋ ์์ ์ ํ๋์ exchange, queue์์ ๊ด๋ฆฌํ๊ธฐ ์ํด์ ๋ฆฌํ๋ ์ ์ ์ฌ์ฉํ๋ ๊ฒ์ด ํต์ฌํ๋ค.
์๋๋ ์ด๋ค ์ก์ ์คํํ ์ง ์ผ์ด์ค ๋ณ๋ก if ๋ฌธ์ consumer์์ ์ฌ์ฉํด์ ์ฝ๋๊ฐ ๊ต์ฅํ ์ง์ ๋ถ ํ๋๋ฐ
if ๋ฌธ์ด ํ์ํ ๋ถ๋ถ์ producer๊ฐ pushํ ๋ ๋ฆฌํ๋ ์ ์ payload๋ก ๋๊ฒจ consumer๋ฅผ ๋ผ์ด๋ธ๋ฌ๋ฆฌํ ํ์ฌ
์ฌํํ๊ฒ ๋ง๋ค์๋ค.
์ถ๊ฐ๋ก exchange, queue๋ฅผ ๋ง๋ค์ง ์์ ๊ฒฝ์ฐ ์ฌ์ฉ ๋ฐฉ๋ฒ
- ํด๋ผ์ด์ธํธ๋ DefaultProducer๋ฅผ ์ฃผ์
๋ฐ์ push๋ฅผ ํ๋ค.
- push๋ฉ์๋ ํ๋ผ๋ฏธํฐ๋ก๋ JobSpringBean ํด๋์ค์ Method ํ์ ๊ณผ method์ ํ๋ผ๋ฏธํฐ๊ฐ ํ์ํ๋ค.
- ํ๋ผ๋ฏธํฐ๋ ๊ธฐ๋ณธ ๋ฐ์ดํฐ ํ์ ๋ง ๊ฐ๋ฅํ๋ค.
private final DefaultProducer defaultProducer; // bean public void client() throws NoSuchMethodException { defaultProducer.push( JobSpringBean.class.getMethod("jobMethod", Long.class), List.of(1L) ); }
- Job์ ์คํํ JobSpringBean ํด๋์ค๋ฅผ ์์ฑํด์ ๋ก์ง์ ์์ฑํ๋ค.
@Component public class JobSpringBean { public void jobMethod(Long id) { Member member = findById(id); ... } }
๋ฆฌํ๋ ์ ์ ์ฌ์ฉํด์ ๊ฐ์ ๋ ์ ์ ์ ๋ฆฌ ํด๋ณด๋ฉด ์๋์ ๊ฐ๋ค.
consumer์์์ ๋ถ๊ธฐ ์ฒ๋ฆฌ ๋ณต์ก๋ ๋จ์ํ ๋์๋ค.
์ ์ด์ ๊ด๋ฆฌ์ ์ฉ์ด์ฑ์ ์ํด์ ํ๋์ exchange, queue๋ก ๋ชจ๋ ๊ด๋ฆฌํ๋ ๊ฒ์ ์ํ๋๋ฐ
์ด๋ฅผ ๋ฆฌํ๋ ์ ์์ด ๊ตฌํํ ๊ฒฝ์ฐ consumer์ ๋ถ๊ธฐ ์ฒ๋ฆฌ๊ฐ ๋๋ฌด ๋ง์์ง๊ฒ ๋๋ค.
์ด๋ฅผ ๊ฐ์ ํ๊ธฐ ์ํด ํ๋ก์ ํจํด๋ ์ฌ์ฉํ๊ณ ํ์ง๋ง ๊ทธ๋๋ ๊ฒฐ๊ตญ ๋ณต์กํ ๊ฒ์ ๋น์ทํ๋ค.๊ฐ์ ํ์๋ ํด๋ผ์ด์ธํธ์์ Job ํด๋์ค์ Method๋ง ๋๊ฒจ์ฃผ๋ฉด ๋๊ธฐ ๋๋ฌธ์ Consumer๊ฐ consumeํ๋ ์ญํ ๋ง ํ๋๋ก ๊ฐ์ ๋์๋ค.
์ ์ฒด ์ฝ๋
DefaultConsumer.java
@Slf4j @RequiredArgsConstructor @Service public class DefaultConsumer { private final ApplicationContext applicationContext; @Bean public Consumer<DefaultPayload> defaultConsume() { return (message) -> { Object bean = applicationContext.getBean(message.getJobBeanClass()); String methodName = message.getMethodName(); List<Serializable> args = message.getNormalizedArgs(); try { message.getJobBeanClass() .getMethod(methodName, message.getParameterTypes()) .invoke(bean, args.toArray()); } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { throw new RuntimeException(e); } }; } }
DefaultProducer.java
@RequiredArgsConstructor @Component public class DefaultProducer { private final StreamBridge streamBridge; public void push(Method method, List<Serializable> args) { Class<?> clazz = method.getDeclaringClass(); String methodName = method.getName(); DefaultPayload payload = new DefaultPayload(clazz, methodName, args); streamBridge.send("defaultExchange", MessageBuilder.withPayload(payload).build()); } }
DefaultPayload.java
@NoArgsConstructor(access = AccessLevel.PUBLIC) // StreamBridge๊ฐ ์ง๋ ฌํ๋ฅผ ํ๊ธฐ ์ํด์ ๊ธฐ๋ณธ ์์ฑ์๊ฐ ํ์ํจ.
@Getter
public class DefaultPayload {
private Class<?> jobBeanClass;
private String methodName;
private List<Serializable> args;
private final List<Class<?>> argTypes = new ArrayList<>();
public DefaultPayload(Class<?> jobBeanClass, String methodName, List<Serializable> args) {
this.jobBeanClass = jobBeanClass;
this.methodName = methodName;
this.args = args;
args.forEach(argValue -> this.argTypes.add(argValue.getClass()));
}
/**
* consumer์์ ์ญ์ง๋ ฌํ ์ Long type์ Integer๋ก ๋ฐ๊ธฐ ๋๋ฌธ์ Integer๋ฅผ Long์ผ๋ก ๋ณํ
* args ํ๋๋ฅผ ๊น์ ๋ณต์ฌํด์ ๋ฐํํ๋ค.
*/
public List<Serializable> getNormalizedArgs() {
int size = this.argTypes.size();
return IntStream.range(0, size)
.mapToObj(i -> convertArgIntegerToLong(this.argTypes.get(i), this.args.get(i)))
.toList();
}
/**
* @return Class<?>[]
* reflection invoke๋ฅผ ์ํด์ ๋ฐฐ์ด์ด ํ์ํ๊ธฐ ๋๋ฌธ์ ์ ๊ณตํ๋ ๋ฉ์๋. consumer์์ ์ฌ์ฉ
*/
public Class<?>[] getParameterTypes() {
return argTypes.toArray(Class[]::new);
}
private Serializable convertArgIntegerToLong(Class<?> argType, Serializable arg) {
if (argType == Long.class && arg.getClass() == Integer.class) {
return Long.valueOf(String.valueOf(arg));
}
return arg;
}
}
application.yml ์์ ์ฝ๋
spring:
cloud:
function:
definition: defaultConsume;customConsume
stream:
bindings:
defaultConsume-in-0:
destination: defaultExchange
group: defaultQueue
consumer:
max-attempts: 1
customConsume-in-0:
destination: customExchange
group: customQueue
consumer:
max-attempts: 1
rabbit:
bindings:
consumeDefault-in-0:
consumer:
dead-letter-queue-name: defaultDlxQueue
auto-bind-dlq: true
customExchange-in-0:
consumer:
dead-letter-queue-name: customtDlxQueue
auto-bind-dlq: true
rabbitmq:
addresses: localhost
username: guest
password: guest
port: 5672
'Spring' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
SpringBoot Request ๋ฐ๋ ์ฌ๋ฌ๊ฐ์ง ๋ฐฉ๋ฒ (2) | 2022.12.04 |
---|---|
@JsonFromat, @DateTimeForamt์ด ๊ณ์ ํท๊ฐ๋ฆฐ๋ค! (0) | 2022.11.19 |
Redis ๋์์ฑ ์ด์ ๊ฐ์ ํ๊ธฐ (0) | 2022.05.14 |
์คํ๋ง ๋ถํธ Logger ์ฌ์ฉ๋ฒ ๋ฐ ํ (0) | 2022.05.11 |
์คํ๋ง ์ํ ์ฐธ์กฐ์ ์์ฑ์ ์ฃผ์ ์ ์ฌ์ฉํด์ผ ํ๋ ์ด์ (0) | 2022.04.20 |