This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2618249b583 [fix][test] Fix flaky 
ReplicatorTest.testReplicatorClearBacklog NPE on inFlightTask (#25691)
2618249b583 is described below

commit 2618249b583f4fa914c4d68682527e3c04de241e
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 5 19:18:15 2026 -0700

    [fix][test] Fix flaky ReplicatorTest.testReplicatorClearBacklog NPE on 
inFlightTask (#25691)
---
 .../java/org/apache/pulsar/broker/service/ReplicatorTest.java    | 9 +++++++--
 .../persistent/BrokerServicePersistInternalMethodInvoker.java    | 5 +++++
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 7f699419187..9753e68dfb1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static 
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
 import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
 import static 
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.ensureNoBacklogByInflightTask;
+import static 
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.newInFlightTaskCtx;
 import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricDoubleGaugeValue;
 import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static org.mockito.ArgumentMatchers.eq;
@@ -663,7 +664,9 @@ public class ReplicatorTest extends ReplicatorTestBase {
         PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
         PersistentReplicator replicator = (PersistentReplicator) spy(
                 
topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0)));
-        replicator.readEntriesFailed(new 
ManagedLedgerException.InvalidCursorPositionException("failed"), null);
+        Object inFlightTaskCtx = newInFlightTaskCtx(replicator, 
PositionFactory.create(1, 1), 1);
+        replicator.readEntriesFailed(new 
ManagedLedgerException.InvalidCursorPositionException("failed"),
+                inFlightTaskCtx);
         replicator.clearBacklog().get();
         Thread.sleep(100);
         replicator.updateRates(); // for code-coverage
@@ -693,7 +696,9 @@ public class ReplicatorTest extends ReplicatorTestBase {
         PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
         PersistentReplicator replicator = (PersistentReplicator) spy(
                 
topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0)));
-        replicator.readEntriesFailed(new 
ManagedLedgerException.InvalidCursorPositionException("failed"), null);
+        Object inFlightTaskCtx = newInFlightTaskCtx(replicator, 
PositionFactory.create(1, 1), 1);
+        replicator.readEntriesFailed(new 
ManagedLedgerException.InvalidCursorPositionException("failed"),
+                inFlightTaskCtx);
         replicator.clearBacklog().get();
         Thread.sleep(100);
         replicator.updateRates(); // for code-coverage
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java
index 8c26aece764..c52b9c6896b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BrokerServicePersistInternalMethodInvoker.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.Position;
 import org.awaitility.Awaitility;
 
 public class BrokerServicePersistInternalMethodInvoker {
@@ -42,4 +43,8 @@ public class BrokerServicePersistInternalMethodInvoker {
             return true;
         });
     }
+
+    public static Object newInFlightTaskCtx(PersistentReplicator replicator, 
Position readPos, int readingEntries) {
+        return new PersistentReplicator.InFlightTask(readPos, readingEntries, 
replicator.getReplicatorId());
+    }
 }

Reply via email to