This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ae246fe7d71495b440a2dc83afe9930f7756ab90
Author: lipenghui <peng...@apache.org>
AuthorDate: Thu Jul 16 09:25:01 2020 +0800

    Decompression payload if needed in KeyShared subscription (#7416)
    
    Decompression payload if needed in KeyShared subscription
    
    (cherry picked from commit ed3583a5bd750661f8643617fc618151f87019b2)
---
 .../broker/service/AbstractBaseDispatcher.java     | 47 +++++-------------
 .../client/api/KeySharedSubscriptionTest.java      | 57 ++++++++++++++++++++++
 .../client/impl/BatchMessageKeyBasedContainer.java |  9 ----
 .../org/apache/pulsar/client/impl/MessageImpl.java | 10 +++-
 .../apache/pulsar/common/protocol/Commands.java    |  8 +++
 5 files changed, 85 insertions(+), 46 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 18b29f2..7cf9793 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -33,6 +33,8 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
@@ -149,43 +151,16 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
     public static final String NONE_KEY = "NONE_KEY";
 
     protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
-        int readerIndex = metadataAndPayload.readerIndex();
+        metadataAndPayload.markReaderIndex();
         PulsarApi.MessageMetadata metadata = 
Commands.parseMessageMetadata(metadataAndPayload);
-
-        try {
-            if (metadata.hasNumMessagesInBatch()) {
-                // If the message was part of a batch (eg: a batch of 1 
message), we need
-                // to read the key from the first single-message-metadata entry
-                PulsarApi.SingleMessageMetadata.Builder 
singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
-                        .newBuilder();
-                ByteBuf singleMessagePayload = 
Commands.deSerializeSingleMessageInBatch(metadataAndPayload,
-                        singleMessageMetadataBuilder, 0, 
metadata.getNumMessagesInBatch());
-                try {
-                    if (singleMessageMetadataBuilder.hasOrderingKey()) {
-                        return 
singleMessageMetadataBuilder.getOrderingKey().toByteArray();
-                    } else if (singleMessageMetadataBuilder.hasPartitionKey()) 
{
-                        return 
singleMessageMetadataBuilder.getPartitionKey().getBytes();
-                    }
-                } finally {
-                    singleMessagePayload.release();
-                    singleMessageMetadataBuilder.recycle();
-                }
-            } else {
-                // Message was not part of a batch
-                if (metadata.hasOrderingKey()) {
-                    return metadata.getOrderingKey().toByteArray();
-                } else if (metadata.hasPartitionKey()) {
-                    return metadata.getPartitionKey().getBytes();
-                }
-            }
-
-            return NONE_KEY.getBytes();
-        } catch (IOException e) {
-            // If we fail to deserialize medata, return null key
-            return NONE_KEY.getBytes();
-        } finally {
-            metadataAndPayload.readerIndex(readerIndex);
-            metadata.recycle();
+        metadataAndPayload.resetReaderIndex();
+        byte[] key = NONE_KEY.getBytes();
+        if (metadata.hasOrderingKey()) {
+            return metadata.getOrderingKey().toByteArray();
+        } else if (metadata.hasPartitionKey()) {
+            return metadata.getPartitionKey().getBytes();
         }
+        metadata.recycle();
+        return key;
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 511f8c2..2a7a20b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -710,6 +710,63 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         consumer4.close();
     }
 
