본문 바로가기
Spring

[Spring] Amazon MQ (Active MQ) : 개발

by 가드 2022. 11. 9.
728x90

[Spring] Amazone MQ (Active MQ) : 개념편을 이어서 실제 Spring Boot + Active MQ 개발에 대해서 기록해보겠다.

우선적으로 Active MQ 서버가 이미 구성되어 있다는 가정하에 Spring Framework에서 어떻게 설정하고 개발했는지에 대해 살펴보겠다.

 

AWS Amazon MQ Document에 예제가 있으니 참고해도 좋다.

Gradle 설정

먼저 gradle에 dependncies를 아래와 같이 'activemq-client'와 'activemq-pool'를 설정해준다.

implementation 'org.apache.activemq:activemq-client:5.15.8'
implementation 'org.apache.activemq:activemq-pool:5.15.8'

 

Cofig 설정

@Configuration
public class AmazonMQConfig{
	private final static String ACTIVE_MQ_URL = "엔드포인트 URL"
	private final static String ACTIVE_MQ_USERNAME = "계정 ID";
	private final static String ACTIVE_MQ_PASSWORD = "계정 PW";
    
    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory()  {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL, ACTIVE_MQ_USERNAME, ACTIVE_MQ_PASSWORD);
        RedeliveryPolicyMap policyMap = activeMQConnectionFactory.getRedeliveryPolicyMap();
        policyMap.put(new ActiveMQQueue(">"), redeliveryPolicy());
        return activeMQConnectionFactory;
    }
    
    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(pooledConnectionFactory());
        jmsTemplate.setMessageConverter(messageConverter());
        jmsTemplate.setReceiveTimeout(10000);
        return jmsTemplate;
    }

    @Bean
    public MessageConverter messageConverter() {
        MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
        messageConverter.setObjectMapper(objectMapper);
        messageConverter.setTargetType(MessageType.TEXT);
        messageConverter.setTypeIdPropertyName("_type");
        return messageConverter;
    }
    
    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory, MessageConverter messageConverter) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMQConnectionFactory);
        factory.setMessageConverter(messageConverter);
        return factory;
    }
    
    private RedeliveryPolicy redeliveryPolicy() {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(0);
        return redeliveryPolicy;
    }
    
    private PooledConnectionFactory pooledConnectionFactory() {
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory());
        pooledConnectionFactory.start();
        return pooledConnectionFactory;
    }
}
  • ActiveMqConnectionFactory : ActiveMQ 브로커 연결 설정
  • JmsTemplate : 메시지 송,수신하고 읽을 수 있도록 설정
  • PooledConnectionFactory : Connection을 재사용하기 위한 Connection Pool 설정
  • DefaultJmsListenerContainerFactory : Consumer가 메시지를 수신받을 수 있도록 설정 Consumer는 아래에 내용을 추가했다.
  • RedeliveryPolicy : 메시지 실패 시 재배달 정책에 대한 설정(maximumRedeliveries = 0으로 설정하면 재시도하지 않는다. DLQ로 바로 빠지게 하기 위함)

더 많은 옵션들이 있지만 Producer, Broker, Consumer에 대한 기본적인 설정이 모두 완료되었다.

 

Producer

@Component
public class ActiveMqProducer {

	private static final String DESTINATION = "브로커 큐 이름";

	private final JmsTemplate jmsTemplate;

	@Autowired
	public ActiveMqProducer(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}


	public void sendMessage(Object message) {
		jmsTemplate.convertAndSend(DESTINATION, message);
	}

}

이미 Config에서 JmsTemplate 설정이 되었으므로 jmsTemplate.convertAndSend에 전달할 "브로커 큐 이름"과 전달할 메시지 Object를 파라미터도 전달하며 호출해주면 된다. 정말 간단하게 메시지를 Producing 할 수 있다.

 

Consumer

@Component
public class ActiveMqConsumer {

	@JmsListener(destination = "브로커 큐 이름")
	public void onMessage(@Payload Object message) {
		System.out.println("onMessage!!");
	}

}

Consumer는 Producer보다 더 간단하다 메시지를 받을 메소드에 @JmsListener 어노테이션만 선언하면 된다. 파라미터 값으로 Object 객체라고 표현했지만 MessageConverter를 이미 설정했으므로 실제 받을 Object 객체를 선언하면 된다.

ex) @Payload Person Message (참고로 @Payload 어노테이션을 선언하지 않아도 된다.)

 

Spring Framwork에서 ActiveMq Producer, Consumer 기본적인 구성을 기록하였다.

이제는 서비스에 맞춰서 전략적으로 구성해야 한다.

예를 들어 대부분 Consumer 서버들이 클러스터링으로 구성될 경우 한 서버에서 Consume을 해야 하는 Thread 수 또는 Message에 대한 전달 전략 일 수 있다. Message 처리의 우선순위 중요하지 않아 Round Robin 방식으로 전달할지 아니면 Message 처리의 우선순위가 중요하다면 Consumer를 단일화할지 이런 결정들이 실제로 남았다.

 

실제로도 서비스에 맞춰서 어떻게 MQ 운영 전략을 가져갈지 고민도 많이 했고 테스트도 상당히 많이 한 기억이 있다.

 

이런 운영적인 것도 까먹기전에 다음에 또 기록해야겠다.

300x250

댓글