chia7712 commented on PR #16686:
URL: https://github.com/apache/kafka/pull/16686#issuecomment-2407034568
> Another point that I would like to mention about ConsumerRebalanceListener
is that it is very common to actually call the consumer in the listener to
manually commit offset for instance. As you know, the Consumer is not supposed
to be called from different threads at the moment.
Okay, that's a solid reason to maintain the current behavior. My main point
was to simplify the coordination between the foreground and background threads.
The strict happen-before events don't seem well-suited to the architecture of
the async consumer.
@kirktrue Could you please consider adding tests or reviewing the existing
tests to ensure they cover the listener guarantees? for example:
```java
@ClusterTest(brokers = 3)
@Timeout(60)
public void testConsumerListener(ClusterInstance clusterInstance) throws
InterruptedException {
var threadName = Thread.currentThread().getName() +
"-testConsumerListener";
var s = Executors.newSingleThreadExecutor(r -> new Thread(r,
threadName));
var onPartitionsAssignedLatch = new CountDownLatch(1);
var onPartitionsRevokedThread = new AtomicReference<String>();
var onPartitionsAssignedThread = new AtomicReference<String>();
var onPartitionsRevokedInterrupted = new AtomicBoolean(false);
CompletableFuture.runAsync(() -> {
var consumer = new KafkaConsumer<>(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, "ikea",
ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"), new
ByteArrayDeserializer(), new ByteArrayDeserializer());
try {
consumer.subscribe(List.of("chia"), new
ConsumerRebalanceListener() {
@Override
public void
onPartitionsRevoked(Collection<TopicPartition> partitions) {
onPartitionsRevokedThread.set(Thread.currentThread().getName());
onPartitionsRevokedInterrupted.set(Thread.currentThread().isInterrupted());
}
@Override
public void
onPartitionsAssigned(Collection<TopicPartition> partitions) {
onPartitionsAssignedThread.set(Thread.currentThread().getName());
onPartitionsAssignedLatch.countDown();
}
});
IntStream.range(0, 10).forEach(__ ->
consumer.poll(Duration.ofSeconds(3)));
} finally {
consumer.close(Duration.ofSeconds(0));
}
}, s);
Assertions.assertTrue(onPartitionsAssignedLatch.await(10,
TimeUnit.SECONDS));
s.shutdownNow();
Assertions.assertTrue(s.awaitTermination(10, TimeUnit.SECONDS));
// start to check all guarantees
// 1) onPartitionsRevoked should be executed regardless of timeout
or interruption
Assertions.assertEquals(threadName, onPartitionsRevokedThread.get());
// 2) listener should be executed by foreground thread
Assertions.assertEquals(threadName,
onPartitionsAssignedThread.get());
// 3) listener should be able to see interrupted signal
Assertions.assertTrue(onPartitionsRevokedInterrupted.get());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]