chia7712 commented on PR #18089:
URL: https://github.com/apache/kafka/pull/18089#issuecomment-2525265697
@brandboat there is another inconsistent behavior between classic consumer
and async consumer. see following test:
```java
@ClusterTest(brokers = 3)
public void testFetchPartitionsAfterFailedListener(ClusterInstance
clusterInstance) throws InterruptedException {
var topic = "topic";
try (var producer =
clusterInstance.producer(Map.of(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class,
VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class))) {
producer.send(new ProducerRecord<>(topic, "key".getBytes(),
"value".getBytes()));
}
try (var consumer =
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
"consumer"))) {
consumer.subscribe(List.of(topic), new
ConsumerRebalanceListener() {
private int count = 0;
@Override
public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
count++;
if (count == 1) throw new
IllegalArgumentException("temporary error");
}
});
// classic consumer passes
// async consumer fails
TestUtils.waitForCondition(() ->
consumer.poll(Duration.ofSeconds(1)).count() == 1,
5000,
"failed to poll data");
}
}
```
In the classic consumer, assigned partitions remain fetchable even if the
onPartitionsAssigned method throws an exception. In contrast, the async
consumer marks those assigned partitions as non-fetchable, preventing us from
polling records from them.
As a simple solution, maybe we should set the `pendingOnAssignedCallback` to
`false` even though the callback is failed.
```java
private CompletableFuture<Void> assignPartitions(
SortedSet<TopicIdPartition> assignedPartitions,
SortedSet<TopicPartition> addedPartitions) {
...
// Invoke user call back.
CompletableFuture<Void> result =
signalPartitionsAssigned(addedPartitions);
result.whenComplete((__, exception) -> {
subscriptions.enablePartitionsAwaitingCallback(addedPartitions);
});
}
```
@kirktrue @lianetm FYI
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]