poorbarcode commented on code in PR #22983: URL: https://github.com/apache/pulsar/pull/22983#discussion_r1667078640
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java: ########## @@ -184,8 +187,42 @@ public void startProducer() { } log.info("[{}] Starting replicator", replicatorId); - producerBuilder.createAsync().thenAccept(producer -> { - setProducerAndTriggerReadEntries(producer); + CompletableFuture<Void> checkPartitionsSameFuture = new CompletableFuture<>(); + replicationClient.getPartitionedTopicMetadata(remoteTopicName, false).thenAccept(metadata -> { + // If there is an exists partitioned topic on the remote cluster, report an error. + if (metadata.partitions != 0) { + log.error("[{}] The partitions count between two clusters is not the same(remote partitions: {})," + + " the replicator can not be created successfully: {}", replicatorId, metadata.partitions, + state); + // This exception will be caught below, so it can be any typed. + checkPartitionsSameFuture.completeExceptionally(new RuntimeException(replicatorId + + "Can not replicate data to a partitioned topic.")); + } else { + checkPartitionsSameFuture.complete(null); + } + }).exceptionally(ex -> { + Throwable actEx = FutureUtil.unwrapCompletionException(ex); + if (actEx instanceof PulsarClientException.NotFoundException + || actEx instanceof PulsarClientException.TopicDoesNotExistException + || actEx instanceof PulsarAdminException.NotFoundException) { + // These 3 error means the topic has not been created on the remote cluster yet, and the current + // replicator will trigger an event to create it. So it is okay. + checkPartitionsSameFuture.complete(null); + } else { + log.warn("[{}] Failed to create remote producer due to get partitioned metadata failed", + replicatorId, ex); + checkPartitionsSameFuture.completeExceptionally(ex); + } + return null; + }); + checkPartitionsSameFuture.thenCompose(metadata -> { + // Force only replicate messages to a non-partitioned topic, to avoid auto-create a partitioned topic on + // the remote cluster. + ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder; + builderImpl.getConf().setForceOnoPartitioned(true); Review Comment: > Would you send the message to one topic? and skip the metadata fetching? Yes. The internal producer will connect to the topic if it already exists in the remote cluster, and trigger a non-partitioned topic's creation if it does not exist in the remote cluster. > This looks like a hack code. There are other scenarios that need this config also, such as Flink and Sink connectors. We can add this config internally for fixing the bug for Replication, and make it public in the future for other scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org