JeffBolle opened a new issue, #19457:
URL: https://github.com/apache/pulsar/issues/19457

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   Linux java 17, Pulsar 2.11.0 
   
   ### Minimal reproduce step
   
   I wrote the following test case assuming a configured PulsarClient and 
PulsarAdministration (only to create the topic)
   
   ```java
   @Test
       public void reproduceDuplicateMessageWarning() throws Exception {
           String topicName = "topic-1";
           int partitions = 10;
           
pulsarAdministration.createAdminClient().topics().createPartitionedTopic(topicName,
 partitions);
           try (Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
                   .topic(topicName)
                   .accessMode(ProducerAccessMode.Shared)
                   .batcherBuilder(BatcherBuilder.KEY_BASED)
                   .enableChunking(false)
                   .autoUpdatePartitions(true)
                   .blockIfQueueFull(true)
                   .enableBatching(true)
                   .compressionType(CompressionType.ZSTD)
                   .batchingMaxMessages(20_000)
                   .batchingMaxPublishDelay(1_000, TimeUnit.MILLISECONDS)
                   .maxPendingMessagesAcrossPartitions(500_000)
                   .batchingMaxBytes(1024 * 1024)
                   .create() ) {
               List<String> keys = IntStream.range(0, 
100).mapToObj(Integer::toString).toList();
               List<CompletableFuture<MessageId>> futList = IntStream.range(0, 
10_000).parallel()
                       .mapToObj(i -> {
                           TypedMessageBuilder<String> mbs = 
producer.newMessage(Schema.STRING);
                           String value = "pushing obj number " + i;
                           String key = keys.get((i % 100));
                           mbs.value(value)
                                   .key(key)
                                   
.orderingKey(key.getBytes(StandardCharsets.UTF_8))
                                   .eventTime(Instant.now().toEpochMilli());
                           return mbs.sendAsync();
   
                       })
                       .toList();
               CompletableFuture.allOf(futList.toArray(new 
CompletableFuture[0])).get();
           }
       }
   ```
   
   I have observed these same errors inside my pulsar functions as well (though 
since that is just using the java Pulsar Client, that isn't surprising).
   
   ### What did you expect to see?
   
   The messages cleanly produced to the specified topic with only the logging 
metrics.
   
   
   ### What did you see instead?
   
   A lot of logging that seems to imply there is a problem with the published 
messages:
   
   ```
   2023-02-07 18:36:05.800  WARN 1146756 --- [r-client-io-1-5] 
o.apache.pulsar.client.impl.ClientCnx    : [id: 0x9961cedd, L:/127.0.0.1:52778 
- R:localhost/127.0.0.1:32844] Received send error from server: 
PersistenceError : Cannot determine whether the message is a duplicate at this 
time
   2023-02-07 18:36:05.801  INFO 1146756 --- [onPool-worker-1] 
o.a.pulsar.client.impl.ProducerImpl      : Message with sequence id 7 might be 
a duplicate but cannot be determined at this time.
   2023-02-07 18:36:05.801  INFO 1146756 --- [onPool-worker-5] 
o.a.pulsar.client.impl.ProducerImpl      : Message with sequence id 9 might be 
a duplicate but cannot be determined at this time.
   2023-02-07 18:36:05.801  WARN 1146756 --- [r-client-io-1-5] 
o.apache.pulsar.client.impl.ClientCnx    : [id: 0x9961cedd, L:/127.0.0.1:52778 
! R:localhost/127.0.0.1:32844] Received send error from server: 
PersistenceError : Cannot determine whether the message is a duplicate at this 
time
   2023-02-07 18:36:05.801  WARN 1146756 --- [r-client-io-1-5] 
o.apache.pulsar.client.impl.ClientCnx    : [id: 0x9961cedd, L:/127.0.0.1:52778 
! R:localhost/127.0.0.1:32844] Received send error from server: 
PersistenceError : Cannot determine whether the message is a duplicate at this 
time
   2023-02-07 18:36:05.801  INFO 1146756 --- [nPool-worker-10] 
o.a.pulsar.client.impl.ProducerImpl      : Message with sequence id 6 might be 
a duplicate but cannot be determined at this time.
   2023-02-07 18:36:05.802  INFO 1146756 --- [r-client-io-1-5] 
o.apache.pulsar.client.impl.ClientCnx    : [id: 0x9961cedd, L:/127.0.0.1:52778 
! R:localhost/127.0.0.1:32844] Disconnected
   2023-02-07 18:36:05.802  INFO 1146756 --- [nPool-worker-15] 
o.a.pulsar.client.impl.ProducerImpl      : Message with sequence id 8 might be 
a duplicate but cannot be determined at this time.
   2023-02-07 18:36:05.802  INFO 1146756 --- [onPool-worker-2] 
o.a.pulsar.client.impl.ProducerImpl      : Message with sequence id 5 might be 
a duplicate but cannot be determined at this time.
   ```
   
   Additionally in my logs I'm seeing messages about the producer connection 
dropping, and that messages are duplicates:
   ```
   2023-02-07 18:36:05.967  WARN 1146756 --- [r-client-io-1-1] 
o.apache.pulsar.client.impl.ClientCnx    : [id: 0x9aa71776, L:/127.0.0.1:52782 
- R:localhost/127.0.0.1:32844] Message with sequence-id 0 published by producer 
6 has been dropped
   ```
   
   
   ### Anything else?
   
   The "expected" output can be produced by synchronizing access to the 
producer for the `sendAsync` call. The below version of my test case will 
produce clean output and seemingly proper results:
   
   ```java
   @Test
       public void reproduceDuplicateMessageWarning() throws Exception {
           String topicName = "topic-1";
           int partitions = 10;
           
pulsarAdministration.createAdminClient().topics().createPartitionedTopic(topicName,
 partitions);
           try (Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
                   .topic(topicName)
                   .accessMode(ProducerAccessMode.Shared)
                   .batcherBuilder(BatcherBuilder.KEY_BASED)
                   .enableChunking(false)
                   .autoUpdatePartitions(true)
                   .blockIfQueueFull(true)
                   .enableBatching(true)
                   .compressionType(CompressionType.ZSTD)
                   .batchingMaxMessages(20_000)
                   .batchingMaxPublishDelay(1_000, TimeUnit.MILLISECONDS)
                   .maxPendingMessagesAcrossPartitions(500_000)
                   .batchingMaxBytes(1024 * 1024)
                   .create() ) {
               List<String> keys = IntStream.range(0, 
100).mapToObj(Integer::toString).toList();
               List<CompletableFuture<MessageId>> futList = IntStream.range(0, 
10_000).parallel()
                       .mapToObj(i -> {
                           TypedMessageBuilder<String> mbs = 
producer.newMessage(Schema.STRING);
                           String value = "pushing obj number " + i;
                           String key = keys.get((i % 100));
                           mbs.value(value)
                                   .key(key)
                                   
.orderingKey(key.getBytes(StandardCharsets.UTF_8))
                                   .eventTime(Instant.now().toEpochMilli());
                           synchronized (producer) {
                               return mbs.sendAsync();
                           }
   
                       })
                       .toList();
               CompletableFuture.allOf(futList.toArray(new 
CompletableFuture[0])).get();
           }
       }
   ```
   The only difference is the `synchronized` block.
   
   For the historical troubleshooting and conversation relating to this issue, 
please see:
   https://github.com/spring-projects-experimental/spring-pulsar/issues/310
   
https://stackoverflow.com/questions/75323708/multithreaded-use-of-spring-pulsar
   
   Additionally I am available on the Pulsar Slack.
   I would submit a PR, but after a quick examination of the PulsarClient 
publishing code, I thought it best to leave addressing this to people much more 
familiar with that code.
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to