dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447124222


##########
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##########
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
     ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp              The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId      The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+    tp: TopicPartition,
+    transactionalId: String,
+    producerId: Long,
+    producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+    val future = new CompletableFuture[VerificationGuard]()
+    replicaManager.maybeStartTransactionVerificationForPartition(
+      topicPartition = tp,
+      transactionalId = transactionalId,
+      producerId = producerId,
+      producerEpoch = producerEpoch,
+      baseSequence = RecordBatch.NO_SEQUENCE,
+      requestLocal = RequestLocal.NoCaching,
+      callback = (error, _, verificationGuard) => {
+        if (error != Errors.NONE) {
+          future.completeExceptionally(error.exception)
+        } else {
+          future.complete(verificationGuard)
+        }
+      }
+    )
+    future
+  }
+
   private def internalAppend(
     tp: TopicPartition,
-    memoryRecords: MemoryRecords
+    memoryRecords: MemoryRecords,
+    verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
     var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-    replicaManager.appendRecords(
+    replicaManager.appendForGroup(
       timeout = 0L,
       requiredAcks = 1,
-      internalTopicsAllowed = true,
-      origin = AppendOrigin.COORDINATOR,
       entriesPerPartition = Map(tp -> memoryRecords),
       responseCallback = results => appendResults = results,
+      requestLocal = RequestLocal.NoCaching,

Review Comment:
   > Right now though, wrap only schedules to the request thread if we are 
already on a request thread. Otherwise we execute directly. If we start 
verification from a non-request handler thread, maybe this already works as you 
intend.
   
   My understanding is that `wrapAsyncCallback` will throw an exception if the 
caller is not a request thread and `bypassThreadCheck` is not set.
   
   In my case, the verification is called from the request thread. I only 
re-schedule to the coordinator thread when the verification returns. I could 
also have scheduled the verification from the coordinator thread but it does 
not seem necessary.
   
   > Alternatively, I could pass in a parameter to optionally wrap the callback 
(send it to the request thread) or not.
   
   Yeah, that's an option. I played a bit with this idea this morning and the 
outcome was a bit confusing. Another option would be to do the re-scheduling on 
the caller side. I will continue to play... I think that we could address this 
in a subsequent PR to get rid of the extra hop. This PR works as-is but is 
suboptimal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to