hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1206088849


##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala:
##########
@@ -61,27 +63,59 @@ class ProducerIdsIntegrationTest {
     clusterInstance.stop()
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO)
+  @Timeout(20)
+  def testHandleAllocateProducerIdsSingleRequestHandlerThread(clusterInstance: 
ClusterInstance): Unit = {
+    
clusterInstance.config().serverProperties().put(KafkaConfig.NumIoThreadsProp, 
"1")
+    clusterInstance.start()
+    verifyUniqueIds(clusterInstance)
+    clusterInstance.stop()
+  }
+
+  @Disabled // TODO: Enable once producer id block size is configurable

Review Comment:
   Can we replace the TODO with a jira? 



##########
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##########
@@ -113,38 +142,113 @@ class ProducerIdManagerTest {
   }
 
   @ParameterizedTest
-  @ValueSource(ints = Array(1, 2, 10))
-  def testContiguousIds(idBlockLen: Int): Unit = {
+  @ValueSource(ints = Array(1, 2, 10, 100))
+  def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = {
+    // Send concurrent generateProducerId requests. Ensure that the generated 
producer id is unique.
+    // For each block (total 3 blocks), only "idBlockLen" number of requests 
should go through.
+    // All other requests should fail immediately.
+
+    val numThreads = 5
+    val latch = new CountDownLatch(idBlockLen * 3)
     val manager = new MockProducerIdManager(0, 0, idBlockLen)
-
-    IntStream.range(0, idBlockLen * 3).forEach { i =>
-      assertEquals(i, manager.generateProducerId())
+    val pidMap = mutable.Map[Long, Int]()
+    val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads)
+
+    for ( _ <- 0 until numThreads) {
+      requestHandlerThreadPool.submit(() => {
+        while(latch.getCount > 0) {
+          val result = manager.generateProducerId()
+          result match {
+            case Success(pid) =>
+              pidMap synchronized {
+                if (latch.getCount != 0) {
+                  val counter = pidMap.getOrElse(pid, 0)
+                  pidMap += pid -> (counter + 1)
+                  latch.countDown()
+                }
+              }
+
+            case Failure(exception) =>
+              assertEquals(classOf[CoordinatorLoadInProgressException], 
exception.getClass)
+          }
+          Thread.sleep(100)
+        }
+      }, 0)
+    }
+    assertTrue(latch.await(15000, TimeUnit.MILLISECONDS))

Review Comment:
   How long does this test take?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -126,17 +132,22 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
     } else {
       val coordinatorEpochAndMetadata = 
txnManager.getTransactionState(transactionalId).flatMap {
         case None =>
-          val producerId = producerIdManager.generateProducerId()
-          val createdMetadata = new TransactionMetadata(transactionalId = 
transactionalId,
-            producerId = producerId,
-            lastProducerId = RecordBatch.NO_PRODUCER_ID,
-            producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
-            lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
-            txnTimeoutMs = transactionTimeoutMs,
-            state = Empty,
-            topicPartitions = collection.mutable.Set.empty[TopicPartition],
-            txnLastUpdateTimestamp = time.milliseconds())
-          txnManager.putTransactionStateIfNotExists(createdMetadata)
+          val result = producerIdManager.generateProducerId()

Review Comment:
   nit: maybe we don't need `result`. Perhaps a little more concise to match on 
`producerIdManager.generateProducerId()`? Same comment below.



##########
server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java:
##########
@@ -32,11 +34,25 @@ public class ProducerIdsBlock {
     private final int assignedBrokerId;
     private final long firstProducerId;
     private final int blockSize;
+    private final AtomicLong producerIdCounter;
 
     public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int 
blockSize) {
         this.assignedBrokerId = assignedBrokerId;
         this.firstProducerId = firstProducerId;
         this.blockSize = blockSize;
+        producerIdCounter = new AtomicLong(firstProducerId);
+    }
+
+    /**
+     * Claim the next available producer id from the block.
+     * Returns an empty result if there are no more available producer ids in 
the block.
+     */
+    public Optional<Long> claimNextId() {
+        long nextId = producerIdCounter.getAndIncrement();
+        if (nextId > lastProducerId()) {

Review Comment:
   This first check is duplicated below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to