This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 02f9fd3 [pulsar-client] Fix broken replication msg to specific cluster (#4930) 02f9fd3 is described below commit 02f9fd3e055f404501e41413ec1081331ff945c3 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Sun Aug 11 18:41:19 2019 -0700 [pulsar-client] Fix broken replication msg to specific cluster (#4930) --- .../pulsar/broker/service/ReplicatorTest.java | 39 ++++++++++++++++++++++ .../apache/pulsar/common/protocol/Commands.java | 3 ++ 2 files changed, 42 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index eddb7cd..f68214a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -54,12 +54,14 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; @@ -812,6 +814,43 @@ public class ReplicatorTest extends ReplicatorTestBase { } + @Test + public void testReplicatedCluster() throws Exception { + + log.info("--- Starting ReplicatorTest::testReplicatedCluster ---"); + + final String namespace = "pulsar/global/repl"; + final String topicName = String.format("persistent://%s/topic1", namespace); + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3")); + admin1.topics().createPartitionedTopic(topicName, 4); + + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create(); + org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe(); + org.apache.pulsar.client.api.Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName("s1").subscribe(); + byte[] value = "test".getBytes(); + + // publish message local only + TypedMessageBuilder<byte[]> msg = producer1.newMessage().replicationClusters(Lists.newArrayList("r1")).value(value); + msg.send(); + assertEquals(consumer1.receive().getValue(), value); + + Message<byte[]> msg2 = consumer2.receive(1, TimeUnit.SECONDS); + if (msg2 != null) { + fail("msg should have not been replicated to remote cluster"); + } + + consumer1.close(); + consumer2.close(); + producer1.close(); + + } + private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 7d9de6f..70ac8e4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1420,6 +1420,9 @@ public class Commands { if (builder.hasReplicatedFrom()) { messageMetadata.setReplicatedFrom(builder.getReplicatedFrom()); } + if (builder.getReplicateToCount() > 0) { + messageMetadata.addAllReplicateTo(builder.getReplicateToList()); + } if (builder.hasSchemaVersion()) { messageMetadata.setSchemaVersion(builder.getSchemaVersion()); }