jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1177161037


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+                           controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + 
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if 
necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
+  override def generateProducerId(): Try[Long] = {
+    val currentBlockCount = blockCount.get
+    currentProducerIdBlock.get.claimNextId().asScala match {
+      case None =>
+        // Check the next block if current block is full
+        val block = nextProducerIdBlock.getAndSet(null)
         if (block == null) {
           // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT 
since older clients treat the error as fatal
           // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")
+          maybeRequestNextBlock(currentBlockCount)
+          Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID 
block is full. Waiting for next block"))
         } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+          // Fence other threads from sending another 
AllocateProducerIdsRequest
+          blockCount.incrementAndGet()

Review Comment:
   this no longer happens because now we cannot send a request until 
`currentBlock` is set. `t2` which checks the prefetch criteria in the example 
above will either observe that `currentBlock` is `[10, 10, 19]` which does not 
fit the prefetch criteria or `requestInFlight==true` so it cannot send another 
request. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to