-
[RabbitMQ] rabbitMQ 공부..5. 그 외 공부/5.5 기타 2024. 11. 18. 19:54
회사에서 메시지 브로커의 일종인 RabbitMQ를 사용 중이다. RabbitMQ 얘기를 할때마다 이게 뭔소리지?? 무슨말을 하는거지 의문이었다. 좀 한가한 틈을 타서 GPT에 물어물어.... RabbitMQ 를 공부하였다.
처음에 궁금했던 점이 도대체 RabbitMQ가 뭔지.. 얘를 왜 쓰는지였다..
RabbitMQ는 서버와 서버간에 메시지를 효율적으로 처리하는 역할을 한다고 한다.
효율적으로... 처리한다???? 는게 뭘까 알아보았다.
보통 rabbitMQ같은 메시지 브로커는 통신 하려는 서버들 사이에 위치한다고 한다.
A 서버 -> MessageBroker -> B서버
1. 서버와 서버간에 의존성이 줄어든다 : 만약 두 서버중에 하나의 서버가 고장나도 통신하려는 메시지는 rabbitMQ 서버에 남아있어서
메시지 큐는 보존되어있고 나중에 처리 가능하다.
2. 서버들 간에 비동기 메시지를 주고받는다 : A 서버가 B 서버에게 메시지를 보내게 될때 메시지브로커가 중간에서 비동기적으로 메시지를 전달해준다. 이렇게 되면 A 서버에서 B 서버로 메시지를 보내게 된 응답을 기다리지 않고 그 다음 작업을 진행할 수 있다.
3. 로드밸런싱 : 여러 서버가 동일한 queue를 구독하고 있다면, 각 서버는 큐에 쌓인 메시지를 하나씩 받아서 처리하게 되는데 메시지 처리를 여러 서버에 분배할 수 있어 부하분산 및 성능이 향상된다.
rabbitMQ에서 사용 하고 있는 용어정리
1. Queue : 메시지가 저장되는 대기열. 메시지 브로커가 처리할 수 있도록 메시지를 임시 보관하는 역할을 한다. Queue로 매핑하지 않으면 메시지 브로커가 처리할 수 없다.
설정에 따라 다를 수 있지만 최초 요청 시 메시지 브로커가 호출될때 함께 생성(자동 생성 or 개발자가 정확하게 정의하고 설정하여 생성)된다고 한다.
그 이후 요청 시 Queue 이름을 조회하여 매핑한다.
2. Producer: 메시지를 Queue를 Exchange에 전달해주는 역할을 한다.
- 최초 요청 시(Queue 생성 시) : Producer가 메시지를 보내기 전에 메시지와 Queue가 연결되어 있음
- 이후 요청 시 (Queue 이미 생성된 상태) : Queue에 바인딩 되지 않은 메시지를 Exchange에 보내고 설정된 Queue에 전달한다.
3. Exchange, Binding : 클라이언트로부터 받은 메시지는 Exchange에 전달되고, Exchange는 RoutingKey를 기반으로 어떤 Queue에 라우팅 할지
결정한다(Binding : 라우팅 키를 기반으로 어떤 Queue로 메시지를 라우팅할지 연결 설정. 라우팅이 이해가 안갔는데 key(routingKey)-value(queue)로 생각해버렸다... Exchange는 RoutingKey를 토대로 어떤 Queue로 전달할지 결정)
4. Consumer : Queue에 있는 메시지를 자동으로 읽고 처리한다 -> Spring에 @RabbitListener을 사용하여 지정된 메서드에 데이터를 전달해준다.
코드구현
1. 설정
- application.properties
#mq 서버를 로컬 에서 실행 한다 spring.rabbitmq.host=localhost #mq 서버의 호스트 정보 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #virtual-host 설정 spring.rabbitmq.virtual-host=/
- build.gradle : amqp 의존성 추가
implementation 'org.springframework.boot:spring-boot-starter-amqp' // RabbitMQ 의존성 추가
2. RabbitConfig : Bean 등록 파일
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { public static final String topicExchangeName1 = "a-b-c-1"; public static final String topicExchangeName2 = "a-b-c-2"; public static final String queueName1 = "haneulQueue1"; public static final String queueName2 = "haneulQueue2"; public static final String routingKey1 = "abc1"; public static final String routingKey2 = "abc2"; //Jackson2JsonMessageConverter : JSON to String & 직렬화(객체를 JSON으로)/역직렬화(JSON을 객체로) @Bean public Jackson2JsonMessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } // 1. 큐 생성 시 내구성(durable)을 true로 설정 @Bean public Queue queue1() { return new Queue(queueName1, false); } @Bean public Queue queue2() { return new Queue(queueName2, false); } // 2. TopicExchange 생성 @Bean public TopicExchange exchange1() { return new TopicExchange(topicExchangeName1, true, false); // durable = true } @Bean public TopicExchange exchange2() { return new TopicExchange(topicExchangeName2, true, false); // durable = true } // 3. Queue와 Routing Key를 바인딩 @Bean public Binding binding1(Queue queue1, TopicExchange exchange1) { // queue1을 exchange1과 routingKey1으로 바인딩 return BindingBuilder.bind(queue1).to(exchange1).with(routingKey1); } @Bean public Binding binding2(Queue queue2, TopicExchange exchange2) { // queue2를 exchange2와 routingKey2로 바인딩 return BindingBuilder.bind(queue2).to(exchange2).with(routingKey2); } // 4. Consumer 설정 : 메시지를 처리할 클래스와 메서드를 연결 @Bean public MessageListenerAdapter listenerAdapter1(RabbitReceiver receiver) { // 메시지를 처리할 클래스 명 : RabbitReceiver, 메서드명 : received1 MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "received1"); adapter.setMessageConverter(messageConverter()); return adapter; } @Bean public MessageListenerAdapter listenerAdapter2(RabbitReceiver receiver) { MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "received2"); adapter.setMessageConverter(messageConverter()); // 메시지를 처리할 클래스 명 : RabbitReceiver, 메서드명 : received2 return adapter; } // 5. SimpleMessageListenerContainer 설정 : 큐에서 메시지를 소비하고 리스너에게 전달 @Bean public SimpleMessageListenerContainer container1(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter1) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName1); container.setMessageListener(listenerAdapter1); return container; } @Bean public SimpleMessageListenerContainer container2(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter2) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName2); container.setMessageListener(listenerAdapter2); return container; } }
3. RabbitReceiver
import lombok.*; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import static hello.itemservice.mq.RabbitConfig.*; import static hello.itemservice.mq.RabbitConfig.queueName2; @Component @Slf4j public class RabbitReceiver{ //haneulQueue1를 구독하는 리스너 @RabbitListener(queues = queueName1) public void received1(@Payload RabbitMessage message) { log.info("RabbitReceiver.received1 호출"); log.info("Received message: {}", message); } //haneulQueue2를 구독하는 리스너 @RabbitListener(queues = queueName2) public void received2(@Payload RabbitMessage message) { log.info("RabbitReceiver.received2 호출"); log.info("Received message: {}", message); } }
@Payload 어노테이션이 바디에서 본문만 추출해 준다고 한다.
4. RabbitMqController
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.*; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import static hello.itemservice.mq.RabbitConfig.*; import static hello.itemservice.mq.RabbitConfig.queueName2; @RestController @Slf4j public class RabbitMqController { @Autowired private ObjectMapper objectMapper; //RabbitMQ의 메시지 큐를 처리하는 템플릿 @Autowired private RabbitTemplate rabbitTemplate; @PostMapping(value = "/rabbit-msg11", consumes = MediaType.APPLICATION_JSON_VALUE) public MessageResponse messageInRabbitMQ1(@RequestBody SendMessageBody mqMessageBody){ try { RabbitMessage message = new RabbitMessage(objectMapper.writeValueAsString(mqMessageBody)); rabbitTemplate.convertAndSend(topicExchangeName1, routingKey1,message); } catch (JsonProcessingException e) { throw new RuntimeException(e); } return new MessageResponse(mqMessageBody,routingKey1,topicExchangeName1,queueName1); } @PostMapping(value="/rabbit-msg22",consumes = MediaType.APPLICATION_JSON_VALUE) public MessageResponse messageInRabbitMQ2(@RequestBody SendMessageBody mqMessageBody){ try { RabbitMessage message = new RabbitMessage(objectMapper.writeValueAsString(mqMessageBody)); rabbitTemplate.convertAndSend(topicExchangeName2, routingKey2,message); } catch (JsonProcessingException e) { throw new RuntimeException(e); } return new MessageResponse(mqMessageBody,routingKey2,topicExchangeName2,queueName2); } @Getter @Setter @NoArgsConstructor @AllArgsConstructor @ToString static class SendMessageBody{ private String message; } @Getter @Setter @NoArgsConstructor @AllArgsConstructor @ToString static class MessageResponse{ private SendMessageBody messageBody; private String routingKey; private String topicKey; private String queueName; } }
RabbitTemplate >> convertAndSend : 지정한 TopicExchangeName(지정한 exchange 이름), RoutingKey(라우팅 키)를 통해 Queue를 찾아 메시지를 전달.
@RabbitListener가 호출이 안되어 삽질한결과 convertAndSend 파라미터에 TopicExchangeName, RoutingKey, 수신할 메시지 타입을 적어주고 RabbitListener에 메시지 타입을 적어줘야 이에 일치하는 RabbitListener가 응답한다.. 이걸로 삽질함..
5. RabbitMessage.java
import lombok.*; @Getter @ToString @NoArgsConstructor @AllArgsConstructor public class RabbitMessage { private String message; }
결과