chia7712 commented on code in PR #18089:
URL: https://github.com/apache/kafka/pull/18089#discussion_r1880949459


##########
core/src/test/java/kafka/clients/consumer/ConsumerIntegrationTest.java:
##########
@@ -70,4 +77,52 @@ public void 
testAsyncConsumerWithOldGroupCoordinator(ClusterInstance clusterInst
             }, "Should get UnsupportedVersionException and how to revert to 
classic protocol");
         }
     }
+
+    @ClusterTest(serverProperties = {
+        @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+        @ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+    })
+    public void 
testFetchPartitionsAfterFailedListenerWithGroupProtocolClassic(ClusterInstance 
clusterInstance)
+            throws InterruptedException {
+        testFetchPartitionsAfterFailedListener(clusterInstance, 
GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest(serverProperties = {
+        @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+        @ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+    })
+    public void 
testFetchPartitionsAfterFailedListenerWithGroupProtocolConsumer(ClusterInstance 
clusterInstance)
+            throws InterruptedException {
+        testFetchPartitionsAfterFailedListener(clusterInstance, 
GroupProtocol.CONSUMER);
+    }
+
+    private static void testFetchPartitionsAfterFailedListener(ClusterInstance 
clusterInstance, GroupProtocol groupProtocol)
+            throws InterruptedException {
+        var topic = "topic";
+        try (var producer = clusterInstance.producer(Map.of(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class,
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class))) {
+            producer.send(new ProducerRecord<>(topic, "key".getBytes(), 
"value".getBytes()));
+        }
+
+        try (var consumer = clusterInstance.consumer(Map.of(
+                ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()))) {
+            consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() 
{
+                private int count = 0;
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                }
+
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                    count++;
+                    if (count == 1) throw new 
IllegalArgumentException("temporary error");

Review Comment:
   Please add a test case where the callback never succeeds, and ensure that no 
records can be polled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to