๋ฐ˜์‘ํ˜•

๐Ÿงจ RabbitMQ๋ž€


AMQP๋ฅผ ๋”ฐ๋ฅด๋Š” ์˜คํ”ˆ ์†Œ์Šค ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค์ด๋‹ค.

์š”์ฒญ์— ๋Œ€ํ•œ ์ฒ˜๋ฆฌ ์‹œ๊ฐ„์ด ๊ธธ ๋•Œ, ํ•ด๋‹น ์š”์ฒญ์„ Queue์— ์œ„์ž„ํ•˜๊ณ  ๋น ๋ฅด๊ฒŒ ์‘๋‹ตํ•  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค.

๋˜ํ•œ, MQ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๊ฐ„ ๊ฒฐํ•ฉ๋„๋ฅผ ๋‚ฎ์ถœ ์ˆ˜ ์žˆ๋‹ค.

๐Ÿงจ AMQP๋ž€


Advaced Message Queing Protocol์˜ ์•ฝ์ž๋กœ์จ, MQ์˜ ํ‘œ์ค€ ํ”„๋กœํ† ์ฝœ์ด๋‹ค.

์ด๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ๋‚˜์˜จ ์ œํ’ˆ ์ค‘ ํ•˜๋‚˜๊ฐ€ RabbitMQ์ด๋‹ค.

๋“ฑ์žฅ ๋ฐฐ๊ฒฝ

์ด์ „์—๋Š” ๊ฐ ํ”Œ๋žซํผ์— ์ข…์†์ ์ธ ์ œํ’ˆ๋“ค์ด์—ˆ๊ธฐ ๋•Œ๋ฌธ์— ์ด๊ธฐ์ข… ๊ฐ„ ๋ฉ”์‹œ์ง€ ๊ตํ™˜์„ ์œ„ํ•ด์„œ

ํฌ๋งท ์ปจ๋ฒ„์ „์„ ํ•ด์•ผ ํ–ˆ๊ธฐ ๋•Œ๋ฌธ์— ๋ฉ”์‹œ์ง€ ๋ธŒ๋ฆฟ์ง€๋ฅผ ์ด์šฉํ•˜๊ฑฐ๋‚˜(์†๋„ ์ €ํ•˜ ๋ฐœ์ƒ) ์‹œ์Šคํ…œ ์ž์ฒด๋ฅผ ํ†ต์ผ ์‹œ์ผœ์•ผ ํ•˜๋Š”

๋ถˆํŽธํ•จ๊ณผ ๋น„ํšจ์œจ์„ฑ์ด ์žˆ์—ˆ๋‹ค.์ด๋Ÿฌํ•œ ๋‹จ์ ์„ ๋ณด์™„ํ•˜๊ธฐ ์œ„ํ•ด ๋‚˜์˜จ ํ‘œ์ค€ ํ”„๋กœํ† ์ปฌ์ด AMQP์ด๋‹ค.

์ฆ‰, AMQP๊ฐ€ ๋‚˜์˜จ ๋ชฉ์ ์€ ์ด๊ธฐ์ข… ๊ฐ„ ์‹œ์Šคํ…œ๊ฐ„์— ์ตœ๋Œ€ํ•œ ํšจ์œจ์ ์ธ ๋ฐฉ๋ฒ•์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ๊ตํ™˜ํ•˜๊ธฐ ์œ„ํ•ด์„œ๋‹ค.

AMQP ์ถฉ์กฑ ์กฐ๊ฑด

ํŠน์ • ๋ฒค๋”์— ์ข…์†๋˜๋Š” ๊ฒƒ์„ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด ์•„๋ž˜ ์กฐ๊ฑด์„ ์ถฉ์กฑํ•œ๋‹ค.

  • ๋ชจ๋“  ๋ธŒ๋กœ์ปค๋“ค์€ ๋˜‘๊ฐ™์€ ๋ฐฉ์‹์œผ๋กœ ๋™์ž‘ํ•  ๊ฒƒ
  • ๋ชจ๋“  ํด๋ผ์ด์–ธํŠธ๋“ค์€ ๋˜‘๊ฐ™์€ ๋ฐฉ์‹์œผ๋กœ ๋™์ž‘ํ•  ๊ฒƒ
  • ๋„คํŠธ์›Œํฌ ์ƒ์œผ๋กœ ์ „์†ก๋˜๋Š” ๋ช…๋ น์–ด๋“ค์˜ ํ‘œ์ค€ํ™”
  • ํ”„๋กœ๊ทธ๋ž˜๋ฐ ์–ธ์–ด ์ค‘๋ฆฝ

docker run


docker run -d --hostname cloud-stream-rabbit --name cloud-stream-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

๐Ÿงจ @Async๊ฐ€ ์•„๋‹ˆ๊ณ  ์™œ MQ์ธ๊ฐ€


@Async๋กœ ํ•  ๊ฒฝ์šฐ๋Š” ๋ฌธ์ œ์ 

  • ์„œ๋ฒ„๊ฐ€ ๋‹ค์šด๋  ๊ฒฝ์šฐ ๋ชจ๋“  ์“ฐ๋ ˆ๋“œ๋Š” ์‚ฌ๋ผ์ง€๊ธฐ ๋•Œ๋ฌธ์— ๋ฐ์ดํ„ฐ ์œ ์‹ค
  • ์‹คํŒจํ•œ ๋ฉ”์‹œ์ง€๋ฅผ DLQ์™€ ๊ฐ™์€ ๊ณณ์— ๋ณด์กดํ•  ์ˆ˜ ์—†๋‹ค.
  • ์žฌ์‹œ๋„๊ฐ€ ๋ฒˆ๊ฑฐ๋กญ๋‹ค.

