nodece commented on code in PR #22983:
URL: https://github.com/apache/pulsar/pull/22983#discussion_r1667710677


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -184,8 +187,42 @@ public void startProducer() {
         }
 
         log.info("[{}] Starting replicator", replicatorId);
-        producerBuilder.createAsync().thenAccept(producer -> {
-            setProducerAndTriggerReadEntries(producer);
+        CompletableFuture<Void> checkPartitionsSameFuture = new 
CompletableFuture<>();
+        replicationClient.getPartitionedTopicMetadata(remoteTopicName, 
false).thenAccept(metadata -> {
+            // If there is an exists partitioned topic on the remote cluster, 
report an error.
+            if (metadata.partitions != 0) {
+                log.error("[{}] The partitions count between two clusters is 
not the same(remote partitions: {}),"
+                                + " the replicator can not be created 
successfully: {}", replicatorId, metadata.partitions,
+                        state);
+                // This exception will be caught below, so it can be any typed.
+                checkPartitionsSameFuture.completeExceptionally(new 
RuntimeException(replicatorId
+                        + "Can not replicate data to a partitioned topic."));
+            } else {
+                checkPartitionsSameFuture.complete(null);
+            }
+        }).exceptionally(ex -> {
+            Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+            if (actEx instanceof PulsarClientException.NotFoundException
+                    || actEx instanceof 
PulsarClientException.TopicDoesNotExistException
+                    || actEx instanceof 
PulsarAdminException.NotFoundException) {
+                // These 3 error means the topic has not been created on the 
remote cluster yet, and the current
+                // replicator will trigger an event to create it. So it is 
okay.
+                checkPartitionsSameFuture.complete(null);
+            } else {
+                log.warn("[{}] Failed to create remote producer due to get 
partitioned metadata failed",
+                        replicatorId, ex);
+                checkPartitionsSameFuture.completeExceptionally(ex);
+            }
+            return null;
+        });
+        checkPartitionsSameFuture.thenCompose(metadata -> {
+            // Force only replicate messages to a non-partitioned topic, to 
avoid auto-create a partitioned topic on
+            // the remote cluster.
+            ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) 
producerBuilder;
+            builderImpl.getConf().setForceOnoPartitioned(true);

Review Comment:
   `forceNoPartitioned` looks confused, what do you think of 
`skipPartitionedCheck`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to