This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d98e51f7a54 [improve][broker] Reschedule reads with increasing backoff 
when no messages are dispatched (#23226)
d98e51f7a54 is described below

commit d98e51f7a54463d68d4521189d24566888888514
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Aug 29 22:27:34 2024 +0300

    [improve][broker] Reschedule reads with increasing backoff when no messages 
are dispatched (#23226)
---
 conf/broker.conf                                   | 10 +++
 conf/standalone.conf                               | 10 +++
 .../apache/pulsar/broker/ServiceConfiguration.java | 14 ++++
 .../PersistentDispatcherMultipleConsumers.java     | 59 ++++++++++-----
 ...istentStickyKeyDispatcherMultipleConsumers.java |  3 +
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  3 +
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 84 ++++++++++++++++++++--
 .../broker/transaction/TransactionTestBase.java    |  3 +
 8 files changed, 163 insertions(+), 23 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index fc32246adea..ed59e5c4566 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -467,6 +467,16 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000
 # The read failure backoff mandatory stop time in milliseconds. By default it 
is 0s.
 dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
 
+# On Shared and KeyShared subscriptions, if all available messages in the 
subscription are filtered
+# out and not dispatched to any consumer, message dispatching will be 
rescheduled with a backoff
+# delay. This parameter sets the initial backoff delay in milliseconds.
+dispatcherRetryBackoffInitialTimeInMs=100
+
+# On Shared and KeyShared subscriptions, if all available messages in the 
subscription are filtered
+# out and not dispatched to any consumer, message dispatching will be 
rescheduled with a backoff
+# delay. This parameter sets the maximum backoff delay in milliseconds.
+dispatcherRetryBackoffMaxTimeInMs=1000
+
 # Precise dispatcher flow control according to history message number of each 
entry
 preciseDispatcherFlowControl=false
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index ae696410d86..d5d79e0383e 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -283,6 +283,16 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000
 # The read failure backoff mandatory stop time in milliseconds. By default it 
is 0s.
 dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
 
+# On Shared and KeyShared subscriptions, if all available messages in the 
subscription are filtered
+# out and not dispatched to any consumer, message dispatching will be 
rescheduled with a backoff
+# delay. This parameter sets the initial backoff delay in milliseconds.
+dispatcherRetryBackoffInitialTimeInMs=100
+
+# On Shared and KeyShared subscriptions, if all available messages in the 
subscription are filtered
+# out and not dispatched to any consumer, message dispatching will be 
rescheduled with a backoff
+# delay. This parameter sets the maximum backoff delay in milliseconds.
+dispatcherRetryBackoffMaxTimeInMs=1000
+
 # Precise dispatcher flow control according to history message number of each 
entry
 preciseDispatcherFlowControl=false
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 6488ace991e..60f37f52b6b 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1196,6 +1196,20 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private int dispatcherReadFailureBackoffMandatoryStopTimeInMs = 0;
 
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "On Shared and KeyShared subscriptions, if all available 
messages in the subscription are filtered "
+                    + "out and not dispatched to any consumer, message 
dispatching will be rescheduled with a backoff "
+                    + "delay. This parameter sets the initial backoff delay in 
milliseconds.")
+    private int dispatcherRetryBackoffInitialTimeInMs = 100;
+
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "On Shared and KeyShared subscriptions, if all available 
messages in the subscription are filtered "
+                    + "out and not dispatched to any consumer, message 
dispatching will be rescheduled with a backoff "
+                    + "delay. This parameter sets the maximum backoff delay in 
milliseconds.")
+    private int dispatcherRetryBackoffMaxTimeInMs = 1000;
+
     @FieldContext(
             dynamic = true,
             category = CATEGORY_SERVER,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 20dbc4925d1..631a728ccce 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -47,6 +47,7 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsExcep
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
@@ -85,7 +86,6 @@ import org.slf4j.LoggerFactory;
  */
 public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMultipleConsumers
         implements Dispatcher, ReadEntriesCallback {
-
     protected final PersistentTopic topic;
     protected final ManagedCursor cursor;
     protected volatile Range<Position> 
lastIndividualDeletedRangeFromCursorRecovery;
@@ -134,7 +134,8 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     private AtomicBoolean isRescheduleReadInProgress = new 
AtomicBoolean(false);
     protected final ExecutorService dispatchMessagesThread;
     private final SharedConsumerAssignor assignor;
-
+    protected int lastNumberOfEntriesDispatched;
+    private final Backoff retryBackoff;
     protected enum ReadType {
         Normal, Replay
     }
@@ -159,10 +160,15 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.initializeDispatchRateLimiterIfNeeded();
         this.assignor = new SharedConsumerAssignor(this::getNextConsumer, 
this::addMessageToReplay);
+        ServiceConfiguration serviceConfiguration = 
topic.getBrokerService().pulsar().getConfiguration();
         this.readFailureBackoff = new Backoff(
-                
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
+                
serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(),
                 TimeUnit.MILLISECONDS,
                 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+        retryBackoff = new Backoff(
+                
serviceConfiguration.getDispatcherRetryBackoffInitialTimeInMs(), 
TimeUnit.MILLISECONDS,
+                serviceConfiguration.getDispatcherRetryBackoffMaxTimeInMs(), 
TimeUnit.MILLISECONDS,
+                0, TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -437,16 +443,20 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
 
     @Override
     protected void reScheduleRead() {
+        reScheduleReadInMs(MESSAGE_RATE_BACKOFF_MS);
+    }
+
+    protected void reScheduleReadInMs(long readAfterMs) {
         if (isRescheduleReadInProgress.compareAndSet(false, true)) {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] Reschedule message read in {} ms", 
topic.getName(), name, MESSAGE_RATE_BACKOFF_MS);
+                log.debug("[{}] [{}] Reschedule message read in {} ms", 
topic.getName(), name, readAfterMs);
             }
             topic.getBrokerService().executor().schedule(
                     () -> {
                         isRescheduleReadInProgress.set(false);
                         readMoreEntries();
                         },
-                    MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+                    readAfterMs, TimeUnit.MILLISECONDS);
         }
     }
 
@@ -659,8 +669,8 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             log.debug("[{}] Distributing {} messages to {} consumers", name, 
entries.size(), consumerList.size());
         }
 
-        long size = entries.stream().mapToLong(Entry::getLength).sum();
-        updatePendingBytesToDispatch(size);
+        long totalBytesSize = 
entries.stream().mapToLong(Entry::getLength).sum();
+        updatePendingBytesToDispatch(totalBytesSize);
 
         // dispatch messages to a separate thread, but still in order for this 
subscription
         // sendMessagesToConsumers is responsible for running broker-side 
filters
@@ -670,19 +680,28 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
             // in a separate thread, and we want to prevent more reads
             acquireSendInProgress();
             dispatchMessagesThread.execute(() -> {
-                if (sendMessagesToConsumers(readType, entries, false)) {
-                    updatePendingBytesToDispatch(-size);
-                    readMoreEntries();
-                } else {
-                    updatePendingBytesToDispatch(-size);
-                }
+                handleSendingMessagesAndReadingMore(readType, entries, false, 
totalBytesSize);
             });
         } else {
-            if (sendMessagesToConsumers(readType, entries, true)) {
-                updatePendingBytesToDispatch(-size);
-                readMoreEntriesAsync();
-            } else {
-                updatePendingBytesToDispatch(-size);
+            handleSendingMessagesAndReadingMore(readType, entries, true, 
totalBytesSize);
+        }
+    }
+
+    private synchronized void handleSendingMessagesAndReadingMore(ReadType 
readType, List<Entry> entries,
+                                                                  boolean 
needAcquireSendInProgress,
+                                                                  long 
totalBytesSize) {
+        boolean triggerReadingMore = sendMessagesToConsumers(readType, 
entries, needAcquireSendInProgress);
+        int entriesDispatched = lastNumberOfEntriesDispatched;
+        updatePendingBytesToDispatch(-totalBytesSize);
+        if (triggerReadingMore) {
+            if (entriesDispatched > 0) {
+                // Reset the backoff when we successfully dispatched messages
+                retryBackoff.reset();
+                // Call readMoreEntries in the same thread to trigger the next 
read
+                readMoreEntries();
+            } else if (entriesDispatched == 0) {
+                // If no messages were dispatched, we need to reschedule a new 
read with an increasing backoff delay
+                reScheduleReadInMs(retryBackoff.next());
             }
         }
     }
@@ -721,6 +740,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         if (needTrimAckedMessages()) {
             cursor.trimDeletedEntries(entries);
         }
+        lastNumberOfEntriesDispatched = 0;
 
         int entriesToDispatch = entries.size();
         // Trigger read more messages
@@ -828,6 +848,8 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), 
stickyKeyHash);
                 entry.release();
             });
