codelipenghui commented on a change in pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416#discussion_r448344535



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -153,13 +155,18 @@ public void resetCloseFuture() {
         PulsarApi.MessageMetadata metadata = 
Commands.parseMessageMetadata(metadataAndPayload);
 
         try {
-            if (metadata.hasNumMessagesInBatch()) {
+            if (!metadata.hasOrderingKey() && !metadata.hasPartitionKey() && 
metadata.hasNumMessagesInBatch()) {
                 // If the message was part of a batch (eg: a batch of 1 
message), we need
                 // to read the key from the first single-message-metadata entry
+                PulsarApi.CompressionType compressionType = 
metadata.getCompression();
+                CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(compressionType);

Review comment:
       I'm not sure how to handle #7107 that fixed. Maybe we can use the first 
single message key as the batch message key, but this requires all clients 
catchup.
   
   So, I think we can onboard this PR first ensure correctness, and I will 
create an issue for the clients catchup work.
   
   And I have add check `if (!metadata.hasOrderingKey() && 
!metadata.hasPartitionKey() && metadata.hasNumMessagesInBatch())`, if the 
clients set the key of the batch message to the key of the first message, it 
can reduce overhead.
   
   I have test with the KeyBasedBatcher. It already set the batch message key.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to