dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447124222
########## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ########## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( + tp: TopicPartition, + transactionalId: String, + producerId: Long, + producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { + val future = new CompletableFuture[VerificationGuard]() + replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { + if (error != Errors.NONE) { + future.completeExceptionally(error.exception) + } else { + future.complete(verificationGuard) + } + } + ) + future + } + private def internalAppend( tp: TopicPartition, - memoryRecords: MemoryRecords + memoryRecords: MemoryRecords, + verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty - replicaManager.appendRecords( + replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: > Right now though, wrap only schedules to the request thread if we are already on a request thread. Otherwise we execute directly. If we start verification from a non-request handler thread, maybe this already works as you intend. My understanding is that `wrapAsyncCallback` will throw an exception if the caller is not a request thread and `bypassThreadCheck` is not set. In my case, the verification is called from the request thread. I only re-schedule to the coordinator thread when the verification returns. I could also have scheduled the verification from the coordinator thread but it does not seem necessary. > Alternatively, I could pass in a parameter to optionally wrap the callback (send it to the request thread) or not. Yeah, that's an option. I played a bit with this idea this morning and the outcome was a bit confusing. Another option would be to do the re-scheduling on the caller side. I will continue to play... I think that we could address this in a subsequent PR to get rid of the extra hop. This PR works as-is but is suboptimal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org