This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 804bdf371 [hotfix] Add bucket info for bucket related logs (#1750)
804bdf371 is described below

commit 804bdf371ec09c1df3f308dd8c4f74ddeafd0737
Author: Liebing <[email protected]>
AuthorDate: Wed Sep 24 16:36:28 2025 +0800

    [hotfix] Add bucket info for bucket related logs (#1750)
---
 .../java/org/apache/fluss/server/log/LocalLog.java | 38 +++++------
 .../org/apache/fluss/server/log/LogLoader.java     |  3 +-
 .../org/apache/fluss/server/log/LogTablet.java     | 42 ++++++++-----
 .../fluss/server/log/remote/LogTieringTask.java    | 17 +++--
 .../fluss/server/log/remote/RemoteLogManager.java  |  5 +-
 .../org/apache/fluss/server/replica/Replica.java   | 73 +++++++++++++++-------
 .../fluss/server/replica/ReplicaManager.java       |  4 +-
 7 files changed, 119 insertions(+), 63 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
index 061c83ebb..130b3855d 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
@@ -362,10 +362,11 @@ public final class LocalLog {
             throws IOException {
         if (LOG.isTraceEnabled()) {
             LOG.trace(
-                    "Reading maximum {} bytes at offset {} from log with total 
length {} bytes",
+                    "Reading maximum {} bytes at offset {} from log with total 
length {} bytes for bucket {}",
                     maxLength,
                     readOffset,
-                    segments.sizeInBytes());
+                    segments.sizeInBytes(),
+                    tableBucket);
         }
 
         long startOffset = localLogStartOffset;
@@ -471,21 +472,22 @@ public final class LocalLog {
                 // true for an active segment of size zero because one of the 
indexes is
                 // "full" (due to _maxEntries == 0).
                 LOG.warn(
-                        "Trying to roll a new log segment with start offset "
-                                + newOffset
-                                + " =max(provided offset = "
-                                + expectedNextOffset
-                                + ", LEO = "
-                                + getLocalLogEndOffset()
-                                + ") while it already exists and is active 
with size 0."
-                                + ", size of offset index: "
-                                + activeSegment.offsetIndex().entries()
-                                + ".");
+                        "Trying to roll a new log segment for bucket {} with 
start offset {} "
+                                + "=max(provided offset = {}, LEO = {}) while 
it already exists "
+                                + "and is active with size 0, size of offset 
index: {}.",
+                        tableBucket,
+                        newOffset,
+                        expectedNextOffset,
+                        getLocalLogEndOffset(),
+                        activeSegment.offsetIndex().entries());
                 LogSegment newSegment =
                         createAndDeleteSegment(
                                 newOffset, activeSegment, 
SegmentDeletionReason.LOG_ROLL);
                 updateLogEndOffset(getLocalLogEndOffset());
-                LOG.info("Rolled new log segment at offset " + newOffset);
+                LOG.info(
+                        "Rolled new log segment for bucket {} at offset {}",
+                        tableBucket,
+                        newOffset);
                 return newSegment;
             } else {
                 throw new FlussRuntimeException(
@@ -520,9 +522,9 @@ public final class LocalLog {
             for (File file : Arrays.asList(logFile, offsetIdxFile, 
timeIndexFile)) {
                 if (file.exists()) {
                     LOG.warn(
-                            "Newly rolled segment file "
-                                    + file.getAbsolutePath()
-                                    + " already exists; deleting it first");
+                            "Newly rolled segment file {} for bucket {} 
already exists; deleting it first",
+                            tableBucket,
+                            file.getAbsolutePath());
                     Files.delete(file.toPath());
                 }
             }
@@ -536,7 +538,7 @@ public final class LocalLog {
         // metadata when log rolls.
         // The next offset should not change.
         updateLogEndOffset(getLocalLogEndOffset());
-        LOG.info("Rolled new log segment at offset " + newOffset);
+        LOG.info("Rolled new log segment for bucket {} at offset {}", 
tableBucket, newOffset);
         return newSegment;
     }
 
@@ -547,7 +549,7 @@ public final class LocalLog {
      * @return the list of segments that were scheduled for deletion
      */
     List<LogSegment> truncateFullyAndStartAt(long newOffset) throws 
IOException {
-        LOG.debug("Truncate and start at offset " + newOffset);
+        LOG.debug("Truncate and start at offset {} for bucket {}", newOffset, 
tableBucket);
 
         checkIfMemoryMappedBufferClosed();
         List<LogSegment> segmentsToDelete = segments.values();
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
index fac372305..62d38581c 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
@@ -149,7 +149,8 @@ final class LogLoader {
                         File logFile = FlussPaths.logFile(logTabletDir, 
offset);
                         if (!logFile.exists()) {
                             LOG.warn(
-                                    "Found an orphaned index file {}, with no 
corresponding log file.",
+                                    "Found an orphaned index file {} for 
bucket {}, with no corresponding log file.",
+                                    logSegments.getTableBucket(),
                                     file.getAbsolutePath());
                             Files.deleteIfExists(file.toPath());
                         }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index b4c8f1c9a..bf75410e4 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -430,14 +430,18 @@ public final class LogTablet {
         synchronized (lock) {
             if (newHighWatermark.getMessageOffset() < 
highWatermarkMetadata.getMessageOffset()) {
                 LOG.warn(
-                        "Non-monotonic update of high watermark from {} to {}",
+                        "Non-monotonic update of high watermark from {} to {} 
for bucket {}",
                         highWatermarkMetadata,
-                        newHighWatermark);
+                        newHighWatermark,
+                        localLog.getTableBucket());
             }
             highWatermarkMetadata = newHighWatermark;
             // TODO log offset listener to update log offset.
         }
-        LOG.trace("Setting high watermark {}", newHighWatermark);
+        LOG.trace(
+                "Setting high watermark {} for bucket {}",
+                newHighWatermark,
+                localLog.getTableBucket());
     }
 
     /**
@@ -567,8 +571,9 @@ public final class LogTablet {
         long localLogStartOffset = localLog.getLocalLogStartOffset();
         if (cleanUpToOffset < localLogStartOffset) {
             LOG.debug(
-                    "Ignore the delete segments action while the input 
cleanUpToOffset {} "
+                    "Ignore the delete segments action for bucket {} while the 
input cleanUpToOffset {} "
                             + "is smaller than the current localLogStartOffset 
{}",
+                    getTableBucket(),
                     cleanUpToOffset,
                     localLogStartOffset);
             return;
@@ -576,8 +581,9 @@ public final class LogTablet {
 
         if (cleanUpToOffset > getHighWatermark()) {
             LOG.warn(
-                    "Ignore the delete segments action while the input 
cleanUpToOffset {} "
+                    "Ignore the delete segments action for bucket {} while the 
input cleanUpToOffset {} "
                             + "is larger than the current highWatermark {}",
+                    getTableBucket(),
                     cleanUpToOffset,
                     getHighWatermark());
             return;
@@ -716,11 +722,13 @@ public final class LogTablet {
                 // todo update the first unstable offset (which is used to 
compute lso)
 
                 LOG.trace(
-                        "Appended message set with last offset: {}, first 
offset {}, next offset: {} and messages {}",
+                        "Appended message set with last offset: {}, first 
offset {}, next offset: {} "
+                                + "and messages {} for bucket {}",
                         appendInfo.lastOffset(),
                         appendInfo.firstOffset(),
                         localLog.getLocalLogEndOffset(),
-                        validRecords);
+                        validRecords,
+                        getTableBucket());
 
                 if (localLog.unflushedMessages() >= logFlushIntervalMessages) {
                     flush(false);
@@ -787,11 +795,12 @@ public final class LogTablet {
         if (flushOffset > localLog.getRecoveryPoint()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug(
-                        "Flushing log up to offset {} ({}) with recovery point 
{}, unflushed: {}",
+                        "Flushing log up to offset {} ({}) with recovery point 
{}, unflushed: {}, for bucket {}",
                         offset,
                         includingOffsetStr,
                         flushOffset,
-                        localLog.unflushedMessages());
+                        localLog.unflushedMessages(),
+                        getTableBucket());
             }
 
             localLog.flush(flushOffset);
@@ -810,7 +819,9 @@ public final class LogTablet {
                     new RollParams(maxSegmentFileSize, 
appendInfo.lastOffset(), messageSize))) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(
-                            "Rolling new log segment (log_size = {}/{}), 
offset_index_size = {}/{}, time_index_size = {}/{}",
+                            "Rolling new log segment for bucket {} (log_size = 
{}/{}), offset_index_size = {}/{}, "
+                                    + "time_index_size = {}/{}",
+                            getTableBucket(),
                             segment.getSizeInBytes(),
                             maxSegmentFileSize,
                             segment.offsetIndex().entries(),
@@ -863,12 +874,13 @@ public final class LogTablet {
 
         if (targetOffset >= localLog.getLocalLogEndOffset()) {
             LOG.info(
-                    "Truncate to {} has no effect as the largest offset in the 
log is {}.",
+                    "Truncate to {} for bucket {} has no effect as the largest 
offset in the log is {}.",
                     targetOffset,
+                    getTableBucket(),
                     localLog.getLocalLogEndOffset() - 1);
             return false;
         } else {
-            LOG.info("Truncating to offset {}", targetOffset);
+            LOG.info("Truncating to offset {} for bucket {}", targetOffset, 
getTableBucket());
             synchronized (lock) {
                 try {
                     localLog.checkIfMemoryMappedBufferClosed();
@@ -902,7 +914,7 @@ public final class LogTablet {
 
     /** Delete all data in the log and start at the new offset. */
     void truncateFullyAndStartAt(long newOffset) throws LogStorageException {
-        LOG.debug("Truncate and start at offset {}", newOffset);
+        LOG.debug("Truncate and start at offset {} for bucket {}", newOffset, 
getTableBucket());
         synchronized (lock) {
             try {
                 localLog.truncateFullyAndStartAt(newOffset);
@@ -950,14 +962,14 @@ public final class LogTablet {
     }
 
     public void close() {
-        LOG.debug("close log tablet");
+        LOG.debug("close log tablet for bucket {}", getTableBucket());
         synchronized (lock) {
             localLog.checkIfMemoryMappedBufferClosed();
             writerExpireCheck.cancel(true);
             try {
                 writerStateManager.takeSnapshot();
             } catch (IOException e) {
-                LOG.error("Error while taking writer snapshot.", e);
+                LOG.error("Error while taking writer snapshot for bucket {}.", 
getTableBucket(), e);
             }
             localLog.close();
         }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
index 6df83faa4..cdde3842d 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
@@ -200,8 +200,9 @@ public class LogTieringTask implements Runnable {
             long fromOffset = Math.max(copiedOffset + 1, 
log.localLogStartOffset());
             candidateLogSegments = candidateLogSegments(log, fromOffset, 
highWatermark);
             LOG.debug(
-                    "Candidate log segments: logLocalStartOffset: {}, 
copiedOffset: {}, "
+                    "Candidate log segments for bucket {}: 
logLocalStartOffset: {}, copiedOffset: {}, "
                             + "fromOffset: {}, highWatermark: {} and 
candidateLogSegments: {}",
+                    tableBucket,
                     log.localLogStartOffset(),
                     copiedOffset,
                     fromOffset,
@@ -216,7 +217,8 @@ public class LogTieringTask implements Runnable {
             }
         } else {
             LOG.debug(
-                    "Skipping copying segments to remote, current 
read-offset:{}, and highWatermark:{}",
+                    "Skipping copying segments for bucket {} to remote, 
current read-offset:{}, and highWatermark:{}",
+                    tableBucket,
                     copiedOffset,
                     highWatermark);
         }
@@ -314,7 +316,10 @@ public class LogTieringTask implements Runnable {
             remoteLogManifestPath =
                     
remoteLogStorage.writeRemoteLogManifestSnapshot(newRemoteLogManifest);
         } catch (Exception e) {
-            LOG.error("Write remote log manifest file to remote storage 
failed.", e);
+            LOG.error(
+                    "Write remote log manifest file to remote storage failed 
for bucket {}.",
+                    tableBucket,
+                    e);
             return false;
         }
 
@@ -364,8 +369,9 @@ public class LogTieringTask implements Runnable {
                 // the commit failed with unexpected exception, like network 
error, we will
                 // retry send.
                 LOG.error(
-                        "The {} time try to commit remote log manifest 
failed.",
+                        "The {} time try to commit remote log manifest failed 
for bucket {}.",
                         retrySendCommitTimes,
+                        tableBucket,
                         e);
                 retrySendCommitTimes++;
             }
@@ -458,8 +464,9 @@ public class LogTieringTask implements Runnable {
                 metricGroup.remoteLogDeleteRequests().inc();
             } catch (Exception e) {
                 LOG.error(
-                        "Error occurred while deleting remote log segment 
files: {}, "
+                        "Error occurred while deleting remote log segment 
files: {} for bucket {}, "
                                 + "the delete files operation will be 
skipped.",
+                        tableBucket,
                         remoteLogSegment,
                         e);
                 metricGroup.remoteLogDeleteErrors().inc();
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
index d963f4fcf..85f77e2b3 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
@@ -289,7 +289,10 @@ public class RemoteLogManager implements Closeable {
                                     remoteLogStorage,
                                     coordinatorGateway,
                                     clock);
-                    LOG.info("Created a new remote log task: {} and getting 
scheduled", task);
+                    LOG.info(
+                            "Created a new remote log task for table-bucket{}: 
{} and getting scheduled",
+                            tableBucket,
+                            task);
                     ScheduledFuture<?> future =
                             
rlManagerScheduledThreadPool.scheduleWithFixedDelay(
                                     task,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index e4e25de80..091673085 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -408,7 +408,10 @@ public final class Replica {
                                                 "the leader epoch %s in notify 
leader and isr data is smaller than the "
                                                         + "current leader 
epoch %s for table bucket %s",
                                                 requestLeaderEpoch, 
leaderEpoch, tableBucket);
-                                LOG.warn("Ignore make leader because {}", 
errorMessage);
+                                LOG.warn(
+                                        "Ignore make leader for bucket {} 
because {}",
+                                        tableBucket,
+                                        errorMessage);
                                 throw new 
FencedLeaderEpochException(errorMessage);
                             }
 
@@ -456,7 +459,10 @@ public final class Replica {
                                         "the leader epoch %s in notify leader 
and isr data is smaller than the "
                                                 + "current leader epoch %s for 
table bucket %s",
                                         requestLeaderEpoch, leaderEpoch, 
tableBucket);
-                        LOG.warn("Ignore make follower because {}", 
errorMessage);
+                        LOG.warn(
+                                "Ignore make follower for bucket {} because 
{}",
+                                tableBucket,
+                                errorMessage);
                         throw new FencedLeaderEpochException(errorMessage);
                     }
 
@@ -559,7 +565,10 @@ public final class Replica {
             // resister the closeable registry for kv
             closeableRegistry.registerCloseable(closeableRegistryForKv);
         } catch (IOException e) {
-            LOG.warn("Fail to registry closeable registry for kv, it may cause 
resource leak.", e);
+            LOG.warn(
+                    "Fail to registry closeable registry for kv for bucket {}, 
it may cause resource leak.",
+                    tableBucket,
+                    e);
         }
 
         // init kv tablet and get the snapshot it uses to init if have any
@@ -569,7 +578,11 @@ public final class Replica {
                 snapshotUsed = initKvTablet();
                 break;
             } catch (Exception e) {
-                LOG.warn("Fail to init kv tablet, retrying for {} times", i, 
e);
+                LOG.warn(
+                        "Fail to init kv tablet for bucket {}, retrying for {} 
times",
+                        tableBucket,
+                        i,
+                        e);
             }
         }
         // start periodic kv snapshot
@@ -642,7 +655,10 @@ public final class Replica {
                 checkNotNull(kvTablet, "kv tablet should not be null.");
                 restoreStartOffset = completedSnapshot.getLogOffset();
             } else {
-                LOG.info("No snapshot found, restore from log.");
+                LOG.info(
+                        "No snapshot found for {} of {}, restore from log.",
+                        tableBucket,
+                        physicalPath);
                 // actually, kv manager always create a kv tablet since we 
will drop the kv
                 // if it exists before init kv tablet
                 kvTablet =
@@ -830,7 +846,7 @@ public final class Replica {
             kvSnapshotManager.start();
             closeableRegistryForKv.registerCloseable(kvSnapshotManager);
         } catch (Exception e) {
-            LOG.error("init kv periodic snapshot failed.", e);
+            LOG.error("init kv periodic snapshot for {} failed.", tableBucket, 
e);
         }
     }
 
@@ -1003,7 +1019,11 @@ public final class Replica {
         Optional<LogOffsetMetadata> oldWatermark =
                 leaderLog.maybeIncrementHighWatermark(newHighWatermark);
         if (oldWatermark.isPresent()) {
-            LOG.debug("High watermark update from {} to {}.", 
oldWatermark.get(), newHighWatermark);
+            LOG.debug(
+                    "High watermark update from {} to {} for bucket {}.",
+                    oldWatermark.get(),
+                    newHighWatermark,
+                    tableBucket);
             return true;
         } else {
             return false;
@@ -1080,9 +1100,10 @@ public final class Replica {
         }
 
         LOG.debug(
-                "Recorded replica {} log end offset (LEO) position {}.",
+                "Recorded replica {} log end offset (LEO) position {} for 
bucket {}.",
                 localTabletServerId,
-                followerFetchOffsetMetadata.getMessageOffset());
+                followerFetchOffsetMetadata.getMessageOffset(),
+                tableBucket);
     }
 
     private FollowerReplica getFollowerReplicaOrThrown(int followerId) {
@@ -1534,7 +1555,7 @@ public final class Replica {
 
     private CompletableFuture<LeaderAndIsr> submitAdjustIsr(
             IsrState.PendingIsrState proposedIsrState, 
CompletableFuture<LeaderAndIsr> result) {
-        LOG.debug("Submitting ISR state change {}.", proposedIsrState);
+        LOG.debug("Submitting ISR state change {} for bucket {}.", 
proposedIsrState, tableBucket);
         adjustIsrManager
                 .submit(tableBucket, proposedIsrState.sentLeaderAndIsr())
                 .whenComplete(
@@ -1553,8 +1574,9 @@ public final class Replica {
                                             // exactly, but we do know this 
response is out of date,
                                             // so we ignore it.
                                             LOG.debug(
-                                                    "Ignoring failed ISR 
update to {} since we have already updated state to {}",
+                                                    "Ignoring failed ISR 
update to {} for bucket {} since we have already updated state to {}",
                                                     proposedIsrState,
+                                                    tableBucket,
                                                     isrState);
                                         } else if (leaderAndIsr != null) {
                                             hwIncremented.set(
@@ -1600,8 +1622,9 @@ public final class Replica {
         // Success from coordinator, still need to check a few things.
         if (leaderAndIsr.bucketEpoch() < bucketEpoch) {
             LOG.debug(
-                    "Ignoring new ISR {} since we have a newer replica epoch 
{}",
+                    "Ignoring new ISR {} for bucket {} since we have a newer 
replica epoch {}",
                     leaderAndIsr,
+                    tableBucket,
                     bucketEpoch);
             return false;
         } else {
@@ -1630,7 +1653,7 @@ public final class Replica {
             try {
                 return maybeIncrementLeaderHW(logTablet, clock.milliseconds());
             } catch (IOException e) {
-                LOG.error("Failed to increment leader HW", e);
+                LOG.error("Failed to increment leader HW for bucket {}", 
tableBucket, e);
                 return false;
             }
         }
@@ -1656,34 +1679,39 @@ public final class Replica {
                 // response.
                 isrState = proposedIsrState.lastCommittedState();
                 LOG.info(
-                        "Failed to adjust isr to {} since the adjust isr 
manager rejected the request with error {}. "
+                        "Failed to adjust isr to {} for bucket {} since the 
adjust isr manager rejected the request with error {}. "
                                 + "Replica state has been reset to the latest 
committed state {}",
                         proposedIsrState,
+                        tableBucket,
                         error,
                         isrState);
                 return false;
             case UNKNOWN_TABLE_OR_BUCKET_EXCEPTION:
                 LOG.debug(
-                        "Failed to adjust isr to {} since the coordinator 
doesn't know about this table or bucket. "
+                        "Failed to adjust isr to {} for bucket {} since the 
coordinator doesn't know about this table or bucket. "
                                 + "Replica state may be out of sync, awaiting 
new the latest metadata.",
-                        proposedIsrState);
+                        proposedIsrState,
+                        tableBucket);
                 return false;
             case INVALID_UPDATE_VERSION_EXCEPTION:
                 LOG.debug(
-                        "Failed to adjust isr to {} because the request is 
invalid. Replica state may be out of sync, "
+                        "Failed to adjust isr to {} for bucket {} because the 
request is invalid. Replica state may be out of sync, "
                                 + "awaiting new the latest metadata.",
-                        proposedIsrState);
+                        proposedIsrState,
+                        tableBucket);
                 return false;
             case FENCED_LEADER_EPOCH_EXCEPTION:
                 LOG.debug(
-                        "Failed to adjust isr to {} because the leader epoch 
is fenced which indicate this replica "
+                        "Failed to adjust isr to {} for bucket {} because the 
leader epoch is fenced which indicate this replica "
                                 + "maybe no long leader. Replica state may be 
out of sync, awaiting new the latest metadata.",
-                        proposedIsrState);
+                        proposedIsrState,
+                        tableBucket);
                 return false;
             default:
                 LOG.warn(
-                        "Failed to adjust isr to {} due to unexpected error 
{}. Retrying.",
+                        "Failed to adjust isr to {} for bucket {} due to 
unexpected error {}. Retrying.",
                         proposedIsrState,
+                        tableBucket,
                         error);
                 return true;
         }
@@ -1863,7 +1891,8 @@ public final class Replica {
                 });
 
         LOG.trace(
-                "Progress awaiting ISR acks for offset {}, acked replicas: {}, 
awaiting replicas: {}",
+                "Progress awaiting ISR acks for bucket {} for offset {}, acked 
replicas: {}, awaiting replicas: {}",
+                tableBucket,
                 requiredOffset,
                 ackedReplicas.stream()
                         .map(tuple -> "server-" + tuple.f0 + ":" + tuple.f1)
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index f11fe1399..8a5f85615 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -653,7 +653,9 @@ public class ReplicaManager {
                                                         + "The latest known 
leader epoch is %s for table bucket %s.",
                                                 requestLeaderEpoch, 
currentLeaderEpoch, tb);
                                 LOG.warn(
-                                        "Ignore the stop replica request 
because {}", errorMessage);
+                                        "Ignore the stop replica request for 
bucket {} because {}",
+                                        tb,
+                                        errorMessage);
                                 result.add(
                                         new StopReplicaResultForBucket(
                                                 tb,

Reply via email to