Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-11 Thread via GitHub


dajac merged PR #15142:
URL: https://github.com/apache/kafka/pull/15142


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1448416633


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,57 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception 
if the

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


artemlivshits commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r144830


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   My understanding is that by the time we come to this function it already 
runs on a GC thread (it has to happen because that's how we guarantee the 
atomicity), so there is no request local anyway here and we must use NoCaching 
(it is always safe to use NoCaching, just not as optimal as thread local).
   
   The future's callback will be called in the thread that completed it, but 
like I said, it doesn't matter for this function as it gets rescheduled on the 
GC thread pool.
   
   I gave a suggestion to hoist wrapping to the caller, so new GC doesn't have 
to do double-schedule 
https://github.com/apache/kafka/pull/14774#discussion_r1420931474.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1448027564


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   I guess we just complete the future in the callback and do the write in the 
thenCompose. That made me realize I don't know everything about how futures are 
scheduled, but looking at the docs I think it is safe to say this is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447988893


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   Ok -- sorry I just meant I didn't know if it was passed through a callback 
(or future in this case) or if it was only invoked in the callback. For 
example, there were issues in the past since we used the buffer defined for the 
callback on a new handler thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447918075


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,57 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception 
if the

Review Comment:
   sorry, missed it. i think that i will just replace those docs with links to 
the interface doc in order to avoid this in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447916841


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   Yes, we reschedule to request thread to only push an event to the 
coordinator thread. It is not optimal. The request local is not used at all as 
the writer uses its own buffers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447883700


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,57 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception 
if the

Review Comment:
   nit: did we want to update this comment to be consistent with the 
PartitionWriter one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447880085


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   If it is the case its only an optimization question rather than a 
correctness one, I am good to tackle in a followup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447879037


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   Sorry -- you are correct. I was confused because we have a check for if we 
execute on the same request thread -- there we don't reschedule/wrap the 
request. (This was added when we error before we send the request to the txn 
coordinator)
   
   Just for my understanding, right now we reschedule to the request thread, 
and that works fine -- the only concern is not using the allocated coordinator 
threads and taking up space on the request handler threads?
   
   Just triple checking we won't run into an issue with request locals if the 
