This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ccc562e138 [fix][broker] Pass `bytesToRead` when reading compacted 
entries (#20850)
9ccc562e138 is described below

commit 9ccc562e138d4caadad27014cc683f689328db34
Author: Cong Zhao <[email protected]>
AuthorDate: Sat Jul 22 17:55:30 2023 +0800

    [fix][broker] Pass `bytesToRead` when reading compacted entries (#20850)
---
 .../PersistentDispatcherSingleActiveConsumer.java        |  4 ++--
 .../org/apache/pulsar/compaction/CompactedTopic.java     |  4 ++--
 .../apache/pulsar/compaction/CompactedTopicUtils.java    | 16 ++++++++++++----
 3 files changed, 16 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 6de113d6db9..d96429693fd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -350,8 +350,8 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
                 havePendingRead = true;
                 if (consumer.readCompacted()) {
                     boolean readFromEarliest = isFirstRead && 
MessageId.earliest.equals(consumer.getStartMessageId());
-                    
CompactedTopicUtils.readCompactedEntries(topic.getTopicCompactionService(), 
cursor, messagesToRead,
-                            readFromEarliest, this, consumer);
+                    
CompactedTopicUtils.asyncReadCompactedEntries(topic.getTopicCompactionService(),
 cursor,
+                            messagesToRead, bytesToRead, readFromEarliest, 
this, true, consumer);
                 } else {
                     ReadEntriesCtx readEntriesCtx =
                             ReadEntriesCtx.create(consumer, 
consumer.getConsumerEpoch());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 99e2f8a9624..660c7ea7797 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -33,8 +33,8 @@ public interface CompactedTopic {
     /**
      * Read entries from compacted topic.
      *
-     * @deprecated Use {@link 
CompactedTopicUtils#readCompactedEntries(TopicCompactionService, ManagedCursor,
-     * int, boolean, ReadEntriesCallback, Consumer)} instead.
+     * @deprecated Use {@link 
CompactedTopicUtils#asyncReadCompactedEntries(TopicCompactionService, 
ManagedCursor,
+     * int, long, boolean, ReadEntriesCallback, boolean, Consumer)} instead.
      */
     @Deprecated
     void asyncReadEntriesOrWait(ManagedCursor cursor,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
index 4cd21cbb03e..cc5147c8e66 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
@@ -39,9 +39,11 @@ import org.apache.pulsar.common.util.FutureUtil;
 public class CompactedTopicUtils {
 
     @Beta
-    public static void readCompactedEntries(TopicCompactionService 
topicCompactionService, ManagedCursor cursor,
-                                            int numberOfEntriesToRead, boolean 
readFromEarliest,
-                                            AsyncCallbacks.ReadEntriesCallback 
callback, @Nullable Consumer consumer) {
+    public static void asyncReadCompactedEntries(TopicCompactionService 
topicCompactionService,
+                                                 ManagedCursor cursor, int 
numberOfEntriesToRead,
+                                                 long bytesToRead, boolean 
readFromEarliest,
+                                                 
AsyncCallbacks.ReadEntriesCallback callback,
+                                                 boolean wait, @Nullable 
Consumer consumer) {
         Objects.requireNonNull(topicCompactionService);
         Objects.requireNonNull(cursor);
         checkArgument(numberOfEntriesToRead > 0);
@@ -64,7 +66,13 @@ public class CompactedTopicUtils {
             if (lastCompactedPosition == null
                     || readPosition.compareTo(
                     lastCompactedPosition.getLedgerId(), 
lastCompactedPosition.getEntryId()) > 0) {
-                cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, 
readEntriesCtx, PositionImpl.LATEST);
+                if (wait) {
+                    cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, 
bytesToRead, callback, readEntriesCtx,
+                        PositionImpl.LATEST);
+                } else {
+                    cursor.asyncReadEntries(numberOfEntriesToRead, 
bytesToRead, callback, readEntriesCtx,
+                        PositionImpl.LATEST);
+                }
                 return CompletableFuture.completedFuture(null);
             }
 

Reply via email to