Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
lucasbru merged PR #14639: URL: https://github.com/apache/kafka/pull/14639 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
lucasbru commented on PR #14639: URL: https://github.com/apache/kafka/pull/14639#issuecomment-1790316683 Test failures unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
philipnee commented on PR #14639: URL: https://github.com/apache/kafka/pull/14639#issuecomment-1789989071 Test failures seem irrelevant, but retriggering the test as there's a failed build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
philipnee commented on PR #14639: URL: https://github.com/apache/kafka/pull/14639#issuecomment-1785408525 @lucasbru - Thanks for taking time reviewing this PR. I addressed your previous comment. Let me know if there's anything unclear on this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
philipnee commented on code in PR #14639: URL: https://github.com/apache/kafka/pull/14639#discussion_r1376339956 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -215,16 +296,117 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { .setMemberId(generation.memberId) .setGroupInstanceId(groupInstanceId) .setTopics(new ArrayList<>(requestTopicDataMap.values(; -return new NetworkClientDelegate.UnsentRequest( +NetworkClientDelegate.UnsentRequest resp = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator(), +coordinatorRequestManager.coordinator()); +resp.future().whenComplete( (response, throwable) -> { -if (throwable == null) { -future.complete(null); -} else { -future.completeExceptionally(throwable); +try { +if (throwable == null) { +onResponse(response); +} else { +onError(throwable, resp.handler().completionTimeMs()); +} +} catch (Throwable t) { +log.error("Unexpected error when completing offset commit: {}", this, t); +future.completeExceptionally(t); } }); +return resp; +} + +public CompletableFuture future() { +return future; +} + +public void onError(final Throwable exception, final long currentTimeMs) { +if (exception instanceof RetriableException) { +handleCoordinatorDisconnect(exception, currentTimeMs); +retry(currentTimeMs); +} +} + +public void onResponse(final ClientResponse response) { +long responseTime = response.receivedTimeMs(); +OffsetCommitResponse commitResponse = (OffsetCommitResponse) response.responseBody(); +Set unauthorizedTopics = new HashSet<>(); +for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) { +for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) { +TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); +OffsetAndMetadata offsetAndMetadata = offsets.get(tp); +long offset = offsetAndMetadata.offset(); +Errors error = Errors.forCode(partition.errorCode()); +if (error == Errors.NONE) { +log.debug("OffsetCommit {} for partition {}", offset, tp); +continue; +} + +if (error.exception() instanceof RetriableException) { Review Comment: Thanks, as well as the error == Errors.NONE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
lucasbru commented on code in PR #14639: URL: https://github.com/apache/kafka/pull/14639#discussion_r1375757133 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -215,16 +296,117 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { .setMemberId(generation.memberId) .setGroupInstanceId(groupInstanceId) .setTopics(new ArrayList<>(requestTopicDataMap.values(; -return new NetworkClientDelegate.UnsentRequest( +NetworkClientDelegate.UnsentRequest resp = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator(), +coordinatorRequestManager.coordinator()); +resp.future().whenComplete( (response, throwable) -> { -if (throwable == null) { -future.complete(null); -} else { -future.completeExceptionally(throwable); +try { +if (throwable == null) { +onResponse(response); +} else { +onError(throwable, resp.handler().completionTimeMs()); +} +} catch (Throwable t) { +log.error("Unexpected error when completing offset commit: {}", this, t); +future.completeExceptionally(t); } }); +return resp; +} + +public CompletableFuture future() { +return future; +} + +public void onError(final Throwable exception, final long currentTimeMs) { +if (exception instanceof RetriableException) { +handleCoordinatorDisconnect(exception, currentTimeMs); +retry(currentTimeMs); +} +} + +public void onResponse(final ClientResponse response) { +long responseTime = response.receivedTimeMs(); +OffsetCommitResponse commitResponse = (OffsetCommitResponse) response.responseBody(); +Set unauthorizedTopics = new HashSet<>(); +for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) { +for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) { +TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); +OffsetAndMetadata offsetAndMetadata = offsets.get(tp); +long offset = offsetAndMetadata.offset(); +Errors error = Errors.forCode(partition.errorCode()); +if (error == Errors.NONE) { +log.debug("OffsetCommit {} for partition {}", offset, tp); +continue; +} + +if (error.exception() instanceof RetriableException) { Review Comment: I think you want to remove this if/else block completely now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
philipnee commented on PR #14639: URL: https://github.com/apache/kafka/pull/14639#issuecomment-1784489022 Hi @lucasbru - Thanks for the response. To your first question: Yes, what is left is correcting the error handling at the user API level. At the time I implemented this, I wrapped all errors in KafkaException (I forgot the reason why), however, we should throw the exact exception instead. So I will follow up with a second part PR to make this Jira issue completed. I get what you meant for `onResponse` and I think your suggestion makes a lot of sense. I'll patch it accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
lucasbru commented on PR #14639: URL: https://github.com/apache/kafka/pull/14639#issuecomment-1782544764 Changes look good to me. Did I understand david correctly that we want to add more changes to this PR? > 1. Mockito: Could you be more specific on how you expect to mock the response object? That was just "out of interest", since you are already using mockito to mock things, why not use it for the response object as well. But I realize that the object is a pretty plain data container, so the answer I guess is that it's easier. > 2. Error handling: Essentially all errors in `continueHandlePartitionErrors` can only happen in the response. I understand there are some redundancy there and can be a bit confusing. But the response and both throw a hard failures (response = null and throwable = non-null) or a server side error (response = non-null). That is why it was kept separated. If you find it unclear - how do I make it more readable? That's not what I meant. I just found the flow `onResponse` a bit hard to read, so I suggested structuring it a bit differently. In particular, `continueHandlePartitionErrors` sounds like that it's determining whether to continue handling errors, but it's actually completing the future inside. I think the control flow may also lead to logging in a way that may not be intended? For fencing, we get one generic `log.error` message and one very similar `log.info` message with some specific details. For topic authorization errors, we get one `log.error` message for each partition, and then another aggregated `log.error` message, listing the partitions again. However, if we have 3 unauthorized topic errors and then one other error, we get three generic error messages for the topics, but we do not get the aggregated error message. Is that trying to be consistent with the old consumer? I imagined something like: ``` if (error == Errors.NONE) { log.debug("OffsetCommit {} for partition {}", offset, tp); } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { // Collect all unauthorized topics before failing unauthorizedTopics.add(tp.topic()); } else if (error instanceof RetriableException) { log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, error.message()); handleRetriableError(error); retry(responseTime); return; } else { log.error("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, error.message()); handleFatalError(error); return; } ``` But I'm not super familiar with the code style in the new consumer, and consistency with the rest of the code and the old consumer is also important. So I just wanted to give this to you as an inspiration, but you are probably in the best position to come up with the best way to implement it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
lucasbru commented on code in PR #14639: URL: https://github.com/apache/kafka/pull/14639#discussion_r1374244715 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -271,14 +445,13 @@ public void onResponse( private void onFailure(final long currentTimeMs, final Errors responseError) { +handleCoordinatorDisconnect(responseError.exception(), currentTimeMs); log.debug("Offset fetch failed: {}", responseError.message()); - // TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ? if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) { Review Comment: nit: merge first two branches now? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { .setMemberId(generation.memberId) .setGroupInstanceId(groupInstanceId) .setTopics(new ArrayList<>(requestTopicDataMap.values(; -return new NetworkClientDelegate.UnsentRequest( +NetworkClientDelegate.UnsentRequest resp = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator(), +coordinatorRequestManager.coordinator()); +resp.future().whenComplete( (response, throwable) -> { -if (throwable == null) { -future.complete(null); -} else { -future.completeExceptionally(throwable); +try { +if (throwable == null) { +onResponse(response); +} else { +onError(throwable, resp.handler().completionTimeMs()); +} +} catch (Throwable t) { +log.error("Unexpected error when completing offset commit: {}", this, t); +future.completeExceptionally(t); } }); +return resp; +} + +public CompletableFuture future() { +return future; +} + +public void onError(final Throwable exception, final long currentTimeMs) { +if (exception instanceof RetriableException) { +handleCoordinatorDisconnect(exception, currentTimeMs); +retry(currentTimeMs); +} +} + +public void onResponse(final ClientResponse response) { +long responseTime = response.receivedTimeMs(); +OffsetCommitResponse commitResponse = (OffsetCommitResponse) response.responseBody(); +Set unauthorizedTopics = new HashSet<>(); +for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) { +for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) { +TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); +OffsetAndMetadata offsetAndMetadata = offsets.get(tp); +long offset = offsetAndMetadata.offset(); +Errors error = Errors.forCode(partition.errorCode()); +if (error == Errors.NONE) { +log.debug("OffsetCommit {} for partition {}", offset, tp); +continue; +} + +if (error.exception() instanceof RetriableException) { +log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, +error.message()); +} else { +log.error("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, error.message()); +} + +if (!continueHandlePartitionErrors(error, tp, offset, unauthorizedTopics, responseTime)) { +return; +} +} +} + +if (!unauthorizedTopics.isEmpty()) { +log.error("OffsetCommit failed due to not authorized to commit to topics {}", unauthorizedTopics); +future.completeExceptionally(new TopicAuthorizationException(unauthorizedTopics)); +} else { +future.complete(null); +} +} + +private void retry(final long currentTimeMs) { +System.out.println("timeout" + currentTimeMs); +onFailedAttempt(currentTimeMs); +pendingRequests.addOffsetCommitRequest(this); +} + +private boolean continueHandlePartitionErrors(Errors error, TopicPartition tp, long offset, +
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
philipnee commented on PR #14639: URL: https://github.com/apache/kafka/pull/14639#issuecomment-1782275615 Hi @lucasbru - Thanks for taking the time to review my PR. I addressed all but 2 comments: 1. Mockito: Could you be more specific on how you expect to mock the response object? 2. Error handling: Essentially all errors in `continueHandlePartitionErrors` can only happen in the response. I understand there are some redundancy there and can be a bit confusing. But the response and both throw a hard failures (response = null and throwable = non-null) or a server side error (response = non-null). That is why it was kept separated. If you find it unclear - how do I make it more readable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
philipnee commented on code in PR #14639: URL: https://github.com/apache/kafka/pull/14639#discussion_r1374057359 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -127,7 +146,7 @@ public void maybeAutoCommit(final Map offsets * Handles {@link org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It creates an * {@link OffsetCommitRequestState} and enqueue it to send later. */ -public CompletableFuture addOffsetCommitRequest(final Map offsets) { +public OffsetCommitRequestState addOffsetCommitRequest(final Map offsets) { Review Comment: Got it, I think the main reason was to test retryBackoff. I made some changes according to your suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
philipnee commented on code in PR #14639: URL: https://github.com/apache/kafka/pull/14639#discussion_r1374040965 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -382,9 +522,49 @@ private ClientResponse buildOffsetFetchClientResponse( return buildOffsetFetchClientResponse(request, topicPartitionData, error); } +private ClientResponse buildOffsetCommitClientResponse(final OffsetCommitResponse commitResponse, + final Errors error) { +OffsetCommitResponseData data = new OffsetCommitResponseData(); +OffsetCommitResponse response = new OffsetCommitResponse(data); +short apiVersion = 1; +return new ClientResponse( +new RequestHeader(ApiKeys.OFFSET_COMMIT, apiVersion, "", 1), +null, +"-1", +time.milliseconds(), +time.milliseconds(), +false, +null, +null, +commitResponse +); +} + +public ClientResponse mockOffsetCommitResponse(String topic, int partition, short apiKeyVersion, Errors error) { Review Comment: Could you be specific: Do you mean by doing something like this? ``` response = mock(ClientResponse.class); when(response.receivedTimeMs()).thenReturn(...); when(response.data()).thenReturn(responseData); return response; ``` ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -382,9 +522,49 @@ private ClientResponse buildOffsetFetchClientResponse( return buildOffsetFetchClientResponse(request, topicPartitionData, error); } +private ClientResponse buildOffsetCommitClientResponse(final OffsetCommitResponse commitResponse, + final Errors error) { +OffsetCommitResponseData data = new OffsetCommitResponseData(); +OffsetCommitResponse response = new OffsetCommitResponse(data); +short apiVersion = 1; +return new ClientResponse( +new RequestHeader(ApiKeys.OFFSET_COMMIT, apiVersion, "", 1), +null, +"-1", +time.milliseconds(), +time.milliseconds(), +false, +null, +null, +commitResponse +); +} + +public ClientResponse mockOffsetCommitResponse(String topic, int partition, short apiKeyVersion, Errors error) { Review Comment: Could you be specific: Do you mean by doing something like this? ``` response = mock(ClientResponse.class); when(response.receivedTimeMs()).thenReturn(...); when(response.data()).thenReturn(responseData); return response; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
philipnee commented on code in PR #14639: URL: https://github.com/apache/kafka/pull/14639#discussion_r1373587460 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { .setMemberId(generation.memberId) .setGroupInstanceId(groupInstanceId) .setTopics(new ArrayList<>(requestTopicDataMap.values(; -return new NetworkClientDelegate.UnsentRequest( +NetworkClientDelegate.UnsentRequest resp = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator(), +coordinatorRequestManager.coordinator()); +resp.future().whenComplete( (response, throwable) -> { -if (throwable == null) { -future.complete(null); -} else { -future.completeExceptionally(throwable); +try { +if (throwable == null) { +onResponse(response); +} else { +onError(throwable, resp.handler().completionTimeMs()); +} +} catch (Throwable t) { +log.error("Unexpected error when completing offset commit: {}", this, t); +future.completeExceptionally(t); } }); +return resp; +} + +public CompletableFuture future() { +return future; +} + +public void onError(final Throwable exception, final long currentTimeMs) { +if (exception instanceof RetriableException) { +handleCoordinatorDisconnect(exception, currentTimeMs); +retry(currentTimeMs); +} +} + +public void onResponse(final ClientResponse response) { +long responseTime = response.receivedTimeMs(); +OffsetCommitResponse commitResponse = (OffsetCommitResponse) response.responseBody(); +Set unauthorizedTopics = new HashSet<>(); +for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) { +for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) { +TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); +OffsetAndMetadata offsetAndMetadata = offsets.get(tp); +long offset = offsetAndMetadata.offset(); +Errors error = Errors.forCode(partition.errorCode()); +if (error == Errors.NONE) { +log.debug("OffsetCommit {} for partition {}", offset, tp); +continue; +} + +if (error.exception() instanceof RetriableException) { +log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, +error.message()); +} else { +log.error("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, error.message()); +} + +if (!continueHandlePartitionErrors(error, tp, offset, unauthorizedTopics, responseTime)) { +return; +} +} +} + +if (!unauthorizedTopics.isEmpty()) { +log.error("OffsetCommit failed due to not authorized to commit to topics {}", unauthorizedTopics); +future.completeExceptionally(new TopicAuthorizationException(unauthorizedTopics)); +} else { +future.complete(null); +} +} + +private void retry(final long currentTimeMs) { +System.out.println("timeout" + currentTimeMs); +onFailedAttempt(currentTimeMs); +pendingRequests.addOffsetCommitRequest(this); +} + +private boolean continueHandlePartitionErrors(Errors error, TopicPartition tp, long offset, + Set unauthorizedTopics, long responseTime) { +switch (error) { +case GROUP_AUTHORIZATION_FAILED: + future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId)); +return false; +case TOPIC_AUTHORIZATION_FAILED: +// Collect all unauthorized topics before failing +unauthorizedTopics.add(tp.topic()); +return true; Review Comment: Ah I see what do you mean. -- This is an automated message from the Apache Git Service. To respond to the
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
philipnee commented on code in PR #14639: URL: https://github.com/apache/kafka/pull/14639#discussion_r1373585188 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { .setMemberId(generation.memberId) .setGroupInstanceId(groupInstanceId) .setTopics(new ArrayList<>(requestTopicDataMap.values(; -return new NetworkClientDelegate.UnsentRequest( +NetworkClientDelegate.UnsentRequest resp = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator(), +coordinatorRequestManager.coordinator()); +resp.future().whenComplete( (response, throwable) -> { -if (throwable == null) { -future.complete(null); -} else { -future.completeExceptionally(throwable); +try { +if (throwable == null) { +onResponse(response); +} else { +onError(throwable, resp.handler().completionTimeMs()); +} +} catch (Throwable t) { +log.error("Unexpected error when completing offset commit: {}", this, t); +future.completeExceptionally(t); } }); +return resp; +} + +public CompletableFuture future() { +return future; +} + +public void onError(final Throwable exception, final long currentTimeMs) { +if (exception instanceof RetriableException) { +handleCoordinatorDisconnect(exception, currentTimeMs); +retry(currentTimeMs); +} +} + +public void onResponse(final ClientResponse response) { +long responseTime = response.receivedTimeMs(); +OffsetCommitResponse commitResponse = (OffsetCommitResponse) response.responseBody(); +Set unauthorizedTopics = new HashSet<>(); +for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) { +for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) { +TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); +OffsetAndMetadata offsetAndMetadata = offsets.get(tp); +long offset = offsetAndMetadata.offset(); +Errors error = Errors.forCode(partition.errorCode()); +if (error == Errors.NONE) { +log.debug("OffsetCommit {} for partition {}", offset, tp); +continue; +} + +if (error.exception() instanceof RetriableException) { +log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, +error.message()); +} else { +log.error("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, error.message()); +} + +if (!continueHandlePartitionErrors(error, tp, offset, unauthorizedTopics, responseTime)) { +return; +} +} +} + +if (!unauthorizedTopics.isEmpty()) { +log.error("OffsetCommit failed due to not authorized to commit to topics {}", unauthorizedTopics); +future.completeExceptionally(new TopicAuthorizationException(unauthorizedTopics)); +} else { +future.complete(null); +} +} + +private void retry(final long currentTimeMs) { +System.out.println("timeout" + currentTimeMs); +onFailedAttempt(currentTimeMs); +pendingRequests.addOffsetCommitRequest(this); +} + +private boolean continueHandlePartitionErrors(Errors error, TopicPartition tp, long offset, + Set unauthorizedTopics, long responseTime) { +switch (error) { +case GROUP_AUTHORIZATION_FAILED: + future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId)); +return false; +case TOPIC_AUTHORIZATION_FAILED: +// Collect all unauthorized topics before failing +unauthorizedTopics.add(tp.topic()); +return true; +case OFFSET_METADATA_TOO_LARGE: +case INVALID_COMMIT_OFFSET_SIZE: +
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
philipnee commented on code in PR #14639: URL: https://github.com/apache/kafka/pull/14639#discussion_r1373567351 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { .setMemberId(generation.memberId) .setGroupInstanceId(groupInstanceId) .setTopics(new ArrayList<>(requestTopicDataMap.values(; -return new NetworkClientDelegate.UnsentRequest( +NetworkClientDelegate.UnsentRequest resp = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator(), +coordinatorRequestManager.coordinator()); +resp.future().whenComplete( (response, throwable) -> { -if (throwable == null) { -future.complete(null); -} else { -future.completeExceptionally(throwable); +try { +if (throwable == null) { +onResponse(response); +} else { +onError(throwable, resp.handler().completionTimeMs()); +} +} catch (Throwable t) { +log.error("Unexpected error when completing offset commit: {}", this, t); +future.completeExceptionally(t); } }); +return resp; +} + +public CompletableFuture future() { +return future; +} + +public void onError(final Throwable exception, final long currentTimeMs) { +if (exception instanceof RetriableException) { +handleCoordinatorDisconnect(exception, currentTimeMs); +retry(currentTimeMs); +} +} + +public void onResponse(final ClientResponse response) { +long responseTime = response.receivedTimeMs(); +OffsetCommitResponse commitResponse = (OffsetCommitResponse) response.responseBody(); +Set unauthorizedTopics = new HashSet<>(); +for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) { +for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) { +TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); +OffsetAndMetadata offsetAndMetadata = offsets.get(tp); +long offset = offsetAndMetadata.offset(); +Errors error = Errors.forCode(partition.errorCode()); +if (error == Errors.NONE) { +log.debug("OffsetCommit {} for partition {}", offset, tp); +continue; +} + +if (error.exception() instanceof RetriableException) { +log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, +error.message()); +} else { +log.error("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, error.message()); +} + +if (!continueHandlePartitionErrors(error, tp, offset, unauthorizedTopics, responseTime)) { +return; +} +} +} + +if (!unauthorizedTopics.isEmpty()) { +log.error("OffsetCommit failed due to not authorized to commit to topics {}", unauthorizedTopics); +future.completeExceptionally(new TopicAuthorizationException(unauthorizedTopics)); +} else { +future.complete(null); +} +} + +private void retry(final long currentTimeMs) { +System.out.println("timeout" + currentTimeMs); Review Comment: Accidentally exposed my debug skills. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
lucasbru commented on PR #14639: URL: https://github.com/apache/kafka/pull/14639#issuecomment-1781118953 Thanks @philipnee, I left some comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
lucasbru commented on code in PR #14639: URL: https://github.com/apache/kafka/pull/14639#discussion_r1373051849 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -148,28 +167,35 @@ Queue unsentOffsetCommitRequests() { return pendingRequests.unsentOffsetCommits; } +private List unsentOffsetFetchRequests() { +return pendingRequests.unsentOffsetFetches; +} + // Visible for testing CompletableFuture sendAutoCommit(final Map allConsumedOffsets) { log.debug("Enqueuing autocommit offsets: {}", allConsumedOffsets); -return this.addOffsetCommitRequest(allConsumedOffsets) -.whenComplete((response, throwable) -> { -this.autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false)); +return addOffsetCommitRequest(allConsumedOffsets).future().whenComplete((response, throwable) -> { +autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false)); +if (throwable == null) { +log.debug("Completed asynchronous auto-commit of offsets {}", allConsumedOffsets); +} else { +if (throwable instanceof RetriableCommitFailedException) { Review Comment: nit: could become `else if` and remove one identation level. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -127,7 +146,7 @@ public void maybeAutoCommit(final Map offsets * Handles {@link org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It creates an * {@link OffsetCommitRequestState} and enqueue it to send later. */ -public CompletableFuture addOffsetCommitRequest(final Map offsets) { +public OffsetCommitRequestState addOffsetCommitRequest(final Map offsets) { Review Comment: You seem to expose the whole internal `OffsetCommitRequestState` here. It took me a minute to track all the references to realize that you are only exposing this for testing? If so, I'd suggest to either avoid exposing internal state for unit testing (that's a smell), or at least introduce a separate method that is package-private and clearly marked as "visible for testing". ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { .setMemberId(generation.memberId) .setGroupInstanceId(groupInstanceId) .setTopics(new ArrayList<>(requestTopicDataMap.values(; -return new NetworkClientDelegate.UnsentRequest( +NetworkClientDelegate.UnsentRequest resp = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator(), +coordinatorRequestManager.coordinator()); +resp.future().whenComplete( (response, throwable) -> { -if (throwable == null) { -future.complete(null); -} else { -future.completeExceptionally(throwable); +try { +if (throwable == null) { +onResponse(response); +} else { +onError(throwable, resp.handler().completionTimeMs()); +} +} catch (Throwable t) { +log.error("Unexpected error when completing offset commit: {}", this, t); +future.completeExceptionally(t); } }); +return resp; +} + +public CompletableFuture future() { +return future; +} + +public void onError(final Throwable exception, final long currentTimeMs) { +if (exception instanceof RetriableException) { +handleCoordinatorDisconnect(exception, currentTimeMs); +retry(currentTimeMs); +} +} + +public void onResponse(final ClientResponse response) { +long responseTime = response.receivedTimeMs(); +OffsetCommitResponse commitResponse = (OffsetCommitResponse) response.responseBody(); +Set unauthorizedTopics = new HashSet<>(); +for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) { +for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) { +TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); +OffsetAndMetadata offsetAndMetadata = offsets.get(tp); +long offset =
Re: [PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
scyber commented on code in PR #14639: URL: https://github.com/apache/kafka/pull/14639#discussion_r1372704905 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -215,16 +227,113 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { .setMemberId(generation.memberId) .setGroupInstanceId(groupInstanceId) .setTopics(new ArrayList<>(requestTopicDataMap.values(; -return new NetworkClientDelegate.UnsentRequest( +// TODO: KAFKA-15592 +NetworkClientDelegate.UnsentRequest resp = new NetworkClientDelegate.UnsentRequest( builder, -coordinatorRequestManager.coordinator(), +coordinatorRequestManager.coordinator()); +resp.future().whenComplete( (response, throwable) -> { -if (throwable == null) { -future.complete(null); -} else { -future.completeExceptionally(throwable); +try { +if (throwable == null) { +onResponse(response); +} else { +onError(throwable, resp.handler().completionTimeMs()); +} +} catch (Throwable t) { Review Comment: I'm not sure if come to that point e.g. do we really need this try/catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]
philipnee opened a new pull request, #14639: URL: https://github.com/apache/kafka/pull/14639 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org