This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit aeda5ca1b28e7dceaeb21772420eb0fe6b7d001b
Author: sinan liu <[email protected]>
AuthorDate: Mon May 18 09:49:50 2026 +0800

    [improve][broker] Prevent stale replicator pending reads after termination 
(#25767)
    
    (cherry picked from commit 8f9f5b49d631e235e86d79e48a63722e74db4413)
---
 .../service/persistent/PersistentReplicator.java   | 21 ++++++--
 .../PersistentReplicatorInflightTaskTest.java      | 60 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index fc338c02e4d..ecb88818b05 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -496,7 +496,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 
     @Override
     public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
-        InFlightTask inFlightTask = (InFlightTask) ctx;
+        completeFailedReadTask(ctx);
         if (state != Started) {
             log.info("[{}] Replicator was disconnected while reading entries."
                             + " Stop reading. Replicator state: {}",
@@ -517,11 +517,9 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
             terminate();
             return;
         } else if (!(exception instanceof TooManyRequestsException)) {
-            inFlightTask.setEntries(Collections.emptyList());
             log.error("[{}] Error reading entries at {}. Retrying to read in 
{}s. ({})",
                     replicatorId, ctx, waitTimeMillis / 1000.0, 
exception.getMessage(), exception);
         } else {
-            inFlightTask.setEntries(Collections.emptyList());
             log.debug("[{}] Throttled by bookies while reading at {}. Retrying 
to read in {}s. ({})",
                     replicatorId, ctx, waitTimeMillis / 1000.0, 
exception.getMessage(),
                     exception);
@@ -530,6 +528,21 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         brokerService.executor().schedule(this::readMoreEntries, 
waitTimeMillis, TimeUnit.MILLISECONDS);
     }
 
+    private void completeFailedReadTask(Object ctx) {
+        if (!(ctx instanceof InFlightTask)) {
+            log.error("[{}] Unexpected read entries failed context {}", 
replicatorId, ctx);
+            return;
+        }
+
+        InFlightTask inFlightTask = (InFlightTask) ctx;
+        if (inFlightTask.entries == null) {
+            inFlightTask.setEntries(Collections.emptyList());
+        } else {
+            log.error("[{}] Unexpected completed in-flight task in read 
entries failed callback. inFlightTask={}",
+                    replicatorId, inFlightTask);
+        }
+    }
+
     public CompletableFuture<Void> clearBacklog() {
         CompletableFuture<Void> future = new CompletableFuture<>();
 
@@ -985,4 +998,4 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     String getReplicatorId() {
         return  replicatorId;
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
index e8a7a11a2db..c825d49caa6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
@@ -28,17 +28,25 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerTest;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerServiceInternalMethodInvoker;
 import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
 import 
org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.awaitility.Awaitility;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.Assert;
@@ -107,6 +115,58 @@ public class PersistentReplicatorInflightTaskTest extends 
OneWayReplicatorTestBa
         assertTrue(counter.get() <= 1);
     }
 
+    @Test
+    public void 
testReadEntriesFailedCompletesInFlightTaskAfterReplicatorTerminated() throws 
Exception {
+        String topicName = BrokerTestUtil.newUniqueName("persistent://" + 
nonReplicatedNamespace + "/tp_");
+        CountDownLatch readStarted = new CountDownLatch(1);
+        CountDownLatch failRead = new CountDownLatch(1);
+        Producer<String> producer = null;
+        try {
+            admin1.topics().createNonPartitionedTopic(topicName);
+            admin2.topics().createNonPartitionedTopic(topicName);
+            producer = 
client1.newProducer(Schema.STRING).topic(topicName).create();
+            producer.send("msg");
+
+            PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false)
+                    .join().get();
+            ManagedLedgerImpl ml = (ManagedLedgerImpl) 
topic.getManagedLedger();
+            ManagedLedgerTest.makeReadEntryProbFail(ml, () -> {
+                readStarted.countDown();
+                try {
+                    if (!failRead.await(30, TimeUnit.SECONDS)) {
+                        return new ManagedLedgerException("Timed out waiting 
to fail read entries");
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    return new ManagedLedgerException(e);
+                }
+                return new 
ManagedLedgerException.TooManyRequestsException("mocked read failure");
+            });
+
+            pulsar1.getConfig().setReplicationStartAt("earliest");
+            admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+            assertTrue(readStarted.await(30, TimeUnit.SECONDS));
+
+            PersistentReplicator replicator = (PersistentReplicator) 
topic.getReplicators().get(cluster2);
+            Assert.assertNotNull(replicator, "Replicator should not be null");
+            Assert.assertTrue(replicator.hasPendingRead());
+
+            replicator.terminate();
+            Assert.assertTrue(replicator.getState() != 
AbstractReplicator.State.Started);
+            failRead.countDown();
+
+            Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
+                    Assert.assertFalse(replicator.hasPendingRead()));
+        } finally {
+            failRead.countDown();
+            if (producer != null) {
+                producer.close();
+            }
+            admin1.topics().delete(topicName, true);
+            admin2.topics().delete(topicName, true);
+        }
+    }
+
     @Test
     public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception {
         log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");

Reply via email to