artemlivshits commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1161050689
########## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ########## @@ -35,6 +36,43 @@ trait ApiRequestHandler { def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit } +object KafkaRequestHandler { + // Support for scheduling callbacks on a request thread. + private val threadRequestChannel = new ThreadLocal[RequestChannel] + private val currentRequest = new ThreadLocal[RequestChannel.Request] + + // For testing + private var bypassThreadCheck = false + def setBypassThreadCheck(bypassCheck: Boolean): Unit = { + bypassThreadCheck = bypassCheck + } + + def currentRequestOnThread(): RequestChannel.Request = { + currentRequest.get() + } + + /** + * Wrap callback to schedule it on a request thread. + * NOTE: this function must be called on a request thread. + * @param fun Callback function to execute + * @return Wrapped callback that would execute `fun` on a request thread + */ + def wrap[T](fun: T => Unit)(request: RequestChannel.Request): T => Unit = { Review Comment: We don't need to pass the current request, this could be completely encapsulated within wrap. In fact, having a request argument here makes it look like that we could pass some arbitrary request, while here we need exactly the one that is currently processed on the thread. ########## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ########## @@ -110,8 +110,23 @@ class KafkaRequestHandler(id: Int, case callback: RequestChannel.CallbackRequest => try { - callback.originalRequest.callbackRequestDequeTimeNanos = Some(time.nanoseconds()) + val originalRequest = callback.originalRequest + + // If we've already executed a callback for this request, reset the times and subtract the callback time from the + // new dequeue time. This will allow calculation of multiple callback times. + // Otherwise, set dequeue time to now. + if (originalRequest.callbackRequestDequeueTimeNanos.isDefined) { Review Comment: ok ########## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ########## @@ -35,6 +36,43 @@ trait ApiRequestHandler { def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit } +object KafkaRequestHandler { + // Support for scheduling callbacks on a request thread. + private val threadRequestChannel = new ThreadLocal[RequestChannel] + private val currentRequest = new ThreadLocal[RequestChannel.Request] + + // For testing + private var bypassThreadCheck = false + def setBypassThreadCheck(bypassCheck: Boolean): Unit = { + bypassThreadCheck = bypassCheck + } + + def currentRequestOnThread(): RequestChannel.Request = { + currentRequest.get() + } + + /** + * Wrap callback to schedule it on a request thread. + * NOTE: this function must be called on a request thread. + * @param fun Callback function to execute + * @return Wrapped callback that would execute `fun` on a request thread + */ + def wrap[T](fun: T => Unit)(request: RequestChannel.Request): T => Unit = { + val requestChannel = threadRequestChannel.get() + if (requestChannel == null) { Review Comment: or currentRequest == null ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -575,6 +575,13 @@ class Partition(val topicPartition: TopicPartition, } } + def hasOngoingTransaction(producerId: Long): Boolean = { + leaderLogIfLocal match { + case Some(leaderLog) => leaderLog.hasOngoingTransaction(producerId) + case _ => false + } Review Comment: leaderLogIfLocal.exists(leaderLog => leaderLog.hasOngoingTransaction(producerId)) ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -572,6 +572,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, result } + def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized { + val entry = producerStateManager.activeProducers.get(producerId) + entry != null && entry.currentTxnFirstOffset.isPresent Review Comment: producerStateManager.activeProducers.get(producerId).exists(entry => entry.currentTxnFirstOffset.isPresent) ########## core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala: ########## Review Comment: Do we have a test that actually tests that we'd get a failure if we try to produce without adding the partition to transaction first? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org