This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 15dbe16630e1cddd0da75bcabb879fccff8e3410
Author: fengyubiao <[email protected]>
AuthorDate: Fri Oct 24 16:10:41 2025 +0800

    [fix][broker] Stop to retry to read entries if the replicator has 
terminated (#24880)
    
    (cherry picked from commit 313ae974ef01b7ed295a03c93906ccf9daf82fd5)
---
 .../service/persistent/PersistentReplicator.java   | 12 ++++++-
 .../PersistentReplicatorInflightTaskTest.java      | 42 +++++++++++++++++++++-
 2 files changed, 52 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index fffbb1b6d3d..4d8ea0e9529 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -274,6 +274,9 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     }
 
     protected void readMoreEntries() {
+        if (state.equals(Terminated) || state.equals(Terminating)) {
+            return;
+        }
         // Acquire permits and check state of producer.
         InFlightTask newInFlightTask = acquirePermitsIfNotFetchingSchema();
         if (newInFlightTask == null) {
@@ -974,7 +977,9 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     protected boolean hasPendingRead() {
         synchronized (inFlightTasks) {
             for (InFlightTask task : inFlightTasks) {
-                if (task.readPos != null && task.entries == null) {
+                // The purpose of calling "getReadPos" instead of calling 
"readPos" is to make the test
+                // "testReplicationTaskStoppedAfterTopicClosed" can counter 
the calling times of "readMoreEntries".
+                if (task.getReadPos() != null && task.entries == null) {
                     // Skip the current reading if there is a pending cursor 
reading.
                     return true;
                 }
@@ -982,4 +987,9 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         }
         return false;
     }
+
+    @VisibleForTesting
+    String getReplicatorId() {
+        return  replicatorId;
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
index 0d56d6fda29..31970f499fd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java
@@ -18,13 +18,17 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -34,6 +38,8 @@ import 
org.apache.pulsar.broker.service.BrokerServiceInternalMethodInvoker;
 import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
 import 
org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask;
 import org.apache.pulsar.client.api.MessageId;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -66,6 +72,40 @@ public class PersistentReplicatorInflightTaskTest extends 
OneWayReplicatorTestBa
         admin1.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
     }
 
+    @Test
+    public void testReplicationTaskStoppedAfterTopicClosed() throws Exception {
+        // Close a topic, which has enabled replication.
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+        admin1.topics().createNonPartitionedTopic(topicName);
+        waitReplicatorStarted(topicName);
+        PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false)
+                .join().get();
+        PersistentReplicator replicator = (PersistentReplicator) 
topic.getReplicators().get(cluster2);
+        admin1.topics().unload(topicName);
+
+        // Inject a task into the "inFlightTasks" to calculate how many times 
the method "replicator.readMoreEntries"
+        // has been called.
+        AtomicInteger counter = new AtomicInteger();
+        InFlightTask injectedTask = new InFlightTask(PositionImpl.get(1, 1), 
1, replicator.getReplicatorId());
+        injectedTask.setEntries(Collections.emptyList());
+        InFlightTask spyTask = spy(injectedTask);
+        replicator.inFlightTasks.add(spyTask);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                counter.incrementAndGet();
+                return invocationOnMock.callRealMethod();
+            }
+        }).when(spyTask).getReadPos();
+
+        // Verify: there is no scheduled task to retry to read entries to 
replicate.
+        // Call "readMoreEntries" to make the issue happen.
+        replicator.readMoreEntries();
+        Thread.sleep(PersistentTopic.MESSAGE_RATE_BACKOFF_MS * 10);
+        assertEquals(replicator.getState(), 
AbstractReplicator.State.Terminated);
+        assertTrue(counter.get() <= 1);
+    }
+
     @Test
     public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception {
         log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");

Reply via email to