+
+            lastNumberOfEntriesDispatched = entriesToDispatch;
         }
         return true;
     }
@@ -890,6 +912,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             totalBytesSent += sendMessageInfo.getTotalBytes();
         }
 
+        lastNumberOfEntriesDispatched = (int) totalEntries;
         acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, 
totalMessagesSent, totalBytesSent);
 
         return numConsumers.get() == 0; // trigger a new readMoreEntries() call
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 91cec1f8e90..97e6c943b0b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -201,6 +201,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
 
     @Override
     protected synchronized boolean trySendMessagesToConsumers(ReadType 
readType, List<Entry> entries) {
+        lastNumberOfEntriesDispatched = 0;
         long totalMessagesSent = 0;
         long totalBytesSent = 0;
         long totalEntries = 0;
@@ -420,6 +421,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             }
         }
 
+        lastNumberOfEntriesDispatched = (int) totalEntries;
+
         // acquire message-dispatch permits for already delivered messages
         acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, 
totalMessagesSent, totalBytesSent);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index e155e399e24..c83888b8022 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -242,6 +242,9 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
         this.conf.setWebServicePort(Optional.of(0));
         this.conf.setNumExecutorThreadPoolSize(5);
         this.conf.setExposeBundlesMetricsInPrometheus(true);
