hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1169036716
########## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ########## @@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { - allocateNewProducerIdBlock() + try { + allocateNewProducerIdBlock() + } catch { + case t: Throwable => + return Failure(t) + } nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) + } + } + + override def hasValidBlock: Boolean = { + this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) + private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) private val requestInFlight = new AtomicBoolean(false) - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = -1L + private val blockCount = new AtomicLong(0) Review Comment: I did not follow why we need this. It seems like it's being used to prevent concurrent sends? ########## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ########## @@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { - allocateNewProducerIdBlock() + try { + allocateNewProducerIdBlock() + } catch { + case t: Throwable => + return Failure(t) + } nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) + } + } + + override def hasValidBlock: Boolean = { + this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) + private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) private val requestInFlight = new AtomicBoolean(false) - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = -1L + private val blockCount = new AtomicLong(0) - override def generateProducerId(): Long = { - this synchronized { - if (nextProducerId == -1L) { - // Send an initial request to get the first block - maybeRequestNextBlock() - nextProducerId = 0L - } else { - nextProducerId += 1 - - // Check if we need to fetch the next block - if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) { - maybeRequestNextBlock() - } - } + override def hasValidBlock: Boolean = { + nextProducerIdBlock.get != null + } - // If we've exhausted the current block, grab the next block (waiting if necessary) - if (nextProducerId > currentProducerIdBlock.lastProducerId) { - val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS) + override def generateProducerId(): Try[Long] = { + val currentBlockCount = blockCount.get + currentProducerIdBlock.get.claimNextId().asScala match { + case None => + // Check the next block if current block is full + val block = nextProducerIdBlock.getAndSet(null) if (block == null) { // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS. - throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block") + maybeRequestNextBlock(currentBlockCount) + Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block")) } else { - block match { - case Success(nextBlock) => - currentProducerIdBlock = nextBlock - nextProducerId = currentProducerIdBlock.firstProducerId - case Failure(t) => throw t + // Fence other threads from sending another AllocateProducerIdsRequest + blockCount.incrementAndGet() + currentProducerIdBlock.set(block) Review Comment: This part seems unsafe. As soon as we set this, other threads can begin accessing the block. It seems possible, if unlikely, that `claimNextId` fails to allocate. I think it would be simpler to set `currentProducerIdBlock` and loop. Or if we don't like the loop, then just return the coordinator loading error. ########## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ########## @@ -207,37 +235,39 @@ class RPCProducerIdManager(brokerId: Int, }) } + // Visible for testing private[transaction] def handleAllocateProducerIdsResponse(response: AllocateProducerIdsResponse): Unit = { - requestInFlight.set(false) val data = response.data + var successfulResponse = false Errors.forCode(data.errorCode()) match { case Errors.NONE => debug(s"Got next producer ID block from controller $data") // Do some sanity checks on the response - if (data.producerIdStart() < currentProducerIdBlock.lastProducerId) { - nextProducerIdBlock.put(Failure(new KafkaException( - s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data"))) + if (data.producerIdStart() < currentProducerIdBlock.get.lastProducerId) { + error(s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data") } else if (data.producerIdStart() < 0 || data.producerIdLen() < 0 || data.producerIdStart() > Long.MaxValue - data.producerIdLen()) { - nextProducerIdBlock.put(Failure(new KafkaException(s"Producer ID block includes invalid ID range: $data"))) + error(s"Producer ID block includes invalid ID range: $data") } else { - nextProducerIdBlock.put( - Success(new ProducerIdsBlock(brokerId, data.producerIdStart(), data.producerIdLen()))) + nextProducerIdBlock.set(new ProducerIdsBlock(brokerId, data.producerIdStart(), data.producerIdLen())) + successfulResponse = true } case Errors.STALE_BROKER_EPOCH => - warn("Our broker epoch was stale, trying again.") - maybeRequestNextBlock() + warn("Our broker currentBlockCount was stale, trying again.") case Errors.BROKER_ID_NOT_REGISTERED => warn("Our broker ID is not yet known by the controller, trying again.") - maybeRequestNextBlock() case e: Errors => - warn("Had an unknown error from the controller, giving up.") - nextProducerIdBlock.put(Failure(e.exception())) + error(s"Had an unknown error from the controller: ${e.exception}") + } + requestInFlight.set(false) + if (!successfulResponse) { + time.sleep(RetryBackoffMs) Review Comment: I am not sure the sleep here is a good idea. I think this gets invoked from the `ControllerChannelManager` which is currently shared with the forwarding manager and others. Sleeping will block other requests as well. I think we need to enforce the backoff in `maybeRequestNextBlock`. ########## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ########## @@ -41,6 +43,7 @@ import scala.util.{Failure, Success, Try} object ProducerIdManager { // Once we reach this percentage of PIDs consumed from the current block, trigger a fetch of the next block val PidPrefetchThreshold = 0.90 + val RetryBackoffMs = 100 Review Comment: Would 50ms be enough? It seems like we are missing some configuration for backoff behavior for requests from brokers to the controller. It would be nice to have support exponential backoff as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org