[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-05-03 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1184283663


##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() {
 }
 }
 
+@Test
+public void testClusterAuthorizationFailure() throws Exception {
+int maxBlockMs = 500;
+
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some-txn", NODE));
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.CLUSTER_AUTHORIZATION_FAILED));
+Producer producer = kafkaProducer(configs, new 
StringSerializer(),
+new StringSerializer(), metadata, client, null, time);
+assertThrows(ClusterAuthorizationException.class, 
producer::initTransactions);
+
+// retry initTransactions after the ClusterAuthorizationException not 
being thrown
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+TestUtils.retryOnExceptionWithTimeout(1000, 100, 
producer::initTransactions);

Review Comment:
   Should we close the producer here? Just looking through the failed tests and 
wanted to close any gaps we may have.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-28 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1180672745


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;
+}
+

Review Comment:
   Ah I see we had the abortable error check. 😅 Ok well now we are doubly 
covered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-28 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1180672146


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;
+}
+

Review Comment:
   Yup -- my concern is we unecessarily reset the producer to initializing in 
the fatal error case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-28 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1180619843


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;
+}
+

Review Comment:
   I thought I left this comment, but seems like it didn't take -- should we 
move the fatal error check above the auth check? Since for other request types, 
we will go through the above path, but it really should just be a fatal error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-26 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1178215576


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -328,6 +333,21 @@ void runOnce() {
 client.poll(pollTimeout, currentTimeMs);
 }
 
