artemlivshits commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1206199912
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -579,10 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long, result } + def transactionNeedsVerifying(producerId: Long, producerEpoch: Short, baseSequence: Int): Optional[VerificationState] = lock synchronized { + val entry = producerStateManager.entryForVerification(producerId, producerEpoch, baseSequence) + if (entry.currentTxnFirstOffset.isPresent) { Review Comment: Maybe add a comment why we don't need verification if currentTxnFirstOffset.isPresent. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ########## @@ -103,28 +115,57 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { return false; } } + + public void maybeAddVerificationState() { + // If we already have a verification state, we can reuse it. This is because we know this is the same transaction + // as the state is cleared upon writing a control marker. + if (!this.verificationState.isPresent()) + this.verificationState = Optional.of(new VerificationState()); Review Comment: This creates an object of class Optional that points to an object of class Verification state, so we get an extra object for every producer entry. We just need a plain value of an Object to avoid extra overhead. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validateAndAssignOffsets = false, leaderEpoch = -1, requestLocal = None, + verificationState = Optional.empty(), Review Comment: Do we ever go through verification logic for followers? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -980,6 +1002,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state Review Comment: Maybe add more comments on how we validate transaction state. Also maybe not here but at least somewhere we should have a detailed comment about the race condition we're addressing and specifically how the verificationState solves the ABA problem. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationState.java: ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +/** + * This class serves as a unique object to ensure the same transaction is being verified. + * When verification starts, this object is created and checked before append to ensure the producer state entry + * is not modified (via ending the transaction) before the record is appended. + */ +public class VerificationState { Review Comment: Do we need a separate class for that? I think we could just use Object, because we just compare references, not values. ########## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ########## @@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { nodesToTransactions.synchronized { // Check if we have already have either node or individual transaction. Add the Node if it isn't there. - val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, + val existingNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, new TransactionDataAndCallbacks( new AddPartitionsToTxnTransactionCollection(1), mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]())) - val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) + val existingTransactionData = existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId) - // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and - // reconnected so return the retriable network exception. - if (currentTransactionData != null) { - val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) - Errors.INVALID_PRODUCER_EPOCH - else - Errors.NETWORK_EXCEPTION - val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() - currentTransactionData.topics().forEach { topic => - topic.partitions().forEach { partition => - topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error) - } + // There are 3 cases if we already have existing data + // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH for existing data since it is fenced + // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for existing data, since the client is likely retrying and we want another retriable exception + // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add incoming data to verify + if (existingTransactionData != null) { + if (existingTransactionData.producerEpoch() <= transactionData.producerEpoch()) { + val error = if (existingTransactionData.producerEpoch() < transactionData.producerEpoch()) + Errors.INVALID_PRODUCER_EPOCH + else + Errors.NETWORK_EXCEPTION + val oldCallback = existingNodeAndTransactionData.callbacks(transactionData.transactionalId()) + existingNodeAndTransactionData.transactionData.remove(transactionData) + oldCallback(topicPartitionsToError(existingTransactionData, error)) + } else { + // If the incoming transactionData's epoch is lower, we can return with INVALID_PRODUCER_EPOCH immediately. + callback(topicPartitionsToError(transactionData, Errors.INVALID_PRODUCER_EPOCH)) + return } - val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) - currentNodeAndTransactionData.transactionData.remove(transactionData) - oldCallback(topicPartitionsToError.toMap) } - currentNodeAndTransactionData.transactionData.add(transactionData) - currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback) + Review Comment: Just to clarify -- this change isn't about "transient" state or handling race with abort, but just an unrelated issue? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ########## @@ -103,28 +115,57 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { return false; } } + + public void maybeAddVerificationState() { + // If we already have a verification state, we can reuse it. This is because we know this is the same transaction + // as the state is cleared upon writing a control marker. + if (!this.verificationState.isPresent()) + this.verificationState = Optional.of(new VerificationState()); + } + + // We only set tentative sequence if this entry has never had any batch metadata written to it. (We do not need to store if we've just bumped epoch.) + // It is used to avoid OutOfOrderSequenceExceptions when we saw a lower sequence during transaction verification. Review Comment: Maybe elaborate on the problem we're trying to address (sequence of steps that leads to the problem). ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -671,6 +671,7 @@ class ReplicaManager(val config: KafkaConfig, val sTime = time.milliseconds val transactionalProducerIds = mutable.HashSet[Long]() + var verificationState: Optional[VerificationState] = Optional.empty() Review Comment: I think this should be per partition -- each log has its own producer state and its own verificationState. Do we have a unit test that tries to append to multiple partitions? I would expect it to fail, because each producer state would have its own verificationState object, but we pass only whoever happened to be last and so for the others it would look like a race. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we are in VERIFIED state. + // Also check that we are not appending a record with a higher sequence than one previously seen through verification. + if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) { + if (verificationState(batch.producerId(), batch.producerEpoch()) != ProducerStateEntry.VerificationState.VERIFIED) { + throw new InvalidRecordException("Record was not part of an ongoing transaction") + } else if (maybeLastEntry.isPresent && maybeLastEntry.get.tentativeSequence.isPresent && maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence) Review Comment: I think verification check belongs here, and sequencing would be better done in ProducerAppendInfo. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -980,6 +1002,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state + // Also check that we are not appending a record with a higher sequence than one previously seen through verification. + if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) { + if (!hasOngoingTransaction(batch.producerId) && (verificationStateOpt != verificationState(batch.producerId) || !verificationStateOpt.isPresent)) { + throw new InvalidRecordException("Record was not part of an ongoing transaction") Review Comment: Looks like we already have the producer state in the maybeLastEntry, so we don't need to do additional lookups and locks and just check the state of maybeLastEntry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org