JeffBolle opened a new issue, #19457: URL: https://github.com/apache/pulsar/issues/19457
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version Linux java 17, Pulsar 2.11.0 ### Minimal reproduce step I wrote the following test case assuming a configured PulsarClient and PulsarAdministration (only to create the topic) ```java @Test public void reproduceDuplicateMessageWarning() throws Exception { String topicName = "topic-1"; int partitions = 10; pulsarAdministration.createAdminClient().topics().createPartitionedTopic(topicName, partitions); try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING) .topic(topicName) .accessMode(ProducerAccessMode.Shared) .batcherBuilder(BatcherBuilder.KEY_BASED) .enableChunking(false) .autoUpdatePartitions(true) .blockIfQueueFull(true) .enableBatching(true) .compressionType(CompressionType.ZSTD) .batchingMaxMessages(20_000) .batchingMaxPublishDelay(1_000, TimeUnit.MILLISECONDS) .maxPendingMessagesAcrossPartitions(500_000) .batchingMaxBytes(1024 * 1024) .create() ) { List<String> keys = IntStream.range(0, 100).mapToObj(Integer::toString).toList(); List<CompletableFuture<MessageId>> futList = IntStream.range(0, 10_000).parallel() .mapToObj(i -> { TypedMessageBuilder<String> mbs = producer.newMessage(Schema.STRING); String value = "pushing obj number " + i; String key = keys.get((i % 100)); mbs.value(value) .key(key) .orderingKey(key.getBytes(StandardCharsets.UTF_8)) .eventTime(Instant.now().toEpochMilli()); return mbs.sendAsync(); }) .toList(); CompletableFuture.allOf(futList.toArray(new CompletableFuture[0])).get(); } } ``` I have observed these same errors inside my pulsar functions as well (though since that is just using the java Pulsar Client, that isn't surprising). ### What did you expect to see? The messages cleanly produced to the specified topic with only the logging metrics. ### What did you see instead? A lot of logging that seems to imply there is a problem with the published messages: ``` 2023-02-07 18:36:05.800 WARN 1146756 --- [r-client-io-1-5] o.apache.pulsar.client.impl.ClientCnx : [id: 0x9961cedd, L:/127.0.0.1:52778 - R:localhost/127.0.0.1:32844] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time 2023-02-07 18:36:05.801 INFO 1146756 --- [onPool-worker-1] o.a.pulsar.client.impl.ProducerImpl : Message with sequence id 7 might be a duplicate but cannot be determined at this time. 2023-02-07 18:36:05.801 INFO 1146756 --- [onPool-worker-5] o.a.pulsar.client.impl.ProducerImpl : Message with sequence id 9 might be a duplicate but cannot be determined at this time. 2023-02-07 18:36:05.801 WARN 1146756 --- [r-client-io-1-5] o.apache.pulsar.client.impl.ClientCnx : [id: 0x9961cedd, L:/127.0.0.1:52778 ! R:localhost/127.0.0.1:32844] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time 2023-02-07 18:36:05.801 WARN 1146756 --- [r-client-io-1-5] o.apache.pulsar.client.impl.ClientCnx : [id: 0x9961cedd, L:/127.0.0.1:52778 ! R:localhost/127.0.0.1:32844] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time 2023-02-07 18:36:05.801 INFO 1146756 --- [nPool-worker-10] o.a.pulsar.client.impl.ProducerImpl : Message with sequence id 6 might be a duplicate but cannot be determined at this time. 2023-02-07 18:36:05.802 INFO 1146756 --- [r-client-io-1-5] o.apache.pulsar.client.impl.ClientCnx : [id: 0x9961cedd, L:/127.0.0.1:52778 ! R:localhost/127.0.0.1:32844] Disconnected 2023-02-07 18:36:05.802 INFO 1146756 --- [nPool-worker-15] o.a.pulsar.client.impl.ProducerImpl : Message with sequence id 8 might be a duplicate but cannot be determined at this time. 2023-02-07 18:36:05.802 INFO 1146756 --- [onPool-worker-2] o.a.pulsar.client.impl.ProducerImpl : Message with sequence id 5 might be a duplicate but cannot be determined at this time. ``` Additionally in my logs I'm seeing messages about the producer connection dropping, and that messages are duplicates: ``` 2023-02-07 18:36:05.967 WARN 1146756 --- [r-client-io-1-1] o.apache.pulsar.client.impl.ClientCnx : [id: 0x9aa71776, L:/127.0.0.1:52782 - R:localhost/127.0.0.1:32844] Message with sequence-id 0 published by producer 6 has been dropped ``` ### Anything else? The "expected" output can be produced by synchronizing access to the producer for the `sendAsync` call. The below version of my test case will produce clean output and seemingly proper results: ```java @Test public void reproduceDuplicateMessageWarning() throws Exception { String topicName = "topic-1"; int partitions = 10; pulsarAdministration.createAdminClient().topics().createPartitionedTopic(topicName, partitions); try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING) .topic(topicName) .accessMode(ProducerAccessMode.Shared) .batcherBuilder(BatcherBuilder.KEY_BASED) .enableChunking(false) .autoUpdatePartitions(true) .blockIfQueueFull(true) .enableBatching(true) .compressionType(CompressionType.ZSTD) .batchingMaxMessages(20_000) .batchingMaxPublishDelay(1_000, TimeUnit.MILLISECONDS) .maxPendingMessagesAcrossPartitions(500_000) .batchingMaxBytes(1024 * 1024) .create() ) { List<String> keys = IntStream.range(0, 100).mapToObj(Integer::toString).toList(); List<CompletableFuture<MessageId>> futList = IntStream.range(0, 10_000).parallel() .mapToObj(i -> { TypedMessageBuilder<String> mbs = producer.newMessage(Schema.STRING); String value = "pushing obj number " + i; String key = keys.get((i % 100)); mbs.value(value) .key(key) .orderingKey(key.getBytes(StandardCharsets.UTF_8)) .eventTime(Instant.now().toEpochMilli()); synchronized (producer) { return mbs.sendAsync(); } }) .toList(); CompletableFuture.allOf(futList.toArray(new CompletableFuture[0])).get(); } } ``` The only difference is the `synchronized` block. For the historical troubleshooting and conversation relating to this issue, please see: https://github.com/spring-projects-experimental/spring-pulsar/issues/310 https://stackoverflow.com/questions/75323708/multithreaded-use-of-spring-pulsar Additionally I am available on the Pulsar Slack. I would submit a PR, but after a quick examination of the PulsarClient publishing code, I thought it best to leave addressing this to people much more familiar with that code. ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
