[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-05-26 Thread via GitHub


hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1207028405


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -113,38 +142,113 @@ class ProducerIdManagerTest {
   }
 
   @ParameterizedTest
-  @ValueSource(ints = Array(1, 2, 10))
-  def testContiguousIds(idBlockLen: Int): Unit = {
+  @ValueSource(ints = Array(1, 2, 10, 100))
+  def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = {
+// Send concurrent generateProducerId requests. Ensure that the generated 
producer id is unique.
+// For each block (total 3 blocks), only "idBlockLen" number of requests 
should go through.
+// All other requests should fail immediately.
+
+val numThreads = 5
+val latch = new CountDownLatch(idBlockLen * 3)
 val manager = new MockProducerIdManager(0, 0, idBlockLen)
-
-IntStream.range(0, idBlockLen * 3).forEach { i =>
-  assertEquals(i, manager.generateProducerId())
+val pidMap = mutable.Map[Long, Int]()
+val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads)
+
+for ( _ <- 0 until numThreads) {
+  requestHandlerThreadPool.submit(() => {
+while(latch.getCount > 0) {
+  val result = manager.generateProducerId()
+  result match {
+case Success(pid) =>
+  pidMap synchronized {
+if (latch.getCount != 0) {
+  val counter = pidMap.getOrElse(pid, 0)
+  pidMap += pid -> (counter + 1)
+  latch.countDown()
+}
+  }
+
+case Failure(exception) =>
+  assertEquals(classOf[CoordinatorLoadInProgressException], 
exception.getClass)
+  }
+  Thread.sleep(100)
+}
+  }, 0)
+}
+assertTrue(latch.await(15000, TimeUnit.MILLISECONDS))

Review Comment:
   I would keep it a bit higher so that it does not become flaky. Have you run 
it a few times on your own to make sure it is not flaky already?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-05-25 Thread via GitHub


hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1206088849


##
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala:
##
@@ -61,27 +63,59 @@ class ProducerIdsIntegrationTest {
 clusterInstance.stop()
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO)
+  @Timeout(20)
+  def testHandleAllocateProducerIdsSingleRequestHandlerThread(clusterInstance: 
ClusterInstance): Unit = {
+
clusterInstance.config().serverProperties().put(KafkaConfig.NumIoThreadsProp, 
"1")
+clusterInstance.start()
+verifyUniqueIds(clusterInstance)
+clusterInstance.stop()
+  }
+
+  @Disabled // TODO: Enable once producer id block size is configurable

Review Comment:
   Can we replace the TODO with a jira? 



##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -113,38 +142,113 @@ class ProducerIdManagerTest {
   }
 
   @ParameterizedTest
-  @ValueSource(ints = Array(1, 2, 10))
-  def testContiguousIds(idBlockLen: Int): Unit = {
+  @ValueSource(ints = Array(1, 2, 10, 100))
+  def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = {
+// Send concurrent generateProducerId requests. Ensure that the generated 
producer id is unique.
+// For each block (total 3 blocks), only "idBlockLen" number of requests 
should go through.
+// All other requests should fail immediately.
+
+val numThreads = 5
+val latch = new CountDownLatch(idBlockLen * 3)
 val manager = new MockProducerIdManager(0, 0, idBlockLen)
-
-IntStream.range(0, idBlockLen * 3).forEach { i =>
-  assertEquals(i, manager.generateProducerId())
+val pidMap = mutable.Map[Long, Int]()
+val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads)
+
+for ( _ <- 0 until numThreads) {
+  requestHandlerThreadPool.submit(() => {
+while(latch.getCount > 0) {
+  val result = manager.generateProducerId()
+  result match {
+case Success(pid) =>
+  pidMap synchronized {
+if (latch.getCount != 0) {
+  val counter = pidMap.getOrElse(pid, 0)
+  pidMap += pid -> (counter + 1)
+  latch.countDown()
+}
+  }
+
+case Failure(exception) =>
+  assertEquals(classOf[CoordinatorLoadInProgressException], 
exception.getClass)
+  }
+  Thread.sleep(100)
+}
+  }, 0)
+}
+assertTrue(latch.await(15000, TimeUnit.MILLISECONDS))

Review Comment:
   How long does this test take?



##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -126,17 +132,22 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 } else {
   val coordinatorEpochAndMetadata = 
txnManager.getTransactionState(transactionalId).flatMap {
 case None =>
-  val producerId = producerIdManager.generateProducerId()
-  val createdMetadata = new TransactionMetadata(transactionalId = 
transactionalId,
-producerId = producerId,
-lastProducerId = RecordBatch.NO_PRODUCER_ID,
-producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
-lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
-txnTimeoutMs = transactionTimeoutMs,
-state = Empty,
-topicPartitions = collection.mutable.Set.empty[TopicPartition],
-txnLastUpdateTimestamp = time.milliseconds())
-  txnManager.putTransactionStateIfNotExists(createdMetadata)
+  val result = producerIdManager.generateProducerId()

Review Comment:
   nit: maybe we don't need `result`. Perhaps a little more concise to match on 
`producerIdManager.generateProducerId()`? Same comment below.



##
server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java:
##
@@ -32,11 +34,25 @@ public class ProducerIdsBlock {
 private final int assignedBrokerId;
 private final long firstProducerId;
 private final int blockSize;
+private final AtomicLong producerIdCounter;
 
 public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int 
blockSize) {
 this.assignedBrokerId = assignedBrokerId;
 this.firstProducerId = firstProducerId;
 this.blockSize = blockSize;
+producerIdCounter = new AtomicLong(firstProducerId);
+}
+
+/**
+ * Claim the next available producer id from the block.
+ * Returns an empty result if there are no more available producer ids in 
the block.
+ */
+public Optional claimNextId() {
+long nextId = producerIdCounter.getAndIncrement();
+if (nextId > lastProducerId()) {

Review Comment:
   This first check is duplicated below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to 

[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-05-19 Thread via GitHub


hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1199413525


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +130,107 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  private val IterationLimit = 3

Review Comment:
   Since this is constant, maybe we can move it to the companion class?



##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +130,107 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  private val IterationLimit = 3
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val backoffDeadlineMs = new AtomicLong(NoRetry)
 
-  override def generateProducerId(): Long = {
-this synchronized {
-  if (nextProducerId == -1L) {
-// Send an initial request to get the first block
-maybeRequestNextBlock()
-nextProducerId = 0L
-  } else {
-nextProducerId += 1
+  override def hasValidBlock: Boolean = {
+nextProducerIdBlock.get != null
+  }
 
-// Check if we need to fetch the next block
-if (nextProducerId >= (currentProducerIdBlock.firstProducerId + 
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-  maybeRequestNextBlock()
-}
-  }
+  override def generateProducerId(): Try[Long] = {
+var result: Try[Long] = null
+var iteration = 0
+while (result == null) 

[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-04-18 Thread via GitHub


hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1170301813


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -207,35 +234,38 @@ class RPCProducerIdManager(brokerId: Int,
 })
   }
 
+  // Visible for testing
   private[transaction] def handleAllocateProducerIdsResponse(response: 
AllocateProducerIdsResponse): Unit = {
-requestInFlight.set(false)
 val data = response.data
+var successfulResponse = false
 Errors.forCode(data.errorCode()) match {
   case Errors.NONE =>
 debug(s"Got next producer ID block from controller $data")
 // Do some sanity checks on the response
-if (data.producerIdStart() < currentProducerIdBlock.lastProducerId) {
-  nextProducerIdBlock.put(Failure(new KafkaException(
-s"Producer ID block is not monotonic with current block: 
current=$currentProducerIdBlock response=$data")))
+if (data.producerIdStart() < 
currentProducerIdBlock.get.lastProducerId) {
+  error(s"Producer ID block is not monotonic with current block: 
current=$currentProducerIdBlock response=$data")
 } else if (data.producerIdStart() < 0 || data.producerIdLen() < 0 || 
data.producerIdStart() > Long.MaxValue - data.producerIdLen()) {
-  nextProducerIdBlock.put(Failure(new KafkaException(s"Producer ID 
block includes invalid ID range: $data")))
+  error(s"Producer ID block includes invalid ID range: $data")
 } else {
-  nextProducerIdBlock.put(
-Success(new ProducerIdsBlock(brokerId, data.producerIdStart(), 
data.producerIdLen(
+  nextProducerIdBlock.set(new ProducerIdsBlock(brokerId, 
data.producerIdStart(), data.producerIdLen()))
+  successfulResponse = true
 }
   case Errors.STALE_BROKER_EPOCH =>
-warn("Our broker epoch was stale, trying again.")
-maybeRequestNextBlock()
+warn("Our broker currentBlockCount was stale, trying again.")
   case Errors.BROKER_ID_NOT_REGISTERED =>
 warn("Our broker ID is not yet known by the controller, trying again.")
-maybeRequestNextBlock()
   case e: Errors =>
-warn("Had an unknown error from the controller, giving up.")
-nextProducerIdBlock.put(Failure(e.exception()))
+error(s"Had an unknown error from the controller: ${e.exception}")

Review Comment:
   nit: maybe we can rephrase this message a little for clarity
   ```scala
   error(s"Received an unexpected error code from the controller: $e")
   ```



##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val shouldBackoff = new AtomicBoolean(false)
 
-  override def generateProducerId(): Long = {
-this synchronized {
-  if (nextProducerId == -1L) {
-// Send an initial request to get the first 

[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-04-17 Thread via GitHub


hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1169133482


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)

Review Comment:
   Why is the `requestInFlight` flag not sufficient to prevent this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-04-17 Thread via GitHub


hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1169036716


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)

Review Comment:
   I did not follow why we need this. It seems like it's being used to prevent 
concurrent sends?



##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)
 
-  override def generateProducerId(): Long = {
-this synchronized {
-  if (nextProducerId == -1L) {
-// Send an initial request to get the first block
-maybeRequestNextBlock()
-nextProducerId = 0L
-  } else {
-nextProducerId += 1
-
-// Check if we need to fetch the next block
-if 

[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-02-17 Thread via GitHub


hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110464884


##
server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java:
##
@@ -32,11 +34,25 @@ public class ProducerIdsBlock {
 private final int assignedBrokerId;
 private final long firstProducerId;
 private final int blockSize;
+private final AtomicLong producerIdCounter;
 
 public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int 
blockSize) {
 this.assignedBrokerId = assignedBrokerId;
 this.firstProducerId = firstProducerId;
 this.blockSize = blockSize;
+producerIdCounter = new AtomicLong(firstProducerId);
+}
+
+/**
+ * Claim the next available producer id from the block.
+ * Returns an empty result if there are no more available producer ids in 
the block.
+ */
+public Optional claimNextId() {

Review Comment:
   Probably helpful to have a basic unit test for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-02-17 Thread via GitHub


hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110463718


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)

Review Comment:
   I wonder if we could consolidate these two fields using the queue. We can 
peek in `generateProducerId` and attempt allocation while the background is 
responsible for pushing new blocks as needed. If it fails, we can dequeue the 
entry and loop in case there is another block. Once there are no blocks, we 
could return as in the current patch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org