This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0c3c175eb13 [improve][broker] Get lowest PositionImpl from
NavigableSet (#18278)
0c3c175eb13 is described below
commit 0c3c175eb132d4ef0fb3a12842127dcb4fa1933d
Author: WJL3333 <[email protected]>
AuthorDate: Wed Nov 2 22:32:42 2022 +0800
[improve][broker] Get lowest PositionImpl from NavigableSet (#18278)
* [cleanup] Direct get lowest PositionImpl from TreeMap
change signature from Set<T> to NavigableSet<T>
which makes the caller to get lowest PositionImpl more efficient.
* change poll to first when call `NavigableSet`
* fix check style remove unused import
Co-authored-by: wangjinlong <[email protected]>
---
.../broker/delayed/DelayedDeliveryTracker.java | 4 ++--
.../delayed/InMemoryDelayedDeliveryTracker.java | 6 +++---
.../persistent/MessageRedeliveryController.java | 3 ++-
.../PersistentDispatcherMultipleConsumers.java | 20 ++++++++++++--------
...rsistentStickyKeyDispatcherMultipleConsumers.java | 10 ++++++----
.../utils/ConcurrentBitmapSortedLongPairSet.java | 4 ++--
6 files changed, 27 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
index b54cbc6982f..2f248a441cd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.broker.delayed;
import com.google.common.annotations.Beta;
-import java.util.Set;
+import java.util.NavigableSet;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
/**
@@ -58,7 +58,7 @@ public interface DelayedDeliveryTracker extends AutoCloseable
{
/**
* Get a set of position of messages that have already reached the
delivery time.
*/
- Set<PositionImpl> getScheduledMessages(int maxMessages);
+ NavigableSet<PositionImpl> getScheduledMessages(int maxMessages);
/**
* Tells whether the dispatcher should pause any message deliveries, until
the DelayedDeliveryTracker has
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index f77fcebfb6a..da28ff19234 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -22,7 +22,7 @@ import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.time.Clock;
-import java.util.Set;
+import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@@ -146,9 +146,9 @@ public class InMemoryDelayedDeliveryTracker implements
DelayedDeliveryTracker, T
* Get a set of position of messages that have already reached.
*/
@Override
- public Set<PositionImpl> getScheduledMessages(int maxMessages) {
+ public NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
int n = maxMessages;
- Set<PositionImpl> positions = new TreeSet<>();
+ NavigableSet<PositionImpl> positions = new TreeSet<>();
long cutoffTime = getCutoffTime();
while (n > 0 && !priorityQueue.isEmpty()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index 21dd272acfe..d8667def552 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
import com.google.common.collect.ComparisonChain;
import java.util.ArrayList;
import java.util.List;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -100,7 +101,7 @@ public class MessageRedeliveryController {
return isContained.get();
}
- public Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
+ public NavigableSet<PositionImpl> getMessagesToReplayNow(int
maxMessagesToRead) {
return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 2e4cd7dcce5..b777087ba2f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -276,7 +277,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
return;
}
- Set<PositionImpl> messagesToReplayNow =
getMessagesToReplayNow(messagesToRead);
+ NavigableSet<PositionImpl> messagesToReplayNow =
getMessagesToReplayNow(messagesToRead);
if (!messagesToReplayNow.isEmpty()) {
if (log.isDebugEnabled()) {
@@ -285,7 +286,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
havePendingReplayRead = true;
- minReplayedPosition =
messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null);
+ minReplayedPosition = messagesToReplayNow.first();
Set<? extends Position> deletedMessages =
topic.isDelayedDeliveryEnabled()
? asyncReplayEntriesInOrder(messagesToReplayNow) :
asyncReplayEntries(messagesToReplayNow);
// clear already acked positions from replay bucket
@@ -309,11 +310,14 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
consumerList.size());
}
havePendingRead = true;
- Set<PositionImpl> toReplay = getMessagesToReplayNow(1);
- minReplayedPosition =
toReplay.stream().findFirst().orElse(null);
- if (minReplayedPosition != null) {
+ NavigableSet<PositionImpl> toReplay =
getMessagesToReplayNow(1);
+ if (!toReplay.isEmpty()) {
+ minReplayedPosition = toReplay.first();
redeliveryMessages.add(minReplayedPosition.getLedgerId(),
minReplayedPosition.getEntryId());
+ } else {
+ minReplayedPosition = null;
}
+
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead,
this,
ReadType.Normal, topic.getMaxReadPosition());
} else {
@@ -1020,17 +1024,17 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
}
}
- protected synchronized Set<PositionImpl> getMessagesToReplayNow(int
maxMessagesToRead) {
+ protected synchronized NavigableSet<PositionImpl>
getMessagesToReplayNow(int maxMessagesToRead) {
if (!redeliveryMessages.isEmpty()) {
return
redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
} else if (delayedDeliveryTracker.isPresent() &&
delayedDeliveryTracker.get().hasMessageAvailable()) {
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
- Set<PositionImpl> messagesAvailableNow =
+ NavigableSet<PositionImpl> messagesAvailableNow =
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
messagesAvailableNow.forEach(p ->
redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
return messagesAvailableNow;
} else {
- return Collections.emptySet();
+ return Collections.emptyNavigableSet();
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index d6f85e11316..0eeb403d291 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -173,9 +174,10 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// A corner case that we have to retry a readMoreEntries in order to
preserver order delivery.
// This may happen when consumer closed. See issue #12885 for details.
if (!allowOutOfOrderDelivery) {
- Set<PositionImpl> messagesToReplayNow =
this.getMessagesToReplayNow(1);
+ NavigableSet<PositionImpl> messagesToReplayNow =
this.getMessagesToReplayNow(1);
if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty())
{
- PositionImpl replayPosition =
messagesToReplayNow.stream().findFirst().get();
+ PositionImpl replayPosition = messagesToReplayNow.first();
+
// We have received a message potentially from the delayed
tracker and, since we're not using it
// right now, it needs to be added to the redelivery tracker
or we won't attempt anymore to
// resend it (until we disconnect consumer).
@@ -435,13 +437,13 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
@Override
- protected synchronized Set<PositionImpl> getMessagesToReplayNow(int
maxMessagesToRead) {
+ protected synchronized NavigableSet<PositionImpl>
getMessagesToReplayNow(int maxMessagesToRead) {
if (isDispatcherStuckOnReplays) {
// If we're stuck on replay, we want to move forward reading on
the topic (until the overall max-unacked
// messages kicks in), instead of keep replaying the same old
messages, since the consumer that these
// messages are routing to might be busy at the moment
this.isDispatcherStuckOnReplays = false;
- return Collections.emptySet();
+ return Collections.emptyNavigableSet();
} else {
return super.getMessagesToReplayNow(maxMessagesToRead);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
index ae7b495272b..e42cae2580b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
-import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.locks.ReadWriteLock;
@@ -95,7 +94,8 @@ public class ConcurrentBitmapSortedLongPairSet {
}
- public <T> Set<T> items(int numberOfItems, LongPairSet.LongPairFunction<T>
longPairConverter) {
+ public <T extends Comparable<T>> NavigableSet<T> items(int numberOfItems,
+
LongPairSet.LongPairFunction<T> longPairConverter) {
NavigableSet<T> items = new TreeSet<>();
lock.readLock().lock();
try {