chia7712 commented on code in PR #18513:
URL: https://github.com/apache/kafka/pull/18513#discussion_r1939052536
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4003,6 +3939,72 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false,
Collections.emptyList(), null, null),
topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
}
+
+ class BackgroundConsumerSet(testGroupId: String, defaultConsumerConfig:
Properties) {
+ private val consumerSet:
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] =
scala.collection.mutable.Set.empty
+ private val consumerThreads: scala.collection.mutable.Set[Thread] =
scala.collection.mutable.Set.empty
+ private var startLatch: CountDownLatch = new CountDownLatch(0)
+ private var stopLatch: CountDownLatch = new CountDownLatch(0)
+ private var consumerThreadRunning = new AtomicBoolean(false)
+
+ defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
+
+ def addConsumer(topic: String, configOverrides: Properties = new
Properties()): Unit = {
Review Comment:
it seems the default properties is useless. could you please remove it?
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4003,6 +3939,72 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false,
Collections.emptyList(), null, null),
topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
}
+
+ class BackgroundConsumerSet(testGroupId: String, defaultConsumerConfig:
Properties) {
+ private val consumerSet:
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] =
scala.collection.mutable.Set.empty
+ private val consumerThreads: scala.collection.mutable.Set[Thread] =
scala.collection.mutable.Set.empty
+ private var startLatch: CountDownLatch = new CountDownLatch(0)
+ private var stopLatch: CountDownLatch = new CountDownLatch(0)
+ private var consumerThreadRunning = new AtomicBoolean(false)
+
+ defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
+
+ def addConsumer(topic: String, configOverrides: Properties = new
Properties()): Unit = {
+ val newConsumerConfig =
defaultConsumerConfig.clone().asInstanceOf[Properties]
+ newConsumerConfig.putAll(configOverrides)
+
+ val consumer = createConsumer(configOverrides = newConsumerConfig)
+ val consumerThread = createConsumerThread(consumer, topic)
+ consumerSet.add(consumer)
+ consumerThreads.add(consumerThread)
+ }
+
+ def start(): Unit = {
+ startLatch = new CountDownLatch(consumerSet.size)
+ stopLatch = new CountDownLatch(consumerSet.size)
+ consumerThreadRunning = new AtomicBoolean(true)
+ consumerThreads.foreach(_.start())
+ assertTrue(startLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to
start consumer threads in time")
+ }
+
+ def stop(): Unit = {
+ consumerSet.foreach(_.wakeup())
+ consumerThreadRunning.set(false)
+ assertTrue(stopLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to
stop consumer threads in time")
+ }
+
+ def close(): Unit = {
+ // stop the consumers and wait for consumer threads stopped
+ stop()
+ consumerThreads.foreach(_.join())
+ }
+
+ private def createConsumerThread[K,V](consumer: Consumer[K,V], topic:
String): Thread = {
+ new Thread {
+ override def run : Unit = {
+ consumer.subscribe(Collections.singleton(topic))
+ try {
+ while (consumerThreadRunning.get()) {
+ consumer.poll(JDuration.ofSeconds(5))
+ if (!consumer.assignment.isEmpty && startLatch.getCount > 0L)
+ startLatch.countDown()
+ try {
+ consumer.commitSync()
+ } catch {
+ case _: CommitFailedException => // Ignore and retry on next
iteration.
+ }
+ }
+ } catch {
+ case _: WakeupException => // ignore
Review Comment:
Since this catch block is not within a while loop, it seems the
`consumerThreadRunning` flag is redundant. The `wakeup` method can directly
cause a `WakeupException`, immediately exiting the loop.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4003,6 +3939,72 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false,
Collections.emptyList(), null, null),
topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
}
+
+ class BackgroundConsumerSet(testGroupId: String, defaultConsumerConfig:
Properties) {
+ private val consumerSet:
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] =
scala.collection.mutable.Set.empty
+ private val consumerThreads: scala.collection.mutable.Set[Thread] =
scala.collection.mutable.Set.empty
+ private var startLatch: CountDownLatch = new CountDownLatch(0)
+ private var stopLatch: CountDownLatch = new CountDownLatch(0)
+ private var consumerThreadRunning = new AtomicBoolean(false)
+
+ defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
Review Comment:
I find it somewhat unusual to pass an argument to a method that is then used
to initialize another argument. Could you please consider setting the group ID
externally?
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4003,6 +3939,72 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false,
Collections.emptyList(), null, null),
topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
}
+
+ class BackgroundConsumerSet(testGroupId: String, defaultConsumerConfig:
Properties) {
+ private val consumerSet:
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] =
scala.collection.mutable.Set.empty
+ private val consumerThreads: scala.collection.mutable.Set[Thread] =
scala.collection.mutable.Set.empty
+ private var startLatch: CountDownLatch = new CountDownLatch(0)
+ private var stopLatch: CountDownLatch = new CountDownLatch(0)
+ private var consumerThreadRunning = new AtomicBoolean(false)
+
+ defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
+
+ def addConsumer(topic: String, configOverrides: Properties = new
Properties()): Unit = {
+ val newConsumerConfig =
defaultConsumerConfig.clone().asInstanceOf[Properties]
+ newConsumerConfig.putAll(configOverrides)
+
+ val consumer = createConsumer(configOverrides = newConsumerConfig)
+ val consumerThread = createConsumerThread(consumer, topic)
+ consumerSet.add(consumer)
+ consumerThreads.add(consumerThread)
+ }
+
+ def start(): Unit = {
+ startLatch = new CountDownLatch(consumerSet.size)
Review Comment:
nit: To address the current style, which results in the `startLatch`
instance being assigned twice, I suggest refactoring `BackgroundConsumerSet` to
accept consumers as a constructor argument. This refactoring would eliminate
the need for the `startLatch.getCount > 0L` check within the while loop, as we
would no longer need to be concerned about calling on an incorrect `startLatch`
instance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]