ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [RabbitMQ] rabbitMQ 공부..
    5. 그 외 공부/5.5 기타 2024. 11. 18. 19:54

    회사에서 메시지 브로커의 일종인 RabbitMQ를 사용 중이다. RabbitMQ 얘기를 할때마다 이게 뭔소리지??  무슨말을 하는거지 의문이었다. 좀 한가한 틈을 타서 GPT에 물어물어.... RabbitMQ 를 공부하였다.

     

    처음에 궁금했던 점이 도대체 RabbitMQ가 뭔지.. 얘를 왜 쓰는지였다..

     

    RabbitMQ는 서버와 서버간에 메시지를 효율적으로 처리하는 역할을 한다고 한다.

    효율적으로... 처리한다???? 는게 뭘까 알아보았다.

    보통 rabbitMQ같은 메시지 브로커는 통신 하려는 서버들 사이에 위치한다고 한다.

    A 서버 -> MessageBroker -> B서버

    1. 서버와 서버간에 의존성이 줄어든다 : 만약 두 서버중에 하나의 서버가 고장나도 통신하려는 메시지는 rabbitMQ 서버에 남아있어서

     메시지 큐는 보존되어있고 나중에 처리 가능하다.

    2. 서버들 간에 비동기 메시지를 주고받는다 : A 서버가 B 서버에게 메시지를 보내게 될때 메시지브로커가 중간에서 비동기적으로 메시지를 전달해준다. 이렇게 되면 A 서버에서 B 서버로 메시지를 보내게 된 응답을 기다리지 않고 그 다음 작업을 진행할 수 있다.

    3. 로드밸런싱 : 여러 서버가 동일한 queue를 구독하고 있다면, 각 서버는 큐에 쌓인 메시지를 하나씩 받아서 처리하게 되는데 메시지 처리를 여러 서버에 분배할 수 있어 부하분산 및 성능이 향상된다. 

     

    rabbitMQ에서 사용 하고 있는 용어정리

    1. Queue : 메시지가 저장되는 대기열. 메시지 브로커가 처리할 수 있도록 메시지를 임시 보관하는 역할을 한다.  Queue로 매핑하지 않으면 메시지 브로커가 처리할 수 없다.

    설정에 따라 다를 수 있지만 최초 요청 시 메시지 브로커가 호출될때 함께 생성(자동 생성 or 개발자가 정확하게 정의하고 설정하여 생성)된다고 한다.

     그 이후 요청 시 Queue 이름을 조회하여 매핑한다. 

    2. Producer: 메시지를 Queue를 Exchange에 전달해주는 역할을 한다.

          - 최초 요청 시(Queue 생성 시) : Producer가 메시지를 보내기 전에 메시지와 Queue가 연결되어 있음

          - 이후 요청 시 (Queue 이미 생성된 상태) : Queue에 바인딩 되지 않은 메시지를 Exchange에 보내고 설정된 Queue에 전달한다.

    3. Exchange, Binding : 클라이언트로부터 받은 메시지는 Exchange에 전달되고, Exchange는 RoutingKey를 기반으로 어떤 Queue에 라우팅 할지

       결정한다(Binding : 라우팅 키를 기반으로 어떤 Queue로 메시지를 라우팅할지 연결 설정. 라우팅이 이해가 안갔는데  key(routingKey)-value(queue)로 생각해버렸다... Exchange는 RoutingKey를 토대로 어떤 Queue로 전달할지 결정)

    4. Consumer : Queue에 있는 메시지를 자동으로 읽고 처리한다  -> Spring에 @RabbitListener을 사용하여 지정된 메서드에 데이터를 전달해준다.

     

     

    코드구현

    1. 설정

    - application.properties

    #mq 서버를 로컬 에서 실행 한다
    spring.rabbitmq.host=localhost
    #mq 서버의 호스트 정보
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    #virtual-host 설정
    spring.rabbitmq.virtual-host=/

     

    - build.gradle : amqp 의존성 추가

    	implementation 'org.springframework.boot:spring-boot-starter-amqp'  // RabbitMQ 의존성 추가

     

     

    2. RabbitConfig : Bean 등록 파일

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitConfig {
    
        public static final String topicExchangeName1 = "a-b-c-1";
        public static final String topicExchangeName2 = "a-b-c-2";
        public static final String queueName1 = "haneulQueue1";
        public static final String queueName2 = "haneulQueue2";
        public static final String routingKey1 = "abc1";
        public static final String routingKey2 = "abc2";
    
        //Jackson2JsonMessageConverter : JSON to String & 직렬화(객체를 JSON으로)/역직렬화(JSON을 객체로)
        @Bean
        public Jackson2JsonMessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        // 1. 큐 생성 시 내구성(durable)을 true로 설정
        @Bean
        public Queue queue1() {
            return new Queue(queueName1, false);
        }
    
        @Bean
        public Queue queue2() {
            return new Queue(queueName2, false);
        }
    
        // 2. TopicExchange 생성
        @Bean
        public TopicExchange exchange1() {
            return new TopicExchange(topicExchangeName1, true, false); // durable = true
        }
    
        @Bean
        public TopicExchange exchange2() {
            return new TopicExchange(topicExchangeName2, true, false); // durable = true
        }
    
        // 3. Queue와 Routing Key를 바인딩
        @Bean
        public Binding binding1(Queue queue1, TopicExchange exchange1) {
            // queue1을 exchange1과 routingKey1으로 바인딩
            return BindingBuilder.bind(queue1).to(exchange1).with(routingKey1);
        }
    
        @Bean
        public Binding binding2(Queue queue2, TopicExchange exchange2) {
            // queue2를 exchange2와 routingKey2로 바인딩
            return BindingBuilder.bind(queue2).to(exchange2).with(routingKey2);
        }
    
        // 4. Consumer 설정 : 메시지를 처리할 클래스와 메서드를 연결
        @Bean
        public MessageListenerAdapter listenerAdapter1(RabbitReceiver receiver) {
            // 메시지를 처리할 클래스 명 : RabbitReceiver, 메서드명 : received1
            MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "received1");
            adapter.setMessageConverter(messageConverter());
            return adapter;
        }
        @Bean
        public MessageListenerAdapter listenerAdapter2(RabbitReceiver receiver) {
            MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "received2");
            adapter.setMessageConverter(messageConverter());
            // 메시지를 처리할 클래스 명 : RabbitReceiver, 메서드명 : received2
            return adapter;
        }
    
        // 5. SimpleMessageListenerContainer 설정 : 큐에서 메시지를 소비하고 리스너에게 전달
        @Bean
        public SimpleMessageListenerContainer container1(ConnectionFactory connectionFactory,
                                                         MessageListenerAdapter listenerAdapter1) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(queueName1);
            container.setMessageListener(listenerAdapter1);
            return container;
        }
        @Bean
        public SimpleMessageListenerContainer container2(ConnectionFactory connectionFactory,
                                                         MessageListenerAdapter listenerAdapter2) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(queueName2);
            container.setMessageListener(listenerAdapter2);
            return container;
        }
    }



    3. RabbitReceiver

    import lombok.*;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    import static hello.itemservice.mq.RabbitConfig.*;
    import static hello.itemservice.mq.RabbitConfig.queueName2;
    
    @Component
    @Slf4j
    public class RabbitReceiver{
    
        //haneulQueue1를 구독하는 리스너
        @RabbitListener(queues = queueName1)
        public void received1(@Payload RabbitMessage message) {
            log.info("RabbitReceiver.received1 호출");
            log.info("Received message: {}", message);
        }
        
        //haneulQueue2를 구독하는 리스너
        @RabbitListener(queues = queueName2)
        public void received2(@Payload RabbitMessage message) {
            log.info("RabbitReceiver.received2 호출");
            log.info("Received message: {}", message);
        }
       
    }

    @Payload 어노테이션이 바디에서 본문만 추출해 준다고 한다.

     

    4. RabbitMqController

    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import lombok.*;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.MediaType;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RestController;
    
    import static hello.itemservice.mq.RabbitConfig.*;
    import static hello.itemservice.mq.RabbitConfig.queueName2;
    
    @RestController
    @Slf4j
    public class RabbitMqController {
        @Autowired
        private ObjectMapper objectMapper;
    
        //RabbitMQ의 메시지 큐를 처리하는 템플릿
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostMapping(value = "/rabbit-msg11", consumes = MediaType.APPLICATION_JSON_VALUE)
        public MessageResponse messageInRabbitMQ1(@RequestBody SendMessageBody mqMessageBody){
            try {
                RabbitMessage message = new RabbitMessage(objectMapper.writeValueAsString(mqMessageBody));
                rabbitTemplate.convertAndSend(topicExchangeName1, routingKey1,message);
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
            return new MessageResponse(mqMessageBody,routingKey1,topicExchangeName1,queueName1);
        }
        @PostMapping(value="/rabbit-msg22",consumes = MediaType.APPLICATION_JSON_VALUE)
        public MessageResponse messageInRabbitMQ2(@RequestBody SendMessageBody mqMessageBody){
            try {
                RabbitMessage message = new RabbitMessage(objectMapper.writeValueAsString(mqMessageBody));
                rabbitTemplate.convertAndSend(topicExchangeName2, routingKey2,message);
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
            return new MessageResponse(mqMessageBody,routingKey2,topicExchangeName2,queueName2);
        }
    
        @Getter
        @Setter
        @NoArgsConstructor
        @AllArgsConstructor
        @ToString
        static class SendMessageBody{
            private String message;
    
        }
    
        @Getter
        @Setter
        @NoArgsConstructor
        @AllArgsConstructor
        @ToString
        static class MessageResponse{
            private SendMessageBody messageBody;
            private String routingKey;
            private String topicKey;
            private String queueName;
    
        }
    }

     

     

    RabbitTemplate >> convertAndSend : 지정한 TopicExchangeName(지정한 exchange 이름), RoutingKey(라우팅 키)를 통해 Queue를 찾아 메시지를 전달. 

    @RabbitListener가 호출이 안되어 삽질한결과 convertAndSend 파라미터에 TopicExchangeName, RoutingKey, 수신할 메시지 타입을 적어주고 RabbitListener에 메시지 타입을 적어줘야 이에 일치하는 RabbitListener가 응답한다.. 이걸로 삽질함..

     

    5. RabbitMessage.java

    import lombok.*;
    
    @Getter
    @ToString
    @NoArgsConstructor
    @AllArgsConstructor
    public class RabbitMessage {
        private String message;
    }

    결과

Designed by Tistory.