maskleo opened a new issue #401: Questions about the official example
URL: https://github.com/apache/rocketmq/issues/401
 
 
   https://rocketmq.apache.org/docs/order-example/
   version  4.2.0
   
   - Producer
   ```java
   DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
           producer.setNamesrvAddr("127.0.0.1:9876");
           producer.setVipChannelEnabled(false);
           producer.start();
           String[] tags = new String[]{"TagA", "TagB", "TagC"};
           try {
               for (int i = 0; i < 100; i++) {
                   Message message = new Message("TopicTestJ", tags[i % 
tags.length], "k" + i, (new Date() + " Hello RocketMQ ,QuickStart" + 
i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                   SendResult sr = producer.send(message, new 
MessageQueueSelector() {
                       @Override
                       public MessageQueue select(List<MessageQueue> mqs, 
Message msg, Object arg) {
                           Integer id = (Integer) arg;
                           int index = id % mqs.size();
                           return mqs.get(index);
                       }
                   }, orderId);
                   System.out.println(sr);
               }
           } catch (Exception e) {
               e.printStackTrace();
           }
           producer.shutdown();
   ```
   
   - Consumer
   
   ```java
   public static void main(String[] args) throws MQClientException {
           DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer("example_group_name");
           consumer.setNamesrvAddr("127.0.0.1:9876");
           
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
           consumer.subscribe("TopicTestJ", "TagA || TagB || TagC");
           consumer.registerMessageListener(new MyMessageListenerOrderly());
           consumer.start();
           System.out.println("Consumer Started.");
       }
   
       static class MyMessageListenerOrderly implements MessageListenerOrderly {
   
           AtomicLong consumeTimes = new AtomicLong(0);
   
           @Override
           public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, 
ConsumeOrderlyContext context) {
               context.setAutoCommit(true);
               System.out.printf(Thread.currentThread().getName() + " Receive 
New Messages: " + msgs + "%n");
               return ConsumeOrderlyStatus.SUCCESS;
   
           }
       }
   ```
   
   Consumers can only get 4 pieces of data, and it is disordered
   
   log 
detail:https://note.youdao.com/share/?id=66e7e49f4c7639fd9c63429b30f8e58b&type=note#/

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to