This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ae246fe7d71495b440a2dc83afe9930f7756ab90 Author: lipenghui <peng...@apache.org> AuthorDate: Thu Jul 16 09:25:01 2020 +0800 Decompression payload if needed in KeyShared subscription (#7416) Decompression payload if needed in KeyShared subscription (cherry picked from commit ed3583a5bd750661f8643617fc618151f87019b2) --- .../broker/service/AbstractBaseDispatcher.java | 47 +++++------------- .../client/api/KeySharedSubscriptionTest.java | 57 ++++++++++++++++++++++ .../client/impl/BatchMessageKeyBasedContainer.java | 9 ---- .../org/apache/pulsar/client/impl/MessageImpl.java | 10 +++- .../apache/pulsar/common/protocol/Commands.java | 8 +++ 5 files changed, 85 insertions(+), 46 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 18b29f2..7cf9793 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -33,6 +33,8 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; @@ -149,43 +151,16 @@ public abstract class AbstractBaseDispatcher implements Dispatcher { public static final String NONE_KEY = "NONE_KEY"; protected byte[] peekStickyKey(ByteBuf metadataAndPayload) { - int readerIndex = metadataAndPayload.readerIndex(); + metadataAndPayload.markReaderIndex(); PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); - - try { - if (metadata.hasNumMessagesInBatch()) { - // If the message was part of a batch (eg: a batch of 1 message), we need - // to read the key from the first single-message-metadata entry - PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata - .newBuilder(); - ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(metadataAndPayload, - singleMessageMetadataBuilder, 0, metadata.getNumMessagesInBatch()); - try { - if (singleMessageMetadataBuilder.hasOrderingKey()) { - return singleMessageMetadataBuilder.getOrderingKey().toByteArray(); - } else if (singleMessageMetadataBuilder.hasPartitionKey()) { - return singleMessageMetadataBuilder.getPartitionKey().getBytes(); - } - } finally { - singleMessagePayload.release(); - singleMessageMetadataBuilder.recycle(); - } - } else { - // Message was not part of a batch - if (metadata.hasOrderingKey()) { - return metadata.getOrderingKey().toByteArray(); - } else if (metadata.hasPartitionKey()) { - return metadata.getPartitionKey().getBytes(); - } - } - - return NONE_KEY.getBytes(); - } catch (IOException e) { - // If we fail to deserialize medata, return null key - return NONE_KEY.getBytes(); - } finally { - metadataAndPayload.readerIndex(readerIndex); - metadata.recycle(); + metadataAndPayload.resetReaderIndex(); + byte[] key = NONE_KEY.getBytes(); + if (metadata.hasOrderingKey()) { + return metadata.getOrderingKey().toByteArray(); + } else if (metadata.hasPartitionKey()) { + return metadata.getPartitionKey().getBytes(); } + metadata.recycle(); + return key; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 511f8c2..2a7a20b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -710,6 +710,63 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { consumer4.close(); } + @Test + public void testWithMessageCompression() throws Exception { + final String topic = "testWithMessageCompression" + UUID.randomUUID().toString(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .compressionType(CompressionType.LZ4) + .create(); + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("test") + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe(); + final int messages = 10; + for (int i = 0; i < messages; i++) { + producer.send(("Hello Pulsar > " + i).getBytes()); + } + List<Message<byte[]>> receives = new ArrayList<>(); + for (int i = 0; i < messages; i++) { + Message<byte[]> received = consumer.receive(); + receives.add(received); + consumer.acknowledge(received); + } + Assert.assertEquals(receives.size(), messages); + producer.close(); + consumer.close(); + } + + @Test + public void testAttachKeyToMessageMetadata() + throws PulsarClientException { + this.conf.setSubscriptionKeySharedEnable(true); + String topic = "persistent://public/default/key_shared-" + UUID.randomUUID(); + + @Cleanup + Consumer<Integer> consumer1 = createConsumer(topic); + + @Cleanup + Consumer<Integer> consumer2 = createConsumer(topic); + + @Cleanup + Consumer<Integer> consumer3 = createConsumer(topic); + + @Cleanup + Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .create(); + + for (int i = 0; i < 1000; i++) { + producer.newMessage() + .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .value(i) + .send(); + } + + receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3)); + } + private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException { return pulsarClient.newConsumer(Schema.STRING) .topic(topic) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java index c9328c8..d9c1c6c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java @@ -208,15 +208,6 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer { private void addMsg(MessageImpl<?> msg, SendCallback callback) { if (messages.size() == 0) { sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder()); - if (msg.hasKey()) { - messageMetadata.setPartitionKey(msg.getKey()); - if (msg.hasBase64EncodedKey()) { - messageMetadata.setPartitionKeyB64Encoded(true); - } - } - if (msg.hasOrderingKey()) { - messageMetadata.setOrderingKey(ByteString.copyFrom(msg.getOrderingKey())); - } batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT .buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize())); firstCallback = callback; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index a07834f..ac387a4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -140,10 +140,18 @@ public class MessageImpl<T> implements Message<T> { } else { properties = Collections.emptyMap(); } - if (singleMessageMetadata.hasPartitionKey()) { msgMetadataBuilder.setPartitionKeyB64Encoded(singleMessageMetadata.getPartitionKeyB64Encoded()); msgMetadataBuilder.setPartitionKey(singleMessageMetadata.getPartitionKey()); + } else if (msgMetadataBuilder.hasPartitionKey()) { + msgMetadataBuilder.clearPartitionKey(); + msgMetadataBuilder.clearPartitionKeyB64Encoded(); + } + + if (singleMessageMetadata.hasOrderingKey()) { + msgMetadataBuilder.setOrderingKey(singleMessageMetadata.getOrderingKey()); + } else if (msgMetadataBuilder.hasOrderingKey()) { + msgMetadataBuilder.clearOrderingKey(); } if (singleMessageMetadata.hasEventTime()) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 7ae21e7..d867139 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1622,6 +1622,14 @@ public class Commands { messageMetadata.setPublishTime(builder.getPublishTime()); messageMetadata.setProducerName(builder.getProducerName()); messageMetadata.setSequenceId(builder.getSequenceId()); + // Attach the key to the message metadata. + if (builder.hasPartitionKey()) { + messageMetadata.setPartitionKey(builder.getPartitionKey()); + messageMetadata.setPartitionKeyB64Encoded(builder.getPartitionKeyB64Encoded()); + } + if (builder.hasOrderingKey()) { + messageMetadata.setOrderingKey(builder.getOrderingKey()); + } if (builder.hasReplicatedFrom()) { messageMetadata.setReplicatedFrom(builder.getReplicatedFrom()); }