This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 02f9fd3  [pulsar-client] Fix broken replication msg to specific 
cluster (#4930)
02f9fd3 is described below

commit 02f9fd3e055f404501e41413ec1081331ff945c3
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Sun Aug 11 18:41:19 2019 -0700

    [pulsar-client] Fix broken replication msg to specific cluster (#4930)
---
 .../pulsar/broker/service/ReplicatorTest.java      | 39 ++++++++++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    |  3 ++
 2 files changed, 42 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index eddb7cd..f68214a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -54,12 +54,14 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -812,6 +814,43 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
     }
 
+    @Test
+    public void testReplicatedCluster() throws Exception {
+
+        log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
+
+        final String namespace = "pulsar/global/repl";
+        final String topicName = String.format("persistent://%s/topic1", 
namespace);
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2", "r3"));
+        admin1.topics().createPartitionedTopic(topicName, 4);
+
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, 
TimeUnit.SECONDS)
+                .build();
+        PulsarClient client2 = 
PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, 
TimeUnit.SECONDS)
+                .build();
+
+        Producer<byte[]> producer1 = 
client1.newProducer().topic(topicName).create();
+        org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = 
client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
+        org.apache.pulsar.client.api.Consumer<byte[]> consumer2 = 
client2.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
+        byte[] value = "test".getBytes();
+
+        // publish message local only
+        TypedMessageBuilder<byte[]> msg = 
producer1.newMessage().replicationClusters(Lists.newArrayList("r1")).value(value);
+        msg.send();
+        assertEquals(consumer1.receive().getValue(), value);
+
+        Message<byte[]> msg2 = consumer2.receive(1, TimeUnit.SECONDS);
+        if (msg2 != null) {
+            fail("msg should have not been replicated to remote cluster");
+        }
+        
+        consumer1.close();
+        consumer2.close();
+        producer1.close();
+
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ReplicatorTest.class);
 
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 7d9de6f..70ac8e4 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1420,6 +1420,9 @@ public class Commands {
         if (builder.hasReplicatedFrom()) {
             messageMetadata.setReplicatedFrom(builder.getReplicatedFrom());
         }
+        if (builder.getReplicateToCount() > 0) {
+            messageMetadata.addAllReplicateTo(builder.getReplicateToList());
+        }
         if (builder.hasSchemaVersion()) {
             messageMetadata.setSchemaVersion(builder.getSchemaVersion());
         }

Reply via email to