dajac commented on code in PR #18513:
URL: https://github.com/apache/kafka/pull/18513#discussion_r1928396045
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1839,250 +1839,209 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
Utils.closeQuietly(producer, "producer")
}
- val EMPTY_GROUP_INSTANCE_ID = ""
val testGroupId = "test_group_id"
val testClientId = "test_client_id"
val testInstanceId1 = "test_instance_id_1"
val testInstanceId2 = "test_instance_id_2"
val fakeGroupId = "fake_group_id"
- def createProperties(groupInstanceId: String): Properties = {
- val newConsumerConfig = new Properties(consumerConfig)
- // We need to disable the auto commit because after the members got
removed from group, the offset commit
- // will cause the member rejoining and the test will be flaky (check
ConsumerCoordinator#OffsetCommitResponseHandler)
-
newConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
- newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
- newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
testClientId)
- if (groupInstanceId != EMPTY_GROUP_INSTANCE_ID) {
-
newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
groupInstanceId)
- }
- newConsumerConfig
- }
-
// contains two static members and one dynamic member
- val groupInstanceSet = Set(testInstanceId1, testInstanceId2,
EMPTY_GROUP_INSTANCE_ID)
- val consumerSet = groupInstanceSet.map { groupInstanceId =>
createConsumer(configOverrides = createProperties(groupInstanceId))}
+ val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "")
val topicSet = Set(testTopicName, testTopicName1, testTopicName2)
+ val backgroundConsumerSet = new BackgroundConsumerSet(testGroupId,
testClientId, new Properties(consumerConfig))
Review Comment:
nit: Could we set the client id via the properties passed? In the end, the
client id is just a property like the others.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1839,250 +1839,209 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
Utils.closeQuietly(producer, "producer")
}
- val EMPTY_GROUP_INSTANCE_ID = ""
val testGroupId = "test_group_id"
val testClientId = "test_client_id"
val testInstanceId1 = "test_instance_id_1"
val testInstanceId2 = "test_instance_id_2"
val fakeGroupId = "fake_group_id"
- def createProperties(groupInstanceId: String): Properties = {
- val newConsumerConfig = new Properties(consumerConfig)
- // We need to disable the auto commit because after the members got
removed from group, the offset commit
- // will cause the member rejoining and the test will be flaky (check
ConsumerCoordinator#OffsetCommitResponseHandler)
-
newConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
- newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
- newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
testClientId)
- if (groupInstanceId != EMPTY_GROUP_INSTANCE_ID) {
-
newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
groupInstanceId)
- }
- newConsumerConfig
- }
-
// contains two static members and one dynamic member
- val groupInstanceSet = Set(testInstanceId1, testInstanceId2,
EMPTY_GROUP_INSTANCE_ID)
- val consumerSet = groupInstanceSet.map { groupInstanceId =>
createConsumer(configOverrides = createProperties(groupInstanceId))}
+ val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "")
val topicSet = Set(testTopicName, testTopicName1, testTopicName2)
+ val backgroundConsumerSet = new BackgroundConsumerSet(testGroupId,
testClientId, new Properties(consumerConfig))
- val latch = new CountDownLatch(consumerSet.size)
- try {
- def createConsumerThread[K,V](consumer: Consumer[K,V], topic: String):
Thread = {
- new Thread {
- override def run : Unit = {
- consumer.subscribe(Collections.singleton(topic))
- try {
- while (true) {
- consumer.poll(JDuration.ofSeconds(5))
- if (!consumer.assignment.isEmpty && latch.getCount > 0L)
- latch.countDown()
- try {
- consumer.commitSync()
- } catch {
- case _: CommitFailedException => // Ignore and retry on
next iteration.
- }
- }
- } catch {
- case _: InterruptException => // Suppress the output to stderr
- }
- }
- }
- }
+ // We need to disable the auto commit because after the members got
removed from group, the offset commit
+ // will cause the member rejoining and the test will be flaky (check
ConsumerCoordinator#OffsetCommitResponseHandler)
+ val configOverrides = new Properties()
+ configOverrides.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false")
Review Comment:
nit: Would it make sense to add this one as part of the default properties
passed at L1851?
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4004,6 +3922,82 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false,
Collections.emptyList(), null, null),
topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
}
+
+ class BackgroundConsumerSet(testGroupId: String, testClientId: String,
defaultConsumerConfig: Properties) {
+ private val consumerSet:
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] =
scala.collection.mutable.Set.empty
+ private val consumerThreads: scala.collection.mutable.Set[Thread] =
scala.collection.mutable.Set.empty
+ private var startLatch: CountDownLatch = new CountDownLatch(0)
+ private var stopLatch: CountDownLatch = new CountDownLatch(0)
+ private var consumerThreadRunning = new AtomicBoolean(false)
+
+ defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
+ defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
testClientId)
+
+ def addConsumer(groupInstanceId: String, topic: String, configOverrides:
Properties = new Properties()): Unit = {
+ val newConsumerConfig =
defaultConsumerConfig.clone().asInstanceOf[Properties]
Review Comment:
nit: It may be easier to use `new Properties(defaultConsumerConfig)`. Would
it work?
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1839,250 +1839,209 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
Utils.closeQuietly(producer, "producer")
}
- val EMPTY_GROUP_INSTANCE_ID = ""
val testGroupId = "test_group_id"
val testClientId = "test_client_id"
val testInstanceId1 = "test_instance_id_1"
val testInstanceId2 = "test_instance_id_2"
val fakeGroupId = "fake_group_id"
- def createProperties(groupInstanceId: String): Properties = {
- val newConsumerConfig = new Properties(consumerConfig)
- // We need to disable the auto commit because after the members got
removed from group, the offset commit
- // will cause the member rejoining and the test will be flaky (check
ConsumerCoordinator#OffsetCommitResponseHandler)
-
newConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
- newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
- newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
testClientId)
- if (groupInstanceId != EMPTY_GROUP_INSTANCE_ID) {
-
newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
groupInstanceId)
- }
- newConsumerConfig
- }
-
// contains two static members and one dynamic member
- val groupInstanceSet = Set(testInstanceId1, testInstanceId2,
EMPTY_GROUP_INSTANCE_ID)
- val consumerSet = groupInstanceSet.map { groupInstanceId =>
createConsumer(configOverrides = createProperties(groupInstanceId))}
+ val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "")
val topicSet = Set(testTopicName, testTopicName1, testTopicName2)
+ val backgroundConsumerSet = new BackgroundConsumerSet(testGroupId,
testClientId, new Properties(consumerConfig))
- val latch = new CountDownLatch(consumerSet.size)
- try {
- def createConsumerThread[K,V](consumer: Consumer[K,V], topic: String):
Thread = {
- new Thread {
- override def run : Unit = {
- consumer.subscribe(Collections.singleton(topic))
- try {
- while (true) {
- consumer.poll(JDuration.ofSeconds(5))
- if (!consumer.assignment.isEmpty && latch.getCount > 0L)
- latch.countDown()
- try {
- consumer.commitSync()
- } catch {
- case _: CommitFailedException => // Ignore and retry on
next iteration.
- }
- }
- } catch {
- case _: InterruptException => // Suppress the output to stderr
- }
- }
- }
- }
+ // We need to disable the auto commit because after the members got
removed from group, the offset commit
+ // will cause the member rejoining and the test will be flaky (check
ConsumerCoordinator#OffsetCommitResponseHandler)
+ val configOverrides = new Properties()
+ configOverrides.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false")
+ groupInstanceSet.zip(topicSet).foreach(zipped =>
backgroundConsumerSet.addConsumer(zipped._1, zipped._2, configOverrides))
Review Comment:
nit: Would it make sense to pass the instance if as part of the properties?
Like client id, it is just another property.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4004,6 +3922,82 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false,
Collections.emptyList(), null, null),
topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
}
+
+ class BackgroundConsumerSet(testGroupId: String, testClientId: String,
defaultConsumerConfig: Properties) {
+ private val consumerSet:
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] =
scala.collection.mutable.Set.empty
+ private val consumerThreads: scala.collection.mutable.Set[Thread] =
scala.collection.mutable.Set.empty
+ private var startLatch: CountDownLatch = new CountDownLatch(0)
+ private var stopLatch: CountDownLatch = new CountDownLatch(0)
+ private var consumerThreadRunning = new AtomicBoolean(false)
+
+ defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
+ defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
testClientId)
+
+ def addConsumer(groupInstanceId: String, topic: String, configOverrides:
Properties = new Properties()): Unit = {
+ val newConsumerConfig =
defaultConsumerConfig.clone().asInstanceOf[Properties]
+ if (groupInstanceId != "") {
+ // static member
+ newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
groupInstanceId)
+ }
+ newConsumerConfig.putAll(configOverrides)
+
+ val consumer = createConsumer(configOverrides = newConsumerConfig)
+ val consumerThread = createConsumerThread(consumer, topic)
+ consumerSet.add(consumer)
Review Comment:
The consumerSet does not seem to be used. Should we remove it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]