[ 
https://issues.apache.org/jira/browse/ROCKETMQ-80?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15875413#comment-15875413
 ] 

ASF GitHub Bot commented on ROCKETMQ-80:
----------------------------------------

Github user vongosling commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101990343
  
    --- Diff: 
broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
 ---
    @@ -442,6 +449,202 @@ private RemotingCommand sendMessage(final 
ChannelHandlerContext ctx, //
             return response;
         }
     
    +    private RemotingCommand sendBatchMessage(final ChannelHandlerContext 
ctx, //
    +                                        final RemotingCommand request, //
    +                                        final SendMessageContext 
sendMessageContext, //
    +                                        final SendMessageRequestHeader 
requestHeader) throws RemotingCommandException {
    +
    +        final RemotingCommand response = 
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    +        final SendMessageResponseHeader responseHeader = 
(SendMessageResponseHeader) response.readCustomHeader();
    +
    +
    +        response.setOpaque(request.getOpaque());
    +
    +        response.addExtField(MessageConst.PROPERTY_MSG_REGION, 
this.brokerController.getBrokerConfig().getRegionId());
    +        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, 
String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
    +
    +        if (log.isDebugEnabled()) {
    +            log.debug("receive SendMessage request command, " + request);
    +        }
    +
    +        final long startTimstamp = 
this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    +        if (this.brokerController.getMessageStore().now() < startTimstamp) 
{
    +            response.setCode(ResponseCode.SYSTEM_ERROR);
    +            response.setRemark(String.format("broker unable to service, 
until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
    +            return response;
    +        }
    +
    +        response.setCode(-1);
    +        super.msgCheck(ctx, requestHeader, response);
    +        if (response.getCode() != -1) {
    +            return response;
    +        }
    +
    +
    +        int queueIdInt = requestHeader.getQueueId();
    +        TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    +
    +        if (queueIdInt < 0) {
    +            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % 
topicConfig.getWriteQueueNums();
    +        }
    +
    +        int sysFlag = requestHeader.getSysFlag();
    +        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) 
{
    +            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
    +        }
    +
    +        String newTopic = requestHeader.getTopic();
    +        if (null != newTopic && 
newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    +            String groupName = 
newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
    +            SubscriptionGroupConfig subscriptionGroupConfig =
    +                    
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
    +            if (null == subscriptionGroupConfig) {
    +                
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
    +                response.setRemark(
    +                        "subscription group not exist, " + groupName + " " 
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
    +                return response;
    +            }
    +
    +
    +            int maxReconsumeTimes = 
subscriptionGroupConfig.getRetryMaxTimes();
    +            if (request.getVersion() >= 
MQVersion.Version.V3_4_9.ordinal()) {
    +                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
    +            }
    +            int reconsumeTimes = requestHeader.getReconsumeTimes() == null 
? 0 : requestHeader.getReconsumeTimes();
    +            if (reconsumeTimes >= maxReconsumeTimes) {
    +                newTopic = MixAll.getDLQTopic(groupName);
    +                queueIdInt = Math.abs(this.random.nextInt() % 99999999) % 
DLQ_NUMS_PER_GROUP;
    +                topicConfig = 
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
 //
    +                        DLQ_NUMS_PER_GROUP, //
    +                        PermName.PERM_WRITE, 0
    +                );
    +                if (null == topicConfig) {
    +                    response.setCode(ResponseCode.SYSTEM_ERROR);
    +                    response.setRemark("topic[" + newTopic + "] not 
exist");
    +                    return response;
    +                }
    +            }
    +        }
    +        if (newTopic.length() > Byte.MAX_VALUE) {
    +            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
    +            response.setRemark("message topic length too long " + 
newTopic.length());
    +            return response;
    +        }
    +
    +
    +        MessageExtBatch messageExtBatch = new MessageExtBatch();
    +        messageExtBatch.setTopic(newTopic);
    --- End diff --
    
    Drawback 1 : only support the same topic


> Add batch feature
> -----------------
>
>                 Key: ROCKETMQ-80
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-80
>             Project: Apache RocketMQ
>          Issue Type: New Feature
>    Affects Versions: 4.1.0-incubating
>            Reporter: dongeforever
>            Assignee: dongeforever
>             Fix For: 4.1.0-incubating
>
>
> Tests show that Kafka's million-level TPS is mainly owed to batch. When set 
> batch size to 1, the TPS is reduced an order of magnitude. So I try to add 
> this feature to RocketMQ.
> For a minimal effort, it works as follows:
> Only add synchronous send functions to MQProducer interface, just like 
> send(final Collection msgs).
> Use MessageBatch which extends Message and implements Iterable<Message>.
> Use byte buffer instead of list of objects to avoid too much GC in Broker.
> Split the decode and encode logic from lockForPutMessage to avoid too many 
> race conditions.
> Tests:
> On linux with 24 Core 48G Ram and SSD, using 50 threads to send 50Byte(body) 
> message in batch size 50, we get about 150w TPS until the disk is full.
> Potential problems:
> Although the messages can be accumulated in the Broker very quickly, it need 
> time to dispatch to the consume queue, which is much slower than accepting 
> messages. So the messages may not be able to be consumed immediately.
> We may need to refactor the ReputMessageService to solve this problem.
> And if guys have some ideas, please let me know or just share it in this 
> issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to