Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-11-02 Thread via GitHub


lucasbru merged PR #14639:
URL: https://github.com/apache/kafka/pull/14639


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-11-02 Thread via GitHub


lucasbru commented on PR #14639:
URL: https://github.com/apache/kafka/pull/14639#issuecomment-1790316683

   Test failures unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-11-01 Thread via GitHub


philipnee commented on PR #14639:
URL: https://github.com/apache/kafka/pull/14639#issuecomment-1789989071

   Test failures seem irrelevant, but retriggering the test as there's a failed 
build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-30 Thread via GitHub


philipnee commented on PR #14639:
URL: https://github.com/apache/kafka/pull/14639#issuecomment-1785408525

   @lucasbru - Thanks for taking time reviewing this PR. I addressed your 
previous comment. Let me know if there's anything unclear on this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-30 Thread via GitHub


philipnee commented on code in PR #14639:
URL: https://github.com/apache/kafka/pull/14639#discussion_r1376339956


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -215,16 +296,117 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 .setMemberId(generation.memberId)
 .setGroupInstanceId(groupInstanceId)
 .setTopics(new ArrayList<>(requestTopicDataMap.values(;
-return new NetworkClientDelegate.UnsentRequest(
+NetworkClientDelegate.UnsentRequest resp = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator(),
+coordinatorRequestManager.coordinator());
+resp.future().whenComplete(
 (response, throwable) -> {
-if (throwable == null) {
-future.complete(null);
-} else {
-future.completeExceptionally(throwable);
+try {
+if (throwable == null) {
+onResponse(response);
+} else {
+onError(throwable, 
resp.handler().completionTimeMs());
+}
+} catch (Throwable t) {
+log.error("Unexpected error when completing offset 
commit: {}", this, t);
+future.completeExceptionally(t);
 }
 });
+return resp;
+}
+
+public CompletableFuture future() {
+return future;
+}
+
+public void onError(final Throwable exception, final long 
currentTimeMs) {
+if (exception instanceof RetriableException) {
+handleCoordinatorDisconnect(exception, currentTimeMs);
+retry(currentTimeMs);
+}
+}
+
+public void onResponse(final ClientResponse response) {
+long responseTime = response.receivedTimeMs();
+OffsetCommitResponse commitResponse = (OffsetCommitResponse) 
response.responseBody();
+Set unauthorizedTopics = new HashSet<>();
+for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
+TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+long offset = offsetAndMetadata.offset();
+Errors error = Errors.forCode(partition.errorCode());
+if (error == Errors.NONE) {
+log.debug("OffsetCommit {} for partition {}", offset, 
tp);
+continue;
+}
+
+if (error.exception() instanceof RetriableException) {

Review Comment:
   Thanks, as well as the error == Errors.NONE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-30 Thread via GitHub


lucasbru commented on code in PR #14639:
URL: https://github.com/apache/kafka/pull/14639#discussion_r1375757133


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -215,16 +296,117 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 .setMemberId(generation.memberId)
 .setGroupInstanceId(groupInstanceId)
 .setTopics(new ArrayList<>(requestTopicDataMap.values(;
-return new NetworkClientDelegate.UnsentRequest(
+NetworkClientDelegate.UnsentRequest resp = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator(),
+coordinatorRequestManager.coordinator());
+resp.future().whenComplete(
 (response, throwable) -> {
-if (throwable == null) {
-future.complete(null);
-} else {
-future.completeExceptionally(throwable);
+try {
+if (throwable == null) {
+onResponse(response);
+} else {
+onError(throwable, 
resp.handler().completionTimeMs());
+}
+} catch (Throwable t) {
+log.error("Unexpected error when completing offset 
commit: {}", this, t);
+future.completeExceptionally(t);
 }
 });
+return resp;
+}
+
+public CompletableFuture future() {
+return future;
+}
+
+public void onError(final Throwable exception, final long 
currentTimeMs) {
+if (exception instanceof RetriableException) {
+handleCoordinatorDisconnect(exception, currentTimeMs);
+retry(currentTimeMs);
+}
+}
+
+public void onResponse(final ClientResponse response) {
+long responseTime = response.receivedTimeMs();
+OffsetCommitResponse commitResponse = (OffsetCommitResponse) 
response.responseBody();
+Set unauthorizedTopics = new HashSet<>();
+for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
+TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+long offset = offsetAndMetadata.offset();
+Errors error = Errors.forCode(partition.errorCode());
+if (error == Errors.NONE) {
+log.debug("OffsetCommit {} for partition {}", offset, 
tp);
+continue;
+}
+
+if (error.exception() instanceof RetriableException) {

Review Comment:
   I think you want to remove this if/else block completely now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-29 Thread via GitHub


philipnee commented on PR #14639:
URL: https://github.com/apache/kafka/pull/14639#issuecomment-1784489022

   Hi @lucasbru - Thanks for the response. To your first question: Yes, what is 
left is correcting the error handling at the user API level.  At the time I 
implemented this, I wrapped all errors in KafkaException (I forgot the reason 
why), however, we should throw the exact exception instead.  So I will follow 
up with a second part PR to make this Jira issue completed.
   
   I get what you meant for `onResponse` and I think your suggestion makes a 
lot of sense. I'll patch it accordingly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-27 Thread via GitHub


lucasbru commented on PR #14639:
URL: https://github.com/apache/kafka/pull/14639#issuecomment-1782544764

   Changes look good to me. Did I understand david correctly that we want to 
add more changes to this PR?
   
   > 1. Mockito: Could you be more specific on how you expect to mock the 
response object?
   
   That was just "out of interest", since you are already using mockito to mock 
things, why not use it for the response object as well. But I realize that the 
object is a pretty plain data container, so the answer I guess is that it's 
easier.
   
   > 2. Error handling: Essentially all errors in 
`continueHandlePartitionErrors` can only happen in the response. I understand 
there are some redundancy there and can be a bit confusing.  But the response 
and both throw a hard failures (response = null and throwable = non-null) or a 
server side error (response = non-null).  That is why it was kept separated.  
If you find it unclear - how do I make it more readable?
   
   That's not what I meant. I just found the flow `onResponse` a bit hard to 
read, so I suggested structuring it a bit differently. In particular, 
`continueHandlePartitionErrors` sounds like that it's determining whether to 
continue handling errors, but it's actually completing the future inside.
   
   I think the control flow may also lead to logging in a way that may not be 
intended? For fencing, we get one generic `log.error` message and one very 
similar `log.info` message with some specific details. For topic authorization 
errors, we get one `log.error` message for each partition, and then another 
aggregated `log.error` message, listing the partitions again. However, if we 
have 3 unauthorized topic errors and then one other error, we get three generic 
error messages for the topics, but we do not get the aggregated error message. 
Is that trying to be consistent with the old consumer?
   
   I imagined something like:
   
   ```
   if (error == Errors.NONE) {
   log.debug("OffsetCommit {} for partition {}", offset, tp);
   } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
   // Collect all unauthorized topics before failing
   unauthorizedTopics.add(tp.topic());
   } else if (error instanceof RetriableException) {
   log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, 
offset, error.message());
   handleRetriableError(error);
   retry(responseTime);
   return;
   } else {
   log.error("OffsetCommit failed on partition {} at offset {}: {}", tp, 
offset, error.message());
   handleFatalError(error);
   return;
   }
   ```
   
   But I'm not super familiar with the code style in the new consumer, and 
consistency with the rest of the code and the old consumer is also important. 
So I just wanted to give this to you as an inspiration, but you are probably in 
the best position to come up with the best way to implement it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-27 Thread via GitHub


lucasbru commented on code in PR #14639:
URL: https://github.com/apache/kafka/pull/14639#discussion_r1374244715


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -271,14 +445,13 @@ public void onResponse(
 
 private void onFailure(final long currentTimeMs,
final Errors responseError) {
+handleCoordinatorDisconnect(responseError.exception(), 
currentTimeMs);
 log.debug("Offset fetch failed: {}", responseError.message());
-
 // TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ?
 if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {

Review Comment:
   nit: merge first two branches now?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 .setMemberId(generation.memberId)
 .setGroupInstanceId(groupInstanceId)
 .setTopics(new ArrayList<>(requestTopicDataMap.values(;
-return new NetworkClientDelegate.UnsentRequest(
+NetworkClientDelegate.UnsentRequest resp = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator(),
+coordinatorRequestManager.coordinator());
+resp.future().whenComplete(
 (response, throwable) -> {
-if (throwable == null) {
-future.complete(null);
-} else {
-future.completeExceptionally(throwable);
+try {
+if (throwable == null) {
+onResponse(response);
+} else {
+onError(throwable, 
resp.handler().completionTimeMs());
+}
+} catch (Throwable t) {
+log.error("Unexpected error when completing offset 
commit: {}", this, t);
+future.completeExceptionally(t);
 }
 });
+return resp;
+}
+
+public CompletableFuture future() {
+return future;
+}
+
+public void onError(final Throwable exception, final long 
currentTimeMs) {
+if (exception instanceof RetriableException) {
+handleCoordinatorDisconnect(exception, currentTimeMs);
+retry(currentTimeMs);
+}
+}
+
+public void onResponse(final ClientResponse response) {
+long responseTime = response.receivedTimeMs();
+OffsetCommitResponse commitResponse = (OffsetCommitResponse) 
response.responseBody();
+Set unauthorizedTopics = new HashSet<>();
+for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
+TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+long offset = offsetAndMetadata.offset();
+Errors error = Errors.forCode(partition.errorCode());
+if (error == Errors.NONE) {
+log.debug("OffsetCommit {} for partition {}", offset, 
tp);
+continue;
+}
+
+if (error.exception() instanceof RetriableException) {
+log.warn("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset,
+error.message());
+} else {
+log.error("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset, error.message());
+}
+
+if (!continueHandlePartitionErrors(error, tp, offset, 
unauthorizedTopics, responseTime)) {
+return;
+}
+}
+}
+
+if (!unauthorizedTopics.isEmpty()) {
+log.error("OffsetCommit failed due to not authorized to commit 
to topics {}", unauthorizedTopics);
+future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+} else {
+future.complete(null);
+}
+}
+
+private void retry(final long currentTimeMs) {
+System.out.println("timeout" + currentTimeMs);
+onFailedAttempt(currentTimeMs);
+pendingRequests.addOffsetCommitRequest(this);
+}
+
+private boolean continueHandlePartitionErrors(Errors error, 
TopicPartition tp, long offset,
+

Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-26 Thread via GitHub


philipnee commented on PR #14639:
URL: https://github.com/apache/kafka/pull/14639#issuecomment-1782275615

   Hi @lucasbru - Thanks for taking the time to review my PR.  I addressed all 
but 2 comments:
   1. Mockito: Could you be more specific on how you expect to mock the 
response object?
   2. Error handling: Essentially all errors in `continueHandlePartitionErrors` 
can only happen in the response. I understand there are some redundancy there 
and can be a bit confusing.  But the response and both throw a hard failures 
(response = null and throwable = non-null) or a server side error (response = 
non-null).  That is why it was kept separated.  If you find it unclear - how do 
I make it more readable?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-26 Thread via GitHub


philipnee commented on code in PR #14639:
URL: https://github.com/apache/kafka/pull/14639#discussion_r1374057359


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -127,7 +146,7 @@ public void maybeAutoCommit(final Map offsets
  * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It 
creates an
  * {@link OffsetCommitRequestState} and enqueue it to send later.
  */
-public CompletableFuture addOffsetCommitRequest(final 
Map offsets) {
+public OffsetCommitRequestState addOffsetCommitRequest(final 
Map offsets) {

Review Comment:
   Got it, I think the main reason was to test retryBackoff.  I made some 
changes according to your suggestions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-26 Thread via GitHub


philipnee commented on code in PR #14639:
URL: https://github.com/apache/kafka/pull/14639#discussion_r1374040965


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -382,9 +522,49 @@ private ClientResponse buildOffsetFetchClientResponse(
 return buildOffsetFetchClientResponse(request, topicPartitionData, 
error);
 }
 
+private ClientResponse buildOffsetCommitClientResponse(final 
OffsetCommitResponse commitResponse,
+   final Errors error) 
{
+OffsetCommitResponseData data = new OffsetCommitResponseData();
+OffsetCommitResponse response = new OffsetCommitResponse(data);
+short apiVersion = 1;
+return new ClientResponse(
+new RequestHeader(ApiKeys.OFFSET_COMMIT, apiVersion, "", 1),
+null,
+"-1",
+time.milliseconds(),
+time.milliseconds(),
+false,
+null,
+null,
+commitResponse
+);
+}
+
+public ClientResponse mockOffsetCommitResponse(String topic, int 
partition, short apiKeyVersion, Errors error) {

Review Comment:
    Could you be specific: Do you mean by doing something like this?
   
   ```
   response = mock(ClientResponse.class);
   when(response.receivedTimeMs()).thenReturn(...);
   when(response.data()).thenReturn(responseData);
   return response;
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -382,9 +522,49 @@ private ClientResponse buildOffsetFetchClientResponse(
 return buildOffsetFetchClientResponse(request, topicPartitionData, 
error);
 }
 
+private ClientResponse buildOffsetCommitClientResponse(final 
OffsetCommitResponse commitResponse,
+   final Errors error) 
{
+OffsetCommitResponseData data = new OffsetCommitResponseData();
+OffsetCommitResponse response = new OffsetCommitResponse(data);
+short apiVersion = 1;
+return new ClientResponse(
+new RequestHeader(ApiKeys.OFFSET_COMMIT, apiVersion, "", 1),
+null,
+"-1",
+time.milliseconds(),
+time.milliseconds(),
+false,
+null,
+null,
+commitResponse
+);
+}
+
+public ClientResponse mockOffsetCommitResponse(String topic, int 
partition, short apiKeyVersion, Errors error) {

Review Comment:
    Could you be specific: Do you mean by doing something like this?
   
   ```
   response = mock(ClientResponse.class);
   when(response.receivedTimeMs()).thenReturn(...);
   when(response.data()).thenReturn(responseData);
   return response;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-26 Thread via GitHub


philipnee commented on code in PR #14639:
URL: https://github.com/apache/kafka/pull/14639#discussion_r1373587460


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 .setMemberId(generation.memberId)
 .setGroupInstanceId(groupInstanceId)
 .setTopics(new ArrayList<>(requestTopicDataMap.values(;
-return new NetworkClientDelegate.UnsentRequest(
+NetworkClientDelegate.UnsentRequest resp = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator(),
+coordinatorRequestManager.coordinator());
+resp.future().whenComplete(
 (response, throwable) -> {
-if (throwable == null) {
-future.complete(null);
-} else {
-future.completeExceptionally(throwable);
+try {
+if (throwable == null) {
+onResponse(response);
+} else {
+onError(throwable, 
resp.handler().completionTimeMs());
+}
+} catch (Throwable t) {
+log.error("Unexpected error when completing offset 
commit: {}", this, t);
+future.completeExceptionally(t);
 }
 });
+return resp;
+}
+
+public CompletableFuture future() {
+return future;
+}
+
+public void onError(final Throwable exception, final long 
currentTimeMs) {
+if (exception instanceof RetriableException) {
+handleCoordinatorDisconnect(exception, currentTimeMs);
+retry(currentTimeMs);
+}
+}
+
+public void onResponse(final ClientResponse response) {
+long responseTime = response.receivedTimeMs();
+OffsetCommitResponse commitResponse = (OffsetCommitResponse) 
response.responseBody();
+Set unauthorizedTopics = new HashSet<>();
+for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
+TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+long offset = offsetAndMetadata.offset();
+Errors error = Errors.forCode(partition.errorCode());
+if (error == Errors.NONE) {
+log.debug("OffsetCommit {} for partition {}", offset, 
tp);
+continue;
+}
+
+if (error.exception() instanceof RetriableException) {
+log.warn("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset,
+error.message());
+} else {
+log.error("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset, error.message());
+}
+
+if (!continueHandlePartitionErrors(error, tp, offset, 
unauthorizedTopics, responseTime)) {
+return;
+}
+}
+}
+
+if (!unauthorizedTopics.isEmpty()) {
+log.error("OffsetCommit failed due to not authorized to commit 
to topics {}", unauthorizedTopics);
+future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+} else {
+future.complete(null);
+}
+}
+
+private void retry(final long currentTimeMs) {
+System.out.println("timeout" + currentTimeMs);
+onFailedAttempt(currentTimeMs);
+pendingRequests.addOffsetCommitRequest(this);
+}
+
+private boolean continueHandlePartitionErrors(Errors error, 
TopicPartition tp, long offset,
+  Set 
unauthorizedTopics, long responseTime) {
+switch (error) {
+case GROUP_AUTHORIZATION_FAILED:
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
+return false;
+case TOPIC_AUTHORIZATION_FAILED:
+// Collect all unauthorized topics before failing
+unauthorizedTopics.add(tp.topic());
+return true;

Review Comment:
   Ah I see what do you mean.



-- 
This is an automated message from the Apache Git Service.
To respond to the 

Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-26 Thread via GitHub


philipnee commented on code in PR #14639:
URL: https://github.com/apache/kafka/pull/14639#discussion_r1373585188


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 .setMemberId(generation.memberId)
 .setGroupInstanceId(groupInstanceId)
 .setTopics(new ArrayList<>(requestTopicDataMap.values(;
-return new NetworkClientDelegate.UnsentRequest(
+NetworkClientDelegate.UnsentRequest resp = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator(),
+coordinatorRequestManager.coordinator());
+resp.future().whenComplete(
 (response, throwable) -> {
-if (throwable == null) {
-future.complete(null);
-} else {
-future.completeExceptionally(throwable);
+try {
+if (throwable == null) {
+onResponse(response);
+} else {
+onError(throwable, 
resp.handler().completionTimeMs());
+}
+} catch (Throwable t) {
+log.error("Unexpected error when completing offset 
commit: {}", this, t);
+future.completeExceptionally(t);
 }
 });
+return resp;
+}
+
+public CompletableFuture future() {
+return future;
+}
+
+public void onError(final Throwable exception, final long 
currentTimeMs) {
+if (exception instanceof RetriableException) {
+handleCoordinatorDisconnect(exception, currentTimeMs);
+retry(currentTimeMs);
+}
+}
+
+public void onResponse(final ClientResponse response) {
+long responseTime = response.receivedTimeMs();
+OffsetCommitResponse commitResponse = (OffsetCommitResponse) 
response.responseBody();
+Set unauthorizedTopics = new HashSet<>();
+for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
+TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+long offset = offsetAndMetadata.offset();
+Errors error = Errors.forCode(partition.errorCode());
+if (error == Errors.NONE) {
+log.debug("OffsetCommit {} for partition {}", offset, 
tp);
+continue;
+}
+
+if (error.exception() instanceof RetriableException) {
+log.warn("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset,
+error.message());
+} else {
+log.error("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset, error.message());
+}
+
+if (!continueHandlePartitionErrors(error, tp, offset, 
unauthorizedTopics, responseTime)) {
+return;
+}
+}
+}
+
+if (!unauthorizedTopics.isEmpty()) {
+log.error("OffsetCommit failed due to not authorized to commit 
to topics {}", unauthorizedTopics);
+future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+} else {
+future.complete(null);
+}
+}
+
+private void retry(final long currentTimeMs) {
+System.out.println("timeout" + currentTimeMs);
+onFailedAttempt(currentTimeMs);
+pendingRequests.addOffsetCommitRequest(this);
+}
+
+private boolean continueHandlePartitionErrors(Errors error, 
TopicPartition tp, long offset,
+  Set 
unauthorizedTopics, long responseTime) {
+switch (error) {
+case GROUP_AUTHORIZATION_FAILED:
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
+return false;
+case TOPIC_AUTHORIZATION_FAILED:
+// Collect all unauthorized topics before failing
+unauthorizedTopics.add(tp.topic());
+return true;
+case OFFSET_METADATA_TOO_LARGE:
+case INVALID_COMMIT_OFFSET_SIZE:
+

Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-26 Thread via GitHub


philipnee commented on code in PR #14639:
URL: https://github.com/apache/kafka/pull/14639#discussion_r1373567351


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 .setMemberId(generation.memberId)
 .setGroupInstanceId(groupInstanceId)
 .setTopics(new ArrayList<>(requestTopicDataMap.values(;
-return new NetworkClientDelegate.UnsentRequest(
+NetworkClientDelegate.UnsentRequest resp = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator(),
+coordinatorRequestManager.coordinator());
+resp.future().whenComplete(
 (response, throwable) -> {
-if (throwable == null) {
-future.complete(null);
-} else {
-future.completeExceptionally(throwable);
+try {
+if (throwable == null) {
+onResponse(response);
+} else {
+onError(throwable, 
resp.handler().completionTimeMs());
+}
+} catch (Throwable t) {
+log.error("Unexpected error when completing offset 
commit: {}", this, t);
+future.completeExceptionally(t);
 }
 });
+return resp;
+}
+
+public CompletableFuture future() {
+return future;
+}
+
+public void onError(final Throwable exception, final long 
currentTimeMs) {
+if (exception instanceof RetriableException) {
+handleCoordinatorDisconnect(exception, currentTimeMs);
+retry(currentTimeMs);
+}
+}
+
+public void onResponse(final ClientResponse response) {
+long responseTime = response.receivedTimeMs();
+OffsetCommitResponse commitResponse = (OffsetCommitResponse) 
response.responseBody();
+Set unauthorizedTopics = new HashSet<>();
+for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
+TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+long offset = offsetAndMetadata.offset();
+Errors error = Errors.forCode(partition.errorCode());
+if (error == Errors.NONE) {
+log.debug("OffsetCommit {} for partition {}", offset, 
tp);
+continue;
+}
+
+if (error.exception() instanceof RetriableException) {
+log.warn("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset,
+error.message());
+} else {
+log.error("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset, error.message());
+}
+
+if (!continueHandlePartitionErrors(error, tp, offset, 
unauthorizedTopics, responseTime)) {
+return;
+}
+}
+}
+
+if (!unauthorizedTopics.isEmpty()) {
+log.error("OffsetCommit failed due to not authorized to commit 
to topics {}", unauthorizedTopics);
+future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+} else {
+future.complete(null);
+}
+}
+
+private void retry(final long currentTimeMs) {
+System.out.println("timeout" + currentTimeMs);

Review Comment:
   Accidentally exposed my debug skills.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-26 Thread via GitHub


lucasbru commented on PR #14639:
URL: https://github.com/apache/kafka/pull/14639#issuecomment-1781118953

   Thanks @philipnee, I left some comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-26 Thread via GitHub


lucasbru commented on code in PR #14639:
URL: https://github.com/apache/kafka/pull/14639#discussion_r1373051849


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -148,28 +167,35 @@ Queue 
unsentOffsetCommitRequests() {
 return pendingRequests.unsentOffsetCommits;
 }
 
+private List unsentOffsetFetchRequests() {
+return pendingRequests.unsentOffsetFetches;
+}
+
 // Visible for testing
 CompletableFuture sendAutoCommit(final Map allConsumedOffsets) {
 log.debug("Enqueuing autocommit offsets: {}", allConsumedOffsets);
-return this.addOffsetCommitRequest(allConsumedOffsets)
-.whenComplete((response, throwable) -> {
-this.autoCommitState.ifPresent(autoCommitState -> 
autoCommitState.setInflightCommitStatus(false));
+return 
addOffsetCommitRequest(allConsumedOffsets).future().whenComplete((response, 
throwable) -> {
+autoCommitState.ifPresent(autoCommitState -> 
autoCommitState.setInflightCommitStatus(false));
+if (throwable == null) {
+log.debug("Completed asynchronous auto-commit of offsets {}", 
allConsumedOffsets);
+} else {
+if (throwable instanceof RetriableCommitFailedException) {

Review Comment:
   nit: could become `else if` and remove one identation level.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -127,7 +146,7 @@ public void maybeAutoCommit(final Map offsets
  * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It 
creates an
  * {@link OffsetCommitRequestState} and enqueue it to send later.
  */
-public CompletableFuture addOffsetCommitRequest(final 
Map offsets) {
+public OffsetCommitRequestState addOffsetCommitRequest(final 
Map offsets) {

Review Comment:
   You seem to expose the whole internal `OffsetCommitRequestState` here. It 
took me a minute to track all the references to realize that you are only 
exposing this for testing? 
   
   If so, I'd suggest to either avoid exposing internal state for unit testing 
(that's a smell), or at least introduce a separate method that is 
package-private and clearly marked as "visible for testing".



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 .setMemberId(generation.memberId)
 .setGroupInstanceId(groupInstanceId)
 .setTopics(new ArrayList<>(requestTopicDataMap.values(;
-return new NetworkClientDelegate.UnsentRequest(
+NetworkClientDelegate.UnsentRequest resp = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator(),
+coordinatorRequestManager.coordinator());
+resp.future().whenComplete(
 (response, throwable) -> {
-if (throwable == null) {
-future.complete(null);
-} else {
-future.completeExceptionally(throwable);
+try {
+if (throwable == null) {
+onResponse(response);
+} else {
+onError(throwable, 
resp.handler().completionTimeMs());
+}
+} catch (Throwable t) {
+log.error("Unexpected error when completing offset 
commit: {}", this, t);
+future.completeExceptionally(t);
 }
 });
+return resp;
+}
+
+public CompletableFuture future() {
+return future;
+}
+
+public void onError(final Throwable exception, final long 
currentTimeMs) {
+if (exception instanceof RetriableException) {
+handleCoordinatorDisconnect(exception, currentTimeMs);
+retry(currentTimeMs);
+}
+}
+
+public void onResponse(final ClientResponse response) {
+long responseTime = response.receivedTimeMs();
+OffsetCommitResponse commitResponse = (OffsetCommitResponse) 
response.responseBody();
+Set unauthorizedTopics = new HashSet<>();
+for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
+TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+long offset = 

Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-26 Thread via GitHub


scyber commented on code in PR #14639:
URL: https://github.com/apache/kafka/pull/14639#discussion_r1372704905


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -215,16 +227,113 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 .setMemberId(generation.memberId)
 .setGroupInstanceId(groupInstanceId)
 .setTopics(new ArrayList<>(requestTopicDataMap.values(;
-return new NetworkClientDelegate.UnsentRequest(
+// TODO: KAFKA-15592
+NetworkClientDelegate.UnsentRequest resp = new 
NetworkClientDelegate.UnsentRequest(
 builder,
-coordinatorRequestManager.coordinator(),
+coordinatorRequestManager.coordinator());
+resp.future().whenComplete(
 (response, throwable) -> {
-if (throwable == null) {
-future.complete(null);
-} else {
-future.completeExceptionally(throwable);
+try {
+if (throwable == null) {
+onResponse(response);
+} else {
+onError(throwable, 
resp.handler().completionTimeMs());
+}
+} catch (Throwable t) {

Review Comment:
   I'm not sure if come to that point e.g. do we really need this try/catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-25 Thread via GitHub


philipnee opened a new pull request, #14639:
URL: https://github.com/apache/kafka/pull/14639

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org