callback expects the buffer supplier from the coordinator thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447873603


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1106,6 +1106,8 @@ private static boolean isGroupIdNotEmpty(String groupId) {
  * @return The Errors instance associated with the given exception.
  */
 private static Errors normalizeException(Throwable exception) {
+exception = Errors.maybeUnwrapException(exception);

Review Comment:
   I see. It was too elegant before 😂 Thanks for looking into it and fixing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


dajac commented on PR #15142:
URL: https://github.com/apache/kafka/pull/15142#issuecomment-1884851533

   @jolshan Thanks for your comments. I have addressed all of them expect the 
one about the thread re-scheduling. I propose to tackle this separately in 
order to keep this patch simple. I have also enabled the transaction system 
tests and ran them successfully. The results are 
[here](https://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/system-test-kafka-branch-builder--1704886118--dajac--KAFKA-14505-4--8535113ae0/2024-01-10--001./2024-01-10--001./report.html).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447125608


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1106,6 +1106,8 @@ private static boolean isGroupIdNotEmpty(String groupId) {
  * @return The Errors instance associated with the given exception.
  */
 private static Errors normalizeException(Throwable exception) {
+exception = Errors.maybeUnwrapException(exception);

Review Comment:
   I found out that the `thenCompose` used in the runtime wraps the exception 
so we need to unwrap it here. I added a test for this too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447124222


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   > Right now though, wrap only schedules to the request thread if we are 
already on a request thread. Otherwise we execute directly. If we start 
verification from a non-request handler thread, maybe this already works as you 
intend.
   
   My understanding is that `wrapAsyncCallback` will throw an exception if the 
caller is not a request thread and `bypassThreadCheck` is not set.
   
   In my case, the verification is called from the request thread. I only 
re-schedule to the coordinator thread when the verification returns. I could 
also have scheduled the verification from the coordinator thread but it does 
not seem necessary.
   
   > Alternatively, I could pass in a parameter to optionally wrap the callback 
(send it to the request thread) or not.
   
   Yeah, that's an option. I played a bit with this idea this morning and the 
outcome was a bit confusing. Another option would be to do the re-scheduling on 
the caller side. I will continue to play... I think that we could address this 
in a subsequent PR to get rid of the extra hop. This PR works as-is but is 
suboptimal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447050169


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java:
##
@@ -116,4 +120,21 @@ long appendEndTransactionMarker(
 int coordinatorEpoch,
 TransactionResult result
 ) throws KafkaException;
+
+/**
+ * Verify the transaction.
+ *
+ * @param tpThe partition to write records to.
+ * @param transactionalId   The transactional id.
+ * @param producerIdThe producer id.
+ * @param producerEpoch The producer epoch.
+ * @return A future containing the {@link VerificationGuard} or an 
exception.

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446688518


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   Yes -- that part has not changed. We "wrap" any callback from the 
transaction coordinator to the request handler thread.
   
   Right now though, wrap only schedules to the request thread if we are 
already on a request thread. Otherwise we execute directly. If we start 
verification from a non-request handler thread, maybe this already works as you 
intend.
   
   Alternatively, I could pass in a parameter to optionally wrap the callback 
(send it to the request thread) or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446663978


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   btw, when the validation completed, we schedule an event on the 
coordinator’s thread pool so we don’t really need to execute it on the request 
handler thread. with your refactor, do you plan to keep the re-scheduling 
within the replica manager?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446657520


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   Thanks! I will try to run through it today too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446654869


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   ah, right. i forgot about that one. let me check this tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446538966


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java:
##
@@ -116,4 +120,21 @@ long appendEndTransactionMarker(
 int coordinatorEpoch,
 TransactionResult result
 ) throws KafkaException;
+
+/**
+ * Verify the transaction.
+ *
+ * @param tpThe partition to write records to.
+ * @param transactionalId   The transactional id.
+ * @param producerIdThe producer id.
+ * @param producerEpoch The producer epoch.
+ * @return A future containing the {@link VerificationGuard} or an 
exception.

Review Comment:
   There is one case where I think we still throw an error even if the 
partition is already verified. When we look up the partition to see if it needs 
verification, we could throw an error if the partition isn't on the broker. Not 
a huge deal though and we probably don't need to include.
   
   But an alternate description could be something like it returns any error 
encountered or the verification guard if it needed verification and the 
sentinel if it did not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446536290


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   Right right, I remember this now. 
   I'm wondering though is that ok with the async verification callback. Is it 
the case we don't evaluate which one to use until we execute the callback.
   
   As an aside, did we decide that the callback is ok to execute on the request 
handler thread? I didn't go through and trace the threads being used yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446091865


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(

Review Comment:
   that's how i understand it too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446091535


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java:
##
@@ -116,4 +120,21 @@ long appendEndTransactionMarker(
 int coordinatorEpoch,
 TransactionResult result
 ) throws KafkaException;
+
+/**
+ * Verify the transaction.
+ *
+ * @param tpThe partition to write records to.
+ * @param transactionalId   The transactional id.
+ * @param producerIdThe producer id.
+ * @param producerEpoch The producer epoch.
+ * @return A future containing the {@link VerificationGuard} or an 
exception.

Review Comment:
   i extended the description. let me know if it works for you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446088963


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   no, we don't have request locals here. the writer uses its own butter 
supplier (`threadLocalBufferSupplier`) so the `requestLocal` is not used at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446089431


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1466,17 +1477,25 @@ public  CompletableFuture 
scheduleTransactionalWriteOperation(
 ) {
 throwIfNotRunning();
 log.debug("Scheduled execution of transactional write operation {}.", 
name);
-CoordinatorWriteEvent event = new CoordinatorWriteEvent<>(
-name,
+return partitionWriter.maybeStartTransactionVerification(
 tp,
 transactionalId,
 producerId,
-producerEpoch,
-timeout,
-op
-);
-enqueue(event);
-return event.future;
+producerEpoch
+).thenCompose(verificationGuard -> {

Review Comment:
   yeah, i am also happy with this. it is simple and elegant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-08 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1445444996


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(

Review Comment:
   One thing to note is that we don't always verify the partition if it has 
already been verified for the transaction. I guess in that case we return the 
sentinel guard which works 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-08 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1445431424


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java:
##
@@ -116,4 +120,21 @@ long appendEndTransactionMarker(
 int coordinatorEpoch,
 TransactionResult result
 ) throws KafkaException;
+
+/**
+ * Verify the transaction.
+ *
+ * @param tpThe partition to write records to.
+ * @param transactionalId   The transactional id.
+ * @param producerIdThe producer id.
+ * @param producerEpoch The producer epoch.
+ * @return A future containing the {@link VerificationGuard} or an 
exception.

Review Comment:
   nit: we could have a verification guard without verifying the transaction -- 
(how the method in ReplicaManager works) so maybe we should include that we 
only return the verification guard and not an error if it was verified by the 
transaction coordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-08 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1445426296


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1466,17 +1477,25 @@ public  CompletableFuture 
scheduleTransactionalWriteOperation(
 ) {
 throwIfNotRunning();
 log.debug("Scheduled execution of transactional write operation {}.", 
name);
-CoordinatorWriteEvent event = new CoordinatorWriteEvent<>(
-name,
+return partitionWriter.maybeStartTransactionVerification(
 tp,
 transactionalId,
 producerId,
-producerEpoch,
-timeout,
-op
-);
-enqueue(event);
-return event.future;
+producerEpoch
+).thenCompose(verificationGuard -> {

Review Comment:
   I'm glad this ended up being kind of elegant :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-08 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1445415307


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   do we not typically use request locals for this type of append?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-08 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1445412684


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -989,6 +989,7 @@ class ReplicaManager(val config: KafkaConfig,
* @param requestLocal  container for the stateful instances 
scoped to this request -- this must correspond to the
*  thread calling this method
* @param verificationGuardsthe mapping from topic partition to 
verification guards if transaction verification is used
+   * @param actionQueue   the action queue to use

Review Comment:
   good news is the refactor will allow specifying the action queue as normal 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-08 Thread via GitHub


jolshan commented on PR #15142:
URL: https://github.com/apache/kafka/pull/15142#issuecomment-1881648674

   Yup -- sorry was hoping to do all the clean ups before you got to this -- 
but shouldn't be too hard to update.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-08 Thread via GitHub


dajac opened a new pull request, #15142:
URL: https://github.com/apache/kafka/pull/15142

   This patch wires the transaction verification in the new group coordinator. 
It basically calls the verification path before scheduling the write operation. 
If the verification fails, the error is returned to the caller.
   
   Note that the patch uses `appendForGroup`. I suppose that we will move away 
from using it when https://github.com/apache/kafka/pull/15087 is merged.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org