DB Queue๊ฐ€ ์•„๋‹Œ ์ด์œ 

๊ธฐ์กด php์—์„œ๋Š” DB Queue๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ์—ˆ๋‹ค.

ํ•˜์ง€๋งŒ ์Šคํ”„๋ง์—์„œ๋Š” ์ด๋ฅผ ์ง€์›ํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— ๊ตณ์ด ๊ตฌํ˜„ํ•  ์ˆ˜๊ณ ๋ฅผ ๋œ๊ธฐ ์œ„ํ•ด

spring cloud stream์—์„œ ์ œ๊ณตํ•˜๋Š” RabbitMQ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

kafka๋Š” ๋น„์šฉ์ด ๋ฐœ์ƒํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š”๋‹ค.

๐Ÿงจ ์žฌ์‹œ๋„ ์ „๋žต


์žฌ์‹œ๋„ ์ „๋žต์ด๋ž€?

์žฌ์‹œ๋„ ์ „๋žต์€ consume์ด ์‹คํŒจํ–ˆ์„ ๊ฒฝ์šฐ ๋ช‡๋ณ€์„ ์žฌ์‹œ๋„ ํ•˜๊ณ , ์žฌ์‹œ๋„ ๊ฐ„๊ฒฉ์€ ์–ด๋–ป๊ฒŒ ํ• ์ง€ ๋“ฑ์˜ ์ „๋žต์ด๋‹ค.

์šฐ๋ฆฌ ์„œ๋น„์Šค์—์„œ ์žฌ์‹œ๋„ ์ „๋žต

ํฌ๊ฒŒ ๋ณต์žกํ•˜๊ฒŒ ๊ฐ€์ ธ๊ฐˆ ์ƒํ™ฉ์€ ์•„๋‹ˆ๋ผ์„œ ์•„์ฃผ ๊ฐ„๋‹จํ•˜๊ฒŒ ๊ฐ€์ ธ๊ฐ„๋‹ค.

  • ์žฌ์‹œ๋„๋Š” ์ผ๋‹จ 1๋ฒˆ์œผ๋กœ ํ•œ๋‹ค.
  • DLX๋กœ ์ „์†กํ•œ๋‹ค.
  • DLQ์— ๋‹ด๊ธด ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜๋™์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ฑฐ๋‚˜ ์Šคํฌ๋ฆฝํŠธ๋ฅผ ๋งŒ๋“ ๋‹ค.

๐Ÿงจ ์—๋Ÿฌ ํ•ธ๋“ค๋ง ์ „๋žต


์—๋Ÿฌ ํ•ธ๋“ค๋ง ์ „๋žต์ด๋ž€

๋ฉ”์‹œ์ง€ consume์ด ์‹คํŒจํ•  ๊ฒฝ์šฐ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฒ„๋ฆด์ง€, ๋ณด์กดํ• ์ง€ ๋ณด์กดํ•œ๋‹ค๋ฉด ์–ด๋–ป๊ฒŒ ๋ณด์กดํ• ์ง€ ๋“ฑ์˜ ์ „๋žต์ด๋‹ค.

์šฐ๋ฆฌ์˜ ์ „๋žต

์šฐ๋ฆฌ๋Š” DLX๋ฅผ ํ™œ์šฉํ•œ๋‹ค.

DLX์— ์ „์†ก๋œ ๋ฉ”์‹œ์ง€๋Š” ์ˆ˜๋™์œผ๋กœ ์ฒ˜๋ฆฌํ•  ์ˆ˜๋„ ์žˆ๊ณ , ์ž๋™ํ™” ์Šคํฌ๋ฆฝํŠธ๋ฅผ ๊ฐœ๋ฐœํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

์ „๋žต ์ข…๋ฅ˜

์ „๋žต ์ข…๋ฅ˜์—๋Š” ๋Œ€๋žต ํฌ๊ฒŒ 3๊ฐ€์ง€๊ฐ€ ์žˆ๋‹ค.

  • Message Drop: ์‹คํŒจํ•œ ๋ฉ”์‹œ์ง€๋Š” ํ๊ธฐํ•œ๋‹ค.
  • Re Queue Message: ์‹คํŒจํ•œ ๋ฉ”์‹œ์ง€๋ฅผ ๋‹ค์‹œ queue์— ๋„ฃ๋Š”๋‹ค.
  • Dead Letter Queue: ์‹คํŒจํ•œ ๋ฉ”์‹œ์ง€๋Š” DLQ๋กœ ๋ณด๋‚ธ๋‹ค.

Message Drop

ํŠน๋ณ„ํ•  ๊ฒƒ ์—†์ด ๋ฉ”์‹œ์ง€๋ฅผ ํ๊ธฐํ•˜๊ณ  ๋์ด๋‹ค.

Re Queue Message

