[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1207484112 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validateAndAssignOffsets = false, leaderEpoch = -1, requestLocal = None, + verificationState = Optional.empty(), Review Comment: We don't do verification on non-client origin requests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1207484112 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validateAndAssignOffsets = false, leaderEpoch = -1, requestLocal = None, + verificationState = Optional.empty(), Review Comment: I suppose I should confirm we won't have an issue when the verification state is blank -- I will make sure of this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1207324170 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validateAndAssignOffsets = false, leaderEpoch = -1, requestLocal = None, + verificationState = Optional.empty(), Review Comment: We currently don't. ~I guess there is an argument that we could have a marker come in before a fetch response.~ EDIT: if we don't replicate in order we have a problem, so I don't think we need to cover this. I'm not sure how this would be implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1207368803 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { nodesToTransactions.synchronized { // Check if we have already have either node or individual transaction. Add the Node if it isn't there. - val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, + val existingNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, new TransactionDataAndCallbacks( new AddPartitionsToTxnTransactionCollection(1), mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]())) - val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) + val existingTransactionData = existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId) - // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and - // reconnected so return the retriable network exception. - if (currentTransactionData != null) { -val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) - Errors.INVALID_PRODUCER_EPOCH -else - Errors.NETWORK_EXCEPTION -val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() -currentTransactionData.topics().forEach { topic => - topic.partitions().forEach { partition => -topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error) - } + // There are 3 cases if we already have existing data + // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH for existing data since it is fenced + // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for existing data, since the client is likely retrying and we want another retriable exception + // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add incoming data to verify + if (existingTransactionData != null) { +if (existingTransactionData.producerEpoch() <= transactionData.producerEpoch()) { +val error = if (existingTransactionData.producerEpoch() < transactionData.producerEpoch()) + Errors.INVALID_PRODUCER_EPOCH +else + Errors.NETWORK_EXCEPTION + val oldCallback = existingNodeAndTransactionData.callbacks(transactionData.transactionalId()) + existingNodeAndTransactionData.transactionData.remove(transactionData) + oldCallback(topicPartitionsToError(existingTransactionData, error)) +} else { + // If the incoming transactionData's epoch is lower, we can return with INVALID_PRODUCER_EPOCH immediately. + callback(topicPartitionsToError(transactionData, Errors.INVALID_PRODUCER_EPOCH)) + return } -val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) -currentNodeAndTransactionData.transactionData.remove(transactionData) -oldCallback(topicPartitionsToError.toMap) } - currentNodeAndTransactionData.transactionData.add(transactionData) - currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback) + Review Comment: I'll separate this out into a new PR as I'm already splitting this up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1207333185 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -671,6 +671,7 @@ class ReplicaManager(val config: KafkaConfig, val sTime = time.milliseconds val transactionalProducerIds = mutable.HashSet[Long]() + var verificationState: Optional[VerificationState] = Optional.empty() Review Comment: I don't think we would get a new one. We only need one per transaction right? So either we succeed and no longer need to worry about the verification, or the partition is retried and get the same verification object from the first time. We do have a test with multiple partitions but I would have to check if it also checks the verification state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1207330097 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { nodesToTransactions.synchronized { // Check if we have already have either node or individual transaction. Add the Node if it isn't there. - val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, + val existingNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, new TransactionDataAndCallbacks( new AddPartitionsToTxnTransactionCollection(1), mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]())) - val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) + val existingTransactionData = existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId) - // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and - // reconnected so return the retriable network exception. - if (currentTransactionData != null) { -val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) - Errors.INVALID_PRODUCER_EPOCH -else - Errors.NETWORK_EXCEPTION -val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() -currentTransactionData.topics().forEach { topic => - topic.partitions().forEach { partition => -topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error) - } + // There are 3 cases if we already have existing data + // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH for existing data since it is fenced + // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for existing data, since the client is likely retrying and we want another retriable exception + // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add incoming data to verify + if (existingTransactionData != null) { +if (existingTransactionData.producerEpoch() <= transactionData.producerEpoch()) { +val error = if (existingTransactionData.producerEpoch() < transactionData.producerEpoch()) + Errors.INVALID_PRODUCER_EPOCH +else + Errors.NETWORK_EXCEPTION + val oldCallback = existingNodeAndTransactionData.callbacks(transactionData.transactionalId()) + existingNodeAndTransactionData.transactionData.remove(transactionData) + oldCallback(topicPartitionsToError(existingTransactionData, error)) +} else { + // If the incoming transactionData's epoch is lower, we can return with INVALID_PRODUCER_EPOCH immediately. + callback(topicPartitionsToError(transactionData, Errors.INVALID_PRODUCER_EPOCH)) + return } -val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) -currentNodeAndTransactionData.transactionData.remove(transactionData) -oldCallback(topicPartitionsToError.toMap) } - currentNodeAndTransactionData.transactionData.add(transactionData) - currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback) + Review Comment: yes - we didn't really cover these epoch cases before, and I thought it would be good to include them for completeness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1207324170 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validateAndAssignOffsets = false, leaderEpoch = -1, requestLocal = None, + verificationState = Optional.empty(), Review Comment: We currently don't. I guess there is an argument that we could have a marker come in before a fetch response. I'm not sure how this would be implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1205982602 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -41,18 +42,35 @@ public class ProducerStateEntry { private int coordinatorEpoch; private long lastTimestamp; private OptionalLong currentTxnFirstOffset; + +private VerificationState verificationState; + +// Before any batches are associated with the entry, the tentative sequence represents the lowest sequence seen. +private OptionalInt tentativeSequence; + +public enum VerificationState { +EMPTY, +VERIFYING, +VERIFIED +} Review Comment: I've addressed this issue by creating a verification object that is created on first attempt to verify and removed when a marker is written. When verification is needed, we pass this object through and check under log lock in the append path. In the steps above, 6 will clear this object, 7 will set a new one, so 8 will not succeed and error out. When verification is not needed (already verified), we rely on firstTxnOffset being present before appending to the log. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1205980757 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } } +public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) { +if (this.producerEpoch < producerEpoch) { +batchMetadata.clear(); +this.producerEpoch = producerEpoch; +return true; +} else { +return false; +} +} + +// We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions +// when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if: +// a) There is no tentative sequence yet +// b) A lower sequence for the same epoch is seen and should thereby block records after that +// c) A higher producer epoch is found that will reset the lowest seen sequence +public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) { +if (batchMetadata.isEmpty() && Review Comment: I dug in further and interestingly we do a different check on bumped epochs after markers. We do have a second check here: ``` if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq) || currentEntry.tentativeSequence().isPresent())) { throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " + "offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq + " (incoming seq. number), " + currentLastSeq + " (current end sequence number)"); } ``` I've added the tentative sequence change because this PR actually changes this behavior on first verification. We put the producer epoch in the entry, so that first case no longer applies. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1205794685 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we are in VERIFIED state. + // Also check that we are not appending a record with a higher sequence than one previously seen through verification. + if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) { +if (verificationState(batch.producerId(), batch.producerEpoch()) != ProducerStateEntry.VerificationState.VERIFIED) { + throw new InvalidRecordException("Record was not part of an ongoing transaction") +} else if (maybeLastEntry.isPresent && maybeLastEntry.get.tentativeSequence.isPresent && maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence) Review Comment: One thing that is tricky is that the producer state entry used in the sequence check is actually not the -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1205792946 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we are in VERIFIED state. + // Also check that we are not appending a record with a higher sequence than one previously seen through verification. + if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) { +if (verificationState(batch.producerId(), batch.producerEpoch()) != ProducerStateEntry.VerificationState.VERIFIED) { + throw new InvalidRecordException("Record was not part of an ongoing transaction") +} else if (maybeLastEntry.isPresent && maybeLastEntry.get.tentativeSequence.isPresent && maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence) Review Comment: Did we want to move the verification check there? Or just tentative sequence? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1205777679 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } } +public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) { +if (this.producerEpoch < producerEpoch) { +batchMetadata.clear(); +this.producerEpoch = producerEpoch; +return true; +} else { +return false; +} +} + +// We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions +// when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if: +// a) There is no tentative sequence yet +// b) A lower sequence for the same epoch is seen and should thereby block records after that +// c) A higher producer epoch is found that will reset the lowest seen sequence +public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) { +if (batchMetadata.isEmpty() && Review Comment: I dug into this a bit more. We actually do check sequence on bumped epoch! ``` if (producerEpoch != updatedEntry.producerEpoch()) { if (appendFirstSeq != 0) { if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) { throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId + "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), " + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)"); ``` Basically we check when the updated epoch is different than the epoch we want to use. The tricky part is that we update the epoch when creating the verification state, so we just need to do that check there somehow. I will look into it. ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } } +public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) { +if (this.producerEpoch < producerEpoch) { +batchMetadata.clear(); +this.producerEpoch = producerEpoch; +return true; +} else { +return false; +} +} + +// We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions +// when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if: +// a) There is no tentative sequence yet +// b) A lower sequence for the same epoch is seen and should thereby block records after that +// c) A higher producer epoch is found that will reset the lowest seen sequence +public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) { +if (batchMetadata.isEmpty() && Review Comment: I dug into this a bit more. We actually do check sequence on bumped epoch! ``` if (producerEpoch != updatedEntry.producerEpoch()) { if (appendFirstSeq != 0) { if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) { throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId + "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), " + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)"); ``` Basically we check when the updated epoch is different than the epoch we want to use. The tricky part is that we update the epoch when creating the verification state, so we just need to do that check there somehow. I will look into it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1199479050 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -183,6 +184,19 @@ private void clearProducerIds() { producers.clear(); producerIdCount = 0; } + +public ProducerStateEntry entryForVerification(long producerId, short producerEpoch, int firstSequence) { Review Comment: Actually hmmm, i guess we would also need to update when we are in verifying state as well. I'll think on this a bit more but it may be possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1199478295 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } } +public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) { +if (this.producerEpoch < producerEpoch) { +batchMetadata.clear(); +this.producerEpoch = producerEpoch; +return true; +} else { +return false; +} +} + +// We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions +// when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if: +// a) There is no tentative sequence yet +// b) A lower sequence for the same epoch is seen and should thereby block records after that +// c) A higher producer epoch is found that will reset the lowest seen sequence +public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) { +if (batchMetadata.isEmpty() && Review Comment: I guess we can simply call this on the case where the entry did not yet exist. See the comment above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1199477543 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -183,6 +184,19 @@ private void clearProducerIds() { producers.clear(); producerIdCount = 0; } + +public ProducerStateEntry entryForVerification(long producerId, short producerEpoch, int firstSequence) { Review Comment: I guess to address the previous concern about tentative sequences on epoch bumps, maybe we can just update the sequence when no entry exists. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1197989841 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } } +public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) { +if (this.producerEpoch < producerEpoch) { +batchMetadata.clear(); +this.producerEpoch = producerEpoch; +return true; +} else { +return false; +} +} + +// We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions +// when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if: +// a) There is no tentative sequence yet +// b) A lower sequence for the same epoch is seen and should thereby block records after that +// c) A higher producer epoch is found that will reset the lowest seen sequence +public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) { +if (batchMetadata.isEmpty() && Review Comment: There is also not currently a great way to check if this is undergoing verification/is the first entry placed in the cache without any batch data yet. Do you have a suggestion for what I could do there? We would somehow need to know that the epoch was bumped vs just a new entry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1197988031 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } } +public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) { +if (this.producerEpoch < producerEpoch) { +batchMetadata.clear(); +this.producerEpoch = producerEpoch; +return true; Review Comment: currently no. I was modeling it off the above method, but I can change to void. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1197986988 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -183,6 +184,19 @@ private void clearProducerIds() { producers.clear(); producerIdCount = 0; } + +public ProducerStateEntry entryForVerification(long producerId, short producerEpoch, int firstSequence) { Review Comment: I think we agreed offline that there were some pitfalls in applying this change to non-verification cases, so we would hold off until later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1197984489 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } } +public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) { +if (this.producerEpoch < producerEpoch) { +batchMetadata.clear(); +this.producerEpoch = producerEpoch; +return true; +} else { +return false; +} +} + +// We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions +// when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if: +// a) There is no tentative sequence yet +// b) A lower sequence for the same epoch is seen and should thereby block records after that +// c) A higher producer epoch is found that will reset the lowest seen sequence +public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) { +if (batchMetadata.isEmpty() && Review Comment: It does because we don't currently check for sequence 0 on new epoch as far as I can tell. So we would have the same problem unless we implement something new there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1195788236 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -41,18 +42,35 @@ public class ProducerStateEntry { private int coordinatorEpoch; private long lastTimestamp; private OptionalLong currentTxnFirstOffset; + +private VerificationState verificationState; + +// Before any batches are associated with the entry, the tentative sequence represents the lowest sequence seen. +private OptionalInt tentativeSequence; + +public enum VerificationState { +EMPTY, +VERIFYING, +VERIFIED +} Review Comment: this falls under the known gap for part 1. we can't prevent old batches from joining new transactions, but we can prevent hanging ones once we get epoch bumps this will work correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1195788236 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -41,18 +42,35 @@ public class ProducerStateEntry { private int coordinatorEpoch; private long lastTimestamp; private OptionalLong currentTxnFirstOffset; + +private VerificationState verificationState; + +// Before any batches are associated with the entry, the tentative sequence represents the lowest sequence seen. +private OptionalInt tentativeSequence; + +public enum VerificationState { +EMPTY, +VERIFYING, +VERIFIED +} Review Comment: this falls under the known gap for part 1. we can't prevent old batches from joining new transactions, but we can prevent hanging ones [4:47](https://confluent.slack.com/archives/C055ADL2X9P/p1684280850696749) once we get epoch bumps this will work correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1195786198 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } } +public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) { +if (this.producerEpoch < producerEpoch) { +batchMetadata.clear(); +this.producerEpoch = producerEpoch; +return true; +} else { +return false; +} +} + +// We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions +// when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if: +// a) There is no tentative sequence yet +// b) A lower sequence for the same epoch is seen and should thereby block records after that +// c) A higher producer epoch is found that will reset the lowest seen sequence +public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) { +if (batchMetadata.isEmpty() && Review Comment: I don't think we rely on this now. I also don't think this is currently incorrect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1187719600 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -579,9 +579,33 @@ class UnifiedLog(@volatile var logStartOffset: Long, result } - def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized { -val entry = producerStateManager.activeProducers.get(producerId) -entry != null && entry.currentTxnFirstOffset.isPresent + def transactionNeedsVerifying(producerId: Long, producerEpoch: Short, baseSequence: Int): Boolean = lock synchronized { +val entry = producerStateManager.entryForVerification(producerId, producerEpoch, baseSequence) +(!entry.currentTxnFirstOffset.isPresent) && + (entry.compareAndSetVerificationState(producerEpoch, ProducerStateEntry.VerificationState.EMPTY, ProducerStateEntry.VerificationState.VERIFYING) || +entry.verificationState() == ProducerStateEntry.VerificationState.VERIFYING) + } + + def compareAndSetVerificationState(producerId: Long, + producerEpoch: Short, + baseSequence: Int, + expectedVerificationState: ProducerStateEntry.VerificationState, + newVerificationState: ProducerStateEntry.VerificationState): Unit = { lock synchronized { Review Comment: That's the method braces. Should I just make a new line so it is clearer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1185544285 ## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ## @@ -3273,17 +3274,46 @@ class PartitionTest extends AbstractPartitionTest { baseOffset = 0L, producerId = producerId) partition.appendRecordsToLeader(idempotentRecords, origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching) -assertFalse(partition.hasOngoingTransaction(producerId)) +assertEquals(OptionalLong.empty(), txnFirstOffset(producerId)) Review Comment: will fix duplicate code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1171868072 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1051,6 +1060,8 @@ class ReplicaManager(val config: KafkaConfig, } else { try { val partition = getPartitionOrException(topicPartition) + val producerId = records.firstBatch().producerId() + partition.compareAndSetVerificationState(producerId, ProducerStateEntry.VerificationState.VERIFYING, ProducerStateEntry.VerificationState.VERIFIED) Review Comment: this will be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org