BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059323030


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> 
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new 
CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that 
represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> 
batchMessageToAcker =

Review Comment:
   I think it's a very hacky to acknowledge the rest messages in a batch after 
seek. Just like the example I mentioned, if you think that's message lost, 
users can easily simulate the **message lost** like:
   
   ```java
       private Consumer<String> subscribe(String topic) throws Exception {
           return pulsarClient.newConsumer(Schema.STRING)
                   .topic(topic)
                   .subscriptionName("test")
                   .subscriptionType(SubscriptionType.Shared)
                   .isAckReceiptEnabled(true)
                   .subscribe();
       }
   
       @Test
       public void testAckAfterSeek() throws Exception {
           final String topic = "testAckAfterSeek";
   
           Consumer<String> consumer = subscribe(topic);
   
           @Cleanup
           Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                   .topic(topic)
                   .batchingMaxPublishDelay(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS)
                   .create();
   
           final int messages = 10;
           for (int i = 0; i < messages; i++) {
               producer.sendAsync("New message - " + i);
           }
           producer.flush();
   
           final List<MessageId> messageIdList = new ArrayList<>();
           for (int i = 0; i < messages; i++) {
               final MessageId messageId = consumer.receive().getMessageId();
               messageIdList.add(messageId);
               if (i != 4) {
                   consumer.acknowledge(messageId);
               }
           }
           consumer.seek(MessageId.earliest);
           Thread.sleep(100);
           consumer.acknowledge(messageIdList.get(4));
           consumer.close();
           consumer = subscribe(topic);
           final Message<String> message = consumer.receive(1, 
TimeUnit.SECONDS);
           // NOTE: message is null
   ```
   
   How could you explain to users that you should not acknowledge 
`messageIdList.get(4)`, otherwise message lost will happen? BTW, if users left 
one message ID (`messageIdList.get(4)`) to acknowledge after `seek` and they 
didn't acknowledge other message IDs, what will they expect? Did they expect 
messages could be received again?
   
   In short, I think we should not handle this corner case. It's more like a 
"bug" on purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to