codelipenghui commented on a change in pull request #7416: URL: https://github.com/apache/pulsar/pull/7416#discussion_r448344535
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java ########## @@ -153,13 +155,18 @@ public void resetCloseFuture() { PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); try { - if (metadata.hasNumMessagesInBatch()) { + if (!metadata.hasOrderingKey() && !metadata.hasPartitionKey() && metadata.hasNumMessagesInBatch()) { // If the message was part of a batch (eg: a batch of 1 message), we need // to read the key from the first single-message-metadata entry + PulsarApi.CompressionType compressionType = metadata.getCompression(); + CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType); Review comment: I'm not sure how to handle #7107 that fixed. Maybe we can use the first single message key as the batch message key, but this requires all clients catchup. So, I think we can onboard this PR first ensure correctness, and I will create an issue for the clients catchup work. And I have add check `if (!metadata.hasOrderingKey() && !metadata.hasPartitionKey() && metadata.hasNumMessagesInBatch())`, if the clients set the key of the batch message to the key of the first message, it can reduce overhead. I have test with the KeyBasedBatcher. It already set the batch message key. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org