chia7712 commented on code in PR #20998:
URL: https://github.com/apache/kafka/pull/20998#discussion_r2576300692
##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -411,22 +402,17 @@ Optional<Errors> checkResponseError(ClientResponse
response, BiConsumer<Errors,
if (response.authenticationException() != null) {
log.error("Authentication exception",
response.authenticationException());
Errors error =
Errors.forException(response.authenticationException());
- errorConsumer.accept(error, new
AuthenticationException(String.format("Server response for %s indicates
authentication exception.", this.partitionKey)));
return Optional.of(error);
} else if (response.versionMismatch() != null) {
log.error("Version mismatch exception",
response.versionMismatch());
Errors error = Errors.forException(response.versionMismatch());
- errorConsumer.accept(error, new
UnsupportedVersionException(String.format("Server response for %s indicates
version mismatch.", this.partitionKey)));
return Optional.of(error);
- } else if (response.wasDisconnected()) {
- errorConsumer.accept(Errors.NETWORK_EXCEPTION, new
NetworkException(String.format("Server response for %s indicates disconnect.",
this.partitionKey)));
+ } else if (response.wasDisconnected()) { // Retriable
return Optional.of(Errors.NETWORK_EXCEPTION);
- } else if (response.wasTimedOut()) {
- log.error("Response for RPC {} with key {} timed out - {}.",
name(), this.partitionKey, response);
- errorConsumer.accept(Errors.REQUEST_TIMED_OUT, new
NetworkException(String.format("Server response for %s indicates timeout.",
this.partitionKey)));
+ } else if (response.wasTimedOut()) { // Retriable
+ log.debug("Response for RPC {} with key {} timed out - {}.",
name(), this.partitionKey, response);
Review Comment:
This debug message appears to be redundant, as the information is logged
later in the downstream handler
##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -1184,65 +1279,87 @@ protected boolean isResponseForRequest(ClientResponse
response) {
protected void handleRequestResponse(ClientResponse response) {
log.debug("Delete state response received - {}", response);
deleteStateBackoff.incrementAttempt();
+ Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
+ String clientResponseErrorMessage = clientResponseError.message();
- // response can be a combined one for large number of requests
- // we need to deconstruct it
- DeleteShareGroupStateResponse combinedResponse =
(DeleteShareGroupStateResponse) response.responseBody();
-
- for (DeleteShareGroupStateResponseData.DeleteStateResult
deleteStateResult : combinedResponse.data().results()) {
- if
(deleteStateResult.topicId().equals(partitionKey().topicId())) {
-
Optional<DeleteShareGroupStateResponseData.PartitionResult> partitionStateData =
- deleteStateResult.partitions().stream()
- .filter(partitionResult ->
partitionResult.partition() == partitionKey().partition())
- .findFirst();
-
- if (partitionStateData.isPresent()) {
- Errors error =
Errors.forCode(partitionStateData.get().errorCode());
- String errorMessage =
partitionStateData.get().errorMessage();
- if (errorMessage == null || errorMessage.isEmpty()) {
- errorMessage = error.message();
- }
+ switch (clientResponseError) {
+ case NONE:
+ // response can be a combined one for large number of
requests
+ // we need to deconstruct it
+ DeleteShareGroupStateResponse combinedResponse =
(DeleteShareGroupStateResponse) response.responseBody();
+
+ for (DeleteShareGroupStateResponseData.DeleteStateResult
deleteStateResult : combinedResponse.data().results()) {
+ if
(deleteStateResult.topicId().equals(partitionKey().topicId())) {
+
Optional<DeleteShareGroupStateResponseData.PartitionResult> partitionStateData =
+ deleteStateResult.partitions().stream()
+ .filter(partitionResult ->
partitionResult.partition() == partitionKey().partition())
+ .findFirst();
+
+ if (partitionStateData.isPresent()) {
+ Errors error =
Errors.forCode(partitionStateData.get().errorCode());
+ String errorMessage =
partitionStateData.get().errorMessage();
+ if (errorMessage == null ||
errorMessage.isEmpty()) {
+ errorMessage = error.message();
+ }
- switch (error) {
- case NONE:
- deleteStateBackoff.resetAttempts();
-
DeleteShareGroupStateResponseData.DeleteStateResult result =
DeleteShareGroupStateResponse.toResponseDeleteStateResult(
- partitionKey().topicId(),
- List.of(partitionStateData.get())
- );
- this.result.complete(new
DeleteShareGroupStateResponse(
- new
DeleteShareGroupStateResponseData().setResults(List.of(result))));
- return;
-
- // check retriable errors
- case COORDINATOR_NOT_AVAILABLE:
- case COORDINATOR_LOAD_IN_PROGRESS:
- case NOT_COORDINATOR:
- case UNKNOWN_TOPIC_OR_PARTITION:
- log.debug("Received retriable error in delete
state RPC for key {}: {}", partitionKey(), errorMessage);
- if (!deleteStateBackoff.canAttempt()) {
- log.error("Exhausted max retries for
delete state RPC for key {} without success.", partitionKey());
- requestErrorResponse(error, new
Exception("Exhausted max retries to complete delete state RPC without
success."));
- return;
+ switch (error) {
+ case NONE:
+ deleteStateBackoff.resetAttempts();
+
DeleteShareGroupStateResponseData.DeleteStateResult result =
DeleteShareGroupStateResponse.toResponseDeleteStateResult(
+ partitionKey().topicId(),
+ List.of(partitionStateData.get())
+ );
+ this.result.complete(new
DeleteShareGroupStateResponse(
+ new
DeleteShareGroupStateResponseData().setResults(List.of(result))));
+ return;
+
+ // check retriable errors
+ case COORDINATOR_NOT_AVAILABLE:
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ case NOT_COORDINATOR:
+ case UNKNOWN_TOPIC_OR_PARTITION:
+ log.debug("Received retriable error in
delete state RPC for key {}: {}", partitionKey(), errorMessage);
+ if (!deleteStateBackoff.canAttempt()) {
+ log.error("Exhausted max retries
for delete state RPC for key {} without success.", partitionKey());
+ requestErrorResponse(error, new
Exception("Exhausted max retries to complete delete state RPC without
success."));
+ return;
+ }
+ super.resetCoordinatorNode();
+ timer.add(new
PersisterTimerTask(deleteStateBackoff.backOff(), this));
+ return;
+
+ default:
+ log.error("Unable to perform delete
state RPC for key {}: {}", partitionKey(), errorMessage);
+ requestErrorResponse(error, new
Exception(errorMessage));
+ return;
}
- super.resetCoordinatorNode();
- timer.add(new
PersisterTimerTask(deleteStateBackoff.backOff(), this));
- return;
-
- default:
- log.error("Unable to perform delete state RPC
for key {}: {}", partitionKey(), errorMessage);
- requestErrorResponse(error, new
Exception(errorMessage));
- return;
+ }
}
}
- }
- }
- // no response found specific topic partition
- IllegalStateException exception = new IllegalStateException(
- "Failed to delete state for share partition: " + partitionKey()
- );
- requestErrorResponse(Errors.forException(exception), exception);
+ // no response found specific topic partition
+ IllegalStateException exception = new
IllegalStateException(
+ "Failed to delete state for share partition: " +
partitionKey()
+ );
+ requestErrorResponse(Errors.forException(exception),
exception);
+ return;
+
+ case NETWORK_EXCEPTION: // Retriable client response error
codes.
+ case REQUEST_TIMED_OUT:
+ log.debug("Received retriable error in delete state RPC
client response for key {}: {}", partitionKey(), clientResponseErrorMessage);
Review Comment:
Have you considered using the
`NetworkPartitionMetadataClient#maybeHandleErrorResponse` pattern to avoid
genrating the 'Received retriable error ... Exhausted max retries' log
https://github.com/apache/kafka/blob/7cea0595f7ce8cdd7458ede053557cedd1f7ec95/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java#L286
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]