This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fa9e3532b4 [fix][broker] Fix non-batched null-value messages not 
removed during topic compaction (#25817)
1fa9e3532b4 is described below

commit 1fa9e3532b4c8978b25f16b43855948b54e95d17
Author: grishaf <[email protected]>
AuthorDate: Fri May 29 10:13:15 2026 +0300

    [fix][broker] Fix non-batched null-value messages not removed during topic 
compaction (#25817)
---
 .../compaction/AbstractTwoPhaseCompactor.java      | 20 ++++--
 .../pulsar/compaction/EventTimeOrderCompactor.java | 14 ++--
 .../apache/pulsar/compaction/CompactionTest.java   | 74 ++++++++++++++++++++++
 3 files changed, 93 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
index 390bd9a0ce0..af989d181fe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
@@ -462,14 +462,24 @@ public abstract class AbstractTwoPhaseCompactor<T> 
extends Compactor {
     return bkf;
   }
 
+  /**
+   * Extract the partition key and the payload size for a non-batch message.
+   *
+   * @return a pair of (partitionKey, payloadSize), or null if the message has 
no partition key.
+   */
   protected Pair<String, Integer> extractKeyAndSize(RawMessage m, 
MessageMetadata msgMetadata) {
-    ByteBuf headersAndPayload = m.getHeadersAndPayload();
     if (msgMetadata.hasPartitionKey()) {
-      int size = headersAndPayload.readableBytes();
-      if (msgMetadata.hasUncompressedSize()) {
-        size = msgMetadata.getUncompressedSize();
+      int payloadSize;
+      if (msgMetadata.hasNullValue() && msgMetadata.isNullValue()) {
+        payloadSize = 0;
+      } else if (msgMetadata.hasUncompressedSize()) {
+        payloadSize = msgMetadata.getUncompressedSize();
+      } else {
+        ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate();
+        Commands.skipMessageMetadata(headersAndPayload);
+        payloadSize = headersAndPayload.readableBytes();
       }
-      return Pair.of(msgMetadata.getPartitionKey(), size);
+      return Pair.of(msgMetadata.getPartitionKey(), payloadSize);
     } else {
       return null;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
index f3255f02b7c..19269b2d52d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.compaction;
 
-import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -138,17 +137,12 @@ public class EventTimeOrderCompactor extends 
AbstractTwoPhaseCompactor<Pair<Mess
   }
 
   protected MessageCompactionData extractMessageCompactionData(RawMessage m, 
MessageMetadata metadata) {
-    ByteBuf headersAndPayload = m.getHeadersAndPayload();
-    if (metadata.hasPartitionKey()) {
-      int size = headersAndPayload.readableBytes();
-      if (metadata.hasUncompressedSize()) {
-        size = metadata.getUncompressedSize();
-      }
-      return new MessageCompactionData(m.getMessageId(), 
metadata.getPartitionKey(),
-          size, metadata.getEventTime());
-    } else {
+    Pair<String, Integer> keyAndSize = extractKeyAndSize(m, metadata);
+    if (keyAndSize == null) {
       return null;
     }
+    return new MessageCompactionData(m.getMessageId(), keyAndSize.getLeft(),
+        keyAndSize.getRight(), metadata.getEventTime());
   }
 
   private List<MessageCompactionData> 
extractMessageCompactionDataFromBatch(RawMessage msg, MessageMetadata metadata)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 99f3ce40409..6b625d1cd65 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -640,6 +640,80 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(messages.get(2).getKey(), "key5");
     }
 
+    /**
+     * Write raw non-batch entries directly to the managed ledger without
+     * uncompressedSize, as seen with some non-Java clients. Verifies that
+     * null-value tombstones remove keys during compaction.
+     */
+    @Test
+    public void testNonBatchedMessageWithNullValue() throws Exception {
+        String topic = 
"persistent://my-tenant/my-ns/non-batched-message-with-null-value";
+
+        admin.topics().createNonPartitionedTopic(topic);
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .receiverQueueSize(1).readCompacted(true).subscribe().close();
+
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+
+        long seqId = 0;
+
+        // key1: value then null-value tombstone
+        ml.addEntry(buildNonBatchEntry("key1", "my-message-1".getBytes(), 
seqId++));
+        ml.addEntry(buildNonBatchEntry("key1", null, seqId++));
+
+        // key2: value only (should survive)
+        ml.addEntry(buildNonBatchEntry("key2", "my-message-3".getBytes(), 
seqId++));
+
+        // key3: value then null-value tombstone
+        ml.addEntry(buildNonBatchEntry("key3", "my-message-4".getBytes(), 
seqId++));
+        ml.addEntry(buildNonBatchEntry("key3", null, seqId++));
+
+        // key4: value only (should survive)
+        ml.addEntry(buildNonBatchEntry("key4", "my-message-6".getBytes(), 
seqId++));
+
+        compact(topic);
+
+        List<Message<byte[]>> messages = new ArrayList<>();
+        try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic)
+             
.subscriptionName("sub1").receiverQueueSize(1).readCompacted(true).subscribe()) 
{
+            while (true) {
+                Message<byte[]> message = consumer.receive(5, 
TimeUnit.SECONDS);
+                if (message == null) {
+                    break;
+                }
+                messages.add(message);
+            }
+        }
+
+        assertEquals(messages.size(), 2);
+        assertEquals(messages.get(0).getKey(), "key2");
+        assertEquals(messages.get(1).getKey(), "key4");
+    }
+
+    private byte[] buildNonBatchEntry(String key, byte[] payload, long 
sequenceId) {
+        org.apache.pulsar.common.api.proto.MessageMetadata metadata =
+                new org.apache.pulsar.common.api.proto.MessageMetadata();
+        metadata.setPartitionKey(key);
+        metadata.setPublishTime(System.currentTimeMillis());
+        metadata.setProducerName("test-non-batch");
+        metadata.setSequenceId(sequenceId);
+        if (payload == null) {
+            metadata.setNullValue(true);
+        }
+        ByteBuf payloadBuf = io.netty.buffer.Unpooled.wrappedBuffer(
+                payload != null ? payload : new byte[0]);
+        ByteBuf entry = 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(
+                org.apache.pulsar.common.protocol.Commands.ChecksumType.Crc32c,
+                metadata, payloadBuf);
+        byte[] bytes = new byte[entry.readableBytes()];
+        entry.readBytes(bytes);
+        entry.release();
+        payloadBuf.release();
+        return bytes;
+    }
+
     @Test
     public void testWholeBatchCompactedOut() throws Exception {
         String topic = 
"persistent://my-tenant/my-ns/whole-batch-compacted-out";

Reply via email to