This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d49415e Return message ID from compacted ledger while the compaction cursor reach the end of the topic (#13533) d49415e is described below commit d49415e5a558ea9a82c93e55e77b2c3542eacb10 Author: lipenghui <peng...@apache.org> AuthorDate: Wed Dec 29 13:26:57 2021 +0800 Return message ID from compacted ledger while the compaction cursor reach the end of the topic (#13533) ### Motivation The problem happens when the compaction cursor reaches the end of the topic but the tail messages of the topic have been removed by producer writes null value messages during the topic compaction. For example: - 5 messages in the original topic with key: 0,1,2,3,4 - the corresponding message IDs are: 1:0, 1:1, 1:2, 1:3, 1:4 - producer send null value messages for key 3 and 4 - trigger the topic compaction task After the compaction task complete, - 5 messages in the original topic: 1:0, 1:1, 1:2, 1:3, 1:4 - 3 messages in the compacted ledger: 1:0, 1:1, 1:2 At this moment, if the reader tries to get the last message ID of the topic, we should return `1:2` not `1:4`, because the reader is not able to read the message with keys `3` and `4` from the compacted topic, otherwise, the `reader.readNext()` method will be blocked until a new message written to the topic. ### Modifications The fix is straightforward, when the broker receives a get last message ID request, the broker will check if the compaction cursor reaches the end of the original topic. If yes, respond last message ID from the compacted ledger. ### Verifying this change New test added `testHasMessageAvailableWithNullValueMessage` which ensure the `hasMessageAvailable()` return false no more messages from the compacted topic if the compaction cursor reaches the end of the topic. --- .../apache/pulsar/broker/service/ServerCnx.java | 5 ++- .../apache/pulsar/compaction/CompactedTopic.java | 2 + .../pulsar/compaction/CompactedTopicImpl.java | 6 +-- .../pulsar/compaction/CompactedTopicTest.java | 48 +++++++++++++++++++++- 4 files changed, 55 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index deecb36..a7a7c2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1729,7 +1729,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); // If it's not pointing to a valid entry, respond messageId of the current position. - if (lastPosition.getEntryId() == -1) { + // If the compaction cursor reach the end of the topic, respond messageId from compacted ledger + Optional<Position> compactionHorizon = persistentTopic.getCompactedTopic().getCompactionHorizon(); + if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent() + && lastPosition.compareTo((PositionImpl) compactionHorizon.get()) <= 0)) { handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex, markDeletePosition); return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java index 31955a5..9e50fc0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.compaction; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; @@ -34,4 +35,5 @@ public interface CompactedTopic { ReadEntriesCallback callback, Consumer consumer); CompletableFuture<Entry> readLastEntryOfCompactedLedger(); + Optional<Position> getCompactionHorizon(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index a6d6fc9..aac213f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -20,7 +20,6 @@ package org.apache.pulsar.compaction; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ComparisonChain; import io.netty.buffer.ByteBuf; import java.util.ArrayList; @@ -311,9 +310,8 @@ public class CompactedTopicImpl implements CompactedTopic { .compare(p.getEntryId(), m.getEntryId()).result(); } - @VisibleForTesting - PositionImpl getCompactionHorizon() { - return this.compactionHorizon; + public synchronized Optional<Position> getCompactionHorizon() { + return Optional.ofNullable(this.compactionHorizon); } private static final Logger log = LoggerFactory.getLogger(CompactedTopicImpl.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java index d44868a..4d00d28 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java @@ -254,7 +254,8 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest { Assert.assertTrue(compactedTopic.getCompactedTopicContext().isPresent()); Assert.assertEquals(compactedTopic.getCompactedTopicContext().get().getLedger().getId(), newCompactedLedger.getId()); - Assert.assertEquals(compactedTopic.getCompactionHorizon(), newHorizon); + Assert.assertTrue(compactedTopic.getCompactionHorizon().isPresent()); + Assert.assertEquals(compactedTopic.getCompactionHorizon().get(), newHorizon); compactedTopic.deleteCompactedLedger(oldCompactedLedger.getId()).join(); // old ledger should be deleted, new still there @@ -688,4 +689,49 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest { Assert.assertFalse(reader.hasMessageAvailable()); } + @Test + public void testHasMessageAvailableWithNullValueMessage() throws Exception { + String topic = "persistent://my-property/use/my-ns/testHasMessageAvailable-" + + UUID.randomUUID(); + final int numMessages = 10; + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .blockIfQueueFull(true) + .enableBatching(false) + .create(); + CompletableFuture<MessageId> lastMessage = null; + for (int i = 0; i < numMessages; ++i) { + lastMessage = producer.newMessage().key(i + "").value(String.format("msg [%d]", i)).sendAsync(); + } + + for (int i = numMessages / 2; i < numMessages; ++i) { + lastMessage = producer.newMessage().key(i + "").value(null).sendAsync(); + } + producer.flush(); + lastMessage.join(); + admin.topics().triggerCompaction(topic); + Awaitility.await().untilAsserted(() -> { + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); + Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1); + Assert.assertEquals(stats.compactedLedger.entries, numMessages / 2); + Assert.assertEquals(admin.topics().getStats(topic) + .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); + Assert.assertEquals(stats.lastConfirmedEntry, stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition); + }); + + @Cleanup + Reader<byte[]> reader = pulsarClient.newReader() + .topic(topic) + .startMessageIdInclusive() + .startMessageId(MessageId.earliest) + .readCompacted(true) + .create(); + for (int i = numMessages / 2; i < numMessages; ++i) { + reader.readNext(); + } + Assert.assertFalse(reader.hasMessageAvailable()); + Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS)); + } + }