This is an automated email from the ASF dual-hosted git repository.
poorbarcode pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new efc9299dcd6 [fix][broker]Replication is stuck because failed to read
entries (#25625)
efc9299dcd6 is described below
commit efc9299dcd67630373316a9689e06e43bbabe0fc
Author: fengyubiao <[email protected]>
AuthorDate: Wed May 6 00:42:40 2026 +0800
[fix][broker]Replication is stuck because failed to read entries (#25625)
---
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 21 ++++++-
.../service/persistent/PersistentReplicator.java | 3 +
.../broker/service/OneWayReplicatorTest.java | 72 ++++++++++++++++++++++
...OneWayReplicatorUsingGlobalPartitionedTest.java | 6 ++
.../service/OneWayReplicatorUsingGlobalZKTest.java | 6 ++
5 files changed, 107 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 7e756f1d994..ad2a2a79d3f 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
@@ -87,6 +88,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import lombok.Cleanup;
import lombok.CustomLog;
import lombok.Data;
@@ -167,7 +169,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}
- private void makeAddEntryTimeout(ManagedLedgerImpl ml, AtomicBoolean
addEntryFinished) throws Exception {
+ public static void makeAddEntryTimeout(ManagedLedgerImpl ml, AtomicBoolean
addEntryFinished) throws Exception {
LedgerHandle currentLedger = ml.currentLedger;
final LedgerHandle spyLedgerHandle = spy(currentLedger);
doAnswer(invocation -> {
@@ -183,6 +185,23 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ml.currentLedger = spyLedgerHandle;
}
+ public static void makeReadEntryProbFail(ManagedLedgerImpl ml,
Supplier<ManagedLedgerException> errorOrNot)
+ throws Exception {
+ ml.entryCache.clear();
+ LedgerHandle currentLedger = ml.currentLedger;
+ final LedgerHandle spyLedgerHandle = spy(currentLedger);
+ doAnswer(invocation -> {
+ long ledgerId = (long) invocation.getArguments()[0];
+ long entryId = (long) invocation.getArguments()[1];
+ ManagedLedgerException mightError = errorOrNot.get();
+ if (mightError != null) {
+ return CompletableFuture.failedFuture(mightError);
+ }
+ return currentLedger.readUnconfirmedAsync(ledgerId, entryId);
+ }).when(spyLedgerHandle).readUnconfirmedAsync(anyLong(), anyLong());
+ ml.currentLedger = spyLedgerHandle;
+ }
+
@Data
private static class DeleteLedgerInfo{
volatile boolean hasCalled;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 0a6caea067c..bd68d2fb980 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -494,6 +494,7 @@ public abstract class PersistentReplicator extends
AbstractReplicator
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object
ctx) {
+ InFlightTask inFlightTask = (InFlightTask) ctx;
if (state != Started) {
log.info("Replicator was disconnected while reading entries,
stopping reads");
return;
@@ -514,12 +515,14 @@ public abstract class PersistentReplicator extends
AbstractReplicator
terminate();
return;
} else if (!(exception instanceof TooManyRequestsException)) {
+ inFlightTask.setEntries(Collections.emptyList());
log.error()
.attr("ctx", ctx)
.attr("waitTimeSec", waitTimeMillis / 1000.0)
.exception(exception)
.log("Error reading entries, retrying");
} else {
+ inFlightTask.setEntries(Collections.emptyList());
log.debug()
.attr("ctx", ctx)
.attr("waitTimeSec", waitTimeMillis / 1000.0)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index b0b12bd8f2c..0645877c144 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -76,6 +76,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerTest;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.resources.ClusterResources;
@@ -648,6 +649,77 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
});
}
+ @Test(timeOut = 45 * 1000)
+ public void testProbBKErrorWhenReplicating() throws Exception {
+ // creates topics.
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ nonReplicatedNamespace + "/tp_");
+ final String subscription = "s1";
+ final int totalMsg = 10_000;
+ admin1.topics().createNonPartitionedTopic(topicName);
+ admin2.topics().createNonPartitionedTopic(topicName);
+ RetentionPolicies retentionPolicies = new RetentionPolicies(10, -1);
+ admin1.topicPolicies().setRetention(topicName, retentionPolicies);
+ admin2.topicPolicies().setRetention(topicName, retentionPolicies);
+ PersistentTopic topic1 = (PersistentTopic) broker1.getTopic(topicName,
false).join().get();
+ ManagedLedgerImpl ml1 = (ManagedLedgerImpl) topic1.getManagedLedger();
+ PersistentTopic topic2 = (PersistentTopic) broker2.getTopic(topicName,
false).join().get();
+ Awaitility.await().untilAsserted(() -> {
+ HierarchyTopicPolicies policies1 =
topic1.getHierarchyTopicPolicies();
+ HierarchyTopicPolicies policies2 =
topic2.getHierarchyTopicPolicies();
+
assertEquals(policies1.getRetentionPolicies().get().getRetentionTimeInMinutes(),
10);
+
assertEquals(policies2.getRetentionPolicies().get().getRetentionTimeInMinutes(),
10);
+ });
+ // Publishes messages.
+ Producer<String> producer1 =
client1.newProducer(Schema.STRING).topic(topicName).create();
+ Set<String> msgPublished = new HashSet<>();
+ for (int i = 0; i < totalMsg; i++) {
+ msgPublished.add("msg" + i);
+ producer1.send("msg" + i);
+ }
+
+ // Inject a probable error.
+ AtomicInteger roundrobin = new AtomicInteger();
+ Supplier<ManagedLedgerException> bkErrorOrNot = () -> {
+ if (roundrobin.incrementAndGet() % 2 == 0) {
+ return null;
+ }
+ return new ManagedLedgerException.TooManyRequestsException("mocked
error");
+ };
+ ManagedLedgerTest.makeReadEntryProbFail(ml1, bkErrorOrNot);
+
+ // Verify: the replication will finish even though received
ManagedLedgerException.TooManyRequestsException.
+ pulsar1.getConfig().setReplicationStartAt("earliest");
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1, cluster2));
+ waitReplicatorStarted(topicName);
+
Awaitility.await().atMost(Duration.ofSeconds(600)).pollInterval(Duration.ofSeconds(1)).untilAsserted(()
-> {
+ TopicStats topicStats = admin1.topics().getStats(topicName);
+
assertEquals(topicStats.getReplication().get(cluster2).getReplicationBacklog(),
0);
+ });
+
+ // Verify: messages were replicated.
+ admin2.topics().createSubscription(topicName, subscription,
MessageId.earliest);
+ Set<String> received = new HashSet<>();
+ Consumer<String> consumer2 = client2.newConsumer(Schema.STRING)
+ .subscriptionName(subscription).topic(topicName).subscribe();
+ while (true) {
+ Message<String> msg = consumer2.receive(2, TimeUnit.SECONDS);
+ if (msg == null) {
+ break;
+ }
+ received.add(msg.getValue());
+ }
+ assertEquals(received.size(), msgPublished.size());
+ assertEquals(received, msgPublished);
+
+ // cleanup.
+ producer1.close();
+ consumer2.close();
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1));
+ waitReplicatorStopped(topicName, false);
+ admin1.topics().delete(topicName);
+ admin2.topics().delete(topicName);
+ }
+
/**
* Since {@link NonPersistentReplicator} never implement the rate
limitation, the config
* "replicationProducerQueueSize" should not affect {@link
NonPersistentReplicator}.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index 7f6124e62a7..f828f92f7db 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -110,6 +110,12 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
super.testReplicatorProducerStatInTopic();
}
+ @Override
+ @Test(enabled = false)
+ public void testProbBKErrorWhenReplicating() throws Exception {
+ super.testProbBKErrorWhenReplicating();
+ }
+
@Override
@Test(enabled = false)
public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws
Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index de7e0cf0a3e..4db26758beb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -321,6 +321,12 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
super.testReplicatorProducerStatInTopic();
}
+ @Override
+ @Test(enabled = false)
+ public void testProbBKErrorWhenReplicating() throws Exception {
+ super.testProbBKErrorWhenReplicating();
+ }
+
@Test(enabled = false)
public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws
Exception {
super.testReplicatorProducerStatInTopic();