๋ฉ”์‹œ์ง€๋ฅผ ๋‹ค์‹œ queue์— ๋„ฃ๋Š”๋ฐ ๋ฌด์ž‘์ • ๊ณ„์† ๋ฐ˜๋ณตํ•˜๋ฉด ํ•ด๋‹น ๋ฉ”์‹œ์ง€๋Š” ํ‰์ƒ queue์— ๋‚จ์•„์žˆ๊ฒŒ ๋œ๋‹ค.

๊ทธ๋ ‡๊ธฐ ๋•Œ๋ฌธ์— ๋ฉ”์‹œ์ง€ ๊ธฐ๊ฐ„์ด๋‚˜ ์‹œ๋„ ํšŸ์ˆ˜์— ๋”ฐ๋ผ์„œ ํ•ธ๋“ค๋งํ•˜๋Š” ๋“ฑ์˜ ํ–‰์œ„๊ฐ€ ํ•„์š”ํ•˜๋‹ค.

Dead Letter Queue(์‚ฌ์šฉํ•  ์ „๋žต)

์‹คํŒจํ•œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๊ด€ํ•˜๋Š” ๋ณ„๋„์˜ queue์ด๋‹ค.

DLQ๋กœ ๋ฉ”์‹œ์ง€๊ฐ€ ๋“ค์–ด์˜ฌ ๊ฒฝ์šฐ ๊ฐœ๋ฐœ์ž์—๊ฒŒ ์•Œ๋ฆผ์„ ๊ฐ€๊ฒŒ ํ•˜๋Š” ๋“ฑ์˜ ์ฒ˜๋ฆฌ๋กœ ์ˆ˜๋™์œผ๋กœ ์žฌ์‹œ๋„ํ•˜๋Š” ๋“ฑ์˜

๋ฐฉ์‹์„ ์“ธ ์ˆ˜ ์žˆ๋‹ค. ์•„๋‹ˆ๋ฉด ์ด๋ฅผ ์ž๋™ํ™”ํ•˜๋Š” ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•ด๋„ ์ข‹์„ ๊ฒƒ ๊ฐ™๋‹ค.

Error Handling

DLQ๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ฑฐ๋‚˜ Re Queue๋ฅผ ํ•˜์ง€ ์•Š์„ ๊ฒฝ์šฐ ๋ณ„๋„์˜ ํ•ธ๋“ค๋ง์ด ํ•„์š”ํ•  ๊ฒƒ์ด๋‹ค.

๊ทธ๋Ÿด ๊ฒฝ์šฐ ErrorMessage์šฉ consumer๋ฅผ ๋งŒ๋“ค์–ด์„œ ํ•ธ๋“ค๋งํ•  ์ˆ˜ ์žˆ๋‹ค.

๋Œ€๋žต ์‚ดํŽด๋ณด๋ฉด ์•„๋ž˜์™€ ๊ฐ™์ด ํ•ธ๋“ค๋งํ•  ์ˆ˜ ์žˆ๊ณ , ๋” ๊นŠ์€ ๋‚ด์šฉ์€ ๋‹ค๋ฃจ์ง€ ์•Š๋Š”๋‹ค.

java code

@Bean
public Consumer<Employee> sample() {
  return (employee) -> {
    System.out.println(employee.toString());
    if (employee.getAge() > 100) {
      throw new RuntimeException("occured consumer");
    }
  };
}

@Bean
public Consumer<ErrorMessage> errorHandler() {
  return e -> {
    errorOccur++;
    System.out.println("์—๋Ÿฌ ๋ฐœ์ƒ: " + e);
  };
}
  • errorHandler()
    • ์ด ์—ญ์‹œ ๋˜‘๊ฐ™์€ Consumer์ด๊ธฐ ์ƒ์„ฑ๋˜๋Š” exchange๋Š” errorHandler-in-0์ด๋‹ค.
    • ๋‹ค๋งŒ T ํƒ€์ž…๋งŒ ErrorMessage์ผ ๋ฟ์ด๋‹ค. ์ด๋ ‡๊ฒŒ ํƒ€์ž…๋งŒ ๋ฐ”๊ฟ”์ฃผ๋ฉด ์•Œ์•„์„œ
      ์—๋Ÿฌ ์ „์šฉ excahnge์™€ consumer๊ฐ€ ๋ฐ˜์ธ๋”ฉ ๋œ๋‹ค.
    • ์žฌ์‹œ๋„๊นŒ์ง€ ๋ชจ๋‘ ์‹คํŒจํ–ˆ์„ ๊ฒฝ์šฐ ์—๋Ÿฌ๋กœ ํŒ๋‹จํ•ด์„œ ๊ทธ ๋•Œ queue์— ๋‹ด๊ฒจ์„œ consumeํ•˜๊ฒŒ ๋œ๋‹ค.
    • ๋ฌผ๋ก  ์ด๋ ‡๊ฒŒ ํ•ด๋„ ์•„๋ฌด ์ฒ˜๋ฆฌ๋„ ํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— ๋ฉ”์‹œ์ง€๊ฐ€ ๋ฒ„๋ ค์ง€๋Š” ๊ฒƒ์€ ๋˜‘๊ฐ™๋‹ค.

๐Ÿงจ ์šฉ์–ด ์ •๋ฆฌ


Connection

