jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446536290
########## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ########## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( + tp: TopicPartition, + transactionalId: String, + producerId: Long, + producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { + val future = new CompletableFuture[VerificationGuard]() + replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { + if (error != Errors.NONE) { + future.completeExceptionally(error.exception) + } else { + future.complete(verificationGuard) + } + } + ) + future + } + private def internalAppend( tp: TopicPartition, - memoryRecords: MemoryRecords + memoryRecords: MemoryRecords, + verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty - replicaManager.appendRecords( + replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: Right right, I remember this now. I'm wondering though is that ok with the async verification callback. Is it the case we don't evaluate which one to use until we execute the callback. As an aside, did we decide that the callback is ok to execute on the request handler thread? I didn't go through and trace the threads being used yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org