This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 7ae882623a0 KAFKA-19940 Reduce the log4j2 cpu usage in
PersisterStateManager (#21027)
7ae882623a0 is described below
commit 7ae882623a0b006c99e58c756f7cddbc5a89ff49
Author: Ken Huang <[email protected]>
AuthorDate: Thu Dec 18 01:35:13 2025 +0800
KAFKA-19940 Reduce the log4j2 cpu usage in PersisterStateManager (#21027)
FYI: https://github.com/apache/kafka/pull/20175/files#r2572693459
This PR optimizes the logging mechanism in PersisterStateManager to
reduce CPU overhead and fix issues preventing dynamic log level
configuration for internal handlers.
Replaced the usage of getCanonicalName() with the standard class-based
logger initialization (e.g., LoggerFactory.getLogger(...)).
Reviewers: Sushant Mahajan <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../share/persister/DefaultStatePersister.java | 2 +-
.../share/persister/PersisterStateManager.java | 146 +++++++++++++--------
.../share/persister/PersisterStateManagerTest.java | 8 ++
3 files changed, 99 insertions(+), 57 deletions(-)
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
index 4d593a3bd98..7ac49e6e413 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
@@ -577,7 +577,7 @@ public class DefaultStatePersister implements Persister {
validateGroupTopicPartitionData(prefix,
params.groupTopicPartitionData());
}
-
+
private static void validate(WriteShareGroupStateParameters params) {
String prefix = "Write share group parameters";
if (params == null) {
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 6c151686e9b..6e15ab268e7 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -190,7 +190,6 @@ public class PersisterStateManager {
public abstract class PersisterStateManagerHandler implements
RequestCompletionHandler {
protected Node coordinatorNode;
private final ExponentialBackoffManager findCoordBackoff;
- protected final Logger log;
private Consumer<ClientResponse> onCompleteCallback;
protected final SharePartitionKey partitionKey;
@@ -211,13 +210,16 @@ public class PersisterStateManager {
this.onCompleteCallback = response -> {
}; // noop
partitionKey = SharePartitionKey.getInstance(groupId, topicId,
partition);
- String canonicalName = getClass().getCanonicalName();
- if (canonicalName == null) {
- canonicalName = getClass().getName();
- }
- log = LoggerFactory.getLogger(canonicalName);
}
+ /**
+ * Child classes must provide their own static logger instance. This
avoids unnecessary resource usage
+ * caused by creating a large number of logger instances.
+ *
+ * @return child class logger
+ */
+ protected abstract Logger log();
+
/**
* Child class must create appropriate builder object for the handled
RPC
*
@@ -314,10 +316,10 @@ public class PersisterStateManager {
return false;
}
if (cacheHelper.containsTopic(Topic.SHARE_GROUP_STATE_TOPIC_NAME))
{
- log.debug("{} internal topic already exists.",
Topic.SHARE_GROUP_STATE_TOPIC_NAME);
+ log().debug("{} internal topic already exists.",
Topic.SHARE_GROUP_STATE_TOPIC_NAME);
Node node = cacheHelper.getShareCoordinator(partitionKey(),
Topic.SHARE_GROUP_STATE_TOPIC_NAME);
if (node != Node.noNode()) {
- log.debug("Found coordinator node in cache: {}", node);
+ log().debug("Found coordinator node in cache: {}", node);
coordinatorNode = node;
addRequestToNodeMap(node, this);
return false;
@@ -372,20 +374,20 @@ public class PersisterStateManager {
return Optional.empty();
}
- log.debug("Response for RPC {} with key {} is invalid - {}",
name(), this.partitionKey, response);
+ log().debug("Response for RPC {} with key {} is invalid - {}",
name(), this.partitionKey, response);
if (response.authenticationException() != null) {
- log.error("Authentication exception",
response.authenticationException());
+ log().error("Authentication exception",
response.authenticationException());
Errors error =
Errors.forException(response.authenticationException());
return Optional.of(error);
} else if (response.versionMismatch() != null) {
- log.error("Version mismatch exception",
response.versionMismatch());
+ log().error("Version mismatch exception",
response.versionMismatch());
Errors error = Errors.forException(response.versionMismatch());
return Optional.of(error);
} else if (response.wasDisconnected()) { // Retriable
return Optional.of(Errors.NETWORK_EXCEPTION);
} else if (response.wasTimedOut()) { // Retriable
- log.debug("Response for RPC {} with key {} timed out - {}.",
name(), this.partitionKey, response);
+ log().debug("Response for RPC {} with key {} timed out - {}.",
name(), this.partitionKey, response);
return Optional.of(Errors.REQUEST_TIMED_OUT);
} else {
return Optional.of(Errors.UNKNOWN_SERVER_ERROR);
@@ -402,7 +404,7 @@ public class PersisterStateManager {
* @param response - Client response for find coordinator RPC
*/
protected void handleFindCoordinatorResponse(ClientResponse response) {
- log.debug("Find coordinator response received - {}", response);
+ log().debug("Find coordinator response received - {}", response);
// Incrementing the number of find coordinator attempts
findCoordBackoff.incrementAttempt();
@@ -413,7 +415,7 @@ public class PersisterStateManager {
case NONE:
List<FindCoordinatorResponseData.Coordinator> coordinators
= ((FindCoordinatorResponse) response.responseBody()).coordinators();
if (coordinators.size() != 1) {
- log.error("Find coordinator response for {} is
invalid. Number of coordinators = {}", partitionKey(), coordinators.size());
+ log().error("Find coordinator response for {} is
invalid. Number of coordinators = {}", partitionKey(), coordinators.size());
findCoordinatorErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new
IllegalStateException("Invalid response with multiple coordinators."));
return;
}
@@ -426,7 +428,7 @@ public class PersisterStateManager {
}
switch (error) {
case NONE:
- log.trace("Find coordinator response valid.
Enqueuing actual request.");
+ log().trace("Find coordinator response valid.
Enqueuing actual request.");
findCoordBackoff.resetAttempts();
coordinatorNode = new
Node(coordinatorData.nodeId(), coordinatorData.host(), coordinatorData.port());
// now we want the actual share state RPC call to
happen
@@ -441,9 +443,9 @@ public class PersisterStateManager {
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
case UNKNOWN_TOPIC_OR_PARTITION:
- log.debug("Received retriable error in find
coordinator for {} using key {}: {}", name(), partitionKey(), errorMessage);
+ log().debug("Received retriable error in find
coordinator for {} using key {}: {}", name(), partitionKey(), errorMessage);
if (!findCoordBackoff.canAttempt()) {
- log.error("Exhausted max retries to find
coordinator for {} using key {} without success.", name(), partitionKey());
+ log().error("Exhausted max retries to find
coordinator for {} using key {} without success.", name(), partitionKey());
findCoordinatorErrorResponse(error, new
Exception("Exhausted max retries to find coordinator without success."));
break;
}
@@ -452,16 +454,16 @@ public class PersisterStateManager {
break;
default:
- log.error("Unable to find coordinator for {} using
key {}: {}.", name(), partitionKey(), errorMessage);
+ log().error("Unable to find coordinator for {}
using key {}: {}.", name(), partitionKey(), errorMessage);
findCoordinatorErrorResponse(error, new
Exception(errorMessage));
}
return;
case NETWORK_EXCEPTION: // Retriable client response error
codes.
case REQUEST_TIMED_OUT:
- log.debug("Received retriable error in find coordinator
client response for {} using key {} due to {}.", name(), partitionKey(),
clientResponseErrorMessage);
+ log().debug("Received retriable error in find coordinator
client response for {} using key {} due to {}.", name(), partitionKey(),
clientResponseErrorMessage);
if (!findCoordBackoff.canAttempt()) {
- log.error("Exhausted max retries to find coordinator
due to error in client response for {} using key {}.", name(), partitionKey());
+ log().error("Exhausted max retries to find coordinator
due to error in client response for {} using key {}.", name(), partitionKey());
findCoordinatorErrorResponse(clientResponseError, new
Exception("Exhausted max retries to find coordinator without success."));
break;
}
@@ -470,7 +472,7 @@ public class PersisterStateManager {
break;
default:
- log.error("Unable to find coordinator due to error in
client response for {} using key {}: {}", name(), partitionKey(),
clientResponseError.code());
+ log().error("Unable to find coordinator due to error in
client response for {} using key {}: {}", name(), partitionKey(),
clientResponseError.code());
findCoordinatorErrorResponse(clientResponseError, new
Exception(clientResponseErrorMessage));
}
}
@@ -494,6 +496,9 @@ public class PersisterStateManager {
}
public class InitializeStateHandler extends PersisterStateManagerHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(InitializeStateHandler.class);
+
private final int stateEpoch;
private final long startOffset;
private final CompletableFuture<InitializeShareGroupStateResponse>
result;
@@ -544,6 +549,11 @@ public class PersisterStateManager {
);
}
+ @Override
+ protected Logger log() {
+ return LOG;
+ }
+
@Override
protected String name() {
return "InitializeStateHandler";
@@ -561,7 +571,7 @@ public class PersisterStateManager {
@Override
protected void handleRequestResponse(ClientResponse response) {
- log.debug("Initialize state response received - {}", response);
+ log().debug("Initialize state response received - {}", response);
initializeStateBackoff.incrementAttempt();
Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
String clientResponseErrorMessage = clientResponseError.message();
@@ -601,9 +611,9 @@ public class PersisterStateManager {
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
case UNKNOWN_TOPIC_OR_PARTITION:
- log.debug("Received retriable error in
initialize state RPC for key {}: {}", partitionKey(), errorMessage);
+ log().debug("Received retriable error
in initialize state RPC for key {}: {}", partitionKey(), errorMessage);
if
(!initializeStateBackoff.canAttempt()) {
- log.error("Exhausted max retries
for initialize state RPC for key {} without success.", partitionKey());
+ log().error("Exhausted max retries
for initialize state RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new
Exception("Exhausted max retries to complete initialize state RPC without
success."));
return;
}
@@ -612,7 +622,7 @@ public class PersisterStateManager {
return;
default:
- log.error("Unable to perform
initialize state RPC for key {}: {}", partitionKey(), errorMessage);
+ log().error("Unable to perform
initialize state RPC for key {}: {}", partitionKey(), errorMessage);
requestErrorResponse(error, new
Exception(errorMessage));
return;
}
@@ -629,9 +639,9 @@ public class PersisterStateManager {
case NETWORK_EXCEPTION: // Retriable client response error
codes.
case REQUEST_TIMED_OUT:
- log.debug("Received retriable error in initialize state
RPC client response for key {}: {}", partitionKey(),
clientResponseErrorMessage);
+ log().debug("Received retriable error in initialize state
RPC client response for key {}: {}", partitionKey(),
clientResponseErrorMessage);
if (!initializeStateBackoff.canAttempt()) {
- log.error("Exhausted max retries for initialize state
RPC due to error in client response for key {}.", partitionKey());
+ log().error("Exhausted max retries for initialize
state RPC due to error in client response for key {}.", partitionKey());
requestErrorResponse(clientResponseError, new
Exception("Exhausted max retries to complete initialize state RPC without
success."));
return;
}
@@ -640,7 +650,7 @@ public class PersisterStateManager {
return;
default:
- log.error("Unable to perform initialize state RPC due to
error in client response for key {}: {}", partitionKey(),
clientResponseError.code());
+ log().error("Unable to perform initialize state RPC due to
error in client response for key {}: {}", partitionKey(),
clientResponseError.code());
requestErrorResponse(clientResponseError, new
Exception(clientResponseErrorMessage));
}
}
@@ -675,6 +685,7 @@ public class PersisterStateManager {
}
public class WriteStateHandler extends PersisterStateManagerHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(WriteStateHandler.class);
private final int stateEpoch;
private final int leaderEpoch;
private final long startOffset;
@@ -740,6 +751,11 @@ public class PersisterStateManager {
);
}
+ @Override
+ protected Logger log() {
+ return LOG;
+ }
+
@Override
protected String name() {
return "WriteStateHandler";
@@ -757,7 +773,7 @@ public class PersisterStateManager {
@Override
protected void handleRequestResponse(ClientResponse response) {
- log.debug("Write state response received - {}", response);
+ log().debug("Write state response received - {}", response);
writeStateBackoff.incrementAttempt();
Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
String clientResponseErrorMessage = clientResponseError.message();
@@ -796,9 +812,9 @@ public class PersisterStateManager {
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
case UNKNOWN_TOPIC_OR_PARTITION:
- log.debug("Received retriable error in
write state RPC for key {}: {}", partitionKey(), errorMessage);
+ log().debug("Received retriable error
in write state RPC for key {}: {}", partitionKey(), errorMessage);
if (!writeStateBackoff.canAttempt()) {
- log.error("Exhausted max retries
for write state RPC for key {} without success.", partitionKey());
+ log().error("Exhausted max retries
for write state RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new
Exception("Exhausted max retries to complete write state RPC without
success."));
return;
}
@@ -807,7 +823,7 @@ public class PersisterStateManager {
return;
default:
- log.error("Unable to perform write
state RPC for key {}: {}", partitionKey(), errorMessage);
+ log().error("Unable to perform write
state RPC for key {}: {}", partitionKey(), errorMessage);
requestErrorResponse(error, new
Exception(errorMessage));
return;
}
@@ -824,9 +840,9 @@ public class PersisterStateManager {
case NETWORK_EXCEPTION: // Retriable client response error
codes.
case REQUEST_TIMED_OUT:
- log.debug("Received retriable error in write state RPC
client response for key {}: {}", partitionKey(), clientResponseErrorMessage);
+ log().debug("Received retriable error in write state RPC
client response for key {}: {}", partitionKey(), clientResponseErrorMessage);
if (!writeStateBackoff.canAttempt()) {
- log.error("Exhausted max retries for write state RPC
due to error in client response for key {}.", partitionKey());
+ log().error("Exhausted max retries for write state RPC
due to error in client response for key {}.", partitionKey());
requestErrorResponse(clientResponseError, new
Exception("Exhausted max retries to complete write state RPC without
success."));
return;
}
@@ -835,7 +851,7 @@ public class PersisterStateManager {
return;
default:
- log.error("Unable to perform write state RPC due to error
in client response for key {}: {}", partitionKey(), clientResponseError.code());
+ log().error("Unable to perform write state RPC due to
error in client response for key {}: {}", partitionKey(),
clientResponseError.code());
requestErrorResponse(clientResponseError, new
Exception(clientResponseErrorMessage));
}
}
@@ -870,6 +886,7 @@ public class PersisterStateManager {
}
public class ReadStateHandler extends PersisterStateManagerHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadStateHandler.class);
private final int leaderEpoch;
private final CompletableFuture<ReadShareGroupStateResponse> result;
private final ExponentialBackoffManager readStateBackoff;
@@ -917,6 +934,11 @@ public class PersisterStateManager {
);
}
+ @Override
+ protected Logger log() {
+ return LOG;
+ }
+
@Override
protected String name() {
return "ReadStateHandler";
@@ -934,7 +956,7 @@ public class PersisterStateManager {
@Override
protected void handleRequestResponse(ClientResponse response) {
- log.debug("Read state response received - {}", response);
+ log().debug("Read state response received - {}", response);
readStateBackoff.incrementAttempt();
Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
String clientResponseErrorMessage = clientResponseError.message();
@@ -971,9 +993,9 @@ public class PersisterStateManager {
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
case UNKNOWN_TOPIC_OR_PARTITION:
- log.debug("Received retriable error in
read state RPC for key {}: {}", partitionKey(), errorMessage);
+ log().debug("Received retriable error
in read state RPC for key {}: {}", partitionKey(), errorMessage);
if (!readStateBackoff.canAttempt()) {
- log.error("Exhausted max retries
for read state RPC for key {} without success.", partitionKey());
+ log().error("Exhausted max retries
for read state RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new
Exception("Exhausted max retries to complete read state RPC without success."));
return;
}
@@ -982,7 +1004,7 @@ public class PersisterStateManager {
return;
default:
- log.error("Unable to perform read
state RPC for key {}: {}", partitionKey(), errorMessage);
+ log().error("Unable to perform read
state RPC for key {}: {}", partitionKey(), errorMessage);
requestErrorResponse(error, new
Exception(errorMessage));
return;
}
@@ -999,9 +1021,9 @@ public class PersisterStateManager {
case NETWORK_EXCEPTION: // Retriable client response error
codes.
case REQUEST_TIMED_OUT:
- log.debug("Received retriable error in read state RPC
client response for key {}: {}", partitionKey(), clientResponseErrorMessage);
+ log().debug("Received retriable error in read state RPC
client response for key {}: {}", partitionKey(), clientResponseErrorMessage);
if (!readStateBackoff.canAttempt()) {
- log.error("Exhausted max retries for read state RPC
due to error in client response for key {}.", partitionKey());
+ log().error("Exhausted max retries for read state RPC
due to error in client response for key {}.", partitionKey());
requestErrorResponse(clientResponseError, new
Exception("Exhausted max retries to complete read state RPC without success."));
return;
}
@@ -1010,7 +1032,7 @@ public class PersisterStateManager {
return;
default:
- log.error("Unable to perform read state RPC due to error
in client response for key {}: {}", partitionKey(), clientResponseError.code());
+ log().error("Unable to perform read state RPC due to error
in client response for key {}: {}", partitionKey(), clientResponseError.code());
requestErrorResponse(clientResponseError, new
Exception(clientResponseErrorMessage));
}
}
@@ -1045,6 +1067,7 @@ public class PersisterStateManager {
}
public class ReadStateSummaryHandler extends PersisterStateManagerHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadStateSummaryHandler.class);
private final int leaderEpoch;
private final CompletableFuture<ReadShareGroupStateSummaryResponse>
result;
private final ExponentialBackoffManager readStateSummaryBackoff;
@@ -1092,6 +1115,11 @@ public class PersisterStateManager {
);
}
+ @Override
+ protected Logger log() {
+ return LOG;
+ }
+
@Override
protected String name() {
return "ReadStateSummaryHandler";
@@ -1109,7 +1137,7 @@ public class PersisterStateManager {
@Override
protected void handleRequestResponse(ClientResponse response) {
- log.debug("Read state summary response received - {}", response);
+ log().debug("Read state summary response received - {}", response);
readStateSummaryBackoff.incrementAttempt();
Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
String clientResponseErrorMessage = clientResponseError.message();
@@ -1146,9 +1174,9 @@ public class PersisterStateManager {
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
case UNKNOWN_TOPIC_OR_PARTITION:
- log.debug("Received retriable error in
read state summary RPC for key {}: {}", partitionKey(), errorMessage);
+ log().debug("Received retriable error
in read state summary RPC for key {}: {}", partitionKey(), errorMessage);
if
(!readStateSummaryBackoff.canAttempt()) {
- log.error("Exhausted max retries
for read state summary RPC for key {} without success.", partitionKey());
+ log().error("Exhausted max retries
for read state summary RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new
Exception("Exhausted max retries to complete read state summary RPC without
success."));
return;
}
@@ -1157,7 +1185,7 @@ public class PersisterStateManager {
return;
default:
- log.error("Unable to perform read
state summary RPC for key {}: {}", partitionKey(), errorMessage);
+ log().error("Unable to perform read
state summary RPC for key {}: {}", partitionKey(), errorMessage);
requestErrorResponse(error, new
Exception(errorMessage));
return;
}
@@ -1174,9 +1202,9 @@ public class PersisterStateManager {
case NETWORK_EXCEPTION: // Retriable client response error
codes.
case REQUEST_TIMED_OUT:
- log.debug("Received retriable error in read state summary
RPC client response for key {}: {}", partitionKey(),
clientResponseErrorMessage);
+ log().debug("Received retriable error in read state
summary RPC client response for key {}: {}", partitionKey(),
clientResponseErrorMessage);
if (!readStateSummaryBackoff.canAttempt()) {
- log.error("Exhausted max retries for read state
summary RPC due to error in client response for key {}.", partitionKey());
+ log().error("Exhausted max retries for read state
summary RPC due to error in client response for key {}.", partitionKey());
requestErrorResponse(clientResponseError, new
Exception("Exhausted max retries to complete read state summary RPC without
success."));
return;
}
@@ -1185,7 +1213,7 @@ public class PersisterStateManager {
return;
default:
- log.error("Unable to perform read state summary RPC due to
error in client response for key {}: {}", partitionKey(),
clientResponseError.code());
+ log().error("Unable to perform read state summary RPC due
to error in client response for key {}: {}", partitionKey(),
clientResponseError.code());
requestErrorResponse(clientResponseError, new
Exception(clientResponseErrorMessage));
}
}
@@ -1220,6 +1248,7 @@ public class PersisterStateManager {
}
public class DeleteStateHandler extends PersisterStateManagerHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(DeleteStateHandler.class);
private final CompletableFuture<DeleteShareGroupStateResponse> result;
private final ExponentialBackoffManager deleteStateBackoff;
@@ -1260,6 +1289,11 @@ public class PersisterStateManager {
);
}
+ @Override
+ protected Logger log() {
+ return LOG;
+ }
+
@Override
protected String name() {
return "DeleteStateHandler";
@@ -1277,7 +1311,7 @@ public class PersisterStateManager {
@Override
protected void handleRequestResponse(ClientResponse response) {
- log.debug("Delete state response received - {}", response);
+ log().debug("Delete state response received - {}", response);
deleteStateBackoff.incrementAttempt();
Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
String clientResponseErrorMessage = clientResponseError.message();
@@ -1318,9 +1352,9 @@ public class PersisterStateManager {
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
case UNKNOWN_TOPIC_OR_PARTITION:
- log.debug("Received retriable error in
delete state RPC for key {}: {}", partitionKey(), errorMessage);
+ log().debug("Received retriable error
in delete state RPC for key {}: {}", partitionKey(), errorMessage);
if (!deleteStateBackoff.canAttempt()) {
- log.error("Exhausted max retries
for delete state RPC for key {} without success.", partitionKey());
+ log().error("Exhausted max retries
for delete state RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new
Exception("Exhausted max retries to complete delete state RPC without
success."));
return;
}
@@ -1329,7 +1363,7 @@ public class PersisterStateManager {
return;
default:
- log.error("Unable to perform delete
state RPC for key {}: {}", partitionKey(), errorMessage);
+ log().error("Unable to perform delete
state RPC for key {}: {}", partitionKey(), errorMessage);
requestErrorResponse(error, new
Exception(errorMessage));
return;
}
@@ -1346,9 +1380,9 @@ public class PersisterStateManager {
case NETWORK_EXCEPTION: // Retriable client response error
codes.
case REQUEST_TIMED_OUT:
- log.debug("Received retriable error in delete state RPC
client response for key {}: {}", partitionKey(), clientResponseErrorMessage);
+ log().debug("Received retriable error in delete state RPC
client response for key {}: {}", partitionKey(), clientResponseErrorMessage);
if (!deleteStateBackoff.canAttempt()) {
- log.error("Exhausted max retries for delete state RPC
due to error in client response for key {}.", partitionKey());
+ log().error("Exhausted max retries for delete state
RPC due to error in client response for key {}.", partitionKey());
requestErrorResponse(clientResponseError, new
Exception("Exhausted max retries to complete delete state RPC without
success."));
return;
}
@@ -1357,7 +1391,7 @@ public class PersisterStateManager {
return;
default:
- log.error("Unable to perform delete state RPC due to error
in client response for key {}: {}", partitionKey(), clientResponseError.code());
+ log().error("Unable to perform delete state RPC due to
error in client response for key {}: {}", partitionKey(),
clientResponseError.code());
requestErrorResponse(clientResponseError, new
Exception(clientResponseErrorMessage));
}
}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
index bd4f87ae19d..ffc89dcfdcf 100644
---
a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java
@@ -60,6 +60,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
@@ -130,6 +132,7 @@ class PersisterStateManagerTest {
}
private abstract class TestStateHandler extends
PersisterStateManager.PersisterStateManagerHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestStateHandler.class);
private final CompletableFuture<TestHandlerResponse> result;
private class TestHandlerResponseData extends
WriteShareGroupStateResponseData {
@@ -154,6 +157,11 @@ class PersisterStateManagerTest {
this.result = result;
}
+ @Override
+ protected Logger log() {
+ return LOG;
+ }
+
@Override
protected void handleRequestResponse(ClientResponse response) {
this.result.complete(new TestHandlerResponse(new
TestHandlerResponseData()