This is an automated email from the ASF dual-hosted git repository.
poorbarcode pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 172a1438adb [fix][broker]Replication is stuck because failed to read
entries (#25625)
172a1438adb is described below
commit 172a1438adb763e87cf48a372e5c54a2b4788b0e
Author: fengyubiao <[email protected]>
AuthorDate: Wed May 6 17:29:57 2026 +0800
[fix][broker]Replication is stuck because failed to read entries (#25625)
(cherry picked from commit efc9299dcd67630373316a9689e06e43bbabe0fc)
---
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 21 ++++++-
.../service/persistent/PersistentReplicator.java | 11 ++--
.../broker/service/OneWayReplicatorTest.java | 73 ++++++++++++++++++++++
...OneWayReplicatorUsingGlobalPartitionedTest.java | 6 ++
.../service/OneWayReplicatorUsingGlobalZKTest.java | 6 ++
5 files changed, 111 insertions(+), 6 deletions(-)
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index b3201a6def5..3804ff7ade9 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
@@ -87,6 +88,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import lombok.Cleanup;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -168,7 +170,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}
- private void makeAddEntryTimeout(ManagedLedgerImpl ml, AtomicBoolean
addEntryFinished) throws Exception {
+ public static void makeAddEntryTimeout(ManagedLedgerImpl ml, AtomicBoolean
addEntryFinished) throws Exception {
LedgerHandle currentLedger = ml.currentLedger;
final LedgerHandle spyLedgerHandle = spy(currentLedger);
doAnswer(invocation -> {
@@ -184,6 +186,23 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ml.currentLedger = spyLedgerHandle;
}
+ public static void makeReadEntryProbFail(ManagedLedgerImpl ml,
Supplier<ManagedLedgerException> errorOrNot)
+ throws Exception {
+ ml.entryCache.clear();
+ LedgerHandle currentLedger = ml.currentLedger;
+ final LedgerHandle spyLedgerHandle = spy(currentLedger);
+ doAnswer(invocation -> {
+ long ledgerId = (long) invocation.getArguments()[0];
+ long entryId = (long) invocation.getArguments()[1];
+ ManagedLedgerException mightError = errorOrNot.get();
+ if (mightError != null) {
+ return CompletableFuture.failedFuture(mightError);
+ }
+ return currentLedger.readUnconfirmedAsync(ledgerId, entryId);
+ }).when(spyLedgerHandle).readUnconfirmedAsync(anyLong(), anyLong());
+ ml.currentLedger = spyLedgerHandle;
+ }
+
@Data
private static class DeleteLedgerInfo{
volatile boolean hasCalled;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index e0a31476fc9..fc338c02e4d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -496,6 +496,7 @@ public abstract class PersistentReplicator extends
AbstractReplicator
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object
ctx) {
+ InFlightTask inFlightTask = (InFlightTask) ctx;
if (state != Started) {
log.info("[{}] Replicator was disconnected while reading entries."
+ " Stop reading. Replicator state: {}",
@@ -516,14 +517,14 @@ public abstract class PersistentReplicator extends
AbstractReplicator
terminate();
return;
} else if (!(exception instanceof TooManyRequestsException)) {
+ inFlightTask.setEntries(Collections.emptyList());
log.error("[{}] Error reading entries at {}. Retrying to read in
{}s. ({})",
replicatorId, ctx, waitTimeMillis / 1000.0,
exception.getMessage(), exception);
} else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Throttled by bookies while reading at {}.
Retrying to read in {}s. ({})",
- replicatorId, ctx, waitTimeMillis / 1000.0,
exception.getMessage(),
- exception);
- }
+ inFlightTask.setEntries(Collections.emptyList());
+ log.debug("[{}] Throttled by bookies while reading at {}. Retrying
to read in {}s. ({})",
+ replicatorId, ctx, waitTimeMillis / 1000.0,
exception.getMessage(),
+ exception);
}
brokerService.executor().schedule(this::readMoreEntries,
waitTimeMillis, TimeUnit.MILLISECONDS);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index aa9b7053719..644bb1f2b1d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -70,6 +70,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerTest;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.resources.ClusterResources;
@@ -103,6 +104,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -440,6 +442,77 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
});
}
+ @Test(timeOut = 45 * 1000)
+ public void testProbBKErrorWhenReplicating() throws Exception {
+ // creates topics.
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ nonReplicatedNamespace + "/tp_");
+ final String subscription = "s1";
+ final int totalMsg = 10_000;
+ admin1.topics().createNonPartitionedTopic(topicName);
+ admin2.topics().createNonPartitionedTopic(topicName);
+ RetentionPolicies retentionPolicies = new RetentionPolicies(10, -1);
+ admin1.topicPolicies().setRetention(topicName, retentionPolicies);
+ admin2.topicPolicies().setRetention(topicName, retentionPolicies);
+ PersistentTopic topic1 = (PersistentTopic) broker1.getTopic(topicName,
false).join().get();
+ ManagedLedgerImpl ml1 = (ManagedLedgerImpl) topic1.getManagedLedger();
+ PersistentTopic topic2 = (PersistentTopic) broker2.getTopic(topicName,
false).join().get();
+ Awaitility.await().untilAsserted(() -> {
+ HierarchyTopicPolicies policies1 =
topic1.getHierarchyTopicPolicies();
+ HierarchyTopicPolicies policies2 =
topic2.getHierarchyTopicPolicies();
+
assertEquals(policies1.getRetentionPolicies().get().getRetentionTimeInMinutes(),
10);
+
assertEquals(policies2.getRetentionPolicies().get().getRetentionTimeInMinutes(),
10);
+ });
+ // Publishes messages.
+ Producer<String> producer1 =
client1.newProducer(Schema.STRING).topic(topicName).create();
+ Set<String> msgPublished = new HashSet<>();
+ for (int i = 0; i < totalMsg; i++) {
+ msgPublished.add("msg" + i);
+ producer1.send("msg" + i);
+ }
+
+ // Inject a probable error.
+ AtomicInteger roundrobin = new AtomicInteger();
+ Supplier<ManagedLedgerException> bkErrorOrNot = () -> {
+ if (roundrobin.incrementAndGet() % 2 == 0) {
+ return null;
+ }
+ return new ManagedLedgerException.TooManyRequestsException("mocked
error");
+ };
+ ManagedLedgerTest.makeReadEntryProbFail(ml1, bkErrorOrNot);
+
+ // Verify: the replication will finish even though received
ManagedLedgerException.TooManyRequestsException.
+ pulsar1.getConfig().setReplicationStartAt("earliest");
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1, cluster2));
+ waitReplicatorStarted(topicName);
+
Awaitility.await().atMost(Duration.ofSeconds(600)).pollInterval(Duration.ofSeconds(1)).untilAsserted(()
-> {
+ TopicStats topicStats = admin1.topics().getStats(topicName);
+
assertEquals(topicStats.getReplication().get(cluster2).getReplicationBacklog(),
0);
+ });
+
+ // Verify: messages were replicated.
+ admin2.topics().createSubscription(topicName, subscription,
MessageId.earliest);
+ Set<String> received = new HashSet<>();
+ Consumer<String> consumer2 = client2.newConsumer(Schema.STRING)
+ .subscriptionName(subscription).topic(topicName).subscribe();
+ while (true) {
+ Message<String> msg = consumer2.receive(2, TimeUnit.SECONDS);
+ if (msg == null) {
+ break;
+ }
+ received.add(msg.getValue());
+ }
+ assertEquals(received.size(), msgPublished.size());
+ assertEquals(received, msgPublished);
+
+ // cleanup.
+ producer1.close();
+ consumer2.close();
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1));
+ waitReplicatorStopped(topicName, false);
+ admin1.topics().delete(topicName);
+ admin2.topics().delete(topicName);
+ }
+
/**
* Since {@link NonPersistentReplicator} never implement the rate
limitation, the config
* "replicationProducerQueueSize" should not affect {@link
NonPersistentReplicator}.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index 3b719551b95..8c1ae9ea08b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -102,6 +102,12 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
super.testReplicatorProducerStatInTopic();
}
+ @Override
+ @Test(enabled = false)
+ public void testProbBKErrorWhenReplicating() throws Exception {
+ super.testProbBKErrorWhenReplicating();
+ }
+
@Override
@Test(enabled = false)
public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws
Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 4a0d2131000..2ccbc56df13 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -310,6 +310,12 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
super.testReplicatorProducerStatInTopic();
}
+ @Override
+ @Test(enabled = false)
+ public void testProbBKErrorWhenReplicating() throws Exception {
+ super.testProbBKErrorWhenReplicating();
+ }
+
@Test(enabled = false)
public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws
Exception {
super.testReplicatorProducerStatInTopic();