๋‹จ์ผ TCP ์—ฐ๊ฒฐ

  • TCP ๊ธฐ๋ฐ˜
  • ํšจ์œจ์„ฑ์„ ์œ„ํ•ด ์ˆ˜๋ช…์ด ๊ธด ์—ฐ๊ฒฐ์„ ํ•œ๋‹ค. ์ฆ‰ ํ”„๋กœํ† ์ฝœ ์ž‘์—…๋‹น ์ƒˆ ์—ฐ๊ฒฐ์ด ์—ด๋ฆฌ์ง€ ์•Š๋Š”๋‹ค.
  • ํ•˜๋‚˜์˜ ํด๋ผ์ด์–ธํŠธ๋Š” ๋‹จ์ผ TCP ์—ฐ๊ฒฐ์„ ์‚ฌ์šฉํ•œ๋‹ค.
  • ์ฆ‰, ์ปค๋„ฅ์…˜ ํ’€์ฒ˜๋Ÿผ ์ปค๋„ฅ์…˜์„ ์œ ์ง€ํ•œ๋‹ค.
  • ์˜ˆ๋ฅผ ๋“ค์–ด a ์„œ๋ฒ„์—์„œ 10๊ฐœ์˜ ์ปค๋„ฅ์…˜์„ ํ•œ ๋ฒˆ ๋งŒ๋“ค์—ˆ๋‹ค๋ฉด 10๊ฐœ๊ฐ€ ๊ณ„์† ์œ ์ง€๋œ๋‹ค.

AMQP 0-9-1

  • ํ•˜๋‚˜์˜ ์ปค๋„ฅ์…˜์— Channel์ด๋ผ๊ณ  ํ•˜๋Š” ์—ฌ๋Ÿฌ ๊ฒฝ๋Ÿ‰ ์—ฐ๊ฒฐ์„ ์—ด ์ˆ˜ ์žˆ๋‹ค.
    ์ด ํ”„๋กœํ† ์ฝœ์„ AMQP 0-9-1์ด๋ผ๊ณ  ํ•œ๋‹ค.
  • AMQP 0-9-1๋Š” ํ•˜๋‚˜ ์ด์ƒ์˜ ์ฑ„๋„์„ ์—ด๊ณ  ์ฑ„๋„์—์„œ ํ”„๋กœํ† ์ฝœ ์ž‘์—…(๊ตฌ๋…, ์†Œ๋น„)๋ฅผ ์ˆ˜ํ–‰ํ•œ๋‹ค.

Channel

  • channel์€ connection์—†์ด๋Š” ์กด์žฌํ•  ์ˆ˜ ์—†๋‹ค.
  • ๋‹จ์ผ TCP ์—ฐ๊ฒฐ์— ๊ฒฝ๋Ÿ‰ ์—ฐ๊ฒฐ์„ Channel์ด๋ผ๊ณ  ํ•œ๋‹ค.
  • ๋ชจ๋“  ์ž‘์—…์€ ์ฑ„๋„์—์„œ ๋ฐœ์ƒํ•œ๋‹ค.
  • ์ตœ๋Œ€ ์ฑ„๋„ ์ˆ˜๋ฅผ ์ž๋ฐ”์—์„œ ์ œ์–ดํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ์†ก์‹ ์„ ํ•˜๋ฉด ์•„๋ž˜์™€ ๊ฐ™์ด pulisher ์ „์šฉ ์ฑ„๋„์ด ์ƒ๊ธด๋‹ค. publish channel์€ consume channel๊ณผ๋Š”
    ๋‹ค๋ฅด๋‹ค. ๋ณ„๊ฐœ์ด๋‹ค.
  • ํ”„๋กœ์„ธ์Šค์™€ ์“ฐ๋ ˆ๋“œ์™€ ๋น„๊ตํ•˜์ž๋ฉด, connection์€ ํ”„๋กœ์„ธ์Šค์ด๊ณ , channel์ด ์“ฐ๋ ˆ๋“œ์™€ ๋น„์Šทํ•˜๊ฒŒ ์ƒ๊ฐํ•  ์ˆ˜ ์žˆ์„ ๊ฒƒ ๊ฐ™๋‹ค.

