This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 40eebc0a9eb [fix] [broker] Fix write all compacted out entry into 
compacted topic (#21917)
40eebc0a9eb is described below

commit 40eebc0a9eb9352ba31bca0d20d471f16a5515a9
Author: thetumbled <[email protected]>
AuthorDate: Sun Jan 21 19:47:40 2024 +0800

    [fix] [broker] Fix write all compacted out entry into compacted topic 
(#21917)
---
 .../pulsar/client/impl/RawBatchConverter.java      |  6 ++-
 .../apache/pulsar/compaction/CompactorTest.java    | 56 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index dfa65d19953..4c24f6d3036 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -134,7 +134,11 @@ public class RawBatchConverter {
                                                       
msg.getMessageIdData().getEntryId(),
                                                       
msg.getMessageIdData().getPartition(),
                                                       i);
-                if (!singleMessageMetadata.hasPartitionKey()) {
+                if (singleMessageMetadata.isCompactedOut()) {
+                    // we may read compacted out message from the compacted 
topic
+                    
Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata,
+                            Unpooled.EMPTY_BUFFER, batchBuffer);
+                } else if (!singleMessageMetadata.hasPartitionKey()) {
                     if (retainNullKey) {
                         messagesRetained++;
                         
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 4e442ac0513..71700ef83a4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -36,6 +36,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -46,10 +48,15 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.RawMessageImpl;
@@ -177,6 +184,55 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
         compactAndVerify(topic, expected, true);
     }
 
+    @Test
+    public void testAllCompactedOut() throws Exception {
+        String topicName = 
"persistent://my-property/use/my-ns/testAllCompactedOut";
+        // set retain null key to true
+        boolean oldRetainNullKey = 
pulsar.getConfig().isTopicCompactionRetainNullKey();
+        pulsar.getConfig().setTopicCompactionRetainNullKey(true);
+        this.restartBroker();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                
.enableBatching(true).topic(topicName).batchingMaxMessages(3).create();
+
+        producer.newMessage().key("K1").value("V1").sendAsync();
+        producer.newMessage().key("K2").value("V2").sendAsync();
+        producer.newMessage().key("K2").value(null).sendAsync();
+        producer.flush();
+
+        admin.topics().triggerCompaction(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            
Assert.assertEquals(admin.topics().compactionStatus(topicName).status,
+                    LongRunningProcessStatus.Status.SUCCESS);
+        });
+
+        producer.newMessage().key("K1").value(null).sendAsync();
+        producer.flush();
+
+        admin.topics().triggerCompaction(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            
Assert.assertEquals(admin.topics().compactionStatus(topicName).status,
+                    LongRunningProcessStatus.Status.SUCCESS);
+        });
+
+        @Cleanup
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .subscriptionName("reader-test")
+                .topic(topicName)
+                .readCompacted(true)
+                .startMessageId(MessageId.earliest)
+                .create();
+        while (reader.hasMessageAvailable()) {
+            Message<String> message = reader.readNext(3, TimeUnit.SECONDS);
+            Assert.assertNotNull(message);
+        }
+        // set retain null key back to avoid affecting other tests
+        pulsar.getConfig().setTopicCompactionRetainNullKey(oldRetainNullKey);
+    }
+
     @Test
     public void testCompactAddCompact() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";

Reply via email to