wlliqipeng commented on a change in pull request #659: [RIP-9] The best 
practice of rocketmq
URL: https://github.com/apache/rocketmq/pull/659#discussion_r250080813
 
 

 ##########
 File path: docs/cn/best_practise.md
 ##########
 @@ -0,0 +1,339 @@
+# 7 最佳实践(best practice)
+
+## 7.1 客户端
+
+​       相对于RocketMQ的Broker集群,生产者和消费者都是客户端。本小节主要描述生产者和消费者公共的行为配置。
+
+### 7.1.1 客户端寻址方式
+
+RocketMQ可以令客户端找到Name Server, 然后通过Name 
Server再找到Broker,分别如下,如下所示有多种配置方式,优先级由高到低,高优先级会覆盖低优先级。
+
+1. 代码中指定Name Server地址,多个namesrv地址之间用分号分割   
+```java
+producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");  
+```
+```java
+consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
+```
+2. Java启动参数中指定Name Server地址
+
+```text
+-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876  
+```
+3. 环境变量指定Name Server地址
+
+```text
+export   NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876   
+```
+4. HTTP静态服务器寻址(默认)
+
+客户端启动后,会定时访问一个静态HTTP服务器,地址如下:<http://jmenv.tbsite.net:8080/rocketmq/nsaddr>,这个URL的返回内容如下:
+```text
+192.168.0.1:9876;192.168.0.2:9876   
+```
+客户端默认每隔2分钟访问一次这个HTTP服务器,并更新本地的Name 
Server地址。URL已经在代码中写死,可通过修改/etc/hosts文件来改变要访问的服务器,例如在/etc/hosts增加如下配置:
+```text
+10.232.22.67    jmenv.taobao.net   
+```
+推荐使用HTTP静态服务器寻址方式,好处是客户端部署简单,且Name Server集群可以热升级。
+
+### 7.1.2 客户端配置
+
+DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都继承于ClientConfig类,ClientConfig为客户端的公共配置类。客户端的配置都是get、set形式,每个参数都可以用spring来配置,也可以在代码中配置,例如namesrvAddr这个参数可以这样配置,其他参数同理。
+```java
+   producer.setNamesrvAddr("192.168.0.1:9876");   
+```
+
+####  1  客户端的公共配置
+
+| 参数名                        | 默认值  | 说明                                       
                  |
+| ----------------------------- | ------- | 
------------------------------------------------------------ |
+| namesrvAddr                   |         | Name 
Server地址列表,多个NameServer地址用分号隔开            |
+| clientIP                      | 本机IP  | 
客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定 |
+| instanceName                  | DEFAULT | 
客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等) |
+| clientCallbackExecutorThreads | 4       | 通信层异步回调线程数                         
                |
+| pollNameServerInteval         | 30000   | 轮询Name Server间隔时间,单位毫秒             
               |
+| heartbeatBrokerInterval       | 30000   | 向Broker发送心跳间隔时间,单位毫秒               
            |
+| persistConsumerOffsetInterval | 5000    | 持久化Consumer消费进度间隔时间,单位毫秒           
          |
+
+#### 2  Producer配置
+
+| 参数名                           | 默认值           | 说明                           
                              |
+| -------------------------------- | ---------------- | 
------------------------------------------------------------ |
+| producerGroup                    | DEFAULT_PRODUCER | 
Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组 |
+| createTopicKey                   | TBW102           | 
在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。 |
+| defaultTopicQueueNums            | 4                | 
在发送消息,自动创建服务器不存在的topic时,默认创建的队列数  |
+| sendMsgTimeout                   | 10000            | 发送消息超时时间,单位毫秒          
                         |
+| compressMsgBodyOverHowmuch       | 4096             | 
消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 |
+| retryAnotherBrokerWhenNotStoreOK | FALSE            | 
如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 |
+| retryTimesWhenSendFailed         | 2                | 
如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用 |
+| maxMessageSize                   | 4MB              | 
客户端限制的消息大小,超过报错,同时服务端也会限制,所以需要跟服务端配合使用。 |
+| transactionCheckListener         |                  | 
事务消息回查监听器,如果发送事务消息,必须设置               |
+| checkThreadPoolMinSize           | 1                | 
Broker回查Producer事务状态时,线程池大小                     |
+| checkThreadPoolMaxSize           | 1                | 
Broker回查Producer事务状态时,线程池大小                     |
+| checkRequestHoldMax              | 2000             | 
Broker回查Producer事务状态时,Producer本地缓冲请求队列大小   |
+| RPCHook                          | null             | 
该参数是在Producer创建时传入的,包含消息发送前的预处理和消息响应后的处理两个接口,用户可以在第一个接口中做一些安全控制或者其他操作。 |
+
+#### 3  PushConsumer配置
+
+| 参数名                       | 默认值                        | 说明                  
                                       |