+        // Disable the dispatcher retry backoff in tests by default
+        this.conf.setDispatcherRetryBackoffInitialTimeInMs(0);
+        this.conf.setDispatcherRetryBackoffMaxTimeInMs(0);
     }
 
     protected final void init() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 1a205d0f686..af99741d09b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
@@ -53,6 +54,7 @@ import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
@@ -60,10 +62,10 @@ import java.util.stream.Collectors;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
@@ -107,6 +109,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
 
     final String topicName = "persistent://public/default/testTopic";
     final String subscriptionName = "testSubscription";
+    private AtomicInteger consumerMockAvailablePermits;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -117,7 +120,8 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
         
doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
         doReturn(false).when(configMock).isAllowOverrideEntryFilters();
-
+        
doReturn(10).when(configMock).getDispatcherRetryBackoffInitialTimeInMs();
+        doReturn(50).when(configMock).getDispatcherRetryBackoffMaxTimeInMs();
         pulsarMock = mock(PulsarService.class);
         doReturn(configMock).when(pulsarMock).getConfiguration();
 
@@ -188,7 +192,8 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         consumerMock = mock(Consumer.class);
         channelMock = mock(ChannelPromise.class);
         doReturn("consumer1").when(consumerMock).consumerName();
-        doReturn(1000).when(consumerMock).getAvailablePermits();
+        consumerMockAvailablePermits = new AtomicInteger(1000);
+        doAnswer(invocation -> 
consumerMockAvailablePermits.get()).when(consumerMock).getAvailablePermits();
         doReturn(true).when(consumerMock).isWritable();
         doReturn(channelMock).when(consumerMock).sendMessages(
                 anyList(),
@@ -511,8 +516,6 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         allEntries.forEach(entry -> entry.release());
     }
 
-
-
     @DataProvider(name = "initializeLastSentPosition")
     private Object[][] initialLastSentPositionProvider() {
         return new Object[][] { { false }, { true } };
@@ -822,6 +825,77 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         assertEquals(persistentDispatcher.getLastSentPosition(), 
initialLastSentPosition.toString());
     }
 
