hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1206088849
########## core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala: ########## @@ -61,27 +63,59 @@ class ProducerIdsIntegrationTest { clusterInstance.stop() } + @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO) + @Timeout(20) + def testHandleAllocateProducerIdsSingleRequestHandlerThread(clusterInstance: ClusterInstance): Unit = { + clusterInstance.config().serverProperties().put(KafkaConfig.NumIoThreadsProp, "1") + clusterInstance.start() + verifyUniqueIds(clusterInstance) + clusterInstance.stop() + } + + @Disabled // TODO: Enable once producer id block size is configurable Review Comment: Can we replace the TODO with a jira? ########## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ########## @@ -113,38 +142,113 @@ class ProducerIdManagerTest { } @ParameterizedTest - @ValueSource(ints = Array(1, 2, 10)) - def testContiguousIds(idBlockLen: Int): Unit = { + @ValueSource(ints = Array(1, 2, 10, 100)) + def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = { + // Send concurrent generateProducerId requests. Ensure that the generated producer id is unique. + // For each block (total 3 blocks), only "idBlockLen" number of requests should go through. + // All other requests should fail immediately. + + val numThreads = 5 + val latch = new CountDownLatch(idBlockLen * 3) val manager = new MockProducerIdManager(0, 0, idBlockLen) - - IntStream.range(0, idBlockLen * 3).forEach { i => - assertEquals(i, manager.generateProducerId()) + val pidMap = mutable.Map[Long, Int]() + val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads) + + for ( _ <- 0 until numThreads) { + requestHandlerThreadPool.submit(() => { + while(latch.getCount > 0) { + val result = manager.generateProducerId() + result match { + case Success(pid) => + pidMap synchronized { + if (latch.getCount != 0) { + val counter = pidMap.getOrElse(pid, 0) + pidMap += pid -> (counter + 1) + latch.countDown() + } + } + + case Failure(exception) => + assertEquals(classOf[CoordinatorLoadInProgressException], exception.getClass) + } + Thread.sleep(100) + } + }, 0) + } + assertTrue(latch.await(15000, TimeUnit.MILLISECONDS)) Review Comment: How long does this test take? ########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -126,17 +132,22 @@ class TransactionCoordinator(txnConfig: TransactionConfig, } else { val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).flatMap { case None => - val producerId = producerIdManager.generateProducerId() - val createdMetadata = new TransactionMetadata(transactionalId = transactionalId, - producerId = producerId, - lastProducerId = RecordBatch.NO_PRODUCER_ID, - producerEpoch = RecordBatch.NO_PRODUCER_EPOCH, - lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH, - txnTimeoutMs = transactionTimeoutMs, - state = Empty, - topicPartitions = collection.mutable.Set.empty[TopicPartition], - txnLastUpdateTimestamp = time.milliseconds()) - txnManager.putTransactionStateIfNotExists(createdMetadata) + val result = producerIdManager.generateProducerId() Review Comment: nit: maybe we don't need `result`. Perhaps a little more concise to match on `producerIdManager.generateProducerId()`? Same comment below. ########## server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java: ########## @@ -32,11 +34,25 @@ public class ProducerIdsBlock { private final int assignedBrokerId; private final long firstProducerId; private final int blockSize; + private final AtomicLong producerIdCounter; public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int blockSize) { this.assignedBrokerId = assignedBrokerId; this.firstProducerId = firstProducerId; this.blockSize = blockSize; + producerIdCounter = new AtomicLong(firstProducerId); + } + + /** + * Claim the next available producer id from the block. + * Returns an empty result if there are no more available producer ids in the block. + */ + public Optional<Long> claimNextId() { + long nextId = producerIdCounter.getAndIncrement(); + if (nextId > lastProducerId()) { Review Comment: This first check is duplicated below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org