+| ---------------------------- | ----------------------------- | 
------------------------------------------------------------ |
+| consumerGroup                | DEFAULT_CONSUMER              | 
Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
+| messageModel                 | CLUSTERING                    | 消息模型,支持以下两种   
1、集群消费   2、广播消费           |
+| consumeFromWhere             | CONSUME_FROM_LAST_OFFSET      | 
Consumer启动后,默认从上次消费的位置开始消费,这包含两种情况:一种是上次消费的位置未过期,则消费从上次中止的位置进行;一种是上次消费位置已经过期,则从当前队列第一条消息开始消费
 |
+| consumeTimestamp             | 半个小时前                    | 
只有当consumeFromWhere值为CONSUME_FROM_TIMESTAMP时才起作用。 |
+| allocateMessageQueueStrategy | AllocateMessageQueueAveragely | 
Rebalance算法实现策略                                        |
+| subscription                 | {}                            | 订阅关系          
                                           |
+| messageListener              |                               | 消息监听器         
                                          |
+| offsetStore                  |                               | 消费进度存储        
                                         |
+| consumeThreadMin             | 10                            | 消费线程池数量       
                                        |
+| consumeThreadMax             | 20                            | 消费线程池数量       
                                        |
+|                              |                               |               
                                               |
+| consumeConcurrentlyMaxSpan   | 2000                          | 
单队列并行消费允许的最大跨度                                 |
+| pullThresholdForQueue        | 1000                          | 
拉消息本地队列缓存消息最大数                                 |
+| pullInterval                 | 0                             | 
拉消息间隔,由于是长轮询,所以为0,但是如果应用为了流控,也可以设置大于0的值,单位毫秒 |
+| consumeMessageBatchMaxSize   | 1                             | 
批量消费,一次消费多少条消息                                 |
+| pullBatchSize                | 32                            | 
批量拉消息,一次最多拉多少条                                 |
+
+#### 4  PullConsumer配置
+
+| 参数名                           | 默认值                        | 说明              
                                           |
+| -------------------------------- | ----------------------------- | 
------------------------------------------------------------ |
+| consumerGroup                    | DEFAULT_CONSUMER              | 
Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
+| brokerSuspendMaxTimeMillis       | 20000                         | 
长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒     |
+| consumerTimeoutMillisWhenSuspend | 30000                         | 
长轮询,Consumer拉消息请求在Broker挂起超过指定时间,客户端认为超时,单位毫秒 |
+| consumerPullTimeoutMillis        | 10000                         | 
非长轮询,拉消息超时时间,单位毫秒                           |
+| messageModel                     | BROADCASTING                  | 
消息模型,支持以下两种   1、集群消费   2、广播消费           |
+| messageQueueListener             |                               | 监听队列变化    
                                             |
+| offsetStore                      |                               | 消费进度存储    
                                             |
+| registerTopics                   | []                            | 
注册的topic集合                                              |
+| allocateMessageQueueStrategy     | AllocateMessageQueueAveragely | 
Rebalance算法实现策略                                        |
+
+#### 5  Message数据结构
+
+| 字段名         | 默认值 | 说明                                                       
  |
+| -------------- | ------ | 
------------------------------------------------------------ |
+| Topic          | null   | 必填,线下环境不需要申请,线上环境需要申请后才能使用         |
+| Body           | null   | 必填,字节类型,序列化由应用决定,Producer与Consumer要协商好序列化形式。 |
+| Tags           | null   | 
选填,类似于Gmail为每封邮件设置的标签,方便服务器过滤使用。目前只支持每个消息设置一个tag,所以也可以类比为Notify的MessageType概念 |
+| Keys           | null   | 
选填,代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以在Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品Id等。
 |
