wlliqipeng commented on a change in pull request #655: [RIP-9] Add the simple
example description
URL: https://github.com/apache/rocketmq/pull/655#discussion_r248920198
##########
File path: docs/cn/RocketMQ_Example.md
##########
@@ -0,0 +1,957 @@
+样例(sample)
+============
+
+基本样例
+--------
+
+在基本样例中我们提供如下的功能场景:
+
+* 使用RocketMQ发送三种类型的消息:同步消息,异步消息和单向消息。其中前两种消息是可靠的,因为会有发送是否成功的应答。
+* 使用RocketMQ来消费接收到的消息。
+
+### 1、加入依赖:
+
+`maven:`
+```
+<dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>4.3.0</version>
+</dependency>
+```
+`gradle`
+```
+compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
+```
+### 2、消息发送
+
+#### 1. Producer端发送同步消息
+
+这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
+```java
+public class SyncProducer {
+ public static void main(String[] args) throws Exception {
+ // 实例化消息生产者Producer
+ DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
+ // 设置NameServer的地址
+ producer.setNamesrvAddr("localhost:9876");
+ // 启动Producer实例
+ producer.start();
+ for (int i = 0; i < 100; i++) {
+ // 创建消息,并指定Topic,Tag和消息体
+ Message msg = new Message("TopicTest" /* Topic */,
+ "TagA" /* Tag */,
+ ("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
+ );
+ // 发送消息到一个Broker
+ SendResult sendResult = producer.send(msg);
+ // 通过sendResult返回消息是否成功送达
+ System.out.printf("%s%n", sendResult);
+ }
+ // 如果不再发送消息,关闭Producer实例。
+ producer.shutdown();
+ }
+}
+```
+#### 2. 发送异步消息
+
+异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
+
+```java
+public class AsyncProducer {
+ public static void main(String[] args) throws Exception {
+ // 实例化消息生产者Producer
+ DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
+ // 设置NameServer的地址
+ producer.setNamesrvAddr("localhost:9876");
+ // 启动Producer实例
+ producer.start();
+ producer.setRetryTimesWhenSendAsyncFailed(0);
+ for (int i = 0; i < 100; i++) {
+ final int index = i;
+ // 创建消息,并指定Topic,Tag和消息体
+ Message msg = new Message("TopicTest",
+ "TagA",
+ "OrderID188",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ // SendCallback接收异步返回结果的回调
+ producer.send(msg, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ System.out.printf("%-10d OK %s %n", index,
+ sendResult.getMsgId());
+ }
+ @Override
+ public void onException(Throwable e) {
+ System.out.printf("%-10d Exception %s %n", index, e);
+ e.printStackTrace();
+ }
+ });
+ }
+ // 如果不再发送消息,关闭Producer实例。
+ producer.shutdown();
+ }
+}
+```
+
+#### 3. 单向发送消息
+
+这种方式主要用在不特别关心发送结果的场景,例如日志发送。
+
+```java
+
+public class OnewayProducer {
+ public static void main(String[] args) throws Exception{
+ // 实例化消息生产者Producer
+ DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
+ // 设置NameServer的地址
+ producer.setNamesrvAddr("localhost:9876");
+ // 启动Producer实例
+ producer.start();
+ for (int i = 0; i < 100; i++) {
+ // 创建消息,并指定Topic,Tag和消息体
+ Message msg = new Message("TopicTest" /* Topic */,
+ "TagA" /* Tag */,
+ ("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
+ );
+ // 发送单向消息,没有任何返回结果
+ producer.sendOneway(msg);
+
+ }
+ // 如果不再发送消息,关闭Producer实例。
+ producer.shutdown();
+ }
+```
+
+### 3、消费消息
+
+```java
+
+public class Consumer {
+
+ public static void main(String[] args) throws InterruptedException,
MQClientException {
+
+ // 实例化消费者
+ DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("please_rename_unique_group_name");
+
+ // 设置NameServer的地址
+ consumer.setNamesrvAddr("localhost:9876");
+
+ // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
+ consumer.subscribe("TopicTest", "*");
+ // 注册回调实现类来处理从broker拉取回来的消息
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+ @Override
+ public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+ System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
+ // 标记该消息已经被成功消费
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+
+ // 启动消费者实例
+ consumer.start();
+
+ System.out.printf("Consumer Started.%n");
+ }
+}
+
+```
+
+
+顺序消息样例
+----------
+
+消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
+
+顺序消费的原理解析,在默认的情况下消息发送会采取Round
Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
+
+下面用订单进行示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
+
+### 1、顺序消息生产
+
+```java
+
+package org.apache.rocketmq.example.order2;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+* Producer,发送顺序消息
+*/
+public class Producer {
+
+ public static void main(String[] args) throws Exception {
+ DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
+
+ producer.setNamesrvAddr("127.0.0.1:9876");
+
+ producer.start();
+
+ String[] tags = new String[]{"TagA", "TagC", "TagD"};
+
+ // 订单列表
+ List<OrderStep> orderList = new Producer().buildOrders();
+
+ Date date = new Date();
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ String dateStr = sdf.format(date);
+ for (int i = 0; i < 10; i++) {
+ // 加个时间前缀
+ String body = dateStr + " Hello RocketMQ " + orderList.get(i);
+ Message msg = new Message("TopicTest", tags[i % tags.length], "KEY"
+ i, body.getBytes());
+
+ SendResult sendResult = producer.send(msg, new
MessageQueueSelector() {
+ @Override
+ public MessageQueue select(List<MessageQueue> mqs, Message msg,
Object arg) {
+ Long id = (Long) arg; //根据订单id选择发送queue
+ long index = id % mqs.size();
+ return mqs.get((int) index);
+ }
+ }, orderList.get(i).getOrderId());//订单id
+
+ System.out.println(String.format("SendResult status:%s, queueId:%d,
body:%s",
+ sendResult.getSendStatus(),
+ sendResult.getMessageQueue().getQueueId(),
+ body));
+ }
+
+ producer.shutdown();
+ }
+
+ /**
+ * 订单的步骤
+ */
+ private static class OrderStep {
+ private long orderId;
+ private String desc;
+
+ public long getOrderId() {
+ return orderId;
+ }
+
+ public void setOrderId(long orderId) {
+ this.orderId = orderId;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public void setDesc(String desc) {
+ this.desc = desc;
+ }
+
+ @Override
+ public String toString() {
+ return "OrderStep{" +
+ "orderId=" + orderId +
+ ", desc='" + desc + '\'' +
+ '}';
+ }
+ }
+
+ /**
+ * 生成模拟订单数据
+ */
+ private List<OrderStep> buildOrders() {
+ List<OrderStep> orderList = new ArrayList<OrderStep>();
+
+ OrderStep orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103111039L);
+ orderDemo.setDesc("创建");
+ orderList.add(orderDemo);
+
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103111065L);
+ orderDemo.setDesc("创建");
+ orderList.add(orderDemo);
+
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103111039L);
+ orderDemo.setDesc("付款");
+ orderList.add(orderDemo);
+
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103117235L);
+ orderDemo.setDesc("创建");
+ orderList.add(orderDemo);
+
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103111065L);
+ orderDemo.setDesc("付款");
+ orderList.add(orderDemo);
+
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103117235L);
+ orderDemo.setDesc("付款");
+ orderList.add(orderDemo);
+
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103111065L);
+ orderDemo.setDesc("完成");
+ orderList.add(orderDemo);
+
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103111039L);
+ orderDemo.setDesc("推送");
+ orderList.add(orderDemo);
+
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103117235L);
+ orderDemo.setDesc("完成");
+ orderList.add(orderDemo);
+
+ orderDemo = new OrderStep();
+ orderDemo.setOrderId(15103111039L);
+ orderDemo.setDesc("完成");
+ orderList.add(orderDemo);
+
+ return orderList;
+ }
+}
+
+```
+
+### 2、顺序消费消息
+
+```java
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import java.util.List;
+
+package org.apache.rocketmq.example.order2;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
+*/
+public class ConsumerInOrder {
+
+ public static void main(String[] args) throws Exception {
+ DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("please_rename_unique_group_name_3");
+ consumer.setNamesrvAddr("127.0.0.1:9876");
+ /**
+ * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
+ * 如果非第一次启动,那么按照上次消费的位置继续消费
+ */
+
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+
+ consumer.subscribe("TopicTest", "TagA || TagC || TagD");
+
+ consumer.registerMessageListener(new MessageListenerOrderly() {
+
+ Random random = new Random();
+
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
+ context.setAutoCommit(true);
+ for (MessageExt msg : msgs) {
+ // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
+ System.out.println("consumeThread=" +
Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:"
+ new String(msg.getBody()));
+ }
+
+ try {
+ //模拟业务逻辑处理中...
+ TimeUnit.SECONDS.sleep(random.nextInt(10));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return ConsumeOrderlyStatus.SUCCESS;
+ }
+ });
+
+ consumer.start();
+
+ System.out.println("Consumer Started.");
+ }
+}
+
+```
+
+延时消息样例
+----------
+
+### 1、启动消费者等待传入订阅消息
+
+```java
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import java.util.List;
+
+public class ScheduledMessageConsumer {
+ public static void main(String[] args) throws Exception {
+ // 实例化消费者
+ DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("ExampleConsumer");
+ // 订阅Topics
+ consumer.subscribe("TestTopic", "*");
+ // 注册消息监听者
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
messages, ConsumeConcurrentlyContext context) {
+ for (MessageExt message : messages) {
+ // Print approximate delay time period
+ System.out.println("Receive message[msgId=" +
message.getMsgId() + "] " + (System.currentTimeMillis() -
message.getStoreTimestamp()) + "ms later");
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ // 启动消费者
+ consumer.start();
+ }
+}
+
+```
+
+### 2、发送延时消息
+
+```java
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+
+public class ScheduledMessageProducer {
+ public static void main(String[] args) throws Exception {
+ // 实例化一个生产者来产生延时消息
+ DefaultMQProducer producer = new
DefaultMQProducer("ExampleProducerGroup");
+ // 启动生产者
+ producer.start();
+ int totalMessagesToSend = 100;
+ for (int i = 0; i < totalMessagesToSend; i++) {
+ Message message = new Message("TestTopic", ("Hello scheduled message
" + i).getBytes());
+ // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
+ message.setDelayTimeLevel(3);
+ // 发送消息
+ producer.send(message);
+ }
+ // 关闭生产者
+ producer.shutdown();
+ }
+}
+```
+
+### 3、验证
+
+您将会看到消息的消费比存储时间晚10秒。
+
+批量消息样例
+----------
+
+批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
+
+### 1、发送批量消息
+
+如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:
+
+```java
+String topic = "BatchTest";
+List<Message> messages = new ArrayList<>();
+messages.add(new Message(topic, "TagA", "OrderID001", "Hello world
0".getBytes()));
+messages.add(new Message(topic, "TagA", "OrderID002", "Hello world
1".getBytes()));
+messages.add(new Message(topic, "TagA", "OrderID003", "Hello world
2".getBytes()));
+try {
+ producer.send(messages);
+} catch (Exception e) {
+ e.printStackTrace();
+ //处理error
+}
+
+```
+
+### 2、消息列表分割
+
+复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:
+
+```java
+
+public class ListSplitter implements Iterator<List<Message>> {
+ private final int SIZE_LIMIT = 1024 * 1024 * 4;
+ private final List<Message> messages;
+ private int currIndex;
+ public ListSplitter(List<Message> messages) {
+ this.messages = messages;
+ }
+ @Override public boolean hasNext() {
+ return currIndex < messages.size();
+ }
+ @Override public List<Message> next() {
+ int nextIndex = currIndex;
+ int totalSize = 0;
+ for (; nextIndex < messages.size(); nextIndex++) {
+ Message message = messages.get(nextIndex);
+ int tmpSize = message.getTopic().length() +
message.getBody().length;
+ Map<String, String> properties = message.getProperties();
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ tmpSize += entry.getKey().length() + entry.getValue().length();
+ }
+ tmpSize = tmpSize + 20; // 增加日志的开销20字节
+ if (tmpSize > SIZE_LIMIT) {
+ //单个消息超过了最大的限制
+ //忽略,否则会阻塞分裂的进程
+ if (nextIndex - currIndex == 0) {
+ //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
+ nextIndex++;
+ }
+ break;
+ }
+ if (tmpSize + totalSize > SIZE_LIMIT) {
+ break;
+ } else {
+ totalSize += tmpSize;
+ }
+
+ }
+ List<Message> subList = messages.subList(currIndex, nextIndex);
+ currIndex = nextIndex;
+ return subList;
+ }
+}
+//把大的消息分裂成若干个小的消息
+ListSplitter splitter = new ListSplitter(messages);
+while (splitter.hasNext()) {
+ try {
+ List<Message> listItem = splitter.next();
+ producer.send(listItem);
+ } catch (Exception e) {
+ e.printStackTrace();
+ //处理error
+ }
+}
+```
+
+过滤消息样例
+----------
+
+在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:
+
+```java
+DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
+consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
+```
+
+消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:
+
+\-\-\-\-\-\-\-\-\-\-\-\-
+| message |
+|-------- | a > 5 AND b = 'abc'
Review comment:
可以再优化下格式
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services