hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1207028405


##########
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##########
@@ -113,38 +142,113 @@ class ProducerIdManagerTest {
   }
 
   @ParameterizedTest
-  @ValueSource(ints = Array(1, 2, 10))
-  def testContiguousIds(idBlockLen: Int): Unit = {
+  @ValueSource(ints = Array(1, 2, 10, 100))
+  def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = {
+    // Send concurrent generateProducerId requests. Ensure that the generated 
producer id is unique.
+    // For each block (total 3 blocks), only "idBlockLen" number of requests 
should go through.
+    // All other requests should fail immediately.
+
+    val numThreads = 5
+    val latch = new CountDownLatch(idBlockLen * 3)
     val manager = new MockProducerIdManager(0, 0, idBlockLen)
-
-    IntStream.range(0, idBlockLen * 3).forEach { i =>
-      assertEquals(i, manager.generateProducerId())
+    val pidMap = mutable.Map[Long, Int]()
+    val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads)
+
+    for ( _ <- 0 until numThreads) {
+      requestHandlerThreadPool.submit(() => {
+        while(latch.getCount > 0) {
+          val result = manager.generateProducerId()
+          result match {
+            case Success(pid) =>
+              pidMap synchronized {
+                if (latch.getCount != 0) {
+                  val counter = pidMap.getOrElse(pid, 0)
+                  pidMap += pid -> (counter + 1)
+                  latch.countDown()
+                }
+              }
+
+            case Failure(exception) =>
+              assertEquals(classOf[CoordinatorLoadInProgressException], 
exception.getClass)
+          }
+          Thread.sleep(100)
+        }
+      }, 0)
+    }
+    assertTrue(latch.await(15000, TimeUnit.MILLISECONDS))

Review Comment:
   I would keep it a bit higher so that it does not become flaky. Have you run 
it a few times on your own to make sure it is not flaky already?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to