This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 951d4d3 Compaction considers messages with empty payload as deleting the key (#1525) 951d4d3 is described below commit 951d4d3805ad0b7365aa83bfb5001c6513ddff68 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Mon Apr 9 21:39:25 2018 +0200 Compaction considers messages with empty payload as deleting the key (#1525) If the latest message with a key has an empty payload, compaction will take this to mean that the key has been deleted, so it will not be stored in the compacted topic ledger. This patch also introduces empty messages, which were not previously possible. --- .../pulsar/client/impl/RawBatchConverter.java | 3 +- .../pulsar/compaction/TwoPhaseCompactor.java | 18 +++--- .../apache/pulsar/compaction/CompactionTest.java | 68 ++++++++++++++++++++++ .../pulsar/client/impl/MessageBuilderImpl.java | 4 +- 4 files changed, 82 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java index 4e628bc..9ee31ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java @@ -113,7 +113,8 @@ public class RawBatchConverter { messagesRetained++; Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, singleMessagePayload, batchBuffer); - } else if (filter.test(singleMessageMetadataBuilder.getPartitionKey(), id)) { + } else if (filter.test(singleMessageMetadataBuilder.getPartitionKey(), id) + && singleMessagePayload.readableBytes() > 0) { messagesRetained++; Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, singleMessagePayload, batchBuffer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index fbad47e..2eaa8d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.api.Commands; @@ -122,9 +123,9 @@ public class TwoPhaseCompactor extends Compactor { id, ioe); } } else { - String key = extractKey(m); - if (key != null) { - latestForKey.put(key, id); + Pair<String,Integer> keyAndSize = extractKeyAndSize(m); + if (keyAndSize != null) { + latestForKey.put(keyAndSize.getLeft(), id); } } @@ -214,10 +215,11 @@ public class TwoPhaseCompactor extends Compactor { messageToAdd = Optional.of(m); } } else { - String key = extractKey(m); - if (key == null) { // pass through messages without a key + Pair<String,Integer> keyAndSize = extractKeyAndSize(m); + if (keyAndSize == null) { // pass through messages without a key messageToAdd = Optional.of(m); - } else if (latestForKey.get(key).equals(id)) { + } else if (latestForKey.get(keyAndSize.getLeft()).equals(id) + && keyAndSize.getRight() > 0) { messageToAdd = Optional.of(m); } else { m.close(); @@ -307,11 +309,11 @@ public class TwoPhaseCompactor extends Compactor { return bkf; } - private static String extractKey(RawMessage m) { + private static Pair<String,Integer> extractKeyAndSize(RawMessage m) { ByteBuf headersAndPayload = m.getHeadersAndPayload(); MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); if (msgMetadata.hasPartitionKey()) { - return msgMetadata.getPartitionKey(); + return Pair.of(msgMetadata.getPartitionKey(), headersAndPayload.readableBytes()); } else { return null; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index a0f0f97..22e74f2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -512,4 +512,72 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { } } + + @Test + public void testEmptyPayloadDeletes() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + // subscribe before sending anything, so that we get all messages + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe().close(); + + try (Producer producerNormal = pulsarClient.newProducer().topic(topic).create(); + Producer producerBatch = pulsarClient.newProducer().topic(topic).maxPendingMessages(3) + .enableBatching(true).batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { + + // key0 persists through it all + producerNormal.sendAsync(MessageBuilder.create() + .setKey("key0") + .setContent("my-message-0".getBytes()).build()).get(); + + // key1 is added but then deleted + producerNormal.sendAsync(MessageBuilder.create() + .setKey("key1") + .setContent("my-message-1".getBytes()).build()).get(); + + producerNormal.sendAsync(MessageBuilder.create() + .setKey("key1").build()).get(); + + // key2 is added but deleted in same batch + producerBatch.sendAsync(MessageBuilder.create() + .setKey("key2") + .setContent("my-message-2".getBytes()).build()); + producerBatch.sendAsync(MessageBuilder.create() + .setKey("key3") + .setContent("my-message-3".getBytes()).build()); + producerBatch.sendAsync(MessageBuilder.create() + .setKey("key2").build()).get(); + + // key3 is added in previous batch, deleted in this batch + producerBatch.sendAsync(MessageBuilder.create() + .setKey("key3").build()); + producerBatch.sendAsync(MessageBuilder.create() + .setKey("key4") + .setContent("my-message-3".getBytes()).build()); + producerBatch.sendAsync(MessageBuilder.create() + .setKey("key4").build()).get(); + + // key4 is added, deleted, then resurrected + producerNormal.sendAsync(MessageBuilder.create() + .setKey("key4") + .setContent("my-message-4".getBytes()).build()).get(); + } + + // compact the topic + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).get(); + + try (Consumer consumer = pulsarClient.newConsumer().topic(topic) + .subscriptionName("sub1").readCompacted(true).subscribe()){ + Message message1 = consumer.receive(); + Assert.assertEquals(message1.getKey(), "key0"); + Assert.assertEquals(new String(message1.getData()), "my-message-0"); + + Message message2 = consumer.receive(); + Assert.assertEquals(message2.getKey(), "key4"); + Assert.assertEquals(new String(message2.getData()), "my-message-4"); + } + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java index 7714b99..056064a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java @@ -33,10 +33,10 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import com.google.common.base.Preconditions; public class MessageBuilderImpl<T> implements MessageBuilder<T> { - + private static final ByteBuffer EMPTY_CONTENT = ByteBuffer.allocate(0); private final MessageMetadata.Builder msgMetadataBuilder = MessageMetadata.newBuilder(); private final Schema<T> schema; - private ByteBuffer content; + private ByteBuffer content = EMPTY_CONTENT; public MessageBuilderImpl(Schema<T> schema) { this.schema = schema; -- To stop receiving notification emails like this one, please contact si...@apache.org.