nodece commented on code in PR #22983: URL: https://github.com/apache/pulsar/pull/22983#discussion_r1666970494
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java: ########## @@ -184,8 +187,42 @@ public void startProducer() { } log.info("[{}] Starting replicator", replicatorId); - producerBuilder.createAsync().thenAccept(producer -> { - setProducerAndTriggerReadEntries(producer); + CompletableFuture<Void> checkPartitionsSameFuture = new CompletableFuture<>(); + replicationClient.getPartitionedTopicMetadata(remoteTopicName, false).thenAccept(metadata -> { + // If there is an exists partitioned topic on the remote cluster, report an error. + if (metadata.partitions != 0) { + log.error("[{}] The partitions count between two clusters is not the same(remote partitions: {})," + + " the replicator can not be created successfully: {}", replicatorId, metadata.partitions, + state); + // This exception will be caught below, so it can be any typed. + checkPartitionsSameFuture.completeExceptionally(new RuntimeException(replicatorId + + "Can not replicate data to a partitioned topic.")); + } else { + checkPartitionsSameFuture.complete(null); + } + }).exceptionally(ex -> { + Throwable actEx = FutureUtil.unwrapCompletionException(ex); + if (actEx instanceof PulsarClientException.NotFoundException + || actEx instanceof PulsarClientException.TopicDoesNotExistException + || actEx instanceof PulsarAdminException.NotFoundException) { + // These 3 error means the topic has not been created on the remote cluster yet, and the current + // replicator will trigger an event to create it. So it is okay. + checkPartitionsSameFuture.complete(null); + } else { + log.warn("[{}] Failed to create remote producer due to get partitioned metadata failed", + replicatorId, ex); + checkPartitionsSameFuture.completeExceptionally(ex); + } + return null; + }); + checkPartitionsSameFuture.thenCompose(metadata -> { + // Force only replicate messages to a non-partitioned topic, to avoid auto-create a partitioned topic on + // the remote cluster. + ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder; + builderImpl.getConf().setForceOnoPartitioned(true); Review Comment: This looks like a hack code. Would you send the message to one topic? and skip the metadata fetching? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org