This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d49415e  Return message ID from compacted ledger while the compaction 
cursor reach the end of the topic (#13533)
d49415e is described below

commit d49415e5a558ea9a82c93e55e77b2c3542eacb10
Author: lipenghui <peng...@apache.org>
AuthorDate: Wed Dec 29 13:26:57 2021 +0800

    Return message ID from compacted ledger while the compaction cursor reach 
the end of the topic (#13533)
    
    ### Motivation
    
    The problem happens when the compaction cursor reaches the end of the topic 
but the tail messages
    of the topic have been removed by producer writes null value messages 
during the topic compaction.
    
    For example:
    
    - 5 messages in the original topic with key: 0,1,2,3,4
    - the corresponding message IDs are: 1:0, 1:1, 1:2, 1:3, 1:4
    - producer send null value messages for key 3 and 4
    - trigger the topic compaction task
    
    After the compaction task complete,
    
    - 5 messages in the original topic: 1:0, 1:1, 1:2, 1:3, 1:4
    - 3 messages in the compacted ledger: 1:0, 1:1, 1:2
    
    At this moment, if the reader tries to get the last message ID of the topic,
    we should return `1:2` not `1:4`, because the reader is not able to read 
the message
    with keys `3` and `4` from the compacted topic, otherwise, the 
`reader.readNext()` method
    will be blocked until a new message written to the topic.
    
    ### Modifications
    
    The fix is straightforward, when the broker receives a get last message ID 
request,
    the broker will check if the compaction cursor reaches the end of the 
original topic.
    If yes, respond last message ID from the compacted ledger.
    
    ### Verifying this change
    
    New test added `testHasMessageAvailableWithNullValueMessage` which ensure 
the `hasMessageAvailable()`
    return false no more messages from the compacted topic if the compaction 
cursor reaches the end of the topic.
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  5 ++-
 .../apache/pulsar/compaction/CompactedTopic.java   |  2 +
 .../pulsar/compaction/CompactedTopicImpl.java      |  6 +--
 .../pulsar/compaction/CompactedTopicTest.java      | 48 +++++++++++++++++++++-
 4 files changed, 55 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index deecb36..a7a7c2d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1729,7 +1729,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
 
         // If it's not pointing to a valid entry, respond messageId of the 
current position.
-        if (lastPosition.getEntryId() == -1) {
+        // If the compaction cursor reach the end of the topic, respond 
messageId from compacted ledger
+        Optional<Position> compactionHorizon = 
persistentTopic.getCompactedTopic().getCompactionHorizon();
+        if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent()
+                        && lastPosition.compareTo((PositionImpl) 
compactionHorizon.get()) <= 0)) {
             handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, 
partitionIndex,
                     markDeletePosition);
             return;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 31955a5..9e50fc0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.compaction;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -34,4 +35,5 @@ public interface CompactedTopic {
                                 ReadEntriesCallback callback,
                                 Consumer consumer);
     CompletableFuture<Entry> readLastEntryOfCompactedLedger();
+    Optional<Position> getCompactionHorizon();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index a6d6fc9..aac213f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.compaction;
 
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ComparisonChain;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
@@ -311,9 +310,8 @@ public class CompactedTopicImpl implements CompactedTopic {
             .compare(p.getEntryId(), m.getEntryId()).result();
     }
 
-    @VisibleForTesting
-    PositionImpl getCompactionHorizon() {
-        return this.compactionHorizon;
+    public synchronized Optional<Position> getCompactionHorizon() {
+        return Optional.ofNullable(this.compactionHorizon);
     }
     private static final Logger log = 
LoggerFactory.getLogger(CompactedTopicImpl.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index d44868a..4d00d28 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -254,7 +254,8 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
         
Assert.assertTrue(compactedTopic.getCompactedTopicContext().isPresent());
         
Assert.assertEquals(compactedTopic.getCompactedTopicContext().get().getLedger().getId(),
                 newCompactedLedger.getId());
-        Assert.assertEquals(compactedTopic.getCompactionHorizon(), newHorizon);
+        Assert.assertTrue(compactedTopic.getCompactionHorizon().isPresent());
+        Assert.assertEquals(compactedTopic.getCompactionHorizon().get(), 
newHorizon);
         
compactedTopic.deleteCompactedLedger(oldCompactedLedger.getId()).join();
 
         // old ledger should be deleted, new still there
@@ -688,4 +689,49 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertFalse(reader.hasMessageAvailable());
     }
 
+    @Test
+    public void testHasMessageAvailableWithNullValueMessage() throws Exception 
{
+        String topic = 
"persistent://my-property/use/my-ns/testHasMessageAvailable-" +
+                UUID.randomUUID();
+        final int numMessages = 10;
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .blockIfQueueFull(true)
+                .enableBatching(false)
+                .create();
+        CompletableFuture<MessageId> lastMessage = null;
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i + 
"").value(String.format("msg [%d]", i)).sendAsync();
+        }
+
+        for (int i = numMessages / 2; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i + 
"").value(null).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, numMessages / 
2);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+            Assert.assertEquals(stats.lastConfirmedEntry, 
stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition);
+        });
+
+        @Cleanup
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topic)
+                .startMessageIdInclusive()
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .create();
+        for (int i = numMessages / 2; i < numMessages; ++i) {
+            reader.readNext();
+        }
+        Assert.assertFalse(reader.hasMessageAvailable());
+        Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
+    }
+
 }

Reply via email to