This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 804bdf371 [hotfix] Add bucket info for bucket related logs (#1750)
804bdf371 is described below
commit 804bdf371ec09c1df3f308dd8c4f74ddeafd0737
Author: Liebing <[email protected]>
AuthorDate: Wed Sep 24 16:36:28 2025 +0800
[hotfix] Add bucket info for bucket related logs (#1750)
---
.../java/org/apache/fluss/server/log/LocalLog.java | 38 +++++------
.../org/apache/fluss/server/log/LogLoader.java | 3 +-
.../org/apache/fluss/server/log/LogTablet.java | 42 ++++++++-----
.../fluss/server/log/remote/LogTieringTask.java | 17 +++--
.../fluss/server/log/remote/RemoteLogManager.java | 5 +-
.../org/apache/fluss/server/replica/Replica.java | 73 +++++++++++++++-------
.../fluss/server/replica/ReplicaManager.java | 4 +-
7 files changed, 119 insertions(+), 63 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
index 061c83ebb..130b3855d 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
@@ -362,10 +362,11 @@ public final class LocalLog {
throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace(
- "Reading maximum {} bytes at offset {} from log with total
length {} bytes",
+ "Reading maximum {} bytes at offset {} from log with total
length {} bytes for bucket {}",
maxLength,
readOffset,
- segments.sizeInBytes());
+ segments.sizeInBytes(),
+ tableBucket);
}
long startOffset = localLogStartOffset;
@@ -471,21 +472,22 @@ public final class LocalLog {
// true for an active segment of size zero because one of the
indexes is
// "full" (due to _maxEntries == 0).
LOG.warn(
- "Trying to roll a new log segment with start offset "
- + newOffset
- + " =max(provided offset = "
- + expectedNextOffset
- + ", LEO = "
- + getLocalLogEndOffset()
- + ") while it already exists and is active
with size 0."
- + ", size of offset index: "
- + activeSegment.offsetIndex().entries()
- + ".");
+ "Trying to roll a new log segment for bucket {} with
start offset {} "
+ + "=max(provided offset = {}, LEO = {}) while
it already exists "
+ + "and is active with size 0, size of offset
index: {}.",
+ tableBucket,
+ newOffset,
+ expectedNextOffset,
+ getLocalLogEndOffset(),
+ activeSegment.offsetIndex().entries());
LogSegment newSegment =
createAndDeleteSegment(
newOffset, activeSegment,
SegmentDeletionReason.LOG_ROLL);
updateLogEndOffset(getLocalLogEndOffset());
- LOG.info("Rolled new log segment at offset " + newOffset);
+ LOG.info(
+ "Rolled new log segment for bucket {} at offset {}",
+ tableBucket,
+ newOffset);
return newSegment;
} else {
throw new FlussRuntimeException(
@@ -520,9 +522,9 @@ public final class LocalLog {
for (File file : Arrays.asList(logFile, offsetIdxFile,
timeIndexFile)) {
if (file.exists()) {
LOG.warn(
- "Newly rolled segment file "
- + file.getAbsolutePath()
- + " already exists; deleting it first");
+ "Newly rolled segment file {} for bucket {}
already exists; deleting it first",
+ tableBucket,
+ file.getAbsolutePath());
Files.delete(file.toPath());
}
}
@@ -536,7 +538,7 @@ public final class LocalLog {
// metadata when log rolls.
// The next offset should not change.
updateLogEndOffset(getLocalLogEndOffset());
- LOG.info("Rolled new log segment at offset " + newOffset);
+ LOG.info("Rolled new log segment for bucket {} at offset {}",
tableBucket, newOffset);
return newSegment;
}
@@ -547,7 +549,7 @@ public final class LocalLog {
* @return the list of segments that were scheduled for deletion
*/
List<LogSegment> truncateFullyAndStartAt(long newOffset) throws
IOException {
- LOG.debug("Truncate and start at offset " + newOffset);
+ LOG.debug("Truncate and start at offset {} for bucket {}", newOffset,
tableBucket);
checkIfMemoryMappedBufferClosed();
List<LogSegment> segmentsToDelete = segments.values();
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
index fac372305..62d38581c 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
@@ -149,7 +149,8 @@ final class LogLoader {
File logFile = FlussPaths.logFile(logTabletDir,
offset);
if (!logFile.exists()) {
LOG.warn(
- "Found an orphaned index file {}, with no
corresponding log file.",
+ "Found an orphaned index file {} for
bucket {}, with no corresponding log file.",
+ logSegments.getTableBucket(),
file.getAbsolutePath());
Files.deleteIfExists(file.toPath());
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index b4c8f1c9a..bf75410e4 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -430,14 +430,18 @@ public final class LogTablet {
synchronized (lock) {
if (newHighWatermark.getMessageOffset() <
highWatermarkMetadata.getMessageOffset()) {
LOG.warn(
- "Non-monotonic update of high watermark from {} to {}",
+ "Non-monotonic update of high watermark from {} to {}
for bucket {}",
highWatermarkMetadata,
- newHighWatermark);
+ newHighWatermark,
+ localLog.getTableBucket());
}
highWatermarkMetadata = newHighWatermark;
// TODO log offset listener to update log offset.
}
- LOG.trace("Setting high watermark {}", newHighWatermark);
+ LOG.trace(
+ "Setting high watermark {} for bucket {}",
+ newHighWatermark,
+ localLog.getTableBucket());
}
/**
@@ -567,8 +571,9 @@ public final class LogTablet {
long localLogStartOffset = localLog.getLocalLogStartOffset();
if (cleanUpToOffset < localLogStartOffset) {
LOG.debug(
- "Ignore the delete segments action while the input
cleanUpToOffset {} "
+ "Ignore the delete segments action for bucket {} while the
input cleanUpToOffset {} "
+ "is smaller than the current localLogStartOffset
{}",
+ getTableBucket(),
cleanUpToOffset,
localLogStartOffset);
return;
@@ -576,8 +581,9 @@ public final class LogTablet {
if (cleanUpToOffset > getHighWatermark()) {
LOG.warn(
- "Ignore the delete segments action while the input
cleanUpToOffset {} "
+ "Ignore the delete segments action for bucket {} while the
input cleanUpToOffset {} "
+ "is larger than the current highWatermark {}",
+ getTableBucket(),
cleanUpToOffset,
getHighWatermark());
return;
@@ -716,11 +722,13 @@ public final class LogTablet {
// todo update the first unstable offset (which is used to
compute lso)
LOG.trace(
- "Appended message set with last offset: {}, first
offset {}, next offset: {} and messages {}",
+ "Appended message set with last offset: {}, first
offset {}, next offset: {} "
+ + "and messages {} for bucket {}",
appendInfo.lastOffset(),
appendInfo.firstOffset(),
localLog.getLocalLogEndOffset(),
- validRecords);
+ validRecords,
+ getTableBucket());
if (localLog.unflushedMessages() >= logFlushIntervalMessages) {
flush(false);
@@ -787,11 +795,12 @@ public final class LogTablet {
if (flushOffset > localLog.getRecoveryPoint()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Flushing log up to offset {} ({}) with recovery point
{}, unflushed: {}",
+ "Flushing log up to offset {} ({}) with recovery point
{}, unflushed: {}, for bucket {}",
offset,
includingOffsetStr,
flushOffset,
- localLog.unflushedMessages());
+ localLog.unflushedMessages(),
+ getTableBucket());
}
localLog.flush(flushOffset);
@@ -810,7 +819,9 @@ public final class LogTablet {
new RollParams(maxSegmentFileSize,
appendInfo.lastOffset(), messageSize))) {
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Rolling new log segment (log_size = {}/{}),
offset_index_size = {}/{}, time_index_size = {}/{}",
+ "Rolling new log segment for bucket {} (log_size =
{}/{}), offset_index_size = {}/{}, "
+ + "time_index_size = {}/{}",
+ getTableBucket(),
segment.getSizeInBytes(),
maxSegmentFileSize,
segment.offsetIndex().entries(),
@@ -863,12 +874,13 @@ public final class LogTablet {
if (targetOffset >= localLog.getLocalLogEndOffset()) {
LOG.info(
- "Truncate to {} has no effect as the largest offset in the
log is {}.",
+ "Truncate to {} for bucket {} has no effect as the largest
offset in the log is {}.",
targetOffset,
+ getTableBucket(),
localLog.getLocalLogEndOffset() - 1);
return false;
} else {
- LOG.info("Truncating to offset {}", targetOffset);
+ LOG.info("Truncating to offset {} for bucket {}", targetOffset,
getTableBucket());
synchronized (lock) {
try {
localLog.checkIfMemoryMappedBufferClosed();
@@ -902,7 +914,7 @@ public final class LogTablet {
/** Delete all data in the log and start at the new offset. */
void truncateFullyAndStartAt(long newOffset) throws LogStorageException {
- LOG.debug("Truncate and start at offset {}", newOffset);
+ LOG.debug("Truncate and start at offset {} for bucket {}", newOffset,
getTableBucket());
synchronized (lock) {
try {
localLog.truncateFullyAndStartAt(newOffset);
@@ -950,14 +962,14 @@ public final class LogTablet {
}
public void close() {
- LOG.debug("close log tablet");
+ LOG.debug("close log tablet for bucket {}", getTableBucket());
synchronized (lock) {
localLog.checkIfMemoryMappedBufferClosed();
writerExpireCheck.cancel(true);
try {
writerStateManager.takeSnapshot();
} catch (IOException e) {
- LOG.error("Error while taking writer snapshot.", e);
+ LOG.error("Error while taking writer snapshot for bucket {}.",
getTableBucket(), e);
}
localLog.close();
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
index 6df83faa4..cdde3842d 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
@@ -200,8 +200,9 @@ public class LogTieringTask implements Runnable {
long fromOffset = Math.max(copiedOffset + 1,
log.localLogStartOffset());
candidateLogSegments = candidateLogSegments(log, fromOffset,
highWatermark);
LOG.debug(
- "Candidate log segments: logLocalStartOffset: {},
copiedOffset: {}, "
+ "Candidate log segments for bucket {}:
logLocalStartOffset: {}, copiedOffset: {}, "
+ "fromOffset: {}, highWatermark: {} and
candidateLogSegments: {}",
+ tableBucket,
log.localLogStartOffset(),
copiedOffset,
fromOffset,
@@ -216,7 +217,8 @@ public class LogTieringTask implements Runnable {
}
} else {
LOG.debug(
- "Skipping copying segments to remote, current
read-offset:{}, and highWatermark:{}",
+ "Skipping copying segments for bucket {} to remote,
current read-offset:{}, and highWatermark:{}",
+ tableBucket,
copiedOffset,
highWatermark);
}
@@ -314,7 +316,10 @@ public class LogTieringTask implements Runnable {
remoteLogManifestPath =
remoteLogStorage.writeRemoteLogManifestSnapshot(newRemoteLogManifest);
} catch (Exception e) {
- LOG.error("Write remote log manifest file to remote storage
failed.", e);
+ LOG.error(
+ "Write remote log manifest file to remote storage failed
for bucket {}.",
+ tableBucket,
+ e);
return false;
}
@@ -364,8 +369,9 @@ public class LogTieringTask implements Runnable {
// the commit failed with unexpected exception, like network
error, we will
// retry send.
LOG.error(
- "The {} time try to commit remote log manifest
failed.",
+ "The {} time try to commit remote log manifest failed
for bucket {}.",
retrySendCommitTimes,
+ tableBucket,
e);
retrySendCommitTimes++;
}
@@ -458,8 +464,9 @@ public class LogTieringTask implements Runnable {
metricGroup.remoteLogDeleteRequests().inc();
} catch (Exception e) {
LOG.error(
- "Error occurred while deleting remote log segment
files: {}, "
+ "Error occurred while deleting remote log segment
files: {} for bucket {}, "
+ "the delete files operation will be
skipped.",
+ tableBucket,
remoteLogSegment,
e);
metricGroup.remoteLogDeleteErrors().inc();
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
index d963f4fcf..85f77e2b3 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
@@ -289,7 +289,10 @@ public class RemoteLogManager implements Closeable {
remoteLogStorage,
coordinatorGateway,
clock);
- LOG.info("Created a new remote log task: {} and getting
scheduled", task);
+ LOG.info(
+ "Created a new remote log task for table-bucket{}:
{} and getting scheduled",
+ tableBucket,
+ task);
ScheduledFuture<?> future =
rlManagerScheduledThreadPool.scheduleWithFixedDelay(
task,
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index e4e25de80..091673085 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -408,7 +408,10 @@ public final class Replica {
"the leader epoch %s in notify
leader and isr data is smaller than the "
+ "current leader
epoch %s for table bucket %s",
requestLeaderEpoch,
leaderEpoch, tableBucket);
- LOG.warn("Ignore make leader because {}",
errorMessage);
+ LOG.warn(
+ "Ignore make leader for bucket {}
because {}",
+ tableBucket,
+ errorMessage);
throw new
FencedLeaderEpochException(errorMessage);
}
@@ -456,7 +459,10 @@ public final class Replica {
"the leader epoch %s in notify leader
and isr data is smaller than the "
+ "current leader epoch %s for
table bucket %s",
requestLeaderEpoch, leaderEpoch,
tableBucket);
- LOG.warn("Ignore make follower because {}",
errorMessage);
+ LOG.warn(
+ "Ignore make follower for bucket {} because
{}",
+ tableBucket,
+ errorMessage);
throw new FencedLeaderEpochException(errorMessage);
}
@@ -559,7 +565,10 @@ public final class Replica {
// resister the closeable registry for kv
closeableRegistry.registerCloseable(closeableRegistryForKv);
} catch (IOException e) {
- LOG.warn("Fail to registry closeable registry for kv, it may cause
resource leak.", e);
+ LOG.warn(
+ "Fail to registry closeable registry for kv for bucket {},
it may cause resource leak.",
+ tableBucket,
+ e);
}
// init kv tablet and get the snapshot it uses to init if have any
@@ -569,7 +578,11 @@ public final class Replica {
snapshotUsed = initKvTablet();
break;
} catch (Exception e) {
- LOG.warn("Fail to init kv tablet, retrying for {} times", i,
e);
+ LOG.warn(
+ "Fail to init kv tablet for bucket {}, retrying for {}
times",
+ tableBucket,
+ i,
+ e);
}
}
// start periodic kv snapshot
@@ -642,7 +655,10 @@ public final class Replica {
checkNotNull(kvTablet, "kv tablet should not be null.");
restoreStartOffset = completedSnapshot.getLogOffset();
} else {
- LOG.info("No snapshot found, restore from log.");
+ LOG.info(
+ "No snapshot found for {} of {}, restore from log.",
+ tableBucket,
+ physicalPath);
// actually, kv manager always create a kv tablet since we
will drop the kv
// if it exists before init kv tablet
kvTablet =
@@ -830,7 +846,7 @@ public final class Replica {
kvSnapshotManager.start();
closeableRegistryForKv.registerCloseable(kvSnapshotManager);
} catch (Exception e) {
- LOG.error("init kv periodic snapshot failed.", e);
+ LOG.error("init kv periodic snapshot for {} failed.", tableBucket,
e);
}
}
@@ -1003,7 +1019,11 @@ public final class Replica {
Optional<LogOffsetMetadata> oldWatermark =
leaderLog.maybeIncrementHighWatermark(newHighWatermark);
if (oldWatermark.isPresent()) {
- LOG.debug("High watermark update from {} to {}.",
oldWatermark.get(), newHighWatermark);
+ LOG.debug(
+ "High watermark update from {} to {} for bucket {}.",
+ oldWatermark.get(),
+ newHighWatermark,
+ tableBucket);
return true;
} else {
return false;
@@ -1080,9 +1100,10 @@ public final class Replica {
}
LOG.debug(
- "Recorded replica {} log end offset (LEO) position {}.",
+ "Recorded replica {} log end offset (LEO) position {} for
bucket {}.",
localTabletServerId,
- followerFetchOffsetMetadata.getMessageOffset());
+ followerFetchOffsetMetadata.getMessageOffset(),
+ tableBucket);
}
private FollowerReplica getFollowerReplicaOrThrown(int followerId) {
@@ -1534,7 +1555,7 @@ public final class Replica {
private CompletableFuture<LeaderAndIsr> submitAdjustIsr(
IsrState.PendingIsrState proposedIsrState,
CompletableFuture<LeaderAndIsr> result) {
- LOG.debug("Submitting ISR state change {}.", proposedIsrState);
+ LOG.debug("Submitting ISR state change {} for bucket {}.",
proposedIsrState, tableBucket);
adjustIsrManager
.submit(tableBucket, proposedIsrState.sentLeaderAndIsr())
.whenComplete(
@@ -1553,8 +1574,9 @@ public final class Replica {
// exactly, but we do know this
response is out of date,
// so we ignore it.
LOG.debug(
- "Ignoring failed ISR
update to {} since we have already updated state to {}",
+ "Ignoring failed ISR
update to {} for bucket {} since we have already updated state to {}",
proposedIsrState,
+ tableBucket,
isrState);
} else if (leaderAndIsr != null) {
hwIncremented.set(
@@ -1600,8 +1622,9 @@ public final class Replica {
// Success from coordinator, still need to check a few things.
if (leaderAndIsr.bucketEpoch() < bucketEpoch) {
LOG.debug(
- "Ignoring new ISR {} since we have a newer replica epoch
{}",
+ "Ignoring new ISR {} for bucket {} since we have a newer
replica epoch {}",
leaderAndIsr,
+ tableBucket,
bucketEpoch);
return false;
} else {
@@ -1630,7 +1653,7 @@ public final class Replica {
try {
return maybeIncrementLeaderHW(logTablet, clock.milliseconds());
} catch (IOException e) {
- LOG.error("Failed to increment leader HW", e);
+ LOG.error("Failed to increment leader HW for bucket {}",
tableBucket, e);
return false;
}
}
@@ -1656,34 +1679,39 @@ public final class Replica {
// response.
isrState = proposedIsrState.lastCommittedState();
LOG.info(
- "Failed to adjust isr to {} since the adjust isr
manager rejected the request with error {}. "
+ "Failed to adjust isr to {} for bucket {} since the
adjust isr manager rejected the request with error {}. "
+ "Replica state has been reset to the latest
committed state {}",
proposedIsrState,
+ tableBucket,
error,
isrState);
return false;
case UNKNOWN_TABLE_OR_BUCKET_EXCEPTION:
LOG.debug(
- "Failed to adjust isr to {} since the coordinator
doesn't know about this table or bucket. "
+ "Failed to adjust isr to {} for bucket {} since the
coordinator doesn't know about this table or bucket. "
+ "Replica state may be out of sync, awaiting
new the latest metadata.",
- proposedIsrState);
+ proposedIsrState,
+ tableBucket);
return false;
case INVALID_UPDATE_VERSION_EXCEPTION:
LOG.debug(
- "Failed to adjust isr to {} because the request is
invalid. Replica state may be out of sync, "
+ "Failed to adjust isr to {} for bucket {} because the
request is invalid. Replica state may be out of sync, "
+ "awaiting new the latest metadata.",
- proposedIsrState);
+ proposedIsrState,
+ tableBucket);
return false;
case FENCED_LEADER_EPOCH_EXCEPTION:
LOG.debug(
- "Failed to adjust isr to {} because the leader epoch
is fenced which indicate this replica "
+ "Failed to adjust isr to {} for bucket {} because the
leader epoch is fenced which indicate this replica "
+ "maybe no long leader. Replica state may be
out of sync, awaiting new the latest metadata.",
- proposedIsrState);
+ proposedIsrState,
+ tableBucket);
return false;
default:
LOG.warn(
- "Failed to adjust isr to {} due to unexpected error
{}. Retrying.",
+ "Failed to adjust isr to {} for bucket {} due to
unexpected error {}. Retrying.",
proposedIsrState,
+ tableBucket,
error);
return true;
}
@@ -1863,7 +1891,8 @@ public final class Replica {
});
LOG.trace(
- "Progress awaiting ISR acks for offset {}, acked replicas: {},
awaiting replicas: {}",
+ "Progress awaiting ISR acks for bucket {} for offset {}, acked
replicas: {}, awaiting replicas: {}",
+ tableBucket,
requiredOffset,
ackedReplicas.stream()
.map(tuple -> "server-" + tuple.f0 + ":" + tuple.f1)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index f11fe1399..8a5f85615 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -653,7 +653,9 @@ public class ReplicaManager {
+ "The latest known
leader epoch is %s for table bucket %s.",
requestLeaderEpoch,
currentLeaderEpoch, tb);
LOG.warn(
- "Ignore the stop replica request
because {}", errorMessage);
+ "Ignore the stop replica request for
bucket {} because {}",
+ tb,
+ errorMessage);
result.add(
new StopReplicaResultForBucket(
tb,