jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221910510


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1006,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction 
state.
+          // This guarantees that transactional records are never written to 
the log outside of the transaction coordinator's knowledge of an open 
transaction on
+          // the partition. If we do not have an ongoing transaction or 
correct guard, return an error and do not append.
+          // There are two phases -- the first append to the log and 
subsequent appends.
+          //
+          // 1. First append: Verification starts with creating a verification 
guard object, sending a verification request to the transaction coordinator, and
+          // given a "verified" response, continuing the append path. (A 
non-verified response throws an error.) We create the unique verification guard 
for the transaction
+          // to ensure there is no race between the transaction coordinator 
response and an abort marker getting written to the log. We need a unique guard 
because we could
+          // have a sequence of events where we start a transaction 
verification, have the transaction coordinator send a verified response, write 
an abort marker,
+          // start a new transaction not aware of the partition, and receive 
the stale verification (ABA problem). With a unique verification guard object, 
this sequence would not
+          // result in appending to the log and would return an error. The 
guard is removed after the first append to the transaction and from then, we 
can rely on phase 2.
+          //
+          // 2. Subsequent appends: Once we write to the transaction, the 
in-memory state currentTxnFirstOffset is populated. This field remains until the
+          // transaction is completed or aborted. We can guarantee the 
transaction coordinator knows about the transaction given step 1 and that the 
transaction is still
+          // ongoing. If the transaction is expected to be ongoing, we will 
not set a verification guard. If the transaction is aborted, 
hasOngoingTransaction is false and
+          // requestVerificationGuard is null, so we will throw an error. A 
subsequent produce request (retry) should create verification state and return 
to phase 1.
+          if (batch.isTransactional && 
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
+            if (!hasOngoingTransaction(batch.producerId) && 
(requestVerificationGuard != verificationGuard(batch.producerId) || 
requestVerificationGuard == null))

Review Comment:
   This is a hold over from the previous pr that also included tentative state. 
Since we may move that, I can make this all one. (But I may make it a helper 
since this is quite a lot of logic. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to