This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dd90657  #12429 only fixed the compactor skips data issue, but the 
normal reader/consumer (#12464)
dd90657 is described below

commit dd90657c33aa71ca86c9988513d92bf79aa68335
Author: lipenghui <peng...@apache.org>
AuthorDate: Sun Oct 24 23:11:15 2021 +0800

    #12429 only fixed the compactor skips data issue, but the normal 
reader/consumer (#12464)
    
    also skips data while enabled read compacted data and read from the 
earliest position.
---
 .../org/apache/pulsar/compaction/CompactedTopicImpl.java  |  4 +---
 .../org/apache/pulsar/compaction/CompactedTopicTest.java  | 15 +++++++++++++++
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 4375188..4bc1664 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.compaction;
 
-import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.collect.ComparisonChain;
@@ -128,8 +127,7 @@ public class CompactedTopicImpl implements CompactedTopic {
                                         // need to force seek the read 
position to ensure the compactor can read
                                         // the complete last snapshot because 
of the compactor will read the data
                                         // before the compaction cursor mark 
delete position
-                                        
cursor.seek(lastEntry.getPosition().getNext(),
-                                                
cursor.getName().equals(COMPACTION_SUBSCRIPTION));
+                                        
cursor.seek(lastEntry.getPosition().getNext(), true);
                                         callback.readEntriesComplete(entries, 
consumer);
                                     });
                             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index adb6f46..2dd6f8a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -481,5 +481,20 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
             PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topic);
             Assert.assertEquals(stats.compactedLedger.entries, keys + 1);
         });
+
+        // Make sure the reader can get all data from the compacted ledger and 
original ledger.
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .create();
+        int received = 0;
+        while (reader.hasMessageAvailable()) {
+            reader.readNext();
+            received++;
+        }
+        Assert.assertEquals(received, keys + 1);
+        reader.close();
+        producer.close();
     }
 }

Reply via email to