+    @DataProvider(name = "dispatchMessagesInSubscriptionThread")
+    private Object[][] dispatchMessagesInSubscriptionThread() {
+        return new Object[][] { { false }, { true } };
+    }
+
+    @Test(dataProvider = "dispatchMessagesInSubscriptionThread")
+    public void testBackoffDelayWhenNoMessagesDispatched(boolean 
dispatchMessagesInSubscriptionThread)
+            throws Exception {
+        persistentDispatcher.close();
+
+        List<Long> retryDelays = new CopyOnWriteArrayList<>();
+        
doReturn(dispatchMessagesInSubscriptionThread).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
+        persistentDispatcher = new 
PersistentStickyKeyDispatcherMultipleConsumers(
+                topicMock, cursorMock, subscriptionMock, configMock,
+                new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
+            @Override
+            protected void reScheduleReadInMs(long readAfterMs) {
+                retryDelays.add(readAfterMs);
+            }
+        };
+
+        // add a consumer without permits to trigger the retry behavior
+        consumerMockAvailablePermits.set(0);
+        persistentDispatcher.addConsumer(consumerMock);
+
+        // call "readEntriesComplete" directly to test the retry behavior
+        List<Entry> entries = List.of(EntryImpl.create(1, 1, 
createMessage("message1", 1)));
+        persistentDispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+        Awaitility.await().untilAsserted(() -> {
+                    assertEquals(retryDelays.size(), 1);
+                    assertEquals(retryDelays.get(0), 10, "Initial retry delay 
should be 10ms");
+                }
+        );
+        // test the second retry delay
+        entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 
1)));
+        persistentDispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+        Awaitility.await().untilAsserted(() -> {
+                    assertEquals(retryDelays.size(), 2);
+                    double delay = retryDelays.get(1);
+                    assertEquals(delay, 20.0, 2.0, "Second retry delay should 
be 20ms (jitter <-10%)");
+                }
+        );
+        // verify the max retry delay
+        for (int i = 0; i < 100; i++) {
+            entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 
1)));
+            persistentDispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+        }
+        Awaitility.await().untilAsserted(() -> {
+                    assertEquals(retryDelays.size(), 102);
+                    double delay = retryDelays.get(101);
+                    assertEquals(delay, 50.0, 5.0, "Max delay should be 50ms 
(jitter <-10%)");
+                }
+        );
+        // unblock to check that the retry delay is reset
+        consumerMockAvailablePermits.set(1000);
+        entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, 
"key2")));
+        persistentDispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+        // wait that the possibly async handling has completed
+        Awaitility.await().untilAsserted(() -> 
assertFalse(persistentDispatcher.isSendInProgress()));
+
+        // now block again to check the next retry delay so verify it was reset
+        consumerMockAvailablePermits.set(0);
+        entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, 
"key3")));
+        persistentDispatcher.readEntriesComplete(entries, 
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+        Awaitility.await().untilAsserted(() -> {
+                    assertEquals(retryDelays.size(), 103);
+                    assertEquals(retryDelays.get(0), 10, "Resetted retry delay 
should be 10ms");
+                }
+        );
+    }
+
     private ByteBuf createMessage(String message, int sequenceId) {
         return createMessage(message, sequenceId, "testKey");
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 4ab886492a4..34af94f2c31 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -163,6 +163,9 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
             conf.setBrokerDeduplicationEnabled(true);
             conf.setTransactionBufferSnapshotMaxTransactionCount(2);
             conf.setTransactionBufferSnapshotMinTimeInMillis(2000);
+            // Disable the dispatcher retry backoff in tests by default
+            conf.setDispatcherRetryBackoffInitialTimeInMs(0);
+            conf.setDispatcherRetryBackoffMaxTimeInMs(0);
             serviceConfigurationList.add(conf);
 
             PulsarTestContext.Builder testContextBuilder =

Reply via email to