kirktrue commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1702416223
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -911,4 +911,42 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertThrows(classOf[WakeupException], () =>
consumer.position(topicPartition, Duration.ofSeconds(100)))
}
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testCloseLeavesGroupOnInterrupt(quorum: String, groupProtocol: String):
Unit = {
+ val adminClient = createAdminClient()
+ val consumer = createConsumer()
+ val groupId = consumerConfig.getProperty("group.id")
+
+ def hasMembers: Boolean = {
+ try {
+ val groupDescription = adminClient.describeConsumerGroups
(Collections.singletonList (groupId) ).describedGroups.get (groupId).get
+ groupDescription.members.size() > 0
+ } catch {
+ case _: ExecutionException | _: InterruptedException =>
+ false
+ }
+ }
+
+ val listener = new TestConsumerReassignmentListener()
+ consumer.subscribe(List(topic).asJava, listener)
+ awaitRebalance(consumer, listener)
+
+ assertEquals(1, listener.callsToAssigned)
+ assertEquals(0, listener.callsToRevoked)
+ TestUtils.waitUntilTrue(() => hasMembers, s"Consumer did not join the
consumer group within ${JTestUtils.DEFAULT_MAX_WAIT_MS} of subscribe")
+
+ try {
+ Thread.currentThread().interrupt()
+ assertThrows(classOf[InterruptException], () => consumer.close())
+ } finally {
+ // Clear the interrupted flag so we don't create problems for subsequent
tests.
+ Thread.interrupted()
+ }
+
+ assertEquals(1, listener.callsToAssigned)
+ assertEquals(1, listener.callsToRevoked)
+ TestUtils.waitUntilTrue(() => !hasMembers, s"Consumer did not leave the
consumer group within ${JTestUtils.DEFAULT_MAX_WAIT_MS} of interrupt/close")
Review Comment:
I explored the alternate approach you referred to with changing the
configuration. It seemed to work, inasmuch as the tests pass locally for both
`GroupProtocol` types. However, per the other comment about reproducibility
locally vs. on CI, I'm not sure if I made any real difference.
However, I later updated the test to simply wait half of the configured
session timeout, which I believe achieves the same effect. In this test, the
session timeout is never explicitly set anywhere; it defaults to 45000 ms per
the `ConsumerConfig` default. Waiting half of the session timeout means that
the call to `waitUntilTrue` will fail before the broker gets around to kicking
the consumer out of the group.
Correct me if I'm wrong, as I probably am because my AC is broken and my
brain is sweating 😄
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]