[ https://issues.apache.org/jira/browse/ROCKETMQ-80?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15875413#comment-15875413 ]
ASF GitHub Bot commented on ROCKETMQ-80: ---------------------------------------- Github user vongosling commented on a diff in the pull request: https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101990343 --- Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java --- @@ -442,6 +449,202 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // return response; } + private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, // + final RemotingCommand request, // + final SendMessageContext sendMessageContext, // + final SendMessageRequestHeader requestHeader) throws RemotingCommandException { + + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); + + + response.setOpaque(request.getOpaque()); + + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); + + if (log.isDebugEnabled()) { + log.debug("receive SendMessage request command, " + request); + } + + final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); + if (this.brokerController.getMessageStore().now() < startTimstamp) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); + return response; + } + + response.setCode(-1); + super.msgCheck(ctx, requestHeader, response); + if (response.getCode() != -1) { + return response; + } + + + int queueIdInt = requestHeader.getQueueId(); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + + if (queueIdInt < 0) { + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + } + + int sysFlag = requestHeader.getSysFlag(); + if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { + sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; + } + + String newTopic = requestHeader.getTopic(); + if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); + if (null == subscriptionGroupConfig) { + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark( + "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + return response; + } + + + int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); + if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { + maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); + } + int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); + if (reconsumeTimes >= maxReconsumeTimes) { + newTopic = MixAll.getDLQTopic(groupName); + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // + DLQ_NUMS_PER_GROUP, // + PermName.PERM_WRITE, 0 + ); + if (null == topicConfig) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("topic[" + newTopic + "] not exist"); + return response; + } + } + } + if (newTopic.length() > Byte.MAX_VALUE) { + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark("message topic length too long " + newTopic.length()); + return response; + } + + + MessageExtBatch messageExtBatch = new MessageExtBatch(); + messageExtBatch.setTopic(newTopic); --- End diff -- Drawback 1 : only support the same topic > Add batch feature > ----------------- > > Key: ROCKETMQ-80 > URL: https://issues.apache.org/jira/browse/ROCKETMQ-80 > Project: Apache RocketMQ > Issue Type: New Feature > Affects Versions: 4.1.0-incubating > Reporter: dongeforever > Assignee: dongeforever > Fix For: 4.1.0-incubating > > > Tests show that Kafka's million-level TPS is mainly owed to batch. When set > batch size to 1, the TPS is reduced an order of magnitude. So I try to add > this feature to RocketMQ. > For a minimal effort, it works as follows: > Only add synchronous send functions to MQProducer interface, just like > send(final Collection msgs). > Use MessageBatch which extends Message and implements Iterable<Message>. > Use byte buffer instead of list of objects to avoid too much GC in Broker. > Split the decode and encode logic from lockForPutMessage to avoid too many > race conditions. > Tests: > On linux with 24 Core 48G Ram and SSD, using 50 threads to send 50Byte(body) > message in batch size 50, we get about 150w TPS until the disk is full. > Potential problems: > Although the messages can be accumulated in the Broker very quickly, it need > time to dispatch to the consume queue, which is much slower than accepting > messages. So the messages may not be able to be consumed immediately. > We may need to refactor the ReputMessageService to solve this problem. > And if guys have some ideas, please let me know or just share it in this > issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346)