jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447879037
##########
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##########
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
))
}
+ /**
+ * Verify the transaction.
+ *
+ * @param tp The partition to write records to.
+ * @param transactionalId The transactional id.
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @return A future containing the {@link VerificationGuard} or an exception.
+ * @throws KafkaException Any KafkaException caught during the operation.
+ */
+ override def maybeStartTransactionVerification(
+ tp: TopicPartition,
+ transactionalId: String,
+ producerId: Long,
+ producerEpoch: Short
+ ): CompletableFuture[VerificationGuard] = {
+ val future = new CompletableFuture[VerificationGuard]()
+ replicaManager.maybeStartTransactionVerificationForPartition(
+ topicPartition = tp,
+ transactionalId = transactionalId,
+ producerId = producerId,
+ producerEpoch = producerEpoch,
+ baseSequence = RecordBatch.NO_SEQUENCE,
+ requestLocal = RequestLocal.NoCaching,
+ callback = (error, _, verificationGuard) => {
+ if (error != Errors.NONE) {
+ future.completeExceptionally(error.exception)
+ } else {
+ future.complete(verificationGuard)
+ }
+ }
+ )
+ future
+ }
+
private def internalAppend(
tp: TopicPartition,
- memoryRecords: MemoryRecords
+ memoryRecords: MemoryRecords,
+ verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
): Long = {
var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
- replicaManager.appendRecords(
+ replicaManager.appendForGroup(
timeout = 0L,
requiredAcks = 1,
- internalTopicsAllowed = true,
- origin = AppendOrigin.COORDINATOR,
entriesPerPartition = Map(tp -> memoryRecords),
responseCallback = results => appendResults = results,
+ requestLocal = RequestLocal.NoCaching,
Review Comment:
Sorry -- you are correct. I was confused because we have a check for if we
execute on the same request thread -- there we don't reschedule/wrap the
request. (This was added when we error before we send the request to the txn
coordinator)
Just for my understanding, right now we reschedule to the request thread,
and that works fine -- the only concern is not using the allocated coordinator
threads and taking up space on the request handler threads?
Just triple checking we won't run into an issue with request locals if the
callback expects the buffer supplier from the coordinator thread.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]