liuliuzo opened a new issue #293:
URL: https://github.com/apache/rocketmq-spring/issues/293
是这样的,我们这边需要支持原生Listener的使用方式自己控制ConsumeConcurrentlyStatus。
目前做法是优化了下加载并且在Lifecycle set自己的Listener,希望这边又更优雅的方式。
public class MyMessageListenerConcurrently implements
MessageListenerConcurrently {
private static final Logger log =
LoggerFactory.getLogger(MyMessageListenerConcurrently.class);
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
//do something here
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(),
costTime);
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}",
messageExt, e);
context.setDelayLevelWhenNextConsume(0);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
@Service
@RocketMQMessageListener(topic = "topic", consumerGroup = "consumerGroup")
public class StringConsumer implements RocketMQPushConsumerLifecycleListener
{
@Autowired
private MyMessageListenerConcurrently myMyMessageListenerConcurrently;
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setMessageListener(myMyMessageListenerConcurrently);
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]