๐Ÿงจ python์œผ๋กœ connection, channel ์•Œ์•„๋ณด๊ธฐ


  1. consum ์ฝ”๋“œ ์ž‘์„ฑ

     #!/usr/bin/env python
     import pika, sys, os
    
     def main():
         def callback(ch, method, properties, body):
             print(" [x] Received %r" % body)
    
             # ์ปค๋„ฅ์…˜ ๋ฐ ์ฑ„๋„ ์ƒ์„ฑ
         connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
         channel = connection.channel()
    
             # ํ ์„ ์–ธ, ์ต์Šค์ฒด์ธ์ง€ ์„ ์–ธ, ํ ๋ฐ”์ธ๋”ฉ
         channel.queue_declare(queue='hello')
         channel.exchange_declare(exchange='defaultExchange',exchange_type='direct')
         channel.queue_bind(queue='hello', exchange='defaultExchange', routing_key='defaultRoutingKey')
    
             # ์ปจ์ˆ˜๋จธ ์„ผํŒ…
         channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
    
             # ์ปจ์ˆจ ์‹œ์ž‘
         print(' [*] Waiting for messages. To exit press CTRL+C')
         channel.start_consuming()
    
     if __name__ == '__main__':
         try:
             main()
         except KeyboardInterrupt:
             print('Interrupted')
             try:
                 sys.exit(0)
             except SystemExit:
                 os._exit(0)
  2. receiver.py

  3. consume ์‹คํ–‰

    • ์‹คํ–‰ํ•˜๊ฒŒ ๋˜๋ฉด ์ปจ์ˆจ์„ ํ•˜๊ธฐ ์œ„ํ•ด ๋Œ€๊ธฐํ•˜๊ฒŒ ๋œ๋‹ค.
    • ๋ธŒ๋กœ์ปค์— defaultExchange, queue, binding์ด ๋ชจ๋‘ ์ƒ์„ฑ๋œ๋‹ค.(๋งค๋‹ˆ์ง€๋จผํŠธ ํˆด์—์„œ ํ™•์ธ ํ•ด๋ณด๋ฉด ๋œ๋‹ค.)
  4. python receiver.py

  5. publisher ์ฝ”๋“œ ์ž‘์„ฑ

     #!/usr/bin/env python
     import pika
    
     if __name__ == "__main__":
    
         connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
         channel = connection.channel()
    
         channel.queue_declare(queue='hello')
         channel.exchange_declare(exchange='defaultExchange', exchange_type='direct')
         channel.queue_bind(queue='hello', exchange='defaultExchange', routing_key='defaultRoutingKey')
    
         channel.basic_publish(exchange='defaultExchange', routing_key='defaultRoutingKey', body='Hello World!')
         print(" [x] Sent 'Hello World!'")
         connection.close()

    receiver์ฝ”๋“œ์™€ ๊ฑฐ์˜ ๋™์ผํ•˜๊ณ  ํ, ์ต์Šค์ฒด์ธ, ๋ฐ”์ธ๋”ฉ ์ฝ”๋“œ๊ฐ€ ์ค‘๋ณต๋œ๋‹ค.๊ทธ๋Ÿฐ๋ฐ send.py๊ฐ€ ๋จผ์ € ์‹คํ–‰๋ ์ง€ receiver.py๊ฐ€ ๋จผ์ € ์‹คํ–‰๋ ์ง€๋Š” ์•Œ ์ˆ˜ ์—†๊ธฐ ๋•Œ๋ฌธ์— ๋™์ผํ•œ ์…‹ํŒ…์ด ๋“ค์–ด๊ฐ€๊ฒŒ ๋œ๋‹ค.

  6. ํ•ด๋‹น ์ฝ”๋“œ๋Š” ๋ธŒ๋กœ์ปค์— ํ, ์ต์Šค์ฒด์ธ์ง€, ๋ฐ”์ธ๋”ฉ์ด ์žˆ๋Š”์ง€ ์—†๋Š”์ง€ ํŒ๋‹จํ•˜๊ณ  ์žˆ์œผ๋ฉด ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉํ•˜๊ณ  ์—†์œผ๋ฉด ์ƒˆ๋กœ ๋งŒ๋“ ๋‹ค.

  7. send.py

  8. publisher ์‹คํ–‰

    • publishing์„ ํ–ˆ์œผ๋ฏ€๋กœ ๋Œ€๊ธฐ ์ค‘์ด๋˜ consumer๋Š” consume์„ ํ•˜๊ฒŒ ๋œ๋‹ค.
  9. python send.py

  10. connections ๋ฐ channels ํ™•์ธํ•˜๊ธฐ

์œ„ ์‚ฌ์ง„์„ ๋ณด๋ฉด connection์ด 2๊ฐœ์ด๋‹ค.

60370์ด consume connection์ด๊ณ , 58306์ด publish connection์ด๋‹ค.

