[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1207028405 ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -113,38 +142,113 @@ class ProducerIdManagerTest { } @ParameterizedTest - @ValueSource(ints = Array(1, 2, 10)) - def testContiguousIds(idBlockLen: Int): Unit = { + @ValueSource(ints = Array(1, 2, 10, 100)) + def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = { +// Send concurrent generateProducerId requests. Ensure that the generated producer id is unique. +// For each block (total 3 blocks), only "idBlockLen" number of requests should go through. +// All other requests should fail immediately. + +val numThreads = 5 +val latch = new CountDownLatch(idBlockLen * 3) val manager = new MockProducerIdManager(0, 0, idBlockLen) - -IntStream.range(0, idBlockLen * 3).forEach { i => - assertEquals(i, manager.generateProducerId()) +val pidMap = mutable.Map[Long, Int]() +val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads) + +for ( _ <- 0 until numThreads) { + requestHandlerThreadPool.submit(() => { +while(latch.getCount > 0) { + val result = manager.generateProducerId() + result match { +case Success(pid) => + pidMap synchronized { +if (latch.getCount != 0) { + val counter = pidMap.getOrElse(pid, 0) + pidMap += pid -> (counter + 1) + latch.countDown() +} + } + +case Failure(exception) => + assertEquals(classOf[CoordinatorLoadInProgressException], exception.getClass) + } + Thread.sleep(100) +} + }, 0) +} +assertTrue(latch.await(15000, TimeUnit.MILLISECONDS)) Review Comment: I would keep it a bit higher so that it does not become flaky. Have you run it a few times on your own to make sure it is not flaky already? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1206088849 ## core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala: ## @@ -61,27 +63,59 @@ class ProducerIdsIntegrationTest { clusterInstance.stop() } + @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO) + @Timeout(20) + def testHandleAllocateProducerIdsSingleRequestHandlerThread(clusterInstance: ClusterInstance): Unit = { + clusterInstance.config().serverProperties().put(KafkaConfig.NumIoThreadsProp, "1") +clusterInstance.start() +verifyUniqueIds(clusterInstance) +clusterInstance.stop() + } + + @Disabled // TODO: Enable once producer id block size is configurable Review Comment: Can we replace the TODO with a jira? ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -113,38 +142,113 @@ class ProducerIdManagerTest { } @ParameterizedTest - @ValueSource(ints = Array(1, 2, 10)) - def testContiguousIds(idBlockLen: Int): Unit = { + @ValueSource(ints = Array(1, 2, 10, 100)) + def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = { +// Send concurrent generateProducerId requests. Ensure that the generated producer id is unique. +// For each block (total 3 blocks), only "idBlockLen" number of requests should go through. +// All other requests should fail immediately. + +val numThreads = 5 +val latch = new CountDownLatch(idBlockLen * 3) val manager = new MockProducerIdManager(0, 0, idBlockLen) - -IntStream.range(0, idBlockLen * 3).forEach { i => - assertEquals(i, manager.generateProducerId()) +val pidMap = mutable.Map[Long, Int]() +val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads) + +for ( _ <- 0 until numThreads) { + requestHandlerThreadPool.submit(() => { +while(latch.getCount > 0) { + val result = manager.generateProducerId() + result match { +case Success(pid) => + pidMap synchronized { +if (latch.getCount != 0) { + val counter = pidMap.getOrElse(pid, 0) + pidMap += pid -> (counter + 1) + latch.countDown() +} + } + +case Failure(exception) => + assertEquals(classOf[CoordinatorLoadInProgressException], exception.getClass) + } + Thread.sleep(100) +} + }, 0) +} +assertTrue(latch.await(15000, TimeUnit.MILLISECONDS)) Review Comment: How long does this test take? ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -126,17 +132,22 @@ class TransactionCoordinator(txnConfig: TransactionConfig, } else { val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).flatMap { case None => - val producerId = producerIdManager.generateProducerId() - val createdMetadata = new TransactionMetadata(transactionalId = transactionalId, -producerId = producerId, -lastProducerId = RecordBatch.NO_PRODUCER_ID, -producerEpoch = RecordBatch.NO_PRODUCER_EPOCH, -lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH, -txnTimeoutMs = transactionTimeoutMs, -state = Empty, -topicPartitions = collection.mutable.Set.empty[TopicPartition], -txnLastUpdateTimestamp = time.milliseconds()) - txnManager.putTransactionStateIfNotExists(createdMetadata) + val result = producerIdManager.generateProducerId() Review Comment: nit: maybe we don't need `result`. Perhaps a little more concise to match on `producerIdManager.generateProducerId()`? Same comment below. ## server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java: ## @@ -32,11 +34,25 @@ public class ProducerIdsBlock { private final int assignedBrokerId; private final long firstProducerId; private final int blockSize; +private final AtomicLong producerIdCounter; public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int blockSize) { this.assignedBrokerId = assignedBrokerId; this.firstProducerId = firstProducerId; this.blockSize = blockSize; +producerIdCounter = new AtomicLong(firstProducerId); +} + +/** + * Claim the next available producer id from the block. + * Returns an empty result if there are no more available producer ids in the block. + */ +public Optional claimNextId() { +long nextId = producerIdCounter.getAndIncrement(); +if (nextId > lastProducerId()) { Review Comment: This first check is duplicated below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to
[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1199413525 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +130,107 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + private val IterationLimit = 3 Review Comment: Since this is constant, maybe we can move it to the companion class? ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +130,107 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + private val IterationLimit = 3 + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) + private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) private val requestInFlight = new AtomicBoolean(false) - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = -1L + private val backoffDeadlineMs = new AtomicLong(NoRetry) - override def generateProducerId(): Long = { -this synchronized { - if (nextProducerId == -1L) { -// Send an initial request to get the first block -maybeRequestNextBlock() -nextProducerId = 0L - } else { -nextProducerId += 1 + override def hasValidBlock: Boolean = { +nextProducerIdBlock.get != null + } -// Check if we need to fetch the next block -if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) { - maybeRequestNextBlock() -} - } + override def generateProducerId(): Try[Long] = { +var result: Try[Long] = null +var iteration = 0 +while (result == null)
[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1170301813 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -207,35 +234,38 @@ class RPCProducerIdManager(brokerId: Int, }) } + // Visible for testing private[transaction] def handleAllocateProducerIdsResponse(response: AllocateProducerIdsResponse): Unit = { -requestInFlight.set(false) val data = response.data +var successfulResponse = false Errors.forCode(data.errorCode()) match { case Errors.NONE => debug(s"Got next producer ID block from controller $data") // Do some sanity checks on the response -if (data.producerIdStart() < currentProducerIdBlock.lastProducerId) { - nextProducerIdBlock.put(Failure(new KafkaException( -s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data"))) +if (data.producerIdStart() < currentProducerIdBlock.get.lastProducerId) { + error(s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data") } else if (data.producerIdStart() < 0 || data.producerIdLen() < 0 || data.producerIdStart() > Long.MaxValue - data.producerIdLen()) { - nextProducerIdBlock.put(Failure(new KafkaException(s"Producer ID block includes invalid ID range: $data"))) + error(s"Producer ID block includes invalid ID range: $data") } else { - nextProducerIdBlock.put( -Success(new ProducerIdsBlock(brokerId, data.producerIdStart(), data.producerIdLen( + nextProducerIdBlock.set(new ProducerIdsBlock(brokerId, data.producerIdStart(), data.producerIdLen())) + successfulResponse = true } case Errors.STALE_BROKER_EPOCH => -warn("Our broker epoch was stale, trying again.") -maybeRequestNextBlock() +warn("Our broker currentBlockCount was stale, trying again.") case Errors.BROKER_ID_NOT_REGISTERED => warn("Our broker ID is not yet known by the controller, trying again.") -maybeRequestNextBlock() case e: Errors => -warn("Had an unknown error from the controller, giving up.") -nextProducerIdBlock.put(Failure(e.exception())) +error(s"Had an unknown error from the controller: ${e.exception}") Review Comment: nit: maybe we can rephrase this message a little for clarity ```scala error(s"Received an unexpected error code from the controller: $e") ``` ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) + private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) private val requestInFlight = new AtomicBoolean(false) - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = -1L + private val shouldBackoff = new AtomicBoolean(false) - override def generateProducerId(): Long = { -this synchronized { - if (nextProducerId == -1L) { -// Send an initial request to get the first
[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1169133482 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) + private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) private val requestInFlight = new AtomicBoolean(false) - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = -1L + private val blockCount = new AtomicLong(0) Review Comment: Why is the `requestInFlight` flag not sufficient to prevent this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1169036716 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) + private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) private val requestInFlight = new AtomicBoolean(false) - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = -1L + private val blockCount = new AtomicLong(0) Review Comment: I did not follow why we need this. It seems like it's being used to prevent concurrent sends? ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) + private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) private val requestInFlight = new AtomicBoolean(false) - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = -1L + private val blockCount = new AtomicLong(0) - override def generateProducerId(): Long = { -this synchronized { - if (nextProducerId == -1L) { -// Send an initial request to get the first block -maybeRequestNextBlock() -nextProducerId = 0L - } else { -nextProducerId += 1 - -// Check if we need to fetch the next block -if
[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1110464884 ## server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java: ## @@ -32,11 +34,25 @@ public class ProducerIdsBlock { private final int assignedBrokerId; private final long firstProducerId; private final int blockSize; +private final AtomicLong producerIdCounter; public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int blockSize) { this.assignedBrokerId = assignedBrokerId; this.firstProducerId = firstProducerId; this.blockSize = blockSize; +producerIdCounter = new AtomicLong(firstProducerId); +} + +/** + * Claim the next available producer id from the block. + * Returns an empty result if there are no more available producer ids in the block. + */ +public Optional claimNextId() { Review Comment: Probably helpful to have a basic unit test for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1110463718 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) Review Comment: I wonder if we could consolidate these two fields using the queue. We can peek in `generateProducerId` and attempt allocation while the background is responsible for pushing new blocks as needed. If it fails, we can dequeue the entry and loop in case there is another block. Once there are no blocks, we could return as in the current patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org