dajac commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1219799829


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,9 +579,30 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
+  /**
+   * Maybe create and return the verificationGuard object for the given 
producer ID if the transaction is not yet ongoing.

Review Comment:
   ditto.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1005,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction 
state.
+          // This guarantees that transactional records are never written to 
the log outside of the transaction coordinator's knowledge of an open 
transaction on
+          // the partition. If we do not have an ongoing transaction or 
correct guard, return an error and do not append.
+          // There are two phases -- the first append to the log and 
subsequent appends.
+          //
+          // 1. First append: Verification starts with creating a 
verificationGuard, sending a verification request to the transaction 
coordinator, and

Review Comment:
   nit `verification guard` for consistency. The are a few other cases in this 
comment.



##########
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##########
@@ -1087,6 +1087,46 @@ class ProducerStateManagerTest {
     assertTrue(!manager.latestSnapshotOffset.isPresent)
   }
 
+  @Test
+  def testEntryForVerification(): Unit = {
+    val originalEntry = stateManager.verificationStateEntry(producerId, true)
+    val originalEntryVerificationGuard = originalEntry.verificationGuard()
+
+    def verifyEntry(producerId: Long, newEntry: VerificationStateEntry): Unit 
= {
+      val entry = stateManager.verificationStateEntry(producerId, false)
+      assertEquals(originalEntryVerificationGuard, entry.verificationGuard)
+      assertEquals(entry.verificationGuard, newEntry.verificationGuard)
+    }
+
+    // If we already have an entry, reuse it.
+    val updatedEntry = stateManager.verificationStateEntry(producerId, true)
+    verifyEntry(producerId, updatedEntry)
+
+    // Before we add transactional data, we can't remove the entry.
+    stateManager.clearVerificationStateEntry(producerId)
+    verifyEntry(producerId, updatedEntry)
+
+    // Add the transactional data and clear the entry
+    append(stateManager, producerId, 0, 0, offset = 0, isTransactional = true)
+    stateManager.clearVerificationStateEntry(producerId)
+    assertEquals(null, stateManager.verificationStateEntry(producerId, false))
+

Review Comment:
   nit: Empty line could be removed.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -575,8 +575,12 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def hasOngoingTransaction(producerId: Long): Boolean = {
-    leaderLogIfLocal.exists(leaderLog => 
leaderLog.hasOngoingTransaction(producerId))
+  // Returns a verificationGuard object if we need to verify. This starts or 
continues the verification process. Otherwise return null.

Review Comment:
   nit: `verification guard object`? I find `verificationGuard` confusing 
because we actually return an `Object`.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1005,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction 
state.
+          // This guarantees that transactional records are never written to 
the log outside of the transaction coordinator's knowledge of an open 
transaction on
+          // the partition. If we do not have an ongoing transaction or 
correct guard, return an error and do not append.
+          // There are two phases -- the first append to the log and 
subsequent appends.
+          //
+          // 1. First append: Verification starts with creating a 
verificationGuard, sending a verification request to the transaction 
coordinator, and
+          // given a "verified" response, continuing the append path. (A 
non-verified response throws an error.) We create the unique verification guard 
for the transaction
+          // to ensure there is no race between the transaction coordinator 
response and an abort marker getting written to the log. We need a unique guard 
because we could
+          // have a sequence of events where we start a transaction 
verification, have the transaction coordinator send a verified response, write 
an abort marker,
+          // start a new transaction not aware of the partition, and receive 
the stale verification (ABA problem). With a unique verificationGuard, this 
sequence would not
+          // result in appending to the log and would return an error. The 
guard is removed after the first append to the transaction and from then, we 
can rely on phase 2.
+          //
+          // 2. Subsequent appends: Once we write to the transaction, the 
in-memory state currentTxnFirstOffset is populated. This field remains until the
+          // transaction is completed or aborted. We can guarantee the 
transaction coordinator knows about the transaction given step 1 and that the 
transaction is still
+          // ongoing. If the transaction is expected to be ongoing, we will 
not st a verification guard. If the transaction is aborted, 
hasOngoingTransaction is false and

Review Comment:
   nit: s/st/set



##########
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##########
@@ -1087,6 +1087,46 @@ class ProducerStateManagerTest {
     assertTrue(!manager.latestSnapshotOffset.isPresent)
   }
 
+  @Test
+  def testEntryForVerification(): Unit = {
+    val originalEntry = stateManager.verificationStateEntry(producerId, true)
+    val originalEntryVerificationGuard = originalEntry.verificationGuard()
+
+    def verifyEntry(producerId: Long, newEntry: VerificationStateEntry): Unit 
= {
+      val entry = stateManager.verificationStateEntry(producerId, false)
+      assertEquals(originalEntryVerificationGuard, entry.verificationGuard)
+      assertEquals(entry.verificationGuard, newEntry.verificationGuard)
+    }
+
+    // If we already have an entry, reuse it.
+    val updatedEntry = stateManager.verificationStateEntry(producerId, true)
+    verifyEntry(producerId, updatedEntry)
+
+    // Before we add transactional data, we can't remove the entry.
+    stateManager.clearVerificationStateEntry(producerId)
+    verifyEntry(producerId, updatedEntry)
+
+    // Add the transactional data and clear the entry

Review Comment:
   nit: `.`



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,9 +579,30 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
+  /**
+   * Maybe create and return the verificationGuard object for the given 
producer ID if the transaction is not yet ongoing.
+   * Creation starts the verification process. Otherwise return null.
+   */
+  def maybeStartTransactionVerification(producerId: Long): Object = lock 
synchronized {
+    if (hasOngoingTransaction(producerId))
+      null
+    else
+      verificationGuard(producerId, true)
+  }
+
+  /**
+   * Maybe create the VerificationStateEntry for the given producer ID -- if 
an entry is present, return its verification guard, otherwise, return null.
+   */
+  def verificationGuard(producerId: Long, createIfAbsent: Boolean = false): 
Object = lock synchronized {
+    val entry = producerStateManager.verificationStateEntry(producerId, 
createIfAbsent)
+    if (entry != null) entry.verificationGuard else null
+  }
+
+  /**
+   * Return true if the given producer ID has a transaction ongoing.
+   */
   def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized {
-    val entry = producerStateManager.activeProducers.get(producerId)
-    entry != null && entry.currentTxnFirstOffset.isPresent
+    producerStateManager.activeProducers.getOrDefault(producerId, 
ProducerStateEntry.empty(producerId)).currentTxnFirstOffset.isPresent

Review Comment:
   nit: The previous version seems better to me as we don't have to allocate 
`ProducerStateEntry.empty(producerId)` all the time.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class represents the verification state of a specific producer id.
+ * It contains a verificationGuard object that is used to uniquely identify 
the transaction we want to verify.
+ * After verifying, we retain this object until we append to the log. This 
prevents any race conditions where the transaction
+ * may end via a control marker before we write to the log. This mechanism is 
used to prevent hanging transactions.
+ * We remove the verificationGuard whenever we write data to the transaction 
or write an end marker for the transaction.
+ * Any lingering entries that are never verified are removed via the producer 
state entry cleanup mechanism.
+ */
+public class VerificationStateEntry {
+
+    private long timestamp;
+    private Object verificationGuard;

Review Comment:
   nit: Those two could be `final`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to