+    @Test
+    public void testWithMessageCompression() throws Exception {
+        final String topic = "testWithMessageCompression" + 
UUID.randomUUID().toString();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .compressionType(CompressionType.LZ4)
+                .create();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("test")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+        final int messages = 10;
+        for (int i = 0; i < messages; i++) {
+            producer.send(("Hello Pulsar > " + i).getBytes());
+        }
+        List<Message<byte[]>> receives = new ArrayList<>();
+        for (int i = 0; i < messages; i++) {
+            Message<byte[]> received = consumer.receive();
+            receives.add(received);
+            consumer.acknowledge(received);
+        }
+        Assert.assertEquals(receives.size(), messages);
+        producer.close();
+        consumer.close();
+    }
+
+    @Test
+    public void testAttachKeyToMessageMetadata()
+            throws PulsarClientException {
+        this.conf.setSubscriptionKeySharedEnable(true);
+        String topic = "persistent://public/default/key_shared-" + 
UUID.randomUUID();
+
+        @Cleanup
+        Consumer<Integer> consumer1 = createConsumer(topic);
+
+        @Cleanup
+        Consumer<Integer> consumer2 = createConsumer(topic);
+
+        @Cleanup
+        Consumer<Integer> consumer3 = createConsumer(topic);
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 1000; i++) {
+            producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+                    .value(i)
+                    .send();
+        }
+
+        receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, 
consumer3));
+    }
+
     private Consumer<String> createFixedHashRangesConsumer(String topic, 
String subscription, Range... ranges) throws PulsarClientException {
         return pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index c9328c8..d9c1c6c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -208,15 +208,6 @@ class BatchMessageKeyBasedContainer extends 
AbstractBatchMessageContainer {
         private void addMsg(MessageImpl<?> msg, SendCallback callback) {
             if (messages.size() == 0) {
                 sequenceId = 
Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
-                if (msg.hasKey()) {
-                    messageMetadata.setPartitionKey(msg.getKey());
-                    if (msg.hasBase64EncodedKey()) {
-                        messageMetadata.setPartitionKeyB64Encoded(true);
-                    }
-                }
-                if (msg.hasOrderingKey()) {
-                    
messageMetadata.setOrderingKey(ByteString.copyFrom(msg.getOrderingKey()));
-                }
                 batchedMessageMetadataAndPayload = 
PulsarByteBufAllocator.DEFAULT
                         .buffer(Math.min(maxBatchSize, 
ClientCnx.getMaxMessageSize()));
                 firstCallback = callback;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index a07834f..ac387a4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -140,10 +140,18 @@ public class MessageImpl<T> implements Message<T> {
         } else {
             properties = Collections.emptyMap();
         }
-
         if (singleMessageMetadata.hasPartitionKey()) {
             
msgMetadataBuilder.setPartitionKeyB64Encoded(singleMessageMetadata.getPartitionKeyB64Encoded());
             
msgMetadataBuilder.setPartitionKey(singleMessageMetadata.getPartitionKey());
+        } else if (msgMetadataBuilder.hasPartitionKey()) {
+            msgMetadataBuilder.clearPartitionKey();
+            msgMetadataBuilder.clearPartitionKeyB64Encoded();
+        }
+
+        if (singleMessageMetadata.hasOrderingKey()) {
+            
msgMetadataBuilder.setOrderingKey(singleMessageMetadata.getOrderingKey());
+        } else if (msgMetadataBuilder.hasOrderingKey()) {
+            msgMetadataBuilder.clearOrderingKey();
         }
 
         if (singleMessageMetadata.hasEventTime()) {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 7ae21e7..d867139 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1622,6 +1622,14 @@ public class Commands {
         messageMetadata.setPublishTime(builder.getPublishTime());
         messageMetadata.setProducerName(builder.getProducerName());
         messageMetadata.setSequenceId(builder.getSequenceId());
+        // Attach the key to the message metadata.
+        if (builder.hasPartitionKey()) {
+            messageMetadata.setPartitionKey(builder.getPartitionKey());
+            
messageMetadata.setPartitionKeyB64Encoded(builder.getPartitionKeyB64Encoded());
+        }
+        if (builder.hasOrderingKey()) {
+            messageMetadata.setOrderingKey(builder.getOrderingKey());
+        }
         if (builder.hasReplicatedFrom()) {
             messageMetadata.setReplicatedFrom(builder.getReplicatedFrom());
         }

Reply via email to