hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1207028405
########## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ########## @@ -113,38 +142,113 @@ class ProducerIdManagerTest { } @ParameterizedTest - @ValueSource(ints = Array(1, 2, 10)) - def testContiguousIds(idBlockLen: Int): Unit = { + @ValueSource(ints = Array(1, 2, 10, 100)) + def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = { + // Send concurrent generateProducerId requests. Ensure that the generated producer id is unique. + // For each block (total 3 blocks), only "idBlockLen" number of requests should go through. + // All other requests should fail immediately. + + val numThreads = 5 + val latch = new CountDownLatch(idBlockLen * 3) val manager = new MockProducerIdManager(0, 0, idBlockLen) - - IntStream.range(0, idBlockLen * 3).forEach { i => - assertEquals(i, manager.generateProducerId()) + val pidMap = mutable.Map[Long, Int]() + val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads) + + for ( _ <- 0 until numThreads) { + requestHandlerThreadPool.submit(() => { + while(latch.getCount > 0) { + val result = manager.generateProducerId() + result match { + case Success(pid) => + pidMap synchronized { + if (latch.getCount != 0) { + val counter = pidMap.getOrElse(pid, 0) + pidMap += pid -> (counter + 1) + latch.countDown() + } + } + + case Failure(exception) => + assertEquals(classOf[CoordinatorLoadInProgressException], exception.getClass) + } + Thread.sleep(100) + } + }, 0) + } + assertTrue(latch.await(15000, TimeUnit.MILLISECONDS)) Review Comment: I would keep it a bit higher so that it does not become flaky. Have you run it a few times on your own to make sure it is not flaky already? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org