[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1184283663 ## clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java: ## @@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() { } } +@Test +public void testClusterAuthorizationFailure() throws Exception { +int maxBlockMs = 500; + +Map configs = new HashMap<>(); +configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs); +configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); +configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); +configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn"); + +Time time = new MockTime(1); +MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); +ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE); + +MockClient client = new MockClient(time, metadata); +client.updateMetadata(initialUpdateResponse); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE)); +client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.CLUSTER_AUTHORIZATION_FAILED)); +Producer producer = kafkaProducer(configs, new StringSerializer(), +new StringSerializer(), metadata, client, null, time); +assertThrows(ClusterAuthorizationException.class, producer::initTransactions); + +// retry initTransactions after the ClusterAuthorizationException not being thrown +client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE)); +TestUtils.retryOnExceptionWithTimeout(1000, 100, producer::initTransactions); Review Comment: Should we close the producer here? Just looking through the failed tests and wanted to close any gaps we may have. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1180672745 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -300,9 +301,13 @@ void runOnce() { try { transactionManager.maybeResolveSequences(); +RuntimeException lastError = transactionManager.lastError(); +if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) { +return; +} + Review Comment: Ah I see we had the abortable error check. 😅 Ok well now we are doubly covered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1180672146 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -300,9 +301,13 @@ void runOnce() { try { transactionManager.maybeResolveSequences(); +RuntimeException lastError = transactionManager.lastError(); +if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) { +return; +} + Review Comment: Yup -- my concern is we unecessarily reset the producer to initializing in the fatal error case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1180619843 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -300,9 +301,13 @@ void runOnce() { try { transactionManager.maybeResolveSequences(); +RuntimeException lastError = transactionManager.lastError(); +if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) { +return; +} + Review Comment: I thought I left this comment, but seems like it didn't take -- should we move the fatal error check above the auth check? Since for other request types, we will go through the above path, but it really should just be a fatal error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1178215576 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -328,6 +333,21 @@ void runOnce() { client.poll(pollTimeout, currentTimeMs); } +// We handle {@code TransactionalIdAuthorizationException} and {@code ClusterAuthorizationException} by first +// failing the inflight requests, then transition the state to UNINITIALIZED so that the user doesn't need to +// instantiate the producer again. +private boolean shouldHandleAuthorizationError(RuntimeException exception) { Review Comment: Just curious -- if we get an auth error on another request (ie, not initProducerId) do we expect to start over by initializing with a new ID? Also what is the goal with the poll call? Is it just replacing line 308? Would the code work without it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1171947129 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ## @@ -2175,7 +2182,7 @@ public void testClusterAuthorizationExceptionInProduceRequest() throws Exception sender.runOnce(); assertFutureFailure(future, ClusterAuthorizationException.class); -// cluster authorization errors are fatal, so we should continue seeing it on future sends +// expecting to continue to see authorization error until user permission is fixed assertTrue(transactionManager.hasFatalError()); Review Comment: Ah I also missed. Thanks for correcting -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1171913050 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ## @@ -2189,7 +2196,7 @@ public void testCancelInFlightRequestAfterFatalError() throws Exception { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); -// cluster authorization is a fatal error for the producer +// expecting authorization error on send Review Comment: ditto here -- should we change this test to use a fatal error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1171912717 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ## @@ -2175,7 +2182,7 @@ public void testClusterAuthorizationExceptionInProduceRequest() throws Exception sender.runOnce(); assertFutureFailure(future, ClusterAuthorizationException.class); -// cluster authorization errors are fatal, so we should continue seeing it on future sends +// expecting to continue to see authorization error until user permission is fixed assertTrue(transactionManager.hasFatalError()); Review Comment: Should this still be true? Do we remove it from fatal error state? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1171872077 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) { reenqueue(); } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED || error == Errors.CLUSTER_AUTHORIZATION_FAILED) { -fatalError(error.exception()); +log.info("Abortable authorization error: {}. Transition the producer state to {}", error.message(), State.ABORTABLE_ERROR); +lastError = error.exception(); +epochBumpRequired = true; Review Comment: Was about to ask about this and realized we removed the bump. I guess we could still have a test unless we think the other testing covers this end to end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1171818667 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -618,7 +618,8 @@ private TransactionManager configureTransactionState(ProducerConfig config, * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0) * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured - * transactional.id is not authorized. See the exception for more details + * transactional.id is not authorized, or the idempotent producer id is unavailable. See the exception for Review Comment: Can we remove fatal here still? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1171816496 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -300,9 +301,13 @@ void runOnce() { try { transactionManager.maybeResolveSequences(); +RuntimeException lastError = transactionManager.lastError(); +if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) { +return; Review Comment: Thanks Philip -- I think I forgot that this was the initProducerId call -- so we don't really have an epoch yet. 😅 We set to 0 after getting the producer ID. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1167247268 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) { reenqueue(); } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED || error == Errors.CLUSTER_AUTHORIZATION_FAILED) { -fatalError(error.exception()); +log.info("Abortable authorization error: {}. Transition the producer state to {}", error.message(), State.ABORTABLE_ERROR); +lastError = error.exception(); +epochBumpRequired = true; Review Comment: can we confirm this works as intended via test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1167246564 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -607,6 +608,14 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon removeInFlightBatch(batch); } +public synchronized void transitionToUninitialized(RuntimeException exception) { +transitionTo(State.UNINITIALIZED); +lastError = null; +if (pendingTransition != null) { Review Comment: I'd prefer you confirm that it's fine. Looking at the code -- it seems to set the TransactionalRequestResult state to the error and count down the latch. Given that it is for the request -- looks to be fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1166062401 ## clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java: ## @@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() { } } +@Test +public void testClusterAuthorizationFailure() throws Exception { +int maxBlockMs = 500; + +Map configs = new HashMap<>(); +configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs); +configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); +configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); +configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn"); + +Time time = new MockTime(1); +MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); +ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE); + +MockClient client = new MockClient(time, metadata); +client.updateMetadata(initialUpdateResponse); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE)); +client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.CLUSTER_AUTHORIZATION_FAILED)); +Producer producer = kafkaProducer(configs, new StringSerializer(), +new StringSerializer(), metadata, client, null, time); +assertThrows(ClusterAuthorizationException.class, producer::initTransactions); + +// retry initTransactions after the ClusterAuthorizationException not being thrown +client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE)); +TestUtils.retryOnExceptionWithTimeout(1000, 100, producer::initTransactions); Review Comment: Are we fully validating the request is successful? Is that what the TestUtils method does here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1166059935 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -607,6 +608,14 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon removeInFlightBatch(batch); } +public synchronized void transitionToUninitialized(RuntimeException exception) { +transitionTo(State.UNINITIALIZED); +lastError = null; +if (pendingTransition != null) { Review Comment: Do you have more background on what pendingTransitions we typically have? I don't think this hurts unless it will cause a fatal error. If it does, we may need to take a closer look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1166059149 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -155,7 +155,7 @@ private enum State { private boolean isTransitionValid(State source, State target) { switch (target) { case UNINITIALIZED: -return source == READY; +return source == READY || source == ABORTABLE_ERROR; Review Comment: Ack -- I wonder if there are any other location where we would not want this transition. I can't think of off the top of my head, but I do think we need to be careful to clean the state properly in these cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1166058138 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -300,9 +301,13 @@ void runOnce() { try { transactionManager.maybeResolveSequences(); +RuntimeException lastError = transactionManager.lastError(); +if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) { +return; Review Comment: Did we determine that we will end up bumping the epoch? It doesn't look like it is done here so is it somewhere else? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1113612843 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -155,7 +155,7 @@ private enum State { private boolean isTransitionValid(State source, State target) { switch (target) { case UNINITIALIZED: -return source == READY; +return source == READY || source == ABORTABLE_ERROR; Review Comment: Can you point to me where you saw this? I think abortable error was something we had before that didn't need this extra step. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1086992753 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -155,7 +155,7 @@ private enum State { private boolean isTransitionValid(State source, State target) { switch (target) { case UNINITIALIZED: -return source == READY; +return source == READY || source == ABORTABLE_ERROR; Review Comment: Maybe I'm missing something here, but from "abortable_error" don't we typically abort the transaction and then return to initializing? Sorry if you've explained this before, but why did we choose uninitialized here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1086981869 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -300,9 +301,13 @@ void runOnce() { try { transactionManager.maybeResolveSequences(); +RuntimeException lastError = transactionManager.lastError(); +if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) { +return; Review Comment: (Assuming this is an indempotent producer and not a transactional one) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1086981346 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -300,9 +301,13 @@ void runOnce() { try { transactionManager.maybeResolveSequences(); +RuntimeException lastError = transactionManager.lastError(); +if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) { +return; Review Comment: Sorry -- I think if we have an error we'd still want to bump the epoch right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1007207887 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -155,7 +155,7 @@ private enum State { private boolean isTransitionValid(State source, State target) { switch (target) { case UNINITIALIZED: -return source == READY; +return source == READY || source == ABORTABLE_ERROR; Review Comment: Hmmm. I see. I'm just wondering if we are hijacking the current state machine in an unexpected way and if there are implications there. I suppose we are only following this path on these specific error types, but I wonder if we are missing anything existing by changing the valid transitions and/or opening up the potential for something else in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1004958467 ## core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala: ## @@ -2236,6 +2236,30 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertFalse(response.clusterId.isEmpty, "Cluster id not returned") } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testRetryProducerInitializationAfterPermissionFix(quorum: String): Unit = { +createTopicWithBrokerPrincipal(topic) +val wildcard = new ResourcePattern(TOPIC, ResourcePattern.WILDCARD_RESOURCE, LITERAL) +val prefixed = new ResourcePattern(TOPIC, "t", PREFIXED) +val literal = new ResourcePattern(TOPIC, topic, LITERAL) +val allowWriteAce = new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW) +val denyWriteAce = new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, DENY) +val producer = buildIdempotentProducer() + +addAndVerifyAcls(Set(denyWriteAce), wildcard) +assertThrows(classOf[Exception], () => { + val future = producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)) + future.get() +}) +removeAndVerifyAcls(Set(denyWriteAce), wildcard) +addAndVerifyAcls(Set(allowWriteAce), prefixed) +addAndVerifyAcls(Set(allowWriteAce), literal) +val future = producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)) Review Comment: Ah this test makes the benefit a bit more clear to me -- a subsequent send call works just fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1004955636 ## clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java: ## @@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() { } } +@Test +public void testClusterAuthorizationFailure() throws Exception { +int maxBlockMs = 500; + +Map configs = new HashMap<>(); +configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs); +configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); +configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); +configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn"); + +Time time = new MockTime(1); +MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); +ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE); + +MockClient client = new MockClient(time, metadata); +client.updateMetadata(initialUpdateResponse); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE)); +client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.CLUSTER_AUTHORIZATION_FAILED)); +Producer producer = kafkaProducer(configs, new StringSerializer(), +new StringSerializer(), metadata, client, null, time); +assertThrows(ClusterAuthorizationException.class, producer::initTransactions); + +// retry initTransactions after the ClusterAuthorizationException not being thrown +client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE)); +TestUtils.retryOnExceptionWithTimeout(1000, 100, producer::initTransactions); Review Comment: To be clear -- since I couldn't tell fully from the description -- before this change, trying to call initTransactions here would fail since the producer hits a fatal state. But with this change -- we don't retry initProducerId but instead keep the producer alive and basically re-initialize it so subsequent initTransactions calls can be made (and succeed). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1004948547 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -155,7 +155,7 @@ private enum State { private boolean isTransitionValid(State source, State target) { switch (target) { case UNINITIALIZED: -return source == READY; +return source == READY || source == ABORTABLE_ERROR; Review Comment: Do we know if there are other implications for this state machine change? It seems sort of like we want to just make the authorization error retriable -- but instead we are manking it abortable + making this specific abortable error reset to "uninitialized" I'm not an expert in this area, but are there any other implications with re-initializing the producer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1004948547 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -155,7 +155,7 @@ private enum State { private boolean isTransitionValid(State source, State target) { switch (target) { case UNINITIALIZED: -return source == READY; +return source == READY || source == ABORTABLE_ERROR; Review Comment: Do we know if there are other implications for this state machine change? It seems sort of like we want to just make the authorization error retriable -- but instead we are manking it abortable + making this specific abortable error reset the producer state. I'm not an expert in this area, but are there any other implications with re-initializing the producer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1004944699 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -618,7 +618,8 @@ private TransactionManager configureTransactionState(ProducerConfig config, * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0) * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured - * transactional.id is not authorized. See the exception for more details + * transactional.id is not authorized, or the idempotent producer id is unavailable. See the exception for Review Comment: Should we also change this to not say "fatal error" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1004941219 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -618,7 +618,8 @@ private TransactionManager configureTransactionState(ProducerConfig config, * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0) * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured - * transactional.id is not authorized. See the exception for more details + * transactional.id is not authorized, or the idempotent producer id is unavailable. See the exception for Review Comment: What do we mean by idempotent producer ID is unavailable? Is it that the broker hosting the transaction coordinator is unavailable? And do we return this error if the broker is not available? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1004941219 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -618,7 +618,8 @@ private TransactionManager configureTransactionState(ProducerConfig config, * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0) * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured - * transactional.id is not authorized. See the exception for more details + * transactional.id is not authorized, or the idempotent producer id is unavailable. See the exception for Review Comment: What do we mean by idempotent producer ID is unavailable? Is it that the broker is unavailable? And do we return this error if the broker is not available? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org