[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-26 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207484112


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   validateAndAssignOffsets = false,
   leaderEpoch = -1,
   requestLocal = None,
+  verificationState = Optional.empty(),

Review Comment:
   We don't do verification on non-client origin requests  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-26 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207484112


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   validateAndAssignOffsets = false,
   leaderEpoch = -1,
   requestLocal = None,
+  verificationState = Optional.empty(),

Review Comment:
   I suppose I should confirm we won't have an issue when the verification 
state is blank -- I will make sure of this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-26 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207324170


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   validateAndAssignOffsets = false,
   leaderEpoch = -1,
   requestLocal = None,
+  verificationState = Optional.empty(),

Review Comment:
   We currently don't. 
   ~I guess there is an argument that we could have a marker come in before a 
fetch response.~ 
   EDIT: if we don't replicate in order we have a problem, so I don't think we 
need to cover this.
   
   
   I'm not sure how this would be implemented. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-26 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207368803


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
 nodesToTransactions.synchronized {
   // Check if we have already have either node or individual transaction. 
Add the Node if it isn't there.
-  val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+  val existingNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
 new TransactionDataAndCallbacks(
   new AddPartitionsToTxnTransactionCollection(1),
   mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
 
-  val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+  val existingTransactionData = 
existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
 
-  // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
-  // reconnected so return the retriable network exception.
-  if (currentTransactionData != null) {
-val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
-  Errors.INVALID_PRODUCER_EPOCH
-else 
-  Errors.NETWORK_EXCEPTION
-val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
-currentTransactionData.topics().forEach { topic =>
-  topic.partitions().forEach { partition =>
-topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)
-  }
+  // There are 3 cases if we already have existing data
+  // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH 
for existing data since it is fenced
+  // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for 
existing data, since the client is likely retrying and we want another 
retriable exception 
+  // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH 
for the incoming data since it is fenced, do not add incoming data to verify
+  if (existingTransactionData != null) {
+if (existingTransactionData.producerEpoch() <= 
transactionData.producerEpoch()) {
+val error = if (existingTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+  Errors.INVALID_PRODUCER_EPOCH
+else
+  Errors.NETWORK_EXCEPTION
+  val oldCallback = 
existingNodeAndTransactionData.callbacks(transactionData.transactionalId())
+  
existingNodeAndTransactionData.transactionData.remove(transactionData)
+  oldCallback(topicPartitionsToError(existingTransactionData, error))
+} else {
+  // If the incoming transactionData's epoch is lower, we can return 
with INVALID_PRODUCER_EPOCH immediately.
+  callback(topicPartitionsToError(transactionData, 
Errors.INVALID_PRODUCER_EPOCH))
+  return
 }
-val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
-currentNodeAndTransactionData.transactionData.remove(transactionData)
-oldCallback(topicPartitionsToError.toMap)
   }
-  currentNodeAndTransactionData.transactionData.add(transactionData)
-  
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+

Review Comment:
   I'll separate this out into a new PR as I'm already splitting this up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-26 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207333185


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -671,6 +671,7 @@ class ReplicaManager(val config: KafkaConfig,
   val sTime = time.milliseconds
   
   val transactionalProducerIds = mutable.HashSet[Long]()
+  var verificationState: Optional[VerificationState] = Optional.empty()

Review Comment:
   I don't think we would get a new one. We only need one per transaction 
right? 
   So either we succeed and no longer need to worry about the verification, or 
the partition is retried and get the same verification object from the first 
time. 
   
   We do have a test with multiple partitions but I would have to check if it 
also checks the verification state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-26 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207330097


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
 nodesToTransactions.synchronized {
   // Check if we have already have either node or individual transaction. 
Add the Node if it isn't there.
-  val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+  val existingNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
 new TransactionDataAndCallbacks(
   new AddPartitionsToTxnTransactionCollection(1),
   mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
 
-  val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+  val existingTransactionData = 
existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
 
-  // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
-  // reconnected so return the retriable network exception.
-  if (currentTransactionData != null) {
-val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
-  Errors.INVALID_PRODUCER_EPOCH
-else 
-  Errors.NETWORK_EXCEPTION
-val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
-currentTransactionData.topics().forEach { topic =>
-  topic.partitions().forEach { partition =>
-topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)
-  }
+  // There are 3 cases if we already have existing data
+  // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH 
for existing data since it is fenced
+  // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for 
existing data, since the client is likely retrying and we want another 
retriable exception 
+  // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH 
for the incoming data since it is fenced, do not add incoming data to verify
+  if (existingTransactionData != null) {
+if (existingTransactionData.producerEpoch() <= 
transactionData.producerEpoch()) {
+val error = if (existingTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+  Errors.INVALID_PRODUCER_EPOCH
+else
+  Errors.NETWORK_EXCEPTION
+  val oldCallback = 
existingNodeAndTransactionData.callbacks(transactionData.transactionalId())
+  
existingNodeAndTransactionData.transactionData.remove(transactionData)
+  oldCallback(topicPartitionsToError(existingTransactionData, error))
+} else {
+  // If the incoming transactionData's epoch is lower, we can return 
with INVALID_PRODUCER_EPOCH immediately.
+  callback(topicPartitionsToError(transactionData, 
Errors.INVALID_PRODUCER_EPOCH))
+  return
 }
-val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
-currentNodeAndTransactionData.transactionData.remove(transactionData)
-oldCallback(topicPartitionsToError.toMap)
   }
-  currentNodeAndTransactionData.transactionData.add(transactionData)
-  
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+

Review Comment:
   yes - we didn't really cover these epoch cases before, and I thought it 
would be good to include them for completeness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-26 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207324170


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   validateAndAssignOffsets = false,
   leaderEpoch = -1,
   requestLocal = None,
+  verificationState = Optional.empty(),

Review Comment:
   We currently don't. 
   I guess there is an argument that we could have a marker come in before a 
fetch response. 
   
   I'm not sure how this would be implemented. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-25 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205982602


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
 private int coordinatorEpoch;
 private long lastTimestamp;
 private OptionalLong currentTxnFirstOffset;
+
+private VerificationState verificationState;
+
+// Before any batches are associated with the entry, the tentative 
sequence represents the lowest sequence seen.
+private OptionalInt tentativeSequence;
+
+public enum VerificationState {
+EMPTY,
+VERIFYING,
+VERIFIED
+}

Review Comment:
   I've addressed this issue by creating a verification object that is created 
on first attempt to verify and removed when a marker is written. 
   
   When verification is needed, we pass this object through and check under log 
lock in the append path.
   In the steps above, 6 will clear this object, 7 will set a new one, so 8 
will not succeed and error out. 
   
   When verification is not needed (already verified), we rely on 
firstTxnOffset being present before appending to the log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-25 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205980757


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
 }
 }
 
+public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+if (this.producerEpoch < producerEpoch) {
+batchMetadata.clear();
+this.producerEpoch = producerEpoch;
+return true;
+} else {
+return false;
+}
+}
+
+// We only set tentative sequence if no batches have been written to the 
log. It is used to avoid OutOfOrderSequenceExceptions
+// when we saw a lower sequence during transaction verification. We will 
update the sequence when there is no batch metadata if:
+//  a) There is no tentative sequence yet
+//  b) A lower sequence for the same epoch is seen and should thereby 
block records after that
+//  c) A higher producer epoch is found that will reset the lowest seen 
sequence
+public void maybeUpdateTentativeSequence(int sequence, short 
producerEpoch) {
+if (batchMetadata.isEmpty() && 

Review Comment:
   I dug in further and interestingly we do a different check on bumped epochs 
after markers. We do have a second check here:
   ```
   if (!(currentEntry.producerEpoch() == 
RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq) || 
currentEntry.tentativeSequence().isPresent())) {
   throw new OutOfOrderSequenceException("Out of order sequence 
number for producer " + producerId + " at " +
   "offset " + offset + " in partition " + 
topicPartition + ": " + appendFirstSeq +
   " (incoming seq. number), " + currentLastSeq + " 
(current end sequence number)");
   }
   ```
   
   I've added the tentative sequence change because this PR actually changes 
this behavior on first verification. We put the producer epoch in the entry, so 
that first case no longer applies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-25 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205794685


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (duplicateBatch.isPresent) {
 return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
   }
+
+  // Verify that if the record is transactional & the append origin is 
client, that we are in VERIFIED state.
+  // Also check that we are not appending a record with a higher 
sequence than one previously seen through verification.
+  if (batch.isTransactional && 
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
 {
+if (verificationState(batch.producerId(), batch.producerEpoch()) 
!= ProducerStateEntry.VerificationState.VERIFIED) {
+  throw new InvalidRecordException("Record was not part of an 
ongoing transaction")
+} else if (maybeLastEntry.isPresent && 
maybeLastEntry.get.tentativeSequence.isPresent && 
maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence)

Review Comment:
   One thing that is tricky is that the producer state entry used in the 
sequence check is actually not the 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-25 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205792946


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (duplicateBatch.isPresent) {
 return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
   }
+
+  // Verify that if the record is transactional & the append origin is 
client, that we are in VERIFIED state.
+  // Also check that we are not appending a record with a higher 
sequence than one previously seen through verification.
+  if (batch.isTransactional && 
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
 {
+if (verificationState(batch.producerId(), batch.producerEpoch()) 
!= ProducerStateEntry.VerificationState.VERIFIED) {
+  throw new InvalidRecordException("Record was not part of an 
ongoing transaction")
+} else if (maybeLastEntry.isPresent && 
maybeLastEntry.get.tentativeSequence.isPresent && 
maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence)

Review Comment:
   Did we want to move the verification check there? Or just tentative sequence?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-25 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205777679


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
 }
 }
 
+public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+if (this.producerEpoch < producerEpoch) {
+batchMetadata.clear();
+this.producerEpoch = producerEpoch;
+return true;
+} else {
+return false;
+}
+}
+
+// We only set tentative sequence if no batches have been written to the 
log. It is used to avoid OutOfOrderSequenceExceptions
+// when we saw a lower sequence during transaction verification. We will 
update the sequence when there is no batch metadata if:
+//  a) There is no tentative sequence yet
+//  b) A lower sequence for the same epoch is seen and should thereby 
block records after that
+//  c) A higher producer epoch is found that will reset the lowest seen 
sequence
+public void maybeUpdateTentativeSequence(int sequence, short 
producerEpoch) {
+if (batchMetadata.isEmpty() && 

Review Comment:
   I dug into this a bit more. We actually do check sequence on bumped epoch! 
   
   ```
if (producerEpoch != updatedEntry.producerEpoch()) {
   if (appendFirstSeq != 0) {
   if (updatedEntry.producerEpoch() != 
RecordBatch.NO_PRODUCER_EPOCH) {
   throw new OutOfOrderSequenceException("Invalid sequence 
number for new epoch of producer " + producerId +
   "at offset " + offset + " in partition " + 
topicPartition + ": " + producerEpoch + " (request epoch), "
   + appendFirstSeq + " (seq. number), " + 
updatedEntry.producerEpoch() + " (current producer epoch)");
   ```
   
   Basically we check when the updated epoch is different than the epoch we 
want to use. The tricky part is that we update the epoch when creating the 
verification state, so we just need to do that check there somehow. I will look 
into it.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
 }
 }
 
+public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+if (this.producerEpoch < producerEpoch) {
+batchMetadata.clear();
+this.producerEpoch = producerEpoch;
+return true;
+} else {
+return false;
+}
+}
+
+// We only set tentative sequence if no batches have been written to the 
log. It is used to avoid OutOfOrderSequenceExceptions
+// when we saw a lower sequence during transaction verification. We will 
update the sequence when there is no batch metadata if:
+//  a) There is no tentative sequence yet
+//  b) A lower sequence for the same epoch is seen and should thereby 
block records after that
+//  c) A higher producer epoch is found that will reset the lowest seen 
sequence
+public void maybeUpdateTentativeSequence(int sequence, short 
producerEpoch) {
+if (batchMetadata.isEmpty() && 

Review Comment:
   I dug into this a bit more. We actually do check sequence on bumped epoch! 
   
   ```
if (producerEpoch != updatedEntry.producerEpoch()) {
   if (appendFirstSeq != 0) {
   if (updatedEntry.producerEpoch() != 
RecordBatch.NO_PRODUCER_EPOCH) {
   throw new OutOfOrderSequenceException("Invalid sequence 
number for new epoch of producer " + producerId +
   "at offset " + offset + " in partition " + 
topicPartition + ": " + producerEpoch + " (request epoch), "
   + appendFirstSeq + " (seq. number), " + 
updatedEntry.producerEpoch() + " (current producer epoch)");
```
   
   Basically we check when the updated epoch is different than the epoch we 
want to use. The tricky part is that we update the epoch when creating the 
verification state, so we just need to do that check there somehow. I will look 
into it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-19 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1199479050


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -183,6 +184,19 @@ private void clearProducerIds() {
 producers.clear();
 producerIdCount = 0;
 }
+
+public ProducerStateEntry entryForVerification(long producerId, short 
producerEpoch, int firstSequence) {

Review Comment:
   Actually hmmm, i guess we would also need to update when we are in verifying 
state as well. I'll think on this a bit more but it may be possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-19 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1199478295


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
 }
 }
 
+public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+if (this.producerEpoch < producerEpoch) {
+batchMetadata.clear();
+this.producerEpoch = producerEpoch;
+return true;
+} else {
+return false;
+}
+}
+
+// We only set tentative sequence if no batches have been written to the 
log. It is used to avoid OutOfOrderSequenceExceptions
+// when we saw a lower sequence during transaction verification. We will 
update the sequence when there is no batch metadata if:
+//  a) There is no tentative sequence yet
+//  b) A lower sequence for the same epoch is seen and should thereby 
block records after that
+//  c) A higher producer epoch is found that will reset the lowest seen 
sequence
+public void maybeUpdateTentativeSequence(int sequence, short 
producerEpoch) {
+if (batchMetadata.isEmpty() && 

Review Comment:
   I guess we can simply call this on the case where the entry did not yet 
exist. See the comment above. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-19 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1199477543


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -183,6 +184,19 @@ private void clearProducerIds() {
 producers.clear();
 producerIdCount = 0;
 }
+
+public ProducerStateEntry entryForVerification(long producerId, short 
producerEpoch, int firstSequence) {

Review Comment:
   I guess to address the previous concern about tentative sequences on epoch 
bumps, maybe we can just update the sequence when no entry exists.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-18 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1197989841


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
 }
 }
 
+public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+if (this.producerEpoch < producerEpoch) {
+batchMetadata.clear();
+this.producerEpoch = producerEpoch;
+return true;
+} else {
+return false;
+}
+}
+
+// We only set tentative sequence if no batches have been written to the 
log. It is used to avoid OutOfOrderSequenceExceptions
+// when we saw a lower sequence during transaction verification. We will 
update the sequence when there is no batch metadata if:
+//  a) There is no tentative sequence yet
+//  b) A lower sequence for the same epoch is seen and should thereby 
block records after that
+//  c) A higher producer epoch is found that will reset the lowest seen 
sequence
+public void maybeUpdateTentativeSequence(int sequence, short 
producerEpoch) {
+if (batchMetadata.isEmpty() && 

Review Comment:
   There is also not currently a great way to check if this is undergoing 
verification/is the first entry placed in the cache without any batch data yet. 
Do you have a suggestion for what I could do there?
   
   We would somehow need to know that the epoch was bumped vs just a new entry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-18 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1197988031


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
 }
 }
 
+public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+if (this.producerEpoch < producerEpoch) {
+batchMetadata.clear();
+this.producerEpoch = producerEpoch;
+return true;

Review Comment:
   currently no. I was modeling it off the above method, but I can change to 
void.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-18 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1197986988


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -183,6 +184,19 @@ private void clearProducerIds() {
 producers.clear();
 producerIdCount = 0;
 }
+
+public ProducerStateEntry entryForVerification(long producerId, short 
producerEpoch, int firstSequence) {

Review Comment:
   I think we agreed offline that there were some pitfalls in applying this 
change to non-verification cases, so we would hold off until later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-18 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1197984489


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
 }
 }
 
+public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+if (this.producerEpoch < producerEpoch) {
+batchMetadata.clear();
+this.producerEpoch = producerEpoch;
+return true;
+} else {
+return false;
+}
+}
+
+// We only set tentative sequence if no batches have been written to the 
log. It is used to avoid OutOfOrderSequenceExceptions
+// when we saw a lower sequence during transaction verification. We will 
update the sequence when there is no batch metadata if:
+//  a) There is no tentative sequence yet
+//  b) A lower sequence for the same epoch is seen and should thereby 
block records after that
+//  c) A higher producer epoch is found that will reset the lowest seen 
sequence
+public void maybeUpdateTentativeSequence(int sequence, short 
producerEpoch) {
+if (batchMetadata.isEmpty() && 

Review Comment:
   It does because we don't currently check for sequence 0 on new epoch as far 
as I can tell. So we would have the same problem unless we implement something 
new there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-16 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1195788236


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
 private int coordinatorEpoch;
 private long lastTimestamp;
 private OptionalLong currentTxnFirstOffset;
+
+private VerificationState verificationState;
+
+// Before any batches are associated with the entry, the tentative 
sequence represents the lowest sequence seen.
+private OptionalInt tentativeSequence;
+
+public enum VerificationState {
+EMPTY,
+VERIFYING,
+VERIFIED
+}

Review Comment:
   this falls under the known gap for part 1. we can't prevent old batches from 
joining new transactions, but we can prevent hanging ones
   
   once we get epoch bumps this will work correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-16 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1195788236


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
 private int coordinatorEpoch;
 private long lastTimestamp;
 private OptionalLong currentTxnFirstOffset;
+
+private VerificationState verificationState;
+
+// Before any batches are associated with the entry, the tentative 
sequence represents the lowest sequence seen.
+private OptionalInt tentativeSequence;
+
+public enum VerificationState {
+EMPTY,
+VERIFYING,
+VERIFIED
+}

Review Comment:
   this falls under the known gap for part 1. we can't prevent old batches from 
joining new transactions, but we can prevent hanging ones
   
   
   
   
   
   [4:47](https://confluent.slack.com/archives/C055ADL2X9P/p1684280850696749)
   once we get epoch bumps this will work correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-16 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1195786198


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
 }
 }
 
+public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+if (this.producerEpoch < producerEpoch) {
+batchMetadata.clear();
+this.producerEpoch = producerEpoch;
+return true;
+} else {
+return false;
+}
+}
+
+// We only set tentative sequence if no batches have been written to the 
log. It is used to avoid OutOfOrderSequenceExceptions
+// when we saw a lower sequence during transaction verification. We will 
update the sequence when there is no batch metadata if:
+//  a) There is no tentative sequence yet
+//  b) A lower sequence for the same epoch is seen and should thereby 
block records after that
+//  c) A higher producer epoch is found that will reset the lowest seen 
sequence
+public void maybeUpdateTentativeSequence(int sequence, short 
producerEpoch) {
+if (batchMetadata.isEmpty() && 

Review Comment:
   I don't think we rely on this now. I also don't think this is currently 
incorrect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-08 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1187719600


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -579,9 +579,33 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 result
   }
 
-  def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized {
-val entry = producerStateManager.activeProducers.get(producerId)
-entry != null && entry.currentTxnFirstOffset.isPresent
+  def transactionNeedsVerifying(producerId: Long, producerEpoch: Short, 
baseSequence: Int): Boolean = lock synchronized {
+val entry = producerStateManager.entryForVerification(producerId, 
producerEpoch, baseSequence)
+(!entry.currentTxnFirstOffset.isPresent) &&
+  (entry.compareAndSetVerificationState(producerEpoch, 
ProducerStateEntry.VerificationState.EMPTY, 
ProducerStateEntry.VerificationState.VERIFYING) ||
+entry.verificationState() == 
ProducerStateEntry.VerificationState.VERIFYING)
+  }
+  
+  def compareAndSetVerificationState(producerId: Long,
+ producerEpoch: Short,
+ baseSequence: Int,
+ expectedVerificationState: 
ProducerStateEntry.VerificationState,
+ newVerificationState: 
ProducerStateEntry.VerificationState): Unit = { lock synchronized {

Review Comment:
   That's the method braces. Should I just make a new line so it is clearer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-04 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1185544285


##
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##
@@ -3273,17 +3274,46 @@ class PartitionTest extends AbstractPartitionTest {
   baseOffset = 0L,
   producerId = producerId)
 partition.appendRecordsToLeader(idempotentRecords, origin = 
AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
-assertFalse(partition.hasOngoingTransaction(producerId))
+assertEquals(OptionalLong.empty(), txnFirstOffset(producerId))

Review Comment:
   will fix duplicate code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-04-19 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1171868072


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1051,6 +1060,8 @@ class ReplicaManager(val config: KafkaConfig,
   } else {
 try {
   val partition = getPartitionOrException(topicPartition)
+  val producerId = records.firstBatch().producerId()
+  partition.compareAndSetVerificationState(producerId, 
ProducerStateEntry.VerificationState.VERIFYING, 
ProducerStateEntry.VerificationState.VERIFIED)

Review Comment:
   this will be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org