hachikuji commented on code in PR #12968:
URL: https://github.com/apache/kafka/pull/12968#discussion_r1046262250


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -167,7 +167,7 @@ class RPCProducerIdManager(brokerId: Int,
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
         val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
         if (block == null) {
-          throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next 
producer ID block")
+          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")

Review Comment:
   It would be helpful to have a comment here since it's probably not obvious 
why we use this error instead of `REQUEST_TIMED_OUT`.  



##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -236,7 +236,7 @@ class RPCProducerIdManager(brokerId: Int,
   private[transaction] def handleTimeout(): Unit = {
     warn("Timed out when requesting AllocateProducerIds from the controller.")
     requestInFlight.set(false)
-    nextProducerIdBlock.put(Failure(Errors.REQUEST_TIMED_OUT.exception))
+    
nextProducerIdBlock.put(Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception))

Review Comment:
   I wonder why we do this. If we don't send anything back, we still get the 
timeout after `maxWaitMs`. The odd thing is that the timeout might occur in the 
middle of the wait for `maxWaitMs`, so the actual time we wait might be less 
than that. I wonder if we can just get rid of this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to