This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d5e2ecae95b MINOR: Reduce logging in persister. (#19998)
d5e2ecae95b is described below
commit d5e2ecae95b8fd039ef40da8666c5b27b385ee6a
Author: Sushant Mahajan <[email protected]>
AuthorDate: Fri Jun 20 18:23:46 2025 +0530
MINOR: Reduce logging in persister. (#19998)
* Few logs in `PersisterStateManager` were noisy and not adding much
value.
* For the sake of reducing pollution, they have been moved to debug
level.
* Additional debug log in `DefaultStatePersister` to track epochs.
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
<[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 23 ++++++++++++++++------
.../share/persister/DefaultStatePersister.java | 3 +++
.../share/persister/PersisterStateManager.java | 23 +++++++++++-----------
3 files changed, 32 insertions(+), 17 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 91a11d488f4..e9b4be83b6e 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -28,6 +28,7 @@ import
org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -501,8 +502,8 @@ public class SharePartition {
if (partitionData.errorCode() != Errors.NONE.code()) {
KafkaException ex =
fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
- log.error("Failed to initialize the share partition:
{}-{}. Exception occurred: {}.",
- groupId, topicIdPartition, partitionData);
+ logError(String.format("Failed to initialize the share
partition: %s-%s. Exception occurred: %s.",
+ groupId, topicIdPartition, partitionData), ex);
throwable = ex;
return;
}
@@ -2076,7 +2077,7 @@ public class SharePartition {
lock.writeLock().lock();
try {
if (exception != null) {
- log.error("Failed to write state to persister for the
share partition: {}-{}",
+ log.debug("Failed to write state to persister for the
share partition: {}-{}",
groupId, topicIdPartition, exception);
updatedStates.forEach(state ->
state.completeStateTransition(false));
future.completeExceptionally(exception);
@@ -2324,8 +2325,8 @@ public class SharePartition {
PartitionErrorData partitionData = state.partitions().get(0);
if (partitionData.errorCode() != Errors.NONE.code()) {
KafkaException ex =
fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
- log.error("Failed to write the share group state for share
partition: {}-{} due to exception",
- groupId, topicIdPartition, ex);
+ logError(String.format("Failed to write the share group
state for share partition: %s-%s due to exception",
+ groupId, topicIdPartition), ex);
future.completeExceptionally(ex);
return;
}
@@ -2347,6 +2348,8 @@ public class SharePartition {
new FencedStateEpochException(errorMessage);
case FENCED_LEADER_EPOCH ->
new NotLeaderOrFollowerException(errorMessage);
+ case NETWORK_EXCEPTION ->
+ new NetworkException(errorMessage);
default ->
new UnknownServerException(errorMessage);
};
@@ -2423,7 +2426,7 @@ public class SharePartition {
if (!stateBatches.isEmpty()) {
writeShareGroupState(stateBatches).whenComplete((result,
exception) -> {
if (exception != null) {
- log.error("Failed to write the share group state on
acquisition lock timeout for share partition: {}-{} memberId: {}",
+ log.debug("Failed to write the share group state on
acquisition lock timeout for share partition: {}-{} memberId: {}",
groupId, topicIdPartition, memberId, exception);
}
// Even if write share group state RPC call fails, we will
still go ahead with the state transition.
@@ -2575,6 +2578,14 @@ public class SharePartition {
return filterRecordBatchesFromAcquiredRecords(acquiredRecords,
recordsToArchive);
}
+ private void logError(String message, Throwable e) {
+ if (e instanceof NetworkException) {
+ log.debug(message, e);
+ } else {
+ log.error(message, e);
+ }
+ }
+
/**
* This function filters out the offsets present in the acquired records
list that are also a part of batches that need to be archived.
* It follows an iterative refinement of acquired records to eliminate
batches to be archived.
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
index ae8b8c317c3..fe66efa7f7f 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
@@ -139,6 +139,9 @@ public class DefaultStatePersister implements Persister {
.computeIfAbsent(topicData.topicId(), k -> new HashMap<>())
.computeIfAbsent(partitionData.partition(), k -> new
CompletableFuture<>());
+ log.debug("{}-{}-{}: stateEpoch - {}, leaderEpoch - {}.",
+ groupId, topicData.topicId(), partitionData.partition(),
partitionData.stateEpoch(), partitionData.leaderEpoch());
+
handlers.add(
stateManager.new WriteStateHandler(
groupId,
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 9e451b4b1cd..eedf66c962e 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -374,7 +374,7 @@ public class PersisterStateManager {
// We don't know if FIND_COORD or actual REQUEST. Let's err on
side of request.
if (response == null) {
- requestErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new
NetworkException("Did not receive any response"));
+ requestErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new
NetworkException("Did not receive any response (response = null)"));
sender.wakeup();
return;
}
@@ -399,16 +399,17 @@ public class PersisterStateManager {
return Optional.empty();
}
- log.error("Response for RPC {} with key {} is invalid - {}.",
name(), this.partitionKey, response);
+ log.debug("Response for RPC {} with key {} is invalid - {}.",
name(), this.partitionKey, response);
if (response.wasDisconnected()) {
- errorConsumer.accept(Errors.NETWORK_EXCEPTION, null);
+ errorConsumer.accept(Errors.NETWORK_EXCEPTION, new
NetworkException("Server response indicates disconnect."));
return Optional.of(Errors.NETWORK_EXCEPTION);
} else if (response.wasTimedOut()) {
- errorConsumer.accept(Errors.REQUEST_TIMED_OUT, null);
+ log.error("Response for RPC {} with key {} timed out - {}.",
name(), this.partitionKey, response);
+ errorConsumer.accept(Errors.REQUEST_TIMED_OUT, new
NetworkException("Server response indicates timeout."));
return Optional.of(Errors.REQUEST_TIMED_OUT);
} else {
- errorConsumer.accept(Errors.UNKNOWN_SERVER_ERROR, new
NetworkException("Did not receive any response"));
+ errorConsumer.accept(Errors.UNKNOWN_SERVER_ERROR, new
NetworkException("Server did not provide any response."));
return Optional.of(Errors.UNKNOWN_SERVER_ERROR);
}
}
@@ -453,7 +454,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE: // retriable error codes
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
- log.warn("Received retriable error in find coordinator for
{} using key {}: {}", name(), partitionKey(), error.message());
+ log.debug("Received retriable error in find coordinator
for {} using key {}: {}", name(), partitionKey(), error.message());
if (!findCoordBackoff.canAttempt()) {
log.error("Exhausted max retries to find coordinator
for {} using key {} without success.", name(), partitionKey());
findCoordinatorErrorResponse(error, new
Exception("Exhausted max retries to find coordinator without success."));
@@ -580,7 +581,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
- log.warn("Received retriable error in
initialize state RPC for key {}: {}", partitionKey(), error.message());
+ log.debug("Received retriable error in
initialize state RPC for key {}: {}", partitionKey(), error.message());
if (!initializeStateBackoff.canAttempt()) {
log.error("Exhausted max retries for
initialize state RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new
Exception("Exhausted max retries to complete initialize state RPC without
success."));
@@ -738,7 +739,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
- log.warn("Received retriable error in write
state RPC for key {}: {}", partitionKey(), error.message());
+ log.debug("Received retriable error in write
state RPC for key {}: {}", partitionKey(), error.message());
if (!writeStateBackoff.canAttempt()) {
log.error("Exhausted max retries for write
state RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new
Exception("Exhausted max retries to complete write state RPC without
success."));
@@ -880,7 +881,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
- log.warn("Received retriable error in read
state RPC for key {}: {}", partitionKey(), error.message());
+ log.debug("Received retriable error in read
state RPC for key {}: {}", partitionKey(), error.message());
if (!readStateBackoff.canAttempt()) {
log.error("Exhausted max retries for read
state RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new
Exception("Exhausted max retries to complete read state RPC without success."));
@@ -1022,7 +1023,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
- log.warn("Received retriable error in read
state summary RPC for key {}: {}", partitionKey(), error.message());
+ log.debug("Received retriable error in read
state summary RPC for key {}: {}", partitionKey(), error.message());
if (!readStateSummaryBackoff.canAttempt()) {
log.error("Exhausted max retries for read
state summary RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new
Exception("Exhausted max retries to complete read state summary RPC without
success."));
@@ -1161,7 +1162,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
- log.warn("Received retriable error in delete
state RPC for key {}: {}", partitionKey(), error.message());
+ log.debug("Received retriable error in delete
state RPC for key {}: {}", partitionKey(), error.message());
if (!deleteStateBackoff.canAttempt()) {
log.error("Exhausted max retries for
delete state RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new
Exception("Exhausted max retries to complete delete state RPC without
success."));