This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9ccc562e138 [fix][broker] Pass `bytesToRead` when reading compacted
entries (#20850)
9ccc562e138 is described below
commit 9ccc562e138d4caadad27014cc683f689328db34
Author: Cong Zhao <[email protected]>
AuthorDate: Sat Jul 22 17:55:30 2023 +0800
[fix][broker] Pass `bytesToRead` when reading compacted entries (#20850)
---
.../PersistentDispatcherSingleActiveConsumer.java | 4 ++--
.../org/apache/pulsar/compaction/CompactedTopic.java | 4 ++--
.../apache/pulsar/compaction/CompactedTopicUtils.java | 16 ++++++++++++----
3 files changed, 16 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 6de113d6db9..d96429693fd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -350,8 +350,8 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
havePendingRead = true;
if (consumer.readCompacted()) {
boolean readFromEarliest = isFirstRead &&
MessageId.earliest.equals(consumer.getStartMessageId());
-
CompactedTopicUtils.readCompactedEntries(topic.getTopicCompactionService(),
cursor, messagesToRead,
- readFromEarliest, this, consumer);
+
CompactedTopicUtils.asyncReadCompactedEntries(topic.getTopicCompactionService(),
cursor,
+ messagesToRead, bytesToRead, readFromEarliest,
this, true, consumer);
} else {
ReadEntriesCtx readEntriesCtx =
ReadEntriesCtx.create(consumer,
consumer.getConsumerEpoch());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 99e2f8a9624..660c7ea7797 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -33,8 +33,8 @@ public interface CompactedTopic {
/**
* Read entries from compacted topic.
*
- * @deprecated Use {@link
CompactedTopicUtils#readCompactedEntries(TopicCompactionService, ManagedCursor,
- * int, boolean, ReadEntriesCallback, Consumer)} instead.
+ * @deprecated Use {@link
CompactedTopicUtils#asyncReadCompactedEntries(TopicCompactionService,
ManagedCursor,
+ * int, long, boolean, ReadEntriesCallback, boolean, Consumer)} instead.
*/
@Deprecated
void asyncReadEntriesOrWait(ManagedCursor cursor,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
index 4cd21cbb03e..cc5147c8e66 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
@@ -39,9 +39,11 @@ import org.apache.pulsar.common.util.FutureUtil;
public class CompactedTopicUtils {
@Beta
- public static void readCompactedEntries(TopicCompactionService
topicCompactionService, ManagedCursor cursor,
- int numberOfEntriesToRead, boolean
readFromEarliest,
- AsyncCallbacks.ReadEntriesCallback
callback, @Nullable Consumer consumer) {
+ public static void asyncReadCompactedEntries(TopicCompactionService
topicCompactionService,
+ ManagedCursor cursor, int
numberOfEntriesToRead,
+ long bytesToRead, boolean
readFromEarliest,
+
AsyncCallbacks.ReadEntriesCallback callback,
+ boolean wait, @Nullable
Consumer consumer) {
Objects.requireNonNull(topicCompactionService);
Objects.requireNonNull(cursor);
checkArgument(numberOfEntriesToRead > 0);
@@ -64,7 +66,13 @@ public class CompactedTopicUtils {
if (lastCompactedPosition == null
|| readPosition.compareTo(
lastCompactedPosition.getLedgerId(),
lastCompactedPosition.getEntryId()) > 0) {
- cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback,
readEntriesCtx, PositionImpl.LATEST);
+ if (wait) {
+ cursor.asyncReadEntriesOrWait(numberOfEntriesToRead,
bytesToRead, callback, readEntriesCtx,
+ PositionImpl.LATEST);
+ } else {
+ cursor.asyncReadEntries(numberOfEntriesToRead,
bytesToRead, callback, readEntriesCtx,
+ PositionImpl.LATEST);
+ }
return CompletableFuture.completedFuture(null);
}