chia7712 commented on PR #18089:
URL: https://github.com/apache/kafka/pull/18089#issuecomment-2525265697

   @brandboat there is another inconsistent behavior between classic consumer 
and async consumer. see following test:
   ```java
       @ClusterTest(brokers = 3)
       public void testFetchPartitionsAfterFailedListener(ClusterInstance 
clusterInstance) throws InterruptedException {
           var topic = "topic";
           try (var producer = 
clusterInstance.producer(Map.of(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class,
                   VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class))) {
               producer.send(new ProducerRecord<>(topic, "key".getBytes(), 
"value".getBytes()));
           }
           try (var consumer = 
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
"consumer"))) {
               consumer.subscribe(List.of(topic), new 
ConsumerRebalanceListener() {
                   private int count = 0;
                   @Override
                   public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
                   }
   
                   @Override
                   public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
                       count++;
                       if (count == 1) throw new 
IllegalArgumentException("temporary error");
                   }
               });
   
               // classic consumer passes
               // async consumer fails
               TestUtils.waitForCondition(() -> 
consumer.poll(Duration.ofSeconds(1)).count() == 1,
                       5000,
                       "failed to poll data");
           }
       }
   ```
   
   In the classic consumer, assigned partitions remain fetchable even if the 
onPartitionsAssigned method throws an exception. In contrast, the async 
consumer marks those assigned partitions as non-fetchable, preventing us from 
polling records from them.
   
   As a simple solution, maybe we should set the `pendingOnAssignedCallback` to 
`false` even though the callback is failed.
   
   ```java
       private CompletableFuture<Void> assignPartitions(
               SortedSet<TopicIdPartition> assignedPartitions,
               SortedSet<TopicPartition> addedPartitions) {
         ...
   
           // Invoke user call back.
           CompletableFuture<Void> result = 
signalPartitionsAssigned(addedPartitions);
           result.whenComplete((__, exception) -> {
               subscriptions.enablePartitionsAwaitingCallback(addedPartitions);
           });
      }
   ```
   
   @kirktrue @lianetm FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to