This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 51e681a9a40 [fix][broker] replication does not work due to the mixed
and repetitive sending of user messages and replication markers (#24453)
51e681a9a40 is described below
commit 51e681a9a403309362a74be3438fbca96f2e6f08
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jul 8 10:58:44 2025 +0800
[fix][broker] replication does not work due to the mixed and repetitive
sending of user messages and replication markers (#24453)
(cherry picked from commit af27c43a72e7f91f9346c572dab5ec999ea6e553)
---
.../service/OneWayReplicatorDeduplicationTest.java | 108 ++++++++++++++++++++-
.../client/impl/GeoReplicationProducerImpl.java | 25 ++++-
2 files changed, 128 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
index edbb6b2c3fb..8a195a5f48f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -50,8 +51,10 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import
org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
@@ -71,6 +74,7 @@ import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse;
import org.apache.pulsar.common.api.proto.CommandGetSchemaResponse;
+import org.apache.pulsar.common.api.proto.CommandSendReceipt;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
@@ -153,17 +157,117 @@ public class OneWayReplicatorDeduplicationTest extends
OneWayReplicatorTestBase
final var replicationClients = brokerService.getReplicationClients();
PulsarClientImpl internalClient = (PulsarClientImpl)
replicationClients.get(cluster2);
PulsarClientImpl injectedClient =
InjectedClientCnxClientBuilder.create(clientBuilder2, clientCnxFactory);
- assertTrue(replicationClients.remove(cluster2, internalClient));
+ if (internalClient != null) {
+ assertTrue(replicationClients.remove(cluster2, internalClient));
+ }
assertNull(replicationClients.putIfAbsent(cluster2, injectedClient));
// Return a cleanup injection task;
return () -> {
assertTrue(replicationClients.remove(cluster2, injectedClient));
- assertNull(replicationClients.putIfAbsent(cluster2,
internalClient));
+ if (internalClient != null) {
+ assertNull(replicationClients.putIfAbsent(cluster2,
internalClient));
+ }
injectedClient.closeAsync();
};
}
+ @Test
+ public void testRepeatedlyPublishMixedMessageAndReplMarkers() throws
Exception {
+ // Inject a mechanism to drop all send receipt, to implement a
scenario: the messages of the internal producer
+ // of the replicator will be resent after a disconnection.
+ AtomicBoolean stuckSendReceipt = new AtomicBoolean(true);
+ Runnable cleanInjection = injectReplicatorClientCnx((conf,
eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) {
+ @Override
+ protected void handleSendReceipt(CommandSendReceipt sendReceipt) {
+ if (stuckSendReceipt.get()) {
+ // discard all send receipt, to make the producer
republish.
+ } else {
+ super.handleSendReceipt(sendReceipt);
+ }
+ }
+ });
+
+ // Create topics.
+ // - Enable deduplication on the remote cluster.
+ // - Enable replicated subscription.
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
+ final String subscription = "s1";
+ admin1.topics().createNonPartitionedTopic(topicName);
+ Consumer<Integer> consumer1 =
client1.newConsumer(Schema.INT32).topic(topicName).subscriptionName(subscription)
+ .replicateSubscriptionState(true).subscribe();
+ admin2.topics().createSubscription(topicName, subscription,
MessageId.earliest);
+ consumer1.close();
+ waitReplicatorStarted(topicName);
+ PersistentTopic persistentTopic1 =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+ PersistentTopic persistentTopic2 =
+ (PersistentTopic)
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+ admin2.topicPolicies().setDeduplicationStatus(topicName, true);
+ Awaitility.await().untilAsserted(() -> {
+ MessageDeduplication messageDeduplication2 =
persistentTopic2.getMessageDeduplication();
+ assertEquals(String.valueOf(messageDeduplication2.getStatus()),
"Enabled");
+ });
+
assertTrue(persistentTopic1.getSubscriptions().get(subscription).isReplicated());
+
assertTrue(persistentTopic1.getReplicatedSubscriptionController().isPresent());
+ ReplicatedSubscriptionsController replicatedSubscriptionsController =
+ persistentTopic1.getReplicatedSubscriptionController().get();
+ admin2.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1, cluster2));
+
+ // Publish mixed user messages and replicated markers/
+ // - Publish a user msg.
+ // - Replicated marker will be sent internally per seconds
+ Producer<Integer> producer =
client1.newProducer(Schema.INT32).topic(topicName).create();
+ int messageCount = 15;
+ for (int i = 0; i < messageCount; i++) {
+ producer.send(i);
+ log.info("Sent message: {}", i);
+ // The replicated marker will be sent internally per seconds. We
wait 1.2s here, a replicated marker will
+ // be sent.
+ Thread.sleep(1200);
+ log.info("latest replication snapshot: {}",
replicatedSubscriptionsController.getLastCompletedSnapshotId());
+ }
+ producer.close();
+
+ //
+ GeoPersistentReplicator replicator = (GeoPersistentReplicator)
persistentTopic1.getReplicators().get(cluster2);
+ long backlog =
replicator.getCursor().getNumberOfEntriesInBacklog(true);
+ Awaitility.await().untilAsserted(() -> {
+ log.info("replication backlog: {}", backlog);
+ assertTrue(backlog >= messageCount * 2);
+ });
+
+ // 1. Remove the injection, which discarded send receipt of the
internal producer of the replicator.
+ // 2. Trigger a reconnection of the internal producer of the
replicator.
+ // The internal producer will retry to publish.
+ stuckSendReceipt.set(false);
+ replicator.producer.getClientCnx().ctx().channel().close();
+ Producer<Integer> producer2 =
client1.newProducer(Schema.INT32).topic(topicName).create();
+ producer2.close();
+ waitReplicatorStarted(topicName);
+
+ // Verify: all messages are sent, and no duplicated messages.
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
+
assertEquals(replicator.getCursor().getNumberOfEntriesInBacklog(true), 0);
+ });
+ Consumer<Integer> consumer2 =
client2.newConsumer(Schema.INT32).topic(topicName).subscriptionName(subscription)
+ .replicateSubscriptionState(true).subscribe();
+ for (int i = 0; i < messageCount; i++) {
+ Message<Integer> msg = consumer2.receive(2, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ log.info("Received message: {}", msg.getValue());
+ assertEquals(msg.getValue(), i);
+ }
+
+ // cleanup.
+ cleanInjection.run();
+ admin1.topics().deleteSubscription(topicName, subscription);
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1));
+ admin2.topics().setReplicationClusters(topicName,
Arrays.asList(cluster2));
+ admin1.topics().unload(topicName);
+ admin2.topics().unload(topicName);
+ }
+
@DataProvider(name = "deduplicationArgs")
public Object[][] deduplicationArgs() {
return new Object[][] {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
index 2595ce4ba61..b5d6afc5432 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.api.proto.KeyValue;
+import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Markers;
@@ -160,9 +161,27 @@ public class GeoReplicationProducerImpl extends
ProducerImpl{
return;
}
- // Case-4: Unexpected
- // 4-1: got null source cluster's entry position, which is
unexpected.
- // 4-2: unknown error, which is unexpected.
+ // Case-4: Received an out-of-order msg send receipt, and the first
item in pending queue is a marker.
+ if (op.msg.getMessageBuilder().hasMarkerType()
+ &&
Markers.isReplicationMarker(op.msg.getMessageBuilder().getMarkerType())) {
+ log.warn("[{}] [{}] Received an out-of-order msg send receipt
because enabled replicated subscription,"
+ + " which is expected, it always happens when repeatedly
publishing a pair of mixed"
+ + " replication marker messages and user messages."
+ + " source position {}:{}, pending send[marker type: {}]:
{}:{}, latest persisted: {}:{}."
+ + " Drop the pending publish marker command because it is
a marker and it almost no effect.",
+ topic, producerName, sourceLId, sourceEId,
+
MarkerType.valueOf(op.msg.getMessageBuilder().getMarkerType()), pendingLId,
pendingEId,
+ lastPersistedSourceLedgerId, lastPersistedSourceEntryId);
+ // Drop pending marker. The next ack receipt of this marker
message will be dropped after it come in.
+ ackReceivedReplMarker(cnx, op, op.sequenceId, -1 /*non-batch
message*/, -1, -1);
+ // Handle the current send receipt.
+ ackReceived(cnx, sourceLId, sourceEId, targetLId, targetEid);
+ return;
+ }
+
+ // Case-5: Unexpected
+ // 5-1: got null source cluster's entry position, which is
unexpected.
+ // 5-2: unknown error, which is unexpected.
log.error("[{}] [{}] Received an msg send receipt[error]: source entry
{}:{}, target entry: {}:{},"
+ " pending send: {}:{}, latest persisted: {}:{}, queue-size:
{}",
topic, producerName, sourceLId, sourceEId, targetLId,
targetEid, pendingLId, pendingEId,