jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447988893


##########
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##########
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
     ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp              The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId      The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+    tp: TopicPartition,
+    transactionalId: String,
+    producerId: Long,
+    producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+    val future = new CompletableFuture[VerificationGuard]()
+    replicaManager.maybeStartTransactionVerificationForPartition(
+      topicPartition = tp,
+      transactionalId = transactionalId,
+      producerId = producerId,
+      producerEpoch = producerEpoch,
+      baseSequence = RecordBatch.NO_SEQUENCE,
+      requestLocal = RequestLocal.NoCaching,
+      callback = (error, _, verificationGuard) => {
+        if (error != Errors.NONE) {
+          future.completeExceptionally(error.exception)
+        } else {
+          future.complete(verificationGuard)
+        }
+      }
+    )
+    future
+  }
+
   private def internalAppend(
     tp: TopicPartition,
-    memoryRecords: MemoryRecords
+    memoryRecords: MemoryRecords,
+    verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
     var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-    replicaManager.appendRecords(
+    replicaManager.appendForGroup(
       timeout = 0L,
       requiredAcks = 1,
-      internalTopicsAllowed = true,
-      origin = AppendOrigin.COORDINATOR,
       entriesPerPartition = Map(tp -> memoryRecords),
       responseCallback = results => appendResults = results,
+      requestLocal = RequestLocal.NoCaching,

Review Comment:
   Ok -- sorry I just meant I didn't know if it was passed through a callback 
(or future in this case) or if it was only invoked in the callback. For 
example, there were issues in the past since we used the buffer defined for the 
callback on a new handler thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to