This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new dd90657 #12429 only fixed the compactor skips data issue, but the normal reader/consumer (#12464) dd90657 is described below commit dd90657c33aa71ca86c9988513d92bf79aa68335 Author: lipenghui <peng...@apache.org> AuthorDate: Sun Oct 24 23:11:15 2021 +0800 #12429 only fixed the compactor skips data issue, but the normal reader/consumer (#12464) also skips data while enabled read compacted data and read from the earliest position. --- .../org/apache/pulsar/compaction/CompactedTopicImpl.java | 4 +--- .../org/apache/pulsar/compaction/CompactedTopicTest.java | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index 4375188..4bc1664 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.compaction; -import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.ComparisonChain; @@ -128,8 +127,7 @@ public class CompactedTopicImpl implements CompactedTopic { // need to force seek the read position to ensure the compactor can read // the complete last snapshot because of the compactor will read the data // before the compaction cursor mark delete position - cursor.seek(lastEntry.getPosition().getNext(), - cursor.getName().equals(COMPACTION_SUBSCRIPTION)); + cursor.seek(lastEntry.getPosition().getNext(), true); callback.readEntriesComplete(entries, consumer); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java index adb6f46..2dd6f8a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java @@ -481,5 +481,20 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest { PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); Assert.assertEquals(stats.compactedLedger.entries, keys + 1); }); + + // Make sure the reader can get all data from the compacted ledger and original ledger. + Reader<String> reader = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .startMessageId(MessageId.earliest) + .readCompacted(true) + .create(); + int received = 0; + while (reader.hasMessageAvailable()) { + reader.readNext(); + received++; + } + Assert.assertEquals(received, keys + 1); + reader.close(); + producer.close(); } }