+// We handle {@code TransactionalIdAuthorizationException} and {@code 
ClusterAuthorizationException} by first
+// failing the inflight requests, then transition the state to 
UNINITIALIZED so that the user doesn't need to
+// instantiate the producer again.
+private boolean shouldHandleAuthorizationError(RuntimeException exception) 
{

Review Comment:
   Just curious -- if we get an auth error on another request (ie, not 
initProducerId) do we expect to start over by initializing with a new ID? 
   
   Also what is the goal with the poll call? Is it just replacing line 308? 
Would the code work without it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171947129


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -2175,7 +2182,7 @@ public void 
testClusterAuthorizationExceptionInProduceRequest() throws Exception
 sender.runOnce();
 assertFutureFailure(future, ClusterAuthorizationException.class);
 
-// cluster authorization errors are fatal, so we should continue 
seeing it on future sends
+// expecting to continue to see authorization error until user 
permission is fixed
 assertTrue(transactionManager.hasFatalError());

Review Comment:
   Ah I also missed. Thanks for correcting
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171913050


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -2189,7 +2196,7 @@ public void testCancelInFlightRequestAfterFatalError() 
throws Exception {
 prepareAndReceiveInitProducerId(producerId, Errors.NONE);
 assertTrue(transactionManager.hasProducerId());
 
-// cluster authorization is a fatal error for the producer
+// expecting authorization error on send

Review Comment:
   ditto here -- should we change this test to use a fatal error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171912717


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -2175,7 +2182,7 @@ public void 
testClusterAuthorizationExceptionInProduceRequest() throws Exception
 sender.runOnce();
 assertFutureFailure(future, ClusterAuthorizationException.class);
 
-// cluster authorization errors are fatal, so we should continue 
seeing it on future sends
+// expecting to continue to see authorization error until user 
permission is fixed
 assertTrue(transactionManager.hasFatalError());

Review Comment:
   Should this still be true? Do we remove it from fatal error state?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171872077


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
 reenqueue();
 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
 error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-fatalError(error.exception());
+log.info("Abortable authorization error: {}.  Transition the 
producer state to {}", error.message(), State.ABORTABLE_ERROR);
+lastError = error.exception();
+epochBumpRequired = true;

Review Comment:
   Was about to ask about this and realized we removed the bump. I guess we 
could still have a test unless we think the other testing covers this end to 
end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171818667


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -618,7 +618,8 @@ private TransactionManager 
configureTransactionState(ProducerConfig config,
  * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
  * does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
  * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
- * transactional.id is not authorized. See the exception for more 
details
+ * transactional.id is not authorized, or the idempotent producer 
id is unavailable. See the exception for

Review Comment:
   Can we remove fatal here still?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171816496


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;

Review Comment:
   Thanks Philip -- I think I forgot that this was the initProducerId call -- 
so we don't really have an epoch yet. 😅 We set to 0 after getting the producer 
ID. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-14 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1167247268


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
 reenqueue();
 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
 error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-fatalError(error.exception());
+log.info("Abortable authorization error: {}.  Transition the 
producer state to {}", error.message(), State.ABORTABLE_ERROR);
+lastError = error.exception();
+epochBumpRequired = true;

Review Comment:
   can we confirm this works as intended via test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-14 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1167246564


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -607,6 +608,14 @@ public synchronized void 
handleCompletedBatch(ProducerBatch batch, ProduceRespon
 removeInFlightBatch(batch);
 }
 
+public synchronized void transitionToUninitialized(RuntimeException 
exception) {
+transitionTo(State.UNINITIALIZED);
+lastError = null;
+if (pendingTransition != null) {

Review Comment:
   I'd prefer you confirm that it's fine. Looking at the code -- it seems to 
set the TransactionalRequestResult state to the error and count down the latch. 
Given that it is for the request -- looks to be fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-13 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1166062401


##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() {
 }
 }
 
+@Test
+public void testClusterAuthorizationFailure() throws Exception {
+int maxBlockMs = 500;
+
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some-txn", NODE));
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.CLUSTER_AUTHORIZATION_FAILED));
+Producer producer = kafkaProducer(configs, new 
StringSerializer(),
+new StringSerializer(), metadata, client, null, time);
+assertThrows(ClusterAuthorizationException.class, 
producer::initTransactions);
+
+// retry initTransactions after the ClusterAuthorizationException not 
being thrown
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+TestUtils.retryOnExceptionWithTimeout(1000, 100, 
producer::initTransactions);

Review Comment:
   Are we fully validating the request is successful? Is that what the 
TestUtils method does here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-13 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1166059935


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -607,6 +608,14 @@ public synchronized void 
handleCompletedBatch(ProducerBatch batch, ProduceRespon
 removeInFlightBatch(batch);
 }
 
+public synchronized void transitionToUninitialized(RuntimeException 
exception) {
+transitionTo(State.UNINITIALIZED);
+lastError = null;
+if (pendingTransition != null) {

Review Comment:
   Do you have more background on what pendingTransitions we typically have? I 
don't think this hurts unless it will cause a fatal error. If it does, we may 
need to take a closer look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-13 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1166059149


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -155,7 +155,7 @@ private enum State {
 private boolean isTransitionValid(State source, State target) {
 switch (target) {
 case UNINITIALIZED:
-return source == READY;
+return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   Ack -- I wonder if there are any other location where we would not want this 
transition. I can't think of off the top of my head, but I do think we need to 
be careful to clean the state properly in these cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-13 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1166058138


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;

Review Comment:
   Did we determine that we will end up bumping the epoch? It doesn't look like 
it is done here so is it somewhere else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-02-21 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1113612843


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -155,7 +155,7 @@ private enum State {
 private boolean isTransitionValid(State source, State target) {
 switch (target) {
 case UNINITIALIZED:
-return source == READY;
+return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   Can you point to me where you saw this? I think abortable error was 
something we had before that didn't need this extra step. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-01-25 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1086992753


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -155,7 +155,7 @@ private enum State {
 private boolean isTransitionValid(State source, State target) {
 switch (target) {
 case UNINITIALIZED:
-return source == READY;
+return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   Maybe I'm missing something here, but from "abortable_error" don't we 
typically abort the transaction and then return to initializing? Sorry if 
you've explained this before, but why did we choose uninitialized here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-01-25 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1086981869


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;

Review Comment:
   (Assuming this is an indempotent producer and not a transactional one)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-01-25 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1086981346


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;

Review Comment:
   Sorry -- I think if we have an error we'd still want to bump the epoch 
right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-27 Thread GitBox


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1007207887


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -155,7 +155,7 @@ private enum State {
 private boolean isTransitionValid(State source, State target) {
 switch (target) {
 case UNINITIALIZED:
-return source == READY;
+return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   Hmmm. I see. I'm just wondering if we are hijacking the current state 
machine in an unexpected way and if there are implications there. I suppose we 
are only following this path on these specific error types, but I wonder if we 
are missing anything existing by changing the valid transitions and/or opening 
up the potential for something else in the future. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-25 Thread GitBox


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004958467


##
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##
@@ -2236,6 +2236,30 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 assertFalse(response.clusterId.isEmpty, "Cluster id not returned")
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testRetryProducerInitializationAfterPermissionFix(quorum: String): Unit 
= {
+createTopicWithBrokerPrincipal(topic)
+val wildcard = new ResourcePattern(TOPIC, 
ResourcePattern.WILDCARD_RESOURCE, LITERAL)
+val prefixed = new ResourcePattern(TOPIC, "t", PREFIXED)
+val literal = new ResourcePattern(TOPIC, topic, LITERAL)
+val allowWriteAce = new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)
+val denyWriteAce = new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, DENY)
+val producer = buildIdempotentProducer()
+
+addAndVerifyAcls(Set(denyWriteAce), wildcard)
+assertThrows(classOf[Exception], () => {
+  val future = producer.send(new ProducerRecord[Array[Byte], 
Array[Byte]](topic, "hi".getBytes))
+  future.get()
+})
+removeAndVerifyAcls(Set(denyWriteAce), wildcard)
+addAndVerifyAcls(Set(allowWriteAce), prefixed)
+addAndVerifyAcls(Set(allowWriteAce), literal)
+val future = producer.send(new ProducerRecord[Array[Byte], 
Array[Byte]](topic, "hi".getBytes))

Review Comment:
   Ah this test makes the benefit a bit more clear to me -- a subsequent send 
call works just fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-25 Thread GitBox


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004955636


##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() {
 }
 }
 
+@Test
+public void testClusterAuthorizationFailure() throws Exception {
+int maxBlockMs = 500;
+
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some-txn", NODE));
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.CLUSTER_AUTHORIZATION_FAILED));
+Producer producer = kafkaProducer(configs, new 
StringSerializer(),
+new StringSerializer(), metadata, client, null, time);
+assertThrows(ClusterAuthorizationException.class, 
producer::initTransactions);
+
+// retry initTransactions after the ClusterAuthorizationException not 
being thrown
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+TestUtils.retryOnExceptionWithTimeout(1000, 100, 
producer::initTransactions);

Review Comment:
   To be clear -- since I couldn't tell fully from the description -- before 
this change, trying to call initTransactions here would fail since the producer 
hits a fatal state. But with this change -- we don't retry initProducerId but 
instead keep the producer alive and basically re-initialize it so subsequent 
initTransactions calls can be made (and succeed).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-25 Thread GitBox


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004948547


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -155,7 +155,7 @@ private enum State {
 private boolean isTransitionValid(State source, State target) {
 switch (target) {
 case UNINITIALIZED:
-return source == READY;
+return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   Do we know if there are other implications for this state machine change? It 
seems sort of like we want to just make the authorization error retriable -- 
but instead we are manking it abortable + making this specific abortable error 
reset to "uninitialized"
   
   I'm not an expert in this area, but are there any other implications with 
re-initializing the producer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-25 Thread GitBox


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004948547


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -155,7 +155,7 @@ private enum State {
 private boolean isTransitionValid(State source, State target) {
 switch (target) {
 case UNINITIALIZED:
-return source == READY;
+return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   Do we know if there are other implications for this state machine change? It 
seems sort of like we want to just make the authorization error retriable -- 
but instead we are manking it abortable + making this specific abortable error 
reset the producer state.
   
   I'm not an expert in this area, but are there any other implications with 
re-initializing the producer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-25 Thread GitBox


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004944699


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -618,7 +618,8 @@ private TransactionManager 
configureTransactionState(ProducerConfig config,
  * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
  * does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
  * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
- * transactional.id is not authorized. See the exception for more 
details
+ * transactional.id is not authorized, or the idempotent producer 
id is unavailable. See the exception for

Review Comment:
   Should we also change this to not say "fatal error"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-25 Thread GitBox


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004941219


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -618,7 +618,8 @@ private TransactionManager 
configureTransactionState(ProducerConfig config,
  * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
  * does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
  * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
- * transactional.id is not authorized. See the exception for more 
details
+ * transactional.id is not authorized, or the idempotent producer 
id is unavailable. See the exception for

Review Comment:
   What do we mean by idempotent producer ID is unavailable? Is it that the 
broker hosting the transaction coordinator is unavailable? And do we return 
this error if the broker is not available?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-25 Thread GitBox


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004941219


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -618,7 +618,8 @@ private TransactionManager 
configureTransactionState(ProducerConfig config,
  * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
  * does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
  * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
- * transactional.id is not authorized. See the exception for more 
details
+ * transactional.id is not authorized, or the idempotent producer 
id is unavailable. See the exception for

Review Comment:
   What do we mean by idempotent producer ID is unavailable? Is it that the 
broker is unavailable? And do we return this error if the broker is not 
available?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org