+| Flag           | 0      | 选填,完全由应用来设置,RocketMQ不做干预                     |
+| DelayTimeLevel | 0      | 选填,消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费 |
+| WaitStoreMsgOK | TRUE   | 选填,表示消息是否在服务器落盘后才返回应答。                 |
+
+
+## 7.2   生产者
+
+### 7.2.1 发送消息注意事项
+
+​      
一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置。只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:
+```java
+   message.setTags("TagA")   
+```
+每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。
+```java
+   // 订单Id   
+   String orderId = "20034568923546";   
+   message.setKeys(orderId);   
+```
+​     
消息发送成功或者失败要打印消息日志,务必要打印SendResult和key字段。send消息方法只要不抛异常,就代表发送成功。发送成功会有多个状态,在sendResult里定义。以下对每个状态进行说明:
+
+●        SEND_OK(发送成功)
+
+​      
消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。
+
+●        FLUSH_DISK_TIMEOUT(刷盘超时)
+
+​      
消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。
+
+●        FLUSH_SLAVE_TIMEOUT(数据同步到slave服务器超时)
+
+​      
消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。
+
+●     SLAVE_NOT_AVAILABLE(无Slave服务器可用)
+
+​      
消息发送成功,但是此时Slave不可用。此时消息已经进入Master服务器队列,只有Master服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave
 Broker服务器,则将返回该状态——无Slave服务器可用。
+
+​      
如果返回值是刷盘超时(FLUSH_DISK_TIMEOUT)、同步Slave服务器超时(FLUSH_SLAVE_TIMEOUT)并且Broker服务器正好关闭,那么丢失的消息是可以找到的。此时有两个选择:一个是放弃该消息,这可能会导致此消息丢失;另一种方法是重新发送消息,这可能会使消息重复。通常推荐重新发送消息,然后在消费端过滤重复消息,除非业务场景能够忍受个别消息的丢失。但请记住,当返回值为无Slave服务器可用(SLAVE_NOT_AVAILABLE)时,重新发送消息是无用的。如果发生这种情况,用户应该保留场景、并提醒群集管理器。
+
+### 7.2.2 消息发送失败处理方式
+
+​     Producer的send方法本身支持内部重试,重试逻辑如下:
+
+1)      至多重试2次(同步发送为2次,异步发送为0次)。
+2)      如果发送失败,则轮转到下一个Broker。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
+3)      如果本身向broker发送消息产生超时异常,就不会再重试。
+
+​       
以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。
+
+​      
上述db重试方式为什么没有集成到MQ客户端内部做,而是要求应用自己去完成?我们基于以下几点考虑:首先、MQ的客户端设计为无状态模式,方便任意的水平扩展,且对机器资源的消耗仅仅是cpu、内存、网络。第二、如果MQ客户端内部集成一个KV存储模块,那么数据只有同步落盘才能较可靠,而同步落盘本身性能开销较大,所以通常会采用异步落盘,又由于应用关闭过程不受MQ运维人员控制,可能经常会发生
 kill -9 
这样暴力方式关闭,造成数据没有及时落盘而丢失。第三、Producer所在机器的可靠性较低,一般为虚拟机,不适合存储重要数据。综上,建议重试过程交由应用来控制。
+
+### 7.2.3选择oneway形式发送
+
+​     通常消息的发送是这样一个过程:
+​      1)      客户端发送请求到服务器
+​      2)      服务器处理请求
+​      3)      服务器向客户端返回应答
+
+​      
所以,一次消息发送的耗时时间是上述三个步骤的总和,而某些场景要求耗时非常短,但是对可靠性要求并不高,例如日志收集类应用,此类应用可以采用oneway形式调用,oneway形式只发送请求不等待应答,而发送请求在客户端实现层面仅仅是一个操作系统系统调用的开销,即将数据写入客户端的socket缓冲区,此过程耗时通常在微秒级。
+
+### 7.2.4 其他
+
+1)      建议消息的大小最好不要超过512K。
+
+异步发送(AsyncSending) 
默认方法send(msg)会在得到返回响应之前一直被阻塞。因此,如果用户重视性能,那么建议使用send(msg、callback),这样消息发送将以异步方式进行。
 
+
+2)      生产者是线程安全的。
+
+3)      
如果希望在一个JVM中有多个生产者来进行大数据处理,有以下建议:通过一些生产者(3~5个足以)使用异步发送;为每个生产者的设置实例名,即setInstanceName。
+
+## 7.3   消费者
+
+### 7.3.1 消费过程幂等
+
+​       
RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
+
+msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
+
+### 7.3.2 消费速度慢的处理方式
+
+1   提高消费并行度
+
+​     绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 
RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。
 如下有几种修改消费并行度的方法:
+​      1)  同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 
Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。
 
 Review comment:
   项目编号-

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to