artemlivshits commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1194600838
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -183,6 +184,19 @@ private void clearProducerIds() {
producers.clear();
producerIdCount = 0;
}
+
+ public ProducerStateEntry entryForVerification(long producerId, short
producerEpoch, int firstSequence) {
Review Comment:
"Tentative" state is generally useful for both idempotent and transactional
producers independently of verification functionality - as soon as we've got
batch1 it's good to remember that we've got it, so that if we fail down the
road with a retriable error, batch2 won't get in before we have a chance to
retry batch1.
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short
producerEpoch) {
}
}
+ public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+ if (this.producerEpoch < producerEpoch) {
+ batchMetadata.clear();
+ this.producerEpoch = producerEpoch;
+ return true;
Review Comment:
Do we use a return value?
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short
producerEpoch) {
}
}
+ public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+ if (this.producerEpoch < producerEpoch) {
+ batchMetadata.clear();
+ this.producerEpoch = producerEpoch;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ // We only set tentative sequence if no batches have been written to the
log. It is used to avoid OutOfOrderSequenceExceptions
+ // when we saw a lower sequence during transaction verification. We will
update the sequence when there is no batch metadata if:
+ // a) There is no tentative sequence yet
+ // b) A lower sequence for the same epoch is seen and should thereby
block records after that
+ // c) A higher producer epoch is found that will reset the lowest seen
sequence
+ public void maybeUpdateTentativeSequence(int sequence, short
producerEpoch) {
+ if (batchMetadata.isEmpty() &&
Review Comment:
I don't think this is the right condition -- we only need a tentative entry
when we literally have no entry (i.e. we don't know in which state the producer
was when producer id got expired), not just the empty batch metadata (which can
happen on the epoch bump). If we got an epoch bump we know that the next
sequence is 0 and we can reject messages that have a gap.
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
private int coordinatorEpoch;
private long lastTimestamp;
private OptionalLong currentTxnFirstOffset;
+
+ private VerificationState verificationState;
+
+ // Before any batches are associated with the entry, the tentative
sequence represents the lowest sequence seen.
+ private OptionalInt tentativeSequence;
+
+ public enum VerificationState {
+ EMPTY,
+ VERIFYING,
+ VERIFIED
+ }
Review Comment:
Using a finite state machine for handling race conditions is susceptible to
ABA problem, i.e. the logic may not distinguish between the original state not
changing concurrently and changing back to the same state. In this case it
looks like the following sequence is possible:
1. Got message, started verifying transaction (transitioned to VERIFYING)
2. Transaction is in ongoing, so replied with success.
3. Transaction got aborted (transitioned to EMPTY).
4. Got another message, started verifying transaction (transitioned to
VERIFYING)
5. The first message verification successfully completes, sees that state is
VERIFYING and allows message to get written.
This is why we (and pretty much any distributed system) use
ever-incrementing states (e.g. epochs) to handle concurrency, so that we don't
get into ABA problem.
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
if (duplicateBatch.isPresent) {
return (updatedProducers, completedTxns.toList,
Some(duplicateBatch.get()))
}
+
+ // Verify that if the record is transactional & the append origin is
client, that we are in VERIFIED state.
+ // Also check that we are not appending a record with a higher
sequence than one previously seen through verification.
+ if (batch.isTransactional &&
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
{
+ if (verificationState(batch.producerId(), batch.producerEpoch())
!= ProducerStateEntry.VerificationState.VERIFIED) {
+ throw new InvalidRecordException("Record was not part of an
ongoing transaction")
+ } else if (maybeLastEntry.isPresent &&
maybeLastEntry.get.tentativeSequence.isPresent &&
maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence)
Review Comment:
I think we do sequencing checks in ProducerAppendInfo, should we move this
there as well so that all checks are in one place?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]