This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d98e51f7a54 [improve][broker] Reschedule reads with increasing backoff
when no messages are dispatched (#23226)
d98e51f7a54 is described below
commit d98e51f7a54463d68d4521189d24566888888514
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Aug 29 22:27:34 2024 +0300
[improve][broker] Reschedule reads with increasing backoff when no messages
are dispatched (#23226)
---
conf/broker.conf | 10 +++
conf/standalone.conf | 10 +++
.../apache/pulsar/broker/ServiceConfiguration.java | 14 ++++
.../PersistentDispatcherMultipleConsumers.java | 59 ++++++++++-----
...istentStickyKeyDispatcherMultipleConsumers.java | 3 +
.../broker/auth/MockedPulsarServiceBaseTest.java | 3 +
...ntStickyKeyDispatcherMultipleConsumersTest.java | 84 ++++++++++++++++++++--
.../broker/transaction/TransactionTestBase.java | 3 +
8 files changed, 163 insertions(+), 23 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index fc32246adea..ed59e5c4566 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -467,6 +467,16 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000
# The read failure backoff mandatory stop time in milliseconds. By default it
is 0s.
dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
+# On Shared and KeyShared subscriptions, if all available messages in the
subscription are filtered
+# out and not dispatched to any consumer, message dispatching will be
rescheduled with a backoff
+# delay. This parameter sets the initial backoff delay in milliseconds.
+dispatcherRetryBackoffInitialTimeInMs=100
+
+# On Shared and KeyShared subscriptions, if all available messages in the
subscription are filtered
+# out and not dispatched to any consumer, message dispatching will be
rescheduled with a backoff
+# delay. This parameter sets the maximum backoff delay in milliseconds.
+dispatcherRetryBackoffMaxTimeInMs=1000
+
# Precise dispatcher flow control according to history message number of each
entry
preciseDispatcherFlowControl=false
diff --git a/conf/standalone.conf b/conf/standalone.conf
index ae696410d86..d5d79e0383e 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -283,6 +283,16 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000
# The read failure backoff mandatory stop time in milliseconds. By default it
is 0s.
dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
+# On Shared and KeyShared subscriptions, if all available messages in the
subscription are filtered
+# out and not dispatched to any consumer, message dispatching will be
rescheduled with a backoff
+# delay. This parameter sets the initial backoff delay in milliseconds.
+dispatcherRetryBackoffInitialTimeInMs=100
+
+# On Shared and KeyShared subscriptions, if all available messages in the
subscription are filtered
+# out and not dispatched to any consumer, message dispatching will be
rescheduled with a backoff
+# delay. This parameter sets the maximum backoff delay in milliseconds.
+dispatcherRetryBackoffMaxTimeInMs=1000
+
# Precise dispatcher flow control according to history message number of each
entry
preciseDispatcherFlowControl=false
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 6488ace991e..60f37f52b6b 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1196,6 +1196,20 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private int dispatcherReadFailureBackoffMandatoryStopTimeInMs = 0;
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "On Shared and KeyShared subscriptions, if all available
messages in the subscription are filtered "
+ + "out and not dispatched to any consumer, message
dispatching will be rescheduled with a backoff "
+ + "delay. This parameter sets the initial backoff delay in
milliseconds.")
+ private int dispatcherRetryBackoffInitialTimeInMs = 100;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "On Shared and KeyShared subscriptions, if all available
messages in the subscription are filtered "
+ + "out and not dispatched to any consumer, message
dispatching will be rescheduled with a backoff "
+ + "delay. This parameter sets the maximum backoff delay in
milliseconds.")
+ private int dispatcherRetryBackoffMaxTimeInMs = 1000;
+
@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 20dbc4925d1..631a728ccce 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -47,6 +47,7 @@ import
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsExcep
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
@@ -85,7 +86,6 @@ import org.slf4j.LoggerFactory;
*/
public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMultipleConsumers
implements Dispatcher, ReadEntriesCallback {
-
protected final PersistentTopic topic;
protected final ManagedCursor cursor;
protected volatile Range<Position>
lastIndividualDeletedRangeFromCursorRecovery;
@@ -134,7 +134,8 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
private AtomicBoolean isRescheduleReadInProgress = new
AtomicBoolean(false);
protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;
-
+ protected int lastNumberOfEntriesDispatched;
+ private final Backoff retryBackoff;
protected enum ReadType {
Normal, Replay
}
@@ -159,10 +160,15 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
this.assignor = new SharedConsumerAssignor(this::getNextConsumer,
this::addMessageToReplay);
+ ServiceConfiguration serviceConfiguration =
topic.getBrokerService().pulsar().getConfiguration();
this.readFailureBackoff = new Backoff(
-
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
+
serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(),
TimeUnit.MILLISECONDS,
1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+ retryBackoff = new Backoff(
+
serviceConfiguration.getDispatcherRetryBackoffInitialTimeInMs(),
TimeUnit.MILLISECONDS,
+ serviceConfiguration.getDispatcherRetryBackoffMaxTimeInMs(),
TimeUnit.MILLISECONDS,
+ 0, TimeUnit.MILLISECONDS);
}
@Override
@@ -437,16 +443,20 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
@Override
protected void reScheduleRead() {
+ reScheduleReadInMs(MESSAGE_RATE_BACKOFF_MS);
+ }
+
+ protected void reScheduleReadInMs(long readAfterMs) {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Reschedule message read in {} ms",
topic.getName(), name, MESSAGE_RATE_BACKOFF_MS);
+ log.debug("[{}] [{}] Reschedule message read in {} ms",
topic.getName(), name, readAfterMs);
}
topic.getBrokerService().executor().schedule(
() -> {
isRescheduleReadInProgress.set(false);
readMoreEntries();
},
- MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+ readAfterMs, TimeUnit.MILLISECONDS);
}
}
@@ -659,8 +669,8 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
log.debug("[{}] Distributing {} messages to {} consumers", name,
entries.size(), consumerList.size());
}
- long size = entries.stream().mapToLong(Entry::getLength).sum();
- updatePendingBytesToDispatch(size);
+ long totalBytesSize =
entries.stream().mapToLong(Entry::getLength).sum();
+ updatePendingBytesToDispatch(totalBytesSize);
// dispatch messages to a separate thread, but still in order for this
subscription
// sendMessagesToConsumers is responsible for running broker-side
filters
@@ -670,19 +680,28 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
// in a separate thread, and we want to prevent more reads
acquireSendInProgress();
dispatchMessagesThread.execute(() -> {
- if (sendMessagesToConsumers(readType, entries, false)) {
- updatePendingBytesToDispatch(-size);
- readMoreEntries();
- } else {
- updatePendingBytesToDispatch(-size);
- }
+ handleSendingMessagesAndReadingMore(readType, entries, false,
totalBytesSize);
});
} else {
- if (sendMessagesToConsumers(readType, entries, true)) {
- updatePendingBytesToDispatch(-size);
- readMoreEntriesAsync();
- } else {
- updatePendingBytesToDispatch(-size);
+ handleSendingMessagesAndReadingMore(readType, entries, true,
totalBytesSize);
+ }
+ }
+
+ private synchronized void handleSendingMessagesAndReadingMore(ReadType
readType, List<Entry> entries,
+ boolean
needAcquireSendInProgress,
+ long
totalBytesSize) {
+ boolean triggerReadingMore = sendMessagesToConsumers(readType,
entries, needAcquireSendInProgress);
+ int entriesDispatched = lastNumberOfEntriesDispatched;
+ updatePendingBytesToDispatch(-totalBytesSize);
+ if (triggerReadingMore) {
+ if (entriesDispatched > 0) {
+ // Reset the backoff when we successfully dispatched messages
+ retryBackoff.reset();
+ // Call readMoreEntries in the same thread to trigger the next
read
+ readMoreEntries();
+ } else if (entriesDispatched == 0) {
+ // If no messages were dispatched, we need to reschedule a new
read with an increasing backoff delay
+ reScheduleReadInMs(retryBackoff.next());
}
}
}
@@ -721,6 +740,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
if (needTrimAckedMessages()) {
cursor.trimDeletedEntries(entries);
}
+ lastNumberOfEntriesDispatched = 0;
int entriesToDispatch = entries.size();
// Trigger read more messages
@@ -828,6 +848,8 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(),
stickyKeyHash);
entry.release();
});
+
+ lastNumberOfEntriesDispatched = entriesToDispatch;
}
return true;
}
@@ -890,6 +912,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
totalBytesSent += sendMessageInfo.getTotalBytes();
}
+ lastNumberOfEntriesDispatched = (int) totalEntries;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries,
totalMessagesSent, totalBytesSent);
return numConsumers.get() == 0; // trigger a new readMoreEntries() call
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 91cec1f8e90..97e6c943b0b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -201,6 +201,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
@Override
protected synchronized boolean trySendMessagesToConsumers(ReadType
readType, List<Entry> entries) {
+ lastNumberOfEntriesDispatched = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
@@ -420,6 +421,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
}
}
+ lastNumberOfEntriesDispatched = (int) totalEntries;
+
// acquire message-dispatch permits for already delivered messages
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries,
totalMessagesSent, totalBytesSent);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index e155e399e24..c83888b8022 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -242,6 +242,9 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
this.conf.setWebServicePort(Optional.of(0));
this.conf.setNumExecutorThreadPoolSize(5);
this.conf.setExposeBundlesMetricsInPrometheus(true);
+ // Disable the dispatcher retry backoff in tests by default
+ this.conf.setDispatcherRetryBackoffInitialTimeInMs(0);
+ this.conf.setDispatcherRetryBackoffMaxTimeInMs(0);
}
protected final void init() throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 1a205d0f686..af99741d09b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
@@ -53,6 +54,7 @@ import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@@ -60,10 +62,10 @@ import java.util.stream.Collectors;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
@@ -107,6 +109,7 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
final String topicName = "persistent://public/default/testTopic";
final String subscriptionName = "testSubscription";
+ private AtomicInteger consumerMockAvailablePermits;
@BeforeMethod
public void setup() throws Exception {
@@ -117,7 +120,8 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
doReturn(false).when(configMock).isAllowOverrideEntryFilters();
-
+
doReturn(10).when(configMock).getDispatcherRetryBackoffInitialTimeInMs();
+ doReturn(50).when(configMock).getDispatcherRetryBackoffMaxTimeInMs();
pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();
@@ -188,7 +192,8 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
consumerMock = mock(Consumer.class);
channelMock = mock(ChannelPromise.class);
doReturn("consumer1").when(consumerMock).consumerName();
- doReturn(1000).when(consumerMock).getAvailablePermits();
+ consumerMockAvailablePermits = new AtomicInteger(1000);
+ doAnswer(invocation ->
consumerMockAvailablePermits.get()).when(consumerMock).getAvailablePermits();
doReturn(true).when(consumerMock).isWritable();
doReturn(channelMock).when(consumerMock).sendMessages(
anyList(),
@@ -511,8 +516,6 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
allEntries.forEach(entry -> entry.release());
}
-
-
@DataProvider(name = "initializeLastSentPosition")
private Object[][] initialLastSentPositionProvider() {
return new Object[][] { { false }, { true } };
@@ -822,6 +825,77 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
assertEquals(persistentDispatcher.getLastSentPosition(),
initialLastSentPosition.toString());
}
+ @DataProvider(name = "dispatchMessagesInSubscriptionThread")
+ private Object[][] dispatchMessagesInSubscriptionThread() {
+ return new Object[][] { { false }, { true } };
+ }
+
+ @Test(dataProvider = "dispatchMessagesInSubscriptionThread")
+ public void testBackoffDelayWhenNoMessagesDispatched(boolean
dispatchMessagesInSubscriptionThread)
+ throws Exception {
+ persistentDispatcher.close();
+
+ List<Long> retryDelays = new CopyOnWriteArrayList<>();
+
doReturn(dispatchMessagesInSubscriptionThread).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
+ persistentDispatcher = new
PersistentStickyKeyDispatcherMultipleConsumers(
+ topicMock, cursorMock, subscriptionMock, configMock,
+ new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
+ @Override
+ protected void reScheduleReadInMs(long readAfterMs) {
+ retryDelays.add(readAfterMs);
+ }
+ };
+
+ // add a consumer without permits to trigger the retry behavior
+ consumerMockAvailablePermits.set(0);
+ persistentDispatcher.addConsumer(consumerMock);
+
+ // call "readEntriesComplete" directly to test the retry behavior
+ List<Entry> entries = List.of(EntryImpl.create(1, 1,
createMessage("message1", 1)));
+ persistentDispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 1);
+ assertEquals(retryDelays.get(0), 10, "Initial retry delay
should be 10ms");
+ }
+ );
+ // test the second retry delay
+ entries = List.of(EntryImpl.create(1, 1, createMessage("message1",
1)));
+ persistentDispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 2);
+ double delay = retryDelays.get(1);
+ assertEquals(delay, 20.0, 2.0, "Second retry delay should
be 20ms (jitter <-10%)");
+ }
+ );
+ // verify the max retry delay
+ for (int i = 0; i < 100; i++) {
+ entries = List.of(EntryImpl.create(1, 1, createMessage("message1",
1)));
+ persistentDispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ }
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 102);
+ double delay = retryDelays.get(101);
+ assertEquals(delay, 50.0, 5.0, "Max delay should be 50ms
(jitter <-10%)");
+ }
+ );
+ // unblock to check that the retry delay is reset
+ consumerMockAvailablePermits.set(1000);
+ entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1,
"key2")));
+ persistentDispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ // wait that the possibly async handling has completed
+ Awaitility.await().untilAsserted(() ->
assertFalse(persistentDispatcher.isSendInProgress()));
+
+ // now block again to check the next retry delay so verify it was reset
+ consumerMockAvailablePermits.set(0);
+ entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1,
"key3")));
+ persistentDispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 103);
+ assertEquals(retryDelays.get(0), 10, "Resetted retry delay
should be 10ms");
+ }
+ );
+ }
+
private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 4ab886492a4..34af94f2c31 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -163,6 +163,9 @@ public abstract class TransactionTestBase extends
TestRetrySupport {
conf.setBrokerDeduplicationEnabled(true);
conf.setTransactionBufferSnapshotMaxTransactionCount(2);
conf.setTransactionBufferSnapshotMinTimeInMillis(2000);
+ // Disable the dispatcher retry backoff in tests by default
+ conf.setDispatcherRetryBackoffInitialTimeInMs(0);
+ conf.setDispatcherRetryBackoffMaxTimeInMs(0);
serviceConfigurationList.add(conf);
PulsarTestContext.Builder testContextBuilder =