[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
urbandan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1222556934 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -609,14 +686,15 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon } public synchronized void transitionToUninitialized(RuntimeException exception) { -transitionTo(State.UNINITIALIZED); +transitionTo(State.UNINITIALIZED, exception, InvalidStateDetectionStrategy.BACKGROUND); Review Comment: I think not, it will be ignored because of the UNINITIALIZED state, snippet from transitionTo: ``` else if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) { if (error == null) throw new IllegalArgumentException("Cannot transition to " + target + " with a null exception"); lastError = error; } ``` If it's ignored, do we need to log it somewhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
urbandan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1217640626 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -609,14 +686,15 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon } public synchronized void transitionToUninitialized(RuntimeException exception) { -transitionTo(State.UNINITIALIZED); +transitionTo(State.UNINITIALIZED, exception, InvalidStateDetectionStrategy.BACKGROUND); Review Comment: I have this last open question, other than that, LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
urbandan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1205410637 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -609,14 +686,15 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon } public synchronized void transitionToUninitialized(RuntimeException exception) { -transitionTo(State.UNINITIALIZED); +transitionTo(State.UNINITIALIZED, exception, InvalidStateDetectionStrategy.BACKGROUND); Review Comment: What will happen with the exception we pass to transitionTo here? My understanding is that transitionTo only cares about the exception if the state is one of the error states. ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -609,14 +686,15 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon } public synchronized void transitionToUninitialized(RuntimeException exception) { -transitionTo(State.UNINITIALIZED); +transitionTo(State.UNINITIALIZED, exception, InvalidStateDetectionStrategy.BACKGROUND); if (pendingTransition != null) { pendingTransition.result.fail(exception); } lastError = null; } -public synchronized void maybeTransitionToErrorState(RuntimeException exception) { +public synchronized void maybeTransitionToErrorState(RuntimeException exception, Review Comment: Just to double check: the transitionToFatalError call on line 702 does not use the invalidStateDetectionStrategy param because all of the exceptions in the condition come from a server response, thus making it a BACKGROUND type call? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
urbandan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1204028973 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -626,12 +704,14 @@ public synchronized void maybeTransitionToErrorState(RuntimeException exception) if (canBumpEpoch() && !isCompleting()) { epochBumpRequired = true; } -transitionToAbortableError(exception); +transitionToAbortableError(exception, invalidStateDetectionStrategy); } } - -synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException exception, boolean adjustSequenceNumbers) { -maybeTransitionToErrorState(exception); +synchronized void handleFailedBatch(ProducerBatch batch, Review Comment: is this ever called with InvalidStateDetectionStrategy.FOREGROUND? if not, we might not need the extra parameter, and always use BACKGROUND -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
urbandan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1185797643 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -424,6 +477,11 @@ ProducerIdAndEpoch producerIdAndEpoch() { } synchronized public void maybeUpdateProducerIdAndEpoch(TopicPartition topicPartition) { +if (hasFatalError()) { Review Comment: @kirktrue I agree, it's better to not mutate the state after we encountered a fatal error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
urbandan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1174902758 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -968,13 +968,31 @@ private void transitionTo(State target) { } private void transitionTo(State target, RuntimeException error) { +transitionTo(target, error, false); Review Comment: For the last point, it is probably out of scope, and needs a more generic approach for all transactional reenqueue calls ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -266,7 +266,7 @@ public synchronized TransactionalRequestResult beginAbort() { return handleCachedTransactionRequestResult(() -> { if (currentState != State.ABORTABLE_ERROR) maybeFailWithError(); -transitionTo(State.ABORTING_TRANSACTION); +transitionTo(State.ABORTING_TRANSACTION, null, true); Review Comment: I think the parameter throwError is unused, you need to propagate it to the transitionTo call -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
urbandan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1173405147 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -968,13 +968,31 @@ private void transitionTo(State target) { } private void transitionTo(State target, RuntimeException error) { +transitionTo(target, error, false); Review Comment: > > This is a call chain where we transition on direct user action, shouldn't this be throwing? KafkaProducer.send -> KafkaProducer.doSend -> maybeTransitionToErrorState -> transitionToAbortableError -> transitionTo > > Since maybeTransitionToErrorState is being called from inside a catch block, if we did throw an exception, it would mask the root issue (ApiException), right? I think masking the ApiException is better than silently transitioning into fatal state - if transitionToAbortableError tries going into abortable state, that ApiException is probably something that the calling code can handle, and try to recover by aborting. If we still throw that exception, but in reality the internal state is fatal already, that is a violation of the API, isn't it? > After the abortableError call, if the state transition was invalid, then currentState would be FATAL_ERROR. That has the same effect as the last two branches of the if statement that call the fatalError method, right? > > Would simply skipping the reenqueue on fatal errors be sufficient? I think you are right, and the last 2 branches of that if do the same. At a later point, Sender.runOnce will handle the fatal state and will stop sending, but maybe it would be cleaner to not reenqueue at this point anymore, yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
urbandan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1170149228 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -266,7 +266,7 @@ public synchronized TransactionalRequestResult beginAbort() { return handleCachedTransactionRequestResult(() -> { if (currentState != State.ABORTABLE_ERROR) maybeFailWithError(); -transitionTo(State.ABORTING_TRANSACTION); +transitionTo(State.ABORTING_TRANSACTION, null, true); Review Comment: there is a call chain where the Sender calls beginAbort on producer close - do we want to throw there? I'm not sure if that one qualifies as a "user-direct" action -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
urbandan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1170141463 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -968,13 +968,31 @@ private void transitionTo(State target) { } private void transitionTo(State target, RuntimeException error) { +transitionTo(target, error, false); Review Comment: I think changing the default behavior to not throw can cause issues in some calls: 1. TransactionManager.InitProducerIdHandler#handleResponse on line 1303 - lastError is explicitly set to null (which shouldn't be done at all, as transitionTo already does that if the state transition is valid), which will clear the latest error. I think to make this work, that lastError = null should be removed from line 1303 2. This is a call chain where we transition on direct user action, shouldn't this be throwing? KafkaProducer.send -> KafkaProducer.doSend -> maybeTransitionToErrorState -> transitionToAbortableError -> transitionTo 3. In TransactionManager.TxnOffsetCommitHandler#handleResponse, there are multiple ``` abortableError(...); break; ``` blocks. If abortableError does not throw on invalid state transition anymore, the txn commit will be retried, even when in a failed state, which doesn't seem correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org