artemlivshits commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1213645281


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,9 +579,29 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
+  /**
+   * Maybe create and return the verification state for the given producer ID 
if the transaction is not ongoing. Otherwise return null.
+   */
+  def transactionNeedsVerifying(producerId: Long): Object = lock synchronized {

Review Comment:
   The name implies that it's a function with no side effects, but it actually 
starts verification by installing some state.  Maybe we should use 
"maybeStartTransactionVerification" or something like that.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -184,6 +186,27 @@ private void clearProducerIds() {
         producerIdCount = 0;
     }
 
+    /**
+     * Maybe create the VerificationStateEntry for a given producer ID. Return 
it if it exists, otherwise return null.
+     */
+    public VerificationStateEntry verificationStateEntry(long producerId, 
boolean createIfAbsent) {
+        return verificationStates.computeIfAbsent(producerId, pid -> {
+            if (createIfAbsent)
+                return new VerificationStateEntry(pid, time.milliseconds());
+            else {
+                log.warn("The given producer ID did not have an entry in the 
producer state manager, so it's state will be returned as null");

Review Comment:
   Isn't it expected that we find no entry if we got a race condition with a 
concurrent abort?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1073,7 +1076,8 @@ class ReplicaManager(val config: KafkaConfig,
                                origin: AppendOrigin,
                                entriesPerPartition: Map[TopicPartition, 
MemoryRecords],
                                requiredAcks: Short,
-                               requestLocal: RequestLocal): 
Map[TopicPartition, LogAppendResult] = {
+                               requestLocal: RequestLocal,
+                               verificationState: Object): Map[TopicPartition, 
LogAppendResult] = {

Review Comment:
   This should be per partition.  Either this should be a Map[TopicPartition, 
Object] or it should be added to the entriesPerPartition map.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class represents the verification state of a specific producer-id.
+ * It contains a verificationState object that is used to uniquely identify 
the transaction we want to verify.
+ * After verifying, we retain this object until we append to the log. This 
prevents any race conditions where the transaction
+ * may end via a control marker before we write to the log. This mechanism is 
used to prevent hanging transactions.
+ * We remove the verification state whenever we write data to the transaction 
or write an end marker for the transaction.
+ * Any lingering entries that are never verified are removed via the producer 
state entry cleanup mechanism.
+ */
+public class VerificationStateEntry {
+
+    private final long producerId;
+    private long timestamp;
+    private Object verificationState;

Review Comment:
   The purpose of this to protect atomicity of verification operation from 
concurrent modifications (sort of 'optimistic locking'), maybe 
verificationSentinel or verificationGuard or verificationTripwire would be a 
better name?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1004,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction 
state.

Review Comment:
   I think we need to add some detailed comments about this verification, there 
were so many discussions that lead to these 3 lines of code it's hard to 
reverse engineer all the reasons behind this logic without comments.  I would 
write something along the following lines (really big and detailed comment):
   
   The purpose of this code is to make sure we never add a new transactional 
message for a transaction that is not open in the transaction manager (a.k.a. 
"hanging transaction").  The main scenario is addressed by sending a 
verification RPC to the transaction coordinator (TC) that verifies that the 
transaction is ongoing for this partitions, but there are still some race 
conditions that need to be handled under the log's lock so that validation is 
atomic with append:
   1. Producer adds partition to transaction (talks to TC).
   2. Producer sends message to partition leader (this code path).
   3. If this is a new transaction, we create a verficationState object, that 
serves as a sentinel.
   4. We send verification RPC to TC.
   5. TC verifies that the transaction is ongoing and sends successful reply.
   6. Producer aborts the transaction.
   7. TC sends abort marker to partition leaders.
   8. Abort marker gets written to the log, resets verificationState object.
   9. We try to write the producer message to the log, and we  can see that the 
verificationState object is reset, so we error out.
   
   If we didn't have the verificationState object validation, then we could 
write a message at step 9 after the abort marker got written at step 8 creating 
a "hanging" transaction.
   
   The reason to have a verificationState object and not just isBeingVerified 
boolean is the following:
   1. Producer adds partition to transaction (talks to TC).
   2. Producer sends message to partition leader (this code path).
   3. Producer retries the same message to partition leader (message arrives on 
a new connection).
   4. First message arrives, transaction is not started, create 
verficationState.
   5. We send verification RPC to TC.
   6. TC verifies that the transaction is ongoing and sends successful reply.
   7. Producer aborts the transaction.
   8. TC sends abort marker to partition leaders.
   9. Abort marker gets written to the log, resets verificationState object.
   10. Second produce request (retry) arrives, transaction is not started, 
create verification
   11. We try to write the first producer message to the log, and we  can see 
that the verificationState object is different, so we error out.
   
   If we just used a Boolean flag, we could think at the step 11 that there was 
no state transition, while in fact it was a transition back to the same state 
(the ABA problem).
   
   If a transaction is already started for this partition (i.e. some messages 
got written), we don't send RPC to TC, but we need to re-validate that the 
transaction is still started under the log's lock before insertion, so if we 
didn't run the validation (i.e. requestVerificationState is null), then the 
transaction must be still running.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -683,6 +704,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       validateAndAssignOffsets = false,
       leaderEpoch = -1,
       requestLocal = None,
+      verificationState = null,

Review Comment:
   Do we have a case when we need to use verificationState on a follower?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class represents the verification state of a specific producer-id.
+ * It contains a verificationState object that is used to uniquely identify 
the transaction we want to verify.
+ * After verifying, we retain this object until we append to the log. This 
prevents any race conditions where the transaction
+ * may end via a control marker before we write to the log. This mechanism is 
used to prevent hanging transactions.
+ * We remove the verification state whenever we write data to the transaction 
or write an end marker for the transaction.
+ * Any lingering entries that are never verified are removed via the producer 
state entry cleanup mechanism.
+ */
+public class VerificationStateEntry {
+
+    private final long producerId;

Review Comment:
   Do we need this?  I think we already have this info in the map key.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -671,6 +671,7 @@ class ReplicaManager(val config: KafkaConfig,
       val sTime = time.milliseconds
       
       val transactionalProducerIds = mutable.HashSet[Long]()
+      var verificationState: Object = null

Review Comment:
   This should be per partition, here we're just taking a random 
verificationState that happened to be in the whatever partition got iterated 
last.  Do we have a unit test that adds batches to multiple partitions in the 
same transaction?  I would expect it to fail, because verificationState objects 
of different partitions wouldn't match.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to