send.py, [receiver.py](http://receiver.py) ์ฝ”๋“œ์—์„œ ๊ฐ๊ฐ connection์„ ์ƒ์„ฑํ–ˆ๊ธฐ ๋•Œ๋ฌธ์— ๊ทธ๋ ‡๋‹ค.

publishing, consume ์„œ๋ฒ„๊ฐ€ ๋ถ„๋ฆฌ ๋˜์–ด ์žˆ๋‹ค๋ฉด ๋‹น์—ฐํžˆ ์ด๋Ÿฐ ๊ตฌ์กฐ๊ฐ€ ๋‚˜์˜ค๋Š” ๊ฒƒ์ด๋‹ค.
  1. publishing, consume์ด ๊ฐ™์€ connection์˜ ๊ฐ™์€ channel ์‚ฌ์šฉํ•˜๊ธฐ

  2. ์œ„ ์‚ฌ์ง„๊ณผ ๊ฐ™์ด ํ•˜๋‚˜์˜ channel์—์„œ publising, consume์„ ๋ชจ๋‘ ํ•˜๊ฒŒ ๋œ๋‹ค.

  3. #!/usr/bin/env python import pika if __name__ == "__main__": connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.exchange_declare(exchange='defaultExchange', exchange_type='direct') channel.queue_bind(queue='hello', exchange='defaultExchange', routing_key='defaultRoutingKey') # publisher channel.basic_publish(exchange='defaultExchange', routing_key='defaultRoutingKey', body='Hello World!') print(" [x] Sent 'Hello World!'") # consumer def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) channel.start_consuming() print('a') print('b') print('c') connection.close()

  4. rabbitMQ๊ฐ€ ์ถ”๊ตฌํ•˜๋Š” ๊ฒฝ๋Ÿ‰ connection ๋งŒ๋“ค๊ธฐ

     #!/usr/bin/env python
     import pika
    
     if __name__ == "__main__":
    
         connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
         channel1 = connection.channel()
         channel2 = connection.channel()
    
         channel1.queue_declare(queue='hello1')
         channel1.exchange_declare(exchange='defaultExchange1', exchange_type='direct')
         channel1.queue_bind(queue='hello1', exchange='defaultExchange1', routing_key='defaultRoutingKey1')
    
         channel2.queue_declare(queue='hello2')
         channel2.exchange_declare(exchange='defaultExchange2', exchange_type='direct')
         channel2.queue_bind(queue='hello2', exchange='defaultExchange2', routing_key='defaultRoutingKey2')
    
         # publisher
         channel1.basic_publish(exchange='defaultExchange1', routing_key='defaultRoutingKey1', body='Hello World1!')
         channel2.basic_publish(exchange='defaultExchange2', routing_key='defaultRoutingKey2', body='Hello World2!')
         print(" [x] Sent 'Hello World!'")
         connection.close()

    ์œ„ ์ฝ”๋“œ์™€ ๊ฐ™์ด connection์—์„œ channel๋งŒ ์ƒˆ๋กœ ๋งŒ๋“œ๋Š” ๊ฒƒ์ด๋‹ค. ๊ทธ๋Ÿฌ๋ฉด connection 1๊ฐœ์— 2๊ฐœ์˜ publishing channel์ด ๋งŒ๋“ค์–ด์ง„๋‹ค.

  5. rabbitMQ๋Š” ๊ฒฝ๋Ÿ‰ connection์€ connection ํ•˜๋‚˜์— ์—ฌ๋Ÿฌ๊ฐœ์˜ channel์„ ์šด์˜ํ•˜๋Š” ๊ฒƒ์ด๋‹ค.

channel ์ƒ์„ธํ•˜๊ฒŒ ์•Œ์•„๋ณด๊ธฐ

  • ๊ตฌ์กฐ
    • spring boot server ํ•˜๋‚˜์—์„œ pruducer, rabbitMQ, consume ๋ชจ๋‘ ํ•จ.
    • exchage: defaultExchange, customExchange
    • queue: defaultExchange.defaultQueue, customExchange.customQueue
  1. spring boot server ์‹คํ–‰
    • connection ์ •๋ณด
    • channels ์ •๋ณด
    • queue, consumer ์ •๋ณด
      • channel: 62740(1)
      • channel: 62740(2)
    • connection: 62740
  2. defaultExchange, customExchange์— ๋ชจ๋‘ ์ „์†ก
    • connection ์ •๋ณด
    • server ์‹คํ–‰ ์‹œ ์ƒ๊ฒผ๋˜ connection(62740)์— channel(consume)์ด 1๊ฐœ ์ถ”๊ฐ€๋˜๊ณ ,
      62868 connection์ด ์ƒˆ๋กœ ์ƒ๊ธฐ๋ฉด์„œ channel(publisher)์ด 1๊ฐœ ์ถ”๊ฐ€๋˜์–ด ์ด 4๊ฐœ์˜ channels๊ฐ€ ๋œ๋‹ค.
    • channel ์ •๋ณด
      • 62868(1): publisher ๋ผ๊ณ  ๋‚˜์™€์žˆ์Œ.
      • 62740(1), 62740(2), 62740(3)(1), (2)์— ๊ธฐ์กด์— exchange๊ฐ€ ์—ฐ๊ฒฐ๋˜์–ด ์žˆ์—ˆ๊ธฐ ๋•Œ๋ฌธ์— ์ด ์ฑ„๋„๋กœ consumeํ•œ๋‹ค.
      • (1), (2)๋Š” ๊ธฐ์กด์— ์žˆ๋˜ ๊ฒƒ์ด๊ณ  (3)์ด ์ถ”๊ฐ€๋œ ๊ฒƒ์ธ๋ฐ ์–ด๋–ค exchange๋„ ์—ฐ๊ฒฐ๋˜์ง€ ์•Š์•˜๋‹ค. ???!@

channel ์ •๋ฆฌ

  • 62868(1) ์ฑ„๋„์€ publisher ๋ผ๊ณ  ๋‚˜์™€ ์žˆ๋Š”๋ฐ ์™œ channel์ด 1๊ฐœ์ผ๊นŒ? exchange๊ฐ€ 2๊ฐœ์ธ๋ฐ 2๊ฐœ ์ƒ๊ฒจ์•ผ ํ•˜๋Š”๊ฑฐ ์•„๋‹Œ๊ฐ€?
  • 62740(3)๋Š” ๋ญ˜๊นŒ? ์•„๋ฌด๋Ÿฐ ์—ญํ• ์ด ์—†์–ด ๋ณด์ด๋Š”๋ฐ?
  • consumer๋Š” ๊ฐ๊ฐ์˜ ์ฑ„๋„์„ ์ด์šฉํ•œ๋‹ค๋ผ๋Š”๊ฒŒ ๋งž์„๊นŒ?

exchange์™€ prefetch count

https://blog.dudaji.com/general/2020/05/25/rabbitmq.html

๐Ÿงจ ๊ตฌ์กฐ


๋ชจ๋“  ์žก์„ ํ•˜๋‚˜์˜ queue, exchange๋กœ ๊ด€๋ฆฌํ•œ๋‹ค. ์—ฌ๊ฑฐ ํ๋ฅผ ๊ด€๋ฆฌํ•˜๋Š” ๊ฒƒ ๋ณด๋‹ค๋Š” ํ•˜๋‚˜์˜ ํ์—์„œ ๊ด€๋ฆฌํ•˜๋Š” ๊ฒƒ์ด ํŽธํ•  ๊ฒƒ์œผ๋กœ ํŒ๋‹จํ–ˆ๊ณ 

๋ถ„๋ฆฌ๊ฐ€ ํ•„์š”ํ•˜๋‹ค๋ฉด ๊ทธ ๋•Œ ๊ฐ€์„œ ๋ถ„๋ฆฌํ•˜๋Š” ๋ฐฉํ–ฅ์„ ์ƒ๊ฐํ•ด ๋ณด์ž.

๐Ÿงจ ๊ตฌํ˜„ ๋ ˆ๋ฒจ


์œ„์˜ ๋„์‹๊ณผ ๊ฐ™์ด ๋ชจ๋“  ์ž‘์—…์„ ํ•˜๋‚˜์˜ exchange, queue์—์„œ ๊ด€๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด์„œ ๋ฆฌํ”Œ๋ ‰์…˜์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ํ•ต์‹ฌํžˆ๋‹ค.

์›๋ž˜๋Š” ์–ด๋–ค ์žก์„ ์‹คํ–‰ํ• ์ง€ ์ผ€์ด์Šค ๋ณ„๋กœ if ๋ฌธ์„ consumer์—์„œ ์‚ฌ์šฉํ•ด์„œ ์ฝ”๋“œ๊ฐ€ ๊ต‰์žฅํžˆ ์ง€์ €๋ถ„ ํ–ˆ๋Š”๋ฐ

if ๋ฌธ์ด ํ•„์š”ํ•œ ๋ถ€๋ถ„์„ producer๊ฐ€ pushํ•  ๋•Œ ๋ฆฌํ”Œ๋ ‰์…˜์„ payload๋กœ ๋„˜๊ฒจ consumer๋ฅผ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌํ™” ํ•˜์—ฌ

์‹ฌํ”Œํ•˜๊ฒŒ ๋งŒ๋“ค์—ˆ๋‹ค.

์ถ”๊ฐ€๋กœ exchange, queue๋ฅผ ๋งŒ๋“ค์ง€ ์•Š์„ ๊ฒฝ์šฐ ์‚ฌ์šฉ ๋ฐฉ๋ฒ•

  1. ํด๋ผ์ด์–ธํŠธ๋Š” DefaultProducer๋ฅผ ์ฃผ์ž…๋ฐ›์•„ push๋ฅผ ํ•œ๋‹ค.
    • push๋ฉ”์„œ๋“œ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ๋Š” JobSpringBean ํด๋ž˜์Šค์˜ Method ํƒ€์ž…๊ณผ method์˜ ํŒŒ๋ผ๋ฏธํ„ฐ๊ฐ€ ํ•„์š”ํ•˜๋‹ค.
    • ํŒŒ๋ผ๋ฏธํ„ฐ๋Š” ๊ธฐ๋ณธ ๋ฐ์ดํ„ฐ ํƒ€์ž…๋งŒ ๊ฐ€๋Šฅํ•˜๋‹ค.
    • private final DefaultProducer defaultProducer; // bean public void client() throws NoSuchMethodException { defaultProducer.push( JobSpringBean.class.getMethod("jobMethod", Long.class), List.of(1L) ); }
  2. Job์„ ์‹คํ–‰ํ•  JobSpringBean ํด๋ž˜์Šค๋ฅผ ์ƒ์„ฑํ•ด์„œ ๋กœ์ง์„ ์ž‘์„ฑํ•œ๋‹ค.
  3. @Component public class JobSpringBean { public void jobMethod(Long id) { Member member = findById(id); ... } }

๋ฆฌํ”Œ๋ ‰์…˜์„ ์‚ฌ์šฉํ•ด์„œ ๊ฐœ์„  ๋œ ์ ์„ ์ •๋ฆฌ ํ•ด๋ณด๋ฉด ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

  • consumer์—์„œ์˜ ๋ถ„๊ธฐ ์ฒ˜๋ฆฌ ๋ณต์žก๋„ ๋‹จ์ˆœํ™” ๋˜์—ˆ๋‹ค.

  • ์• ์ดˆ์— ๊ด€๋ฆฌ์˜ ์šฉ์ด์„ฑ์„ ์œ„ํ•ด์„œ ํ•˜๋‚˜์˜ exchange, queue๋กœ ๋ชจ๋‘ ๊ด€๋ฆฌํ•˜๋Š” ๊ฒƒ์„ ์›ํ–ˆ๋Š”๋ฐ
    ์ด๋ฅผ ๋ฆฌํ”Œ๋ ‰์…˜ ์—†์ด ๊ตฌํ˜„ํ•  ๊ฒฝ์šฐ consumer์— ๋ถ„๊ธฐ ์ฒ˜๋ฆฌ๊ฐ€ ๋„ˆ๋ฌด ๋งŽ์•„์ง€๊ฒŒ ๋œ๋‹ค.
    ์ด๋ฅผ ๊ฐœ์„ ํ•˜๊ธฐ ์œ„ํ•ด ํ”„๋ก์‹œ ํŒจํ„ด๋„ ์‚ฌ์šฉํ•˜๊ณ  ํ–ˆ์ง€๋งŒ ๊ทธ๋ž˜๋„ ๊ฒฐ๊ตญ ๋ณต์žกํ•œ ๊ฒƒ์€ ๋น„์Šทํ•˜๋‹ค.

  • ๊ฐœ์„  ํ›„์—๋Š” ํด๋ผ์ด์–ธํŠธ์—์„œ Job ํด๋ž˜์Šค์˜ Method๋งŒ ๋„˜๊ฒจ์ฃผ๋ฉด ๋˜๊ธฐ ๋•Œ๋ฌธ์— Consumer๊ฐ€ consumeํ•˜๋Š” ์—ญํ• ๋งŒ ํ•˜๋„๋ก ๊ฐœ์„  ๋˜์—ˆ๋‹ค.

  • ์ „์ฒด ์ฝ”๋“œ

    DefaultConsumer.java

      @Slf4j
      @RequiredArgsConstructor
      @Service
      public class DefaultConsumer {
    
        private final ApplicationContext applicationContext;
    
        @Bean
        public Consumer<DefaultPayload> defaultConsume() {
          return (message) -> {
            Object bean = applicationContext.getBean(message.getJobBeanClass());
            String methodName = message.getMethodName();
            List<Serializable> args = message.getNormalizedArgs();
    
            try {
              message.getJobBeanClass()
                  .getMethod(methodName, message.getParameterTypes())
                  .invoke(bean, args.toArray());
            } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
              throw new RuntimeException(e);
            }
          };
        }
      }

    DefaultProducer.java

      @RequiredArgsConstructor
      @Component
      public class DefaultProducer {
    
        private final StreamBridge streamBridge;
    
        public void push(Method method, List<Serializable> args) {
          Class<?> clazz = method.getDeclaringClass();
          String methodName = method.getName();
    
          DefaultPayload payload = new DefaultPayload(clazz, methodName, args);
              streamBridge.send("defaultExchange", MessageBuilder.withPayload(payload).build());
        }
      }

