[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-05-03 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1184293956


##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() {
 }
 }
 
+@Test
+public void testClusterAuthorizationFailure() throws Exception {
+int maxBlockMs = 500;
+
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some-txn", NODE));
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.CLUSTER_AUTHORIZATION_FAILED));
+Producer producer = kafkaProducer(configs, new 
StringSerializer(),
+new StringSerializer(), metadata, client, null, time);
+assertThrows(ClusterAuthorizationException.class, 
producer::initTransactions);
+
+// retry initTransactions after the ClusterAuthorizationException not 
being thrown
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+TestUtils.retryOnExceptionWithTimeout(1000, 100, 
producer::initTransactions);

Review Comment:
   Probably - it's just done very consistently in the tests  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-28 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1180625792


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;
+}
+

Review Comment:
   sounds good - perhaps you meant by checking for fatal first then abortable 
error? Though I think it doesn't change the logic there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-26 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1178260025


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -328,6 +333,21 @@ void runOnce() {
 client.poll(pollTimeout, currentTimeMs);
 }
 
+// We handle {@code TransactionalIdAuthorizationException} and {@code 
ClusterAuthorizationException} by first
+// failing the inflight requests, then transition the state to 
UNINITIALIZED so that the user doesn't need to
+// instantiate the producer again.
+private boolean shouldHandleAuthorizationError(RuntimeException exception) 
{

Review Comment:
   I might have added it there because a few lines down, in the fatal error 
handling, the client was polled before return.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-26 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1178259026


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -328,6 +333,21 @@ void runOnce() {
 client.poll(pollTimeout, currentTimeMs);
 }
 
+// We handle {@code TransactionalIdAuthorizationException} and {@code 
ClusterAuthorizationException} by first
+// failing the inflight requests, then transition the state to 
UNINITIALIZED so that the user doesn't need to
+// instantiate the producer again.
+private boolean shouldHandleAuthorizationError(RuntimeException exception) 
{

Review Comment:
   it seems like all of the non-initProducerId 
TransactionalIdAuthorizationException and ClusterAuthorizationException are 
fatal.
   
   For the poll: I think we don't need it because there's no outbound request, 
as it should've been already polled in the previous `runOnce`.  The tests seem 
to work without so i'll remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171946214


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -2175,7 +2182,7 @@ public void 
testClusterAuthorizationExceptionInProduceRequest() throws Exception
 sender.runOnce();
 assertFutureFailure(future, ClusterAuthorizationException.class);
 
-// cluster authorization errors are fatal, so we should continue 
seeing it on future sends
+// expecting to continue to see authorization error until user 
permission is fixed
 assertTrue(transactionManager.hasFatalError());

Review Comment:
   I think that's actually my mistake.  It's a produce error not 
initProducerId.  I'll correct these issues. Sorry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171904037


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
 reenqueue();
 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
 error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-fatalError(error.exception());
+log.info("Abortable authorization error: {}.  Transition the 
producer state to {}", error.message(), State.ABORTABLE_ERROR);
+lastError = error.exception();
+epochBumpRequired = true;

Review Comment:
   @jolshan  - See the last commit.  I cleaned up the comments because 
authorization isn't a fatal error after the patch.  I also patched a test to 
demonstrate resend after fixing the permission should bring the producer back 
to normal. Also tested the epoch there to ensure it is 0, not -1 after fixing 
the permission.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171873787


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
 reenqueue();
 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
 error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-fatalError(error.exception());
+log.info("Abortable authorization error: {}.  Transition the 
producer state to {}", error.message(), State.ABORTABLE_ERROR);
+lastError = error.exception();
+epochBumpRequired = true;

Review Comment:
   even i think the initProducerId path in the sender has been tested, but it's 
probably worth adding a sender test to make sure the epoch is bumped from -1 to 
0. I think it should be an easy modification,  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171855134


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -618,7 +618,8 @@ private TransactionManager 
configureTransactionState(ProducerConfig config,
  * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
  * does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
  * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
- * transactional.id is not authorized. See the exception for more 
details
+ * transactional.id is not authorized, or the idempotent producer 
id is unavailable. See the exception for

Review Comment:
   臘 - sorry and done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171764467


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;

Review Comment:
   FWIW: I think in the context of this ticket, we are trying to avoid 
poisoning the client when the client is unable to re-authenticate upon startup. 
The fix wants to continue to retry the request until the permission is fixed. 
So the handler should handle the epoch bump.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171757259


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;

Review Comment:
   Hey @jolshan - I actually think we don't need to manually bump the epoch 
here.  I think it is already handled by the existing logic.  Here's the 
explanation. I think initProducerId is only happening at 2 different places: 1. 
`initializeTransactions` and 2. in the sender loop for idempotent producer 
`bumpIdempotentEpochAndResetIdIfNeeded`.
   
   for 1. It will bump the epoch if the epoch != None, which means the producer 
has been initialized and needs to bump the epoch upon re-requesting the id per 
your comment
   
   for 2. it is when we first initialize a producer (so it doesn't have an id 
at the beginning), and the `InitProducerIdRequest` should bump the epoch to 0 
upon first successful attempt.  This is the case we are addressing in this PR I 
think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-17 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1168861947


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
 reenqueue();
 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
 error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-fatalError(error.exception());
+log.info("Abortable authorization error: {}.  Transition the 
producer state to {}", error.message(), State.ABORTABLE_ERROR);
+lastError = error.exception();
+epochBumpRequired = true;

Review Comment:
   hey sorry - i'm in progress of writing a test here  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-14 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1167231804


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
 reenqueue();
 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
 error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-fatalError(error.exception());
+log.info("Abortable authorization error: {}.  Transition the 
producer state to {}", error.message(), State.ABORTABLE_ERROR);
+lastError = error.exception();
+epochBumpRequired = true;

Review Comment:
   bumping epoch here - I think sender should bump it on the next iteration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-14 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1167230503


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -607,6 +608,14 @@ public synchronized void 
handleCompletedBatch(ProducerBatch batch, ProduceRespon
 removeInFlightBatch(batch);
 }
 
+public synchronized void transitionToUninitialized(RuntimeException 
exception) {
+transitionTo(State.UNINITIALIZED);
+lastError = null;
+if (pendingTransition != null) {

Review Comment:
   thanks, i think it's fine to leave it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-14 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1167110290


##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() {
 }
 }
 
+@Test
+public void testClusterAuthorizationFailure() throws Exception {
+int maxBlockMs = 500;
+
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some-txn", NODE));
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.CLUSTER_AUTHORIZATION_FAILED));
+Producer producer = kafkaProducer(configs, new 
StringSerializer(),
+new StringSerializer(), metadata, client, null, time);
+assertThrows(ClusterAuthorizationException.class, 
producer::initTransactions);
+
+// retry initTransactions after the ClusterAuthorizationException not 
being thrown
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+TestUtils.retryOnExceptionWithTimeout(1000, 100, 
producer::initTransactions);

Review Comment:
   right, it retries initTransaction until successful or timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-02-21 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1113694079


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -155,7 +155,7 @@ private enum State {
 private boolean isTransitionValid(State source, State target) {
 switch (target) {
 case UNINITIALIZED:
-return source == READY;
+return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   We need to manually transition the state back to initializing no? I think 
otherwise it get stucks in an error state here
   
   
https://github.com/apache/kafka/blob/a2c9f421af40e0c8ace722be94aedf8dec4f2b31/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L998
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-02-21 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1113642114


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;

Review Comment:
   Right this is an idempotent producer, thanks for catching the epoch bump, I 
was wondering about that too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-02-21 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1113611801


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -155,7 +155,7 @@ private enum State {
 private boolean isTransitionValid(State source, State target) {
 switch (target) {
 case UNINITIALIZED:
-return source == READY;
+return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   I think you will need to reinstantiate the producer if it is in an aborting 
state. I think here i'm trying to bring it back to initializing state so that 
it can issue another producerId request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-01-17 Thread GitBox


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1072654169


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;

Review Comment:
   I'm thinking not, because we aren't adding a new producer.  @jolshan 
thoughts?



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -155,7 +155,7 @@ private enum State {
 private boolean isTransitionValid(State source, State target) {
 switch (target) {
 case UNINITIALIZED:
-return source == READY;
+return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   hmm good point, I guess upon re-initializing (transition from UNINITIALIZED 
to INITIALIZING state), should we check the previous error to ensure a valid 
transition? Maybe in `initializeTransactions` we examine the previous error and 
make the next transition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-27 Thread GitBox


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1007187187


##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -1237,6 +1238,34 @@ public void testInitTransactionWhileThrottled() {
 }
 }
 
+@Test
+public void testClusterAuthorizationFailure() throws Exception {
+int maxBlockMs = 500;
+
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(500, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some-txn", NODE));
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.CLUSTER_AUTHORIZATION_FAILED));
+Producer producer = kafkaProducer(configs, new 
StringSerializer(),
+new StringSerializer(), metadata, client, null, time);
+assertThrows(ClusterAuthorizationException.class, 
producer::initTransactions);
+
+// retry initTransactions after the ClusterAuthorizationException not 
being thrown
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+TestUtils.retryOnExceptionWithTimeout(1000, 100, 
producer::initTransactions);

Review Comment:
   yes!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-27 Thread GitBox


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1007185990


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -155,7 +155,7 @@ private enum State {
 private boolean isTransitionValid(State source, State target) {
 switch (target) {
 case UNINITIALIZED:
-return source == READY;
+return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   It's one way (that I could think of) to propagate the authorization error 
from the sender loop; otherwise, the sender will continue to retry and causes 
timeout in some cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-27 Thread GitBox


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1007181238


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -618,7 +618,8 @@ private TransactionManager 
configureTransactionState(ProducerConfig config,
  * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
  * does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
  * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
- * transactional.id is not authorized. See the exception for more 
details
+ * transactional.id is not authorized, or the idempotent producer 
id is unavailable. See the exception for

Review Comment:
   One could get Authorization issue upon requesting for a producerId, not 
necessarily caused by an unavailable broker. Yes, maybe remove the word "fatal" 
thanks! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-10-25 Thread GitBox


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004752898


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -996,20 +1010,21 @@ private void ensureTransactional() {
 }
 
 private void maybeFailWithError() {

Review Comment:
   no change, i just refactored it a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-09-26 Thread GitBox


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r980725833


##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -1237,6 +1238,35 @@ public void testInitTransactionWhileThrottled() {
 }
 }
 
+@Test
+public void testClusterAuthorizationFailure() throws Exception {
+int maxBlockMs = 500;
+
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some-txn", NODE));
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.CLUSTER_AUTHORIZATION_FAILED));
+
+try (Producer producer = kafkaProducer(configs, new 
StringSerializer(),
+new StringSerializer(), metadata, client, null, time)) {
+assertThrows(ClusterAuthorizationException.class, () -> 
producer.initTransactions());
+// retry initTransactions after the ClusterAuthorizationException 
not being thrown
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+producer.initTransactions();

Review Comment:
   hmm... consistently having issue to get a Errors.NONE response.  I can 
sometimes get a ClusterAuthorizationException.  Not sure why is it



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -607,6 +608,14 @@ public synchronized void 
handleCompletedBatch(ProducerBatch batch, ProduceRespon
 removeInFlightBatch(batch);
 }
 
+public synchronized void transitionToUninitialized(RuntimeException 
exception) {
+transitionTo(State.UNINITIALIZED);
+lastError = null;
+if (pendingTransition != null) {

Review Comment:
   I'm actually not sure if this is needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-06-17 Thread GitBox


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r900530199


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1292,8 +1296,12 @@ public void handleResponse(AbstractResponse response) {
 reenqueue();
 } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error 
== Errors.CONCURRENT_TRANSACTIONS) {
 reenqueue();
-} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
-error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
+} else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
+log.error("Cluster authorization failure, transition the 
producer state back to uninitialized");
+fail(error.exception());
+// transition back to uninitialized state to enable client 
side retry
+transitionTo(State.UNINITIALIZED);
+} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {

Review Comment:
   yeah that sounds right. it seems like you they should just configure the ACL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org