chia7712 commented on code in PR #18513:
URL: https://github.com/apache/kafka/pull/18513#discussion_r1939052536


##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4003,6 +3939,72 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerSet(testGroupId: String, defaultConsumerConfig: 
Properties) {
+    private val consumerSet: 
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] = 
scala.collection.mutable.Set.empty
+    private val consumerThreads: scala.collection.mutable.Set[Thread] = 
scala.collection.mutable.Set.empty
+    private var startLatch: CountDownLatch = new CountDownLatch(0)
+    private var stopLatch: CountDownLatch = new CountDownLatch(0)
+    private var consumerThreadRunning = new AtomicBoolean(false)
+
+    defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+
+    def addConsumer(topic: String, configOverrides: Properties = new 
Properties()): Unit = {

Review Comment:
   it seems the default properties is useless. could you please remove it?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4003,6 +3939,72 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerSet(testGroupId: String, defaultConsumerConfig: 
Properties) {
+    private val consumerSet: 
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] = 
scala.collection.mutable.Set.empty
+    private val consumerThreads: scala.collection.mutable.Set[Thread] = 
scala.collection.mutable.Set.empty
+    private var startLatch: CountDownLatch = new CountDownLatch(0)
+    private var stopLatch: CountDownLatch = new CountDownLatch(0)
+    private var consumerThreadRunning = new AtomicBoolean(false)
+
+    defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+
+    def addConsumer(topic: String, configOverrides: Properties = new 
Properties()): Unit = {
+      val newConsumerConfig = 
defaultConsumerConfig.clone().asInstanceOf[Properties]
+      newConsumerConfig.putAll(configOverrides)
+
+      val consumer = createConsumer(configOverrides = newConsumerConfig)
+      val consumerThread = createConsumerThread(consumer, topic)
+      consumerSet.add(consumer)
+      consumerThreads.add(consumerThread)
+    }
+
+    def start(): Unit = {
+      startLatch = new CountDownLatch(consumerSet.size)
+      stopLatch = new CountDownLatch(consumerSet.size)
+      consumerThreadRunning = new AtomicBoolean(true)
+      consumerThreads.foreach(_.start())
+      assertTrue(startLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to 
start consumer threads in time")
+    }
+
+    def stop(): Unit = {
+      consumerSet.foreach(_.wakeup())
+      consumerThreadRunning.set(false)
+      assertTrue(stopLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to 
stop consumer threads in time")
+    }
+
+    def close(): Unit = {
+      // stop the consumers and wait for consumer threads stopped
+      stop()
+      consumerThreads.foreach(_.join())
+    }
+
+    private def createConsumerThread[K,V](consumer: Consumer[K,V], topic: 
String): Thread = {
+      new Thread {
+        override def run : Unit = {
+          consumer.subscribe(Collections.singleton(topic))
+          try {
+            while (consumerThreadRunning.get()) {
+              consumer.poll(JDuration.ofSeconds(5))
+              if (!consumer.assignment.isEmpty && startLatch.getCount > 0L)
+                startLatch.countDown()
+              try {
+                consumer.commitSync()
+              } catch {
+                case _: CommitFailedException => // Ignore and retry on next 
iteration.
+              }
+            }
+          } catch {
+            case _: WakeupException => // ignore

Review Comment:
   Since this catch block is not within a while loop, it seems the 
`consumerThreadRunning` flag is redundant. The `wakeup` method can directly 
cause a `WakeupException`, immediately exiting the loop.



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4003,6 +3939,72 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerSet(testGroupId: String, defaultConsumerConfig: 
Properties) {
+    private val consumerSet: 
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] = 
scala.collection.mutable.Set.empty
+    private val consumerThreads: scala.collection.mutable.Set[Thread] = 
scala.collection.mutable.Set.empty
+    private var startLatch: CountDownLatch = new CountDownLatch(0)
+    private var stopLatch: CountDownLatch = new CountDownLatch(0)
+    private var consumerThreadRunning = new AtomicBoolean(false)
+
+    defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)

Review Comment:
   I find it somewhat unusual to pass an argument to a method that is then used 
to initialize another argument. Could you please consider setting the group ID 
externally?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4003,6 +3939,72 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerSet(testGroupId: String, defaultConsumerConfig: 
Properties) {
+    private val consumerSet: 
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] = 
scala.collection.mutable.Set.empty
+    private val consumerThreads: scala.collection.mutable.Set[Thread] = 
scala.collection.mutable.Set.empty
+    private var startLatch: CountDownLatch = new CountDownLatch(0)
+    private var stopLatch: CountDownLatch = new CountDownLatch(0)
+    private var consumerThreadRunning = new AtomicBoolean(false)
+
+    defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+
+    def addConsumer(topic: String, configOverrides: Properties = new 
Properties()): Unit = {
+      val newConsumerConfig = 
defaultConsumerConfig.clone().asInstanceOf[Properties]
+      newConsumerConfig.putAll(configOverrides)
+
+      val consumer = createConsumer(configOverrides = newConsumerConfig)
+      val consumerThread = createConsumerThread(consumer, topic)
+      consumerSet.add(consumer)
+      consumerThreads.add(consumerThread)
+    }
+
+    def start(): Unit = {
+      startLatch = new CountDownLatch(consumerSet.size)

Review Comment:
   nit: To address the current style, which results in the `startLatch` 
instance being assigned twice, I suggest refactoring `BackgroundConsumerSet` to 
accept consumers as a constructor argument. This refactoring would eliminate 
the need for the `startLatch.getCount > 0L` check within the while loop, as we 
would no longer need to be concerned about calling on an incorrect `startLatch` 
instance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to