DefaultPayload.java

@NoArgsConstructor(access = AccessLevel.PUBLIC) // StreamBridge๊ฐ€ ์ง๋ ฌํ™”๋ฅผ ํ•˜๊ธฐ ์œ„ํ•ด์„œ ๊ธฐ๋ณธ ์ƒ์„ฑ์ž๊ฐ€ ํ•„์š”ํ•จ.
@Getter
public class DefaultPayload {

  private Class<?> jobBeanClass;
  private String methodName;
  private List<Serializable> args;
  private final List<Class<?>> argTypes = new ArrayList<>();

  public DefaultPayload(Class<?> jobBeanClass, String methodName, List<Serializable> args) {
    this.jobBeanClass = jobBeanClass;
    this.methodName = methodName;
    this.args = args;
    args.forEach(argValue -> this.argTypes.add(argValue.getClass()));
  }

  /**
   * consumer์—์„œ ์—ญ์ง๋ ฌํ™” ์‹œ Long type์„ Integer๋กœ ๋ฐ›๊ธฐ ๋•Œ๋ฌธ์— Integer๋ฅผ Long์œผ๋กœ ๋ณ€ํ™˜
   * args ํ•„๋“œ๋ฅผ ๊นŠ์€ ๋ณต์‚ฌํ•ด์„œ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
   */
  public List<Serializable> getNormalizedArgs() {
    int size = this.argTypes.size();
    return IntStream.range(0, size)
        .mapToObj(i -> convertArgIntegerToLong(this.argTypes.get(i), this.args.get(i)))
        .toList();
  }

