This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 951d4d3  Compaction considers messages with empty payload as deleting 
the key (#1525)
951d4d3 is described below

commit 951d4d3805ad0b7365aa83bfb5001c6513ddff68
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Mon Apr 9 21:39:25 2018 +0200

    Compaction considers messages with empty payload as deleting the key (#1525)
    
    If the latest message with a key has an empty payload, compaction
    will take this to mean that the key has been deleted, so it will not
    be stored in the compacted topic ledger.
    
    This patch also introduces empty messages, which were not previously
    possible.
---
 .../pulsar/client/impl/RawBatchConverter.java      |  3 +-
 .../pulsar/compaction/TwoPhaseCompactor.java       | 18 +++---
 .../apache/pulsar/compaction/CompactionTest.java   | 68 ++++++++++++++++++++++
 .../pulsar/client/impl/MessageBuilderImpl.java     |  4 +-
 4 files changed, 82 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 4e628bc..9ee31ac 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -113,7 +113,8 @@ public class RawBatchConverter {
                     messagesRetained++;
                     
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
                                                                       
singleMessagePayload, batchBuffer);
-                } else if 
(filter.test(singleMessageMetadataBuilder.getPartitionKey(), id)) {
+                } else if 
(filter.test(singleMessageMetadataBuilder.getPartitionKey(), id)
+                           && singleMessagePayload.readableBytes() > 0) {
                     messagesRetained++;
                     
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
                                                                       
singleMessagePayload, batchBuffer);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index fbad47e..2eaa8d0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.api.Commands;
@@ -122,9 +123,9 @@ public class TwoPhaseCompactor extends Compactor {
                                          id, ioe);
                             }
                         } else {
-                            String key = extractKey(m);
-                            if (key != null) {
-                                latestForKey.put(key, id);
+                            Pair<String,Integer> keyAndSize = 
extractKeyAndSize(m);
+                            if (keyAndSize != null) {
+                                latestForKey.put(keyAndSize.getLeft(), id);
                             }
                         }
 
@@ -214,10 +215,11 @@ public class TwoPhaseCompactor extends Compactor {
                             messageToAdd = Optional.of(m);
                         }
                     } else {
-                        String key = extractKey(m);
-                        if (key == null) { // pass through messages without a 
key
+                        Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
+                        if (keyAndSize == null) { // pass through messages 
without a key
                             messageToAdd = Optional.of(m);
-                        } else if (latestForKey.get(key).equals(id)) {
+                        } else if 
(latestForKey.get(keyAndSize.getLeft()).equals(id)
+                                   && keyAndSize.getRight() > 0) {
                             messageToAdd = Optional.of(m);
                         } else {
                             m.close();
@@ -307,11 +309,11 @@ public class TwoPhaseCompactor extends Compactor {
         return bkf;
     }
 
-    private static String extractKey(RawMessage m) {
+    private static Pair<String,Integer> extractKeyAndSize(RawMessage m) {
         ByteBuf headersAndPayload = m.getHeadersAndPayload();
         MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
         if (msgMetadata.hasPartitionKey()) {
-            return msgMetadata.getPartitionKey();
+            return Pair.of(msgMetadata.getPartitionKey(), 
headersAndPayload.readableBytes());
         } else {
             return null;
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index a0f0f97..22e74f2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -512,4 +512,72 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+
+    @Test
+    public void testEmptyPayloadDeletes() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        // subscribe before sending anything, so that we get all messages
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+            .readCompacted(true).subscribe().close();
+
+        try (Producer producerNormal = 
pulsarClient.newProducer().topic(topic).create();
+             Producer producerBatch = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
+                .enableBatching(true).batchingMaxMessages(3)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+
+            // key0 persists through it all
+            producerNormal.sendAsync(MessageBuilder.create()
+                                     .setKey("key0")
+                                     
.setContent("my-message-0".getBytes()).build()).get();
+
+            // key1 is added but then deleted
+            producerNormal.sendAsync(MessageBuilder.create()
+                                     .setKey("key1")
+                                     
.setContent("my-message-1".getBytes()).build()).get();
+
+            producerNormal.sendAsync(MessageBuilder.create()
+                                     .setKey("key1").build()).get();
+
+            // key2 is added but deleted in same batch
+            producerBatch.sendAsync(MessageBuilder.create()
+                                    .setKey("key2")
+                                    
.setContent("my-message-2".getBytes()).build());
+            producerBatch.sendAsync(MessageBuilder.create()
+                                    .setKey("key3")
+                                    
.setContent("my-message-3".getBytes()).build());
+            producerBatch.sendAsync(MessageBuilder.create()
+                                    .setKey("key2").build()).get();
+
+            // key3 is added in previous batch, deleted in this batch
+            producerBatch.sendAsync(MessageBuilder.create()
+                                    .setKey("key3").build());
+            producerBatch.sendAsync(MessageBuilder.create()
+                                    .setKey("key4")
+                                    
.setContent("my-message-3".getBytes()).build());
+            producerBatch.sendAsync(MessageBuilder.create()
+                                    .setKey("key4").build()).get();
+
+            // key4 is added, deleted, then resurrected
+            producerNormal.sendAsync(MessageBuilder.create()
+                                     .setKey("key4")
+                                     
.setContent("my-message-4".getBytes()).build()).get();
+        }
+
+        // compact the topic
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName("sub1").readCompacted(true).subscribe()){
+            Message message1 = consumer.receive();
+            Assert.assertEquals(message1.getKey(), "key0");
+            Assert.assertEquals(new String(message1.getData()), 
"my-message-0");
+
+            Message message2 = consumer.receive();
+            Assert.assertEquals(message2.getKey(), "key4");
+            Assert.assertEquals(new String(message2.getData()), 
"my-message-4");
+        }
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
index 7714b99..056064a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
@@ -33,10 +33,10 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import com.google.common.base.Preconditions;
 
 public class MessageBuilderImpl<T> implements MessageBuilder<T> {
-
+    private static final ByteBuffer EMPTY_CONTENT = ByteBuffer.allocate(0);
     private final MessageMetadata.Builder msgMetadataBuilder = 
MessageMetadata.newBuilder();
     private final Schema<T> schema;
-    private ByteBuffer content;
+    private ByteBuffer content = EMPTY_CONTENT;
 
     public MessageBuilderImpl(Schema<T> schema) {
         this.schema = schema;

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to