This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1fa9e3532b4 [fix][broker] Fix non-batched null-value messages not
removed during topic compaction (#25817)
1fa9e3532b4 is described below
commit 1fa9e3532b4c8978b25f16b43855948b54e95d17
Author: grishaf <[email protected]>
AuthorDate: Fri May 29 10:13:15 2026 +0300
[fix][broker] Fix non-batched null-value messages not removed during topic
compaction (#25817)
---
.../compaction/AbstractTwoPhaseCompactor.java | 20 ++++--
.../pulsar/compaction/EventTimeOrderCompactor.java | 14 ++--
.../apache/pulsar/compaction/CompactionTest.java | 74 ++++++++++++++++++++++
3 files changed, 93 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
index 390bd9a0ce0..af989d181fe 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
@@ -462,14 +462,24 @@ public abstract class AbstractTwoPhaseCompactor<T>
extends Compactor {
return bkf;
}
+ /**
+ * Extract the partition key and the payload size for a non-batch message.
+ *
+ * @return a pair of (partitionKey, payloadSize), or null if the message has
no partition key.
+ */
protected Pair<String, Integer> extractKeyAndSize(RawMessage m,
MessageMetadata msgMetadata) {
- ByteBuf headersAndPayload = m.getHeadersAndPayload();
if (msgMetadata.hasPartitionKey()) {
- int size = headersAndPayload.readableBytes();
- if (msgMetadata.hasUncompressedSize()) {
- size = msgMetadata.getUncompressedSize();
+ int payloadSize;
+ if (msgMetadata.hasNullValue() && msgMetadata.isNullValue()) {
+ payloadSize = 0;
+ } else if (msgMetadata.hasUncompressedSize()) {
+ payloadSize = msgMetadata.getUncompressedSize();
+ } else {
+ ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate();
+ Commands.skipMessageMetadata(headersAndPayload);
+ payloadSize = headersAndPayload.readableBytes();
}
- return Pair.of(msgMetadata.getPartitionKey(), size);
+ return Pair.of(msgMetadata.getPartitionKey(), payloadSize);
} else {
return null;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
index f3255f02b7c..19269b2d52d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.compaction;
-import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -138,17 +137,12 @@ public class EventTimeOrderCompactor extends
AbstractTwoPhaseCompactor<Pair<Mess
}
protected MessageCompactionData extractMessageCompactionData(RawMessage m,
MessageMetadata metadata) {
- ByteBuf headersAndPayload = m.getHeadersAndPayload();
- if (metadata.hasPartitionKey()) {
- int size = headersAndPayload.readableBytes();
- if (metadata.hasUncompressedSize()) {
- size = metadata.getUncompressedSize();
- }
- return new MessageCompactionData(m.getMessageId(),
metadata.getPartitionKey(),
- size, metadata.getEventTime());
- } else {
+ Pair<String, Integer> keyAndSize = extractKeyAndSize(m, metadata);
+ if (keyAndSize == null) {
return null;
}
+ return new MessageCompactionData(m.getMessageId(), keyAndSize.getLeft(),
+ keyAndSize.getRight(), metadata.getEventTime());
}
private List<MessageCompactionData>
extractMessageCompactionDataFromBatch(RawMessage msg, MessageMetadata metadata)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 99f3ce40409..6b625d1cd65 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -640,6 +640,80 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
assertEquals(messages.get(2).getKey(), "key5");
}
+ /**
+ * Write raw non-batch entries directly to the managed ledger without
+ * uncompressedSize, as seen with some non-Java clients. Verifies that
+ * null-value tombstones remove keys during compaction.
+ */
+ @Test
+ public void testNonBatchedMessageWithNullValue() throws Exception {
+ String topic =
"persistent://my-tenant/my-ns/non-batched-message-with-null-value";
+
+ admin.topics().createNonPartitionedTopic(topic);
+ pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+ .receiverQueueSize(1).readCompacted(true).subscribe().close();
+
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topic,
false).join().get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+
+ long seqId = 0;
+
+ // key1: value then null-value tombstone
+ ml.addEntry(buildNonBatchEntry("key1", "my-message-1".getBytes(),
seqId++));
+ ml.addEntry(buildNonBatchEntry("key1", null, seqId++));
+
+ // key2: value only (should survive)
+ ml.addEntry(buildNonBatchEntry("key2", "my-message-3".getBytes(),
seqId++));
+
+ // key3: value then null-value tombstone
+ ml.addEntry(buildNonBatchEntry("key3", "my-message-4".getBytes(),
seqId++));
+ ml.addEntry(buildNonBatchEntry("key3", null, seqId++));
+
+ // key4: value only (should survive)
+ ml.addEntry(buildNonBatchEntry("key4", "my-message-6".getBytes(),
seqId++));
+
+ compact(topic);
+
+ List<Message<byte[]>> messages = new ArrayList<>();
+ try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic)
+
.subscriptionName("sub1").receiverQueueSize(1).readCompacted(true).subscribe())
{
+ while (true) {
+ Message<byte[]> message = consumer.receive(5,
TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ messages.add(message);
+ }
+ }
+
+ assertEquals(messages.size(), 2);
+ assertEquals(messages.get(0).getKey(), "key2");
+ assertEquals(messages.get(1).getKey(), "key4");
+ }
+
+ private byte[] buildNonBatchEntry(String key, byte[] payload, long
sequenceId) {
+ org.apache.pulsar.common.api.proto.MessageMetadata metadata =
+ new org.apache.pulsar.common.api.proto.MessageMetadata();
+ metadata.setPartitionKey(key);
+ metadata.setPublishTime(System.currentTimeMillis());
+ metadata.setProducerName("test-non-batch");
+ metadata.setSequenceId(sequenceId);
+ if (payload == null) {
+ metadata.setNullValue(true);
+ }
+ ByteBuf payloadBuf = io.netty.buffer.Unpooled.wrappedBuffer(
+ payload != null ? payload : new byte[0]);
+ ByteBuf entry =
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(
+ org.apache.pulsar.common.protocol.Commands.ChecksumType.Crc32c,
+ metadata, payloadBuf);
+ byte[] bytes = new byte[entry.readableBytes()];
+ entry.readBytes(bytes);
+ entry.release();
+ payloadBuf.release();
+ return bytes;
+ }
+
@Test
public void testWholeBatchCompactedOut() throws Exception {
String topic =
"persistent://my-tenant/my-ns/whole-batch-compacted-out";