  /**
   * @return Class<?>[]
   * reflection invoke๋ฅผ ์œ„ํ•ด์„œ ๋ฐฐ์—ด์ด ํ•„์š”ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์ œ๊ณตํ•˜๋Š” ๋ฉ”์„œ๋“œ. consumer์—์„œ ์‚ฌ์šฉ
   */
  public Class<?>[] getParameterTypes() {
    return argTypes.toArray(Class[]::new);
  }

  private Serializable convertArgIntegerToLong(Class<?> argType, Serializable arg) {
    if (argType == Long.class && arg.getClass() == Integer.class) {
      return Long.valueOf(String.valueOf(arg));
    }

    return arg;
  }
}

application.yml ์˜ˆ์ œ ์ฝ”๋“œ


spring:
  cloud:
    function:
      definition: defaultConsume;customConsume
    stream:
      bindings:
        defaultConsume-in-0:
          destination: defaultExchange
          group: defaultQueue
          consumer:
            max-attempts: 1
        customConsume-in-0:
          destination: customExchange
          group: customQueue
          consumer:
            max-attempts: 1
      rabbit:
        bindings:
          consumeDefault-in-0:
            consumer:
              dead-letter-queue-name: defaultDlxQueue
              auto-bind-dlq: true
          customExchange-in-0:
            consumer:
              dead-letter-queue-name: customtDlxQueue
              auto-bind-dlq: true
  rabbitmq:
    addresses: localhost
    username: guest
    password: guest
    port: 5672
๋ฐ˜์‘ํ˜•
๋ณต์‚ฌํ–ˆ์Šต๋‹ˆ๋‹ค!