This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d5e2ecae95b MINOR: Reduce logging in persister. (#19998)
d5e2ecae95b is described below

commit d5e2ecae95b8fd039ef40da8666c5b27b385ee6a
Author: Sushant Mahajan <[email protected]>
AuthorDate: Fri Jun 20 18:23:46 2025 +0530

    MINOR: Reduce logging in persister. (#19998)
    
    * Few logs in `PersisterStateManager` were noisy and not adding much
    value.
    * For the sake of reducing pollution, they have been moved to debug
    level.
    * Additional debug log in `DefaultStatePersister` to track epochs.
    
    Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    | 23 ++++++++++++++++------
 .../share/persister/DefaultStatePersister.java     |  3 +++
 .../share/persister/PersisterStateManager.java     | 23 +++++++++++-----------
 3 files changed, 32 insertions(+), 17 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 91a11d488f4..e9b4be83b6e 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -28,6 +28,7 @@ import 
org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.InvalidRecordStateException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -501,8 +502,8 @@ public class SharePartition {
 
                 if (partitionData.errorCode() != Errors.NONE.code()) {
                     KafkaException ex = 
fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
-                    log.error("Failed to initialize the share partition: 
{}-{}. Exception occurred: {}.",
-                        groupId, topicIdPartition, partitionData);
+                    logError(String.format("Failed to initialize the share 
partition: %s-%s. Exception occurred: %s.",
+                        groupId, topicIdPartition, partitionData), ex);
                     throwable = ex;
                     return;
                 }
@@ -2076,7 +2077,7 @@ public class SharePartition {
             lock.writeLock().lock();
             try {
                 if (exception != null) {
-                    log.error("Failed to write state to persister for the 
share partition: {}-{}",
+                    log.debug("Failed to write state to persister for the 
share partition: {}-{}",
                         groupId, topicIdPartition, exception);
                     updatedStates.forEach(state -> 
state.completeStateTransition(false));
                     future.completeExceptionally(exception);
@@ -2324,8 +2325,8 @@ public class SharePartition {
                 PartitionErrorData partitionData = state.partitions().get(0);
                 if (partitionData.errorCode() != Errors.NONE.code()) {
                     KafkaException ex = 
fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
-                    log.error("Failed to write the share group state for share 
partition: {}-{} due to exception",
-                        groupId, topicIdPartition, ex);
+                    logError(String.format("Failed to write the share group 
state for share partition: %s-%s due to exception",
+                        groupId, topicIdPartition), ex);
                     future.completeExceptionally(ex);
                     return;
                 }
@@ -2347,6 +2348,8 @@ public class SharePartition {
                 new FencedStateEpochException(errorMessage);
             case FENCED_LEADER_EPOCH ->
                 new NotLeaderOrFollowerException(errorMessage);
+            case NETWORK_EXCEPTION ->
+                new NetworkException(errorMessage);
             default ->
                 new UnknownServerException(errorMessage);
         };
@@ -2423,7 +2426,7 @@ public class SharePartition {
             if (!stateBatches.isEmpty()) {
                 writeShareGroupState(stateBatches).whenComplete((result, 
exception) -> {
                     if (exception != null) {
-                        log.error("Failed to write the share group state on 
acquisition lock timeout for share partition: {}-{} memberId: {}",
+                        log.debug("Failed to write the share group state on 
acquisition lock timeout for share partition: {}-{} memberId: {}",
                             groupId, topicIdPartition, memberId, exception);
                     }
                     // Even if write share group state RPC call fails, we will 
still go ahead with the state transition.
@@ -2575,6 +2578,14 @@ public class SharePartition {
         return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
     }
 
+    private void logError(String message, Throwable e) {
+        if (e instanceof NetworkException) {
+            log.debug(message, e);
+        } else {
+            log.error(message, e);
+        }
+    }
+
     /**
      * This function filters out the offsets present in the acquired records 
list that are also a part of batches that need to be archived.
      * It follows an iterative refinement of acquired records to eliminate 
batches to be archived.
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
index ae8b8c317c3..fe66efa7f7f 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
@@ -139,6 +139,9 @@ public class DefaultStatePersister implements Persister {
                     .computeIfAbsent(topicData.topicId(), k -> new HashMap<>())
                     .computeIfAbsent(partitionData.partition(), k -> new 
CompletableFuture<>());
 
+                log.debug("{}-{}-{}: stateEpoch - {}, leaderEpoch - {}.",
+                    groupId, topicData.topicId(), partitionData.partition(), 
partitionData.stateEpoch(), partitionData.leaderEpoch());
+
                 handlers.add(
                     stateManager.new WriteStateHandler(
                         groupId,
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 9e451b4b1cd..eedf66c962e 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -374,7 +374,7 @@ public class PersisterStateManager {
 
             // We don't know if FIND_COORD or actual REQUEST. Let's err on 
side of request.
             if (response == null) {
-                requestErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new 
NetworkException("Did not receive any response"));
+                requestErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new 
NetworkException("Did not receive any response (response = null)"));
                 sender.wakeup();
                 return;
             }
@@ -399,16 +399,17 @@ public class PersisterStateManager {
                 return Optional.empty();
             }
 
-            log.error("Response for RPC {} with key {} is invalid - {}.", 
name(), this.partitionKey, response);
+            log.debug("Response for RPC {} with key {} is invalid - {}.", 
name(), this.partitionKey, response);
 
             if (response.wasDisconnected()) {
-                errorConsumer.accept(Errors.NETWORK_EXCEPTION, null);
+                errorConsumer.accept(Errors.NETWORK_EXCEPTION, new 
NetworkException("Server response indicates disconnect."));
                 return Optional.of(Errors.NETWORK_EXCEPTION);
             } else if (response.wasTimedOut()) {
-                errorConsumer.accept(Errors.REQUEST_TIMED_OUT, null);
+                log.error("Response for RPC {} with key {} timed out - {}.", 
name(), this.partitionKey, response);
+                errorConsumer.accept(Errors.REQUEST_TIMED_OUT, new 
NetworkException("Server response indicates timeout."));
                 return Optional.of(Errors.REQUEST_TIMED_OUT);
             } else {
-                errorConsumer.accept(Errors.UNKNOWN_SERVER_ERROR, new 
NetworkException("Did not receive any response"));
+                errorConsumer.accept(Errors.UNKNOWN_SERVER_ERROR, new 
NetworkException("Server did not provide any response."));
                 return Optional.of(Errors.UNKNOWN_SERVER_ERROR);
             }
         }
@@ -453,7 +454,7 @@ public class PersisterStateManager {
                 case COORDINATOR_NOT_AVAILABLE: // retriable error codes
                 case COORDINATOR_LOAD_IN_PROGRESS:
                 case NOT_COORDINATOR:
-                    log.warn("Received retriable error in find coordinator for 
{} using key {}: {}", name(), partitionKey(), error.message());
+                    log.debug("Received retriable error in find coordinator 
for {} using key {}: {}", name(), partitionKey(), error.message());
                     if (!findCoordBackoff.canAttempt()) {
                         log.error("Exhausted max retries to find coordinator 
for {} using key {} without success.", name(), partitionKey());
                         findCoordinatorErrorResponse(error, new 
Exception("Exhausted max retries to find coordinator without success."));
@@ -580,7 +581,7 @@ public class PersisterStateManager {
                             case COORDINATOR_NOT_AVAILABLE:
                             case COORDINATOR_LOAD_IN_PROGRESS:
                             case NOT_COORDINATOR:
-                                log.warn("Received retriable error in 
initialize state RPC for key {}: {}", partitionKey(), error.message());
+                                log.debug("Received retriable error in 
initialize state RPC for key {}: {}", partitionKey(), error.message());
                                 if (!initializeStateBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for 
initialize state RPC for key {} without success.", partitionKey());
                                     requestErrorResponse(error, new 
Exception("Exhausted max retries to complete initialize state RPC without 
success."));
@@ -738,7 +739,7 @@ public class PersisterStateManager {
                             case COORDINATOR_NOT_AVAILABLE:
                             case COORDINATOR_LOAD_IN_PROGRESS:
                             case NOT_COORDINATOR:
-                                log.warn("Received retriable error in write 
state RPC for key {}: {}", partitionKey(), error.message());
+                                log.debug("Received retriable error in write 
state RPC for key {}: {}", partitionKey(), error.message());
                                 if (!writeStateBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for write 
state RPC for key {} without success.", partitionKey());
                                     requestErrorResponse(error, new 
Exception("Exhausted max retries to complete write state RPC without 
success."));
@@ -880,7 +881,7 @@ public class PersisterStateManager {
                             case COORDINATOR_NOT_AVAILABLE:
                             case COORDINATOR_LOAD_IN_PROGRESS:
                             case NOT_COORDINATOR:
-                                log.warn("Received retriable error in read 
state RPC for key {}: {}", partitionKey(), error.message());
+                                log.debug("Received retriable error in read 
state RPC for key {}: {}", partitionKey(), error.message());
                                 if (!readStateBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for read 
state RPC for key {} without success.", partitionKey());
                                     requestErrorResponse(error, new 
Exception("Exhausted max retries to complete read state RPC without success."));
@@ -1022,7 +1023,7 @@ public class PersisterStateManager {
                             case COORDINATOR_NOT_AVAILABLE:
                             case COORDINATOR_LOAD_IN_PROGRESS:
                             case NOT_COORDINATOR:
-                                log.warn("Received retriable error in read 
state summary RPC for key {}: {}", partitionKey(), error.message());
+                                log.debug("Received retriable error in read 
state summary RPC for key {}: {}", partitionKey(), error.message());
                                 if (!readStateSummaryBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for read 
state summary RPC for key {} without success.", partitionKey());
                                     requestErrorResponse(error, new 
Exception("Exhausted max retries to complete read state summary RPC without 
success."));
@@ -1161,7 +1162,7 @@ public class PersisterStateManager {
                             case COORDINATOR_NOT_AVAILABLE:
                             case COORDINATOR_LOAD_IN_PROGRESS:
                             case NOT_COORDINATOR:
-                                log.warn("Received retriable error in delete 
state RPC for key {}: {}", partitionKey(), error.message());
+                                log.debug("Received retriable error in delete 
state RPC for key {}: {}", partitionKey(), error.message());
                                 if (!deleteStateBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for 
delete state RPC for key {} without success.", partitionKey());
                                     requestErrorResponse(error, new 
Exception("Exhausted max retries to complete delete state RPC without 
success."));

Reply via email to