artemlivshits commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1213645281
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -579,9 +579,29 @@ class UnifiedLog(@volatile var logStartOffset: Long, result } + /** + * Maybe create and return the verification state for the given producer ID if the transaction is not ongoing. Otherwise return null. + */ + def transactionNeedsVerifying(producerId: Long): Object = lock synchronized { Review Comment: The name implies that it's a function with no side effects, but it actually starts verification by installing some state. Maybe we should use "maybeStartTransactionVerification" or something like that. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ########## @@ -184,6 +186,27 @@ private void clearProducerIds() { producerIdCount = 0; } + /** + * Maybe create the VerificationStateEntry for a given producer ID. Return it if it exists, otherwise return null. + */ + public VerificationStateEntry verificationStateEntry(long producerId, boolean createIfAbsent) { + return verificationStates.computeIfAbsent(producerId, pid -> { + if (createIfAbsent) + return new VerificationStateEntry(pid, time.milliseconds()); + else { + log.warn("The given producer ID did not have an entry in the producer state manager, so it's state will be returned as null"); Review Comment: Isn't it expected that we find no entry if we got a race condition with a concurrent abort? ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1073,7 +1076,8 @@ class ReplicaManager(val config: KafkaConfig, origin: AppendOrigin, entriesPerPartition: Map[TopicPartition, MemoryRecords], requiredAcks: Short, - requestLocal: RequestLocal): Map[TopicPartition, LogAppendResult] = { + requestLocal: RequestLocal, + verificationState: Object): Map[TopicPartition, LogAppendResult] = { Review Comment: This should be per partition. Either this should be a Map[TopicPartition, Object] or it should be added to the entriesPerPartition map. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +/** + * This class represents the verification state of a specific producer-id. + * It contains a verificationState object that is used to uniquely identify the transaction we want to verify. + * After verifying, we retain this object until we append to the log. This prevents any race conditions where the transaction + * may end via a control marker before we write to the log. This mechanism is used to prevent hanging transactions. + * We remove the verification state whenever we write data to the transaction or write an end marker for the transaction. + * Any lingering entries that are never verified are removed via the producer state entry cleanup mechanism. + */ +public class VerificationStateEntry { + + private final long producerId; + private long timestamp; + private Object verificationState; Review Comment: The purpose of this to protect atomicity of verification operation from concurrent modifications (sort of 'optimistic locking'), maybe verificationSentinel or verificationGuard or verificationTripwire would be a better name? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -980,6 +1004,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state. Review Comment: I think we need to add some detailed comments about this verification, there were so many discussions that lead to these 3 lines of code it's hard to reverse engineer all the reasons behind this logic without comments. I would write something along the following lines (really big and detailed comment): The purpose of this code is to make sure we never add a new transactional message for a transaction that is not open in the transaction manager (a.k.a. "hanging transaction"). The main scenario is addressed by sending a verification RPC to the transaction coordinator (TC) that verifies that the transaction is ongoing for this partitions, but there are still some race conditions that need to be handled under the log's lock so that validation is atomic with append: 1. Producer adds partition to transaction (talks to TC). 2. Producer sends message to partition leader (this code path). 3. If this is a new transaction, we create a verficationState object, that serves as a sentinel. 4. We send verification RPC to TC. 5. TC verifies that the transaction is ongoing and sends successful reply. 6. Producer aborts the transaction. 7. TC sends abort marker to partition leaders. 8. Abort marker gets written to the log, resets verificationState object. 9. We try to write the producer message to the log, and we can see that the verificationState object is reset, so we error out. If we didn't have the verificationState object validation, then we could write a message at step 9 after the abort marker got written at step 8 creating a "hanging" transaction. The reason to have a verificationState object and not just isBeingVerified boolean is the following: 1. Producer adds partition to transaction (talks to TC). 2. Producer sends message to partition leader (this code path). 3. Producer retries the same message to partition leader (message arrives on a new connection). 4. First message arrives, transaction is not started, create verficationState. 5. We send verification RPC to TC. 6. TC verifies that the transaction is ongoing and sends successful reply. 7. Producer aborts the transaction. 8. TC sends abort marker to partition leaders. 9. Abort marker gets written to the log, resets verificationState object. 10. Second produce request (retry) arrives, transaction is not started, create verification 11. We try to write the first producer message to the log, and we can see that the verificationState object is different, so we error out. If we just used a Boolean flag, we could think at the step 11 that there was no state transition, while in fact it was a transition back to the same state (the ABA problem). If a transaction is already started for this partition (i.e. some messages got written), we don't send RPC to TC, but we need to re-validate that the transaction is still started under the log's lock before insertion, so if we didn't run the validation (i.e. requestVerificationState is null), then the transaction must be still running. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -683,6 +704,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validateAndAssignOffsets = false, leaderEpoch = -1, requestLocal = None, + verificationState = null, Review Comment: Do we have a case when we need to use verificationState on a follower? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +/** + * This class represents the verification state of a specific producer-id. + * It contains a verificationState object that is used to uniquely identify the transaction we want to verify. + * After verifying, we retain this object until we append to the log. This prevents any race conditions where the transaction + * may end via a control marker before we write to the log. This mechanism is used to prevent hanging transactions. + * We remove the verification state whenever we write data to the transaction or write an end marker for the transaction. + * Any lingering entries that are never verified are removed via the producer state entry cleanup mechanism. + */ +public class VerificationStateEntry { + + private final long producerId; Review Comment: Do we need this? I think we already have this info in the map key. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -671,6 +671,7 @@ class ReplicaManager(val config: KafkaConfig, val sTime = time.milliseconds val transactionalProducerIds = mutable.HashSet[Long]() + var verificationState: Object = null Review Comment: This should be per partition, here we're just taking a random verificationState that happened to be in the whatever partition got iterated last. Do we have a unit test that adds batches to multiple partitions in the same transaction? I would expect it to fail, because verificationState objects of different partitions wouldn't match. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org