cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635610364



##########
File path: 
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
##########
@@ -75,87 +61,179 @@ trait ProducerIdGenerator {
   def shutdown() : Unit = {}
 }
 
-class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) 
extends ProducerIdGenerator with Logging {
-
-  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
-
-  private var currentProducerIdBlock: ProducerIdBlock = null
-  private var nextProducerId: Long = -1L
-
-  // grab the first block of producerIds
-  this synchronized {
-    getNewProducerIdBlock()
-    nextProducerId = currentProducerIdBlock.blockStartId
-  }
-
-  private def getNewProducerIdBlock(): Unit = {
+object ZkProducerIdManager {
+  def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: 
Logging): ProducerIdsBlock = {
+    // Get or create the existing PID block from ZK and attempt to update it. 
We retry in a loop here since other
+    // brokers may be generating PID blocks during a rolling upgrade
     var zkWriteComplete = false
     while (!zkWriteComplete) {
       // refresh current producerId block from zookeeper again
       val (dataOpt, zkVersion) = 
zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
       // generate the new producerId block
-      currentProducerIdBlock = dataOpt match {
+      val newProducerIdBlock = dataOpt match {
         case Some(data) =>
-          val currProducerIdBlock = 
ProducerIdManager.parseProducerIdBlockData(data)
-          debug(s"Read current producerId block $currProducerIdBlock, Zk path 
version $zkVersion")
+          val currProducerIdBlock = 
ProducerIdBlockZNode.parseProducerIdBlockData(data)
+          logger.debug(s"Read current producerId block $currProducerIdBlock, 
Zk path version $zkVersion")
 
-          if (currProducerIdBlock.blockEndId > Long.MaxValue - 
ProducerIdManager.PidBlockSize) {
+          if (currProducerIdBlock.producerIdEnd > Long.MaxValue - 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
             // we have exhausted all producerIds (wow!), treat it as a fatal 
error
-            fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.blockEndId})")
+            logger.fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.producerIdEnd})")
             throw new KafkaException("Have exhausted all producerIds.")
           }
 
-          ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, 
currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
+          new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 
1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
         case None =>
-          debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
-          ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+          logger.debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
+          new ProducerIdsBlock(brokerId, 0L, 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
       }
 
-      val newProducerIdBlockData = 
ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
+      val newProducerIdBlockData = 
ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
 
       // try to write the new producerId block into zookeeper
-      val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
-        newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
+      val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, 
newProducerIdBlockData, zkVersion, None)
       zkWriteComplete = succeeded
 
-      if (zkWriteComplete)
-        info(s"Acquired new producerId block $currentProducerIdBlock by 
writing to Zk with path version $version")
+      if (zkWriteComplete) {
+        logger.info(s"Acquired new producerId block $newProducerIdBlock by 
writing to Zk with path version $version")
+        return newProducerIdBlock
+      }
     }
+    throw new IllegalStateException()
   }
+}
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: 
String, expectedData: Array[Byte]): (Boolean, Int) = {
-    try {
-      val expectedPidBlock = 
ProducerIdManager.parseProducerIdBlockData(expectedData)
-      zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
-        case (Some(data), zkVersion) =>
-          val currProducerIdBLock = 
ProducerIdManager.parseProducerIdBlockData(data)
-          (currProducerIdBLock == expectedPidBlock, zkVersion)
-        case (None, _) => (false, -1)
-      }
-    } catch {
-      case e: Exception =>
-        warn(s"Error while checking for producerId block Zk data on path 
$path: expected data " +
-          s"${new String(expectedData, StandardCharsets.UTF_8)}", e)
-        (false, -1)
-    }
+class ZkProducerIdManager(brokerId: Int,
+                          zkClient: KafkaZkClient) extends ProducerIdGenerator 
with Logging {
+
+  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
+  private var nextProducerId: Long = -1L
+
+  // grab the first block of producerIds
+  this synchronized {
+    getNewProducerIdBlock()
+    nextProducerId = currentProducerIdBlock.producerIdStart
+  }
+
+  private def getNewProducerIdBlock(): Unit = {
+    currentProducerIdBlock = 
ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this)
   }
 
   def generateProducerId(): Long = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
-      if (nextProducerId > currentProducerIdBlock.blockEndId) {
+      if (nextProducerId > currentProducerIdBlock.producerIdEnd) {
         getNewProducerIdBlock()
-        nextProducerId = currentProducerIdBlock.blockStartId + 1
+        nextProducerId = currentProducerIdBlock.producerIdStart + 1
       } else {
         nextProducerId += 1
       }
-
       nextProducerId - 1
     }
   }
+}
+
+class ProducerIdManager(brokerId: Int,
+                        brokerEpochSupplier: () => Long,
+                        controllerChannel: BrokerToControllerChannelManager,
+                        maxWaitMs: Int) extends ProducerIdGenerator with 
Logging {
+
+  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
+
+  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  private val requestInFlight = new AtomicBoolean(false)
+  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
+  private var nextProducerId: Long = -1L
+
+  override def generateProducerId(): Long = {

Review comment:
       To enlarge on the previous comment a bit: I guess the reason why I 
consider a condition variable to be simpler is that it involves fewer locks. A 
blocking queue has its own lock separate from the the lock in the manager, 
which can be awkward.
   
   What kind of behavior do you want on error? Do you want to deliver a given 
error to all waiters, or just to a single waiter? It seems like you might need 
to deliver it to all of them, to prevent huge pile-ups in cases where there are 
such errors.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to