This is an automated email from the ASF dual-hosted git repository.

lollipopjin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 78e680b811 [ISSUE #10462] Standardize log format across tiered storage 
module (#10474)
78e680b811 is described below

commit 78e680b811f15e7bbb5d5ef40b44ba1b33cf76e0
Author: lizhimins <[email protected]>
AuthorDate: Thu Jun 11 19:23:52 2026 +0800

    [ISSUE #10462] Standardize log format across tiered storage module (#10474)
---
 .../rocketmq/tieredstore/TieredMessageStore.java   | 20 +++++------
 .../core/MessageStoreDispatcherImpl.java           |  6 ++--
 .../rocketmq/tieredstore/file/FlatAppendFile.java  |  6 ++--
 .../rocketmq/tieredstore/file/FlatFileStore.java   | 10 +++---
 .../rocketmq/tieredstore/file/FlatMessageFile.java | 10 +++---
 .../rocketmq/tieredstore/index/IndexStoreFile.java | 41 ++++++++++-----------
 .../tieredstore/index/IndexStoreService.java       | 42 +++++++++++-----------
 .../rocketmq/tieredstore/provider/FileSegment.java | 12 +++----
 .../tieredstore/provider/MemoryFileSegment.java    |  2 +-
 .../tieredstore/provider/PosixFileSegment.java     | 12 +++----
 .../tieredstore/util/MessageFormatUtil.java        | 25 ++++++++-----
 11 files changed, 94 insertions(+), 92 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 38946fd161..ed67badf56 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -247,8 +247,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
 
                     if (next.checkInStoreByConsumeOffset(topic, queueId, 
offset)) {
                         TieredStoreMetricsManager.fallbackTotal.add(1, 
latencyAttributes);
-                        log.debug("GetMessageAsync not found, then back to 
next store, result: {}, " +
-                                "topic: {}, queue: {}, queue offset: {}, 
offset range: {}-{}",
+                        log.debug("TieredMessageStore#getMessageAsync, not 
found, fall back to next store, result={}, topic={}, queue={}, queueOffset={}, 
offsetRange={}-{}",
                             result.getStatus(), topic, queueId, offset, 
result.getMinOffset(), result.getMaxOffset());
                         return next.getMessage(group, topic, queueId, offset, 
maxMsgNums, messageFilter);
                     }
@@ -260,8 +259,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                     result.getStatus() != GetMessageStatus.OFFSET_TOO_SMALL &&
                     result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_ONE 
&&
                     result.getStatus() != 
GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
-                    log.warn("GetMessageAsync not found and message is not in 
next store, result: {}, " +
-                            "topic: {}, queue: {}, queue offset: {}, offset 
range: {}-{}",
+                    log.warn("TieredMessageStore#getMessageAsync, not found 
and not in next store, result={}, topic={}, queue={}, queueOffset={}, 
offsetRange={}-{}",
                         result.getStatus(), topic, queueId, offset, 
result.getMinOffset(), result.getMaxOffset());
                 }
 
@@ -288,8 +286,8 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                 // otherwise it will cause repeated consumption after next 
start offset over commit offset.
 
                 if (storeConfig.isRecordGetMessageResult()) {
-                    log.info("GetMessageAsync result, {}, group: {}, topic: 
{}, queueId: {}, offset: {}, count:{}",
-                        result, group, topic, queueId, offset, maxMsgNums);
+                    log.info("TieredMessageStore#getMessageAsync, result={}, 
group={}, topic={}, queueId={}, offset={}, count={}",
+                        result.getStatus(), group, topic, queueId, offset, 
maxMsgNums);
                 }
 
                 return result;
@@ -419,7 +417,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
         int maxNum, long begin, long end) {
         long earliestTimeInNextStore = next.getEarliestMessageTime();
         if (earliestTimeInNextStore <= 0) {
-            log.warn("TieredMessageStore#queryMessageAsync: get earliest 
message time in next store failed: {}", earliestTimeInNextStore);
+            log.warn("TieredMessageStore#queryMessageAsync, get earliest 
message time in next store failed, earliestTime={}", earliestTimeInNextStore);
         }
         boolean isForce = storeConfig.getTieredStorageLevel() == 
MessageStoreConfig.TieredStorageLevel.FORCE;
         QueryMessageResult result = end < earliestTimeInNextStore || isForce ?
@@ -442,7 +440,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                         return result;
                     });
             } catch (Exception e) {
-                log.error("TieredMessageStore#queryMessageAsync: query message 
in tiered store failed", e);
+                log.error("TieredMessageStore#queryMessageAsync, query message 
in tiered store failed, topic={}, key={}", topic, key, e);
                 return CompletableFuture.completedFuture(result);
             }
         }
@@ -453,7 +451,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
     public CompletableFuture<QueryMessageResult> queryMessageAsync(String 
topic, String key, int maxNum, long begin, long end, String indexType, String 
lastKey) {
         long earliestTimeInNextStore = next.getEarliestMessageTime();
         if (earliestTimeInNextStore <= 0) {
-            log.warn("TieredMessageStore queryMessageAsync: get earliest 
message time in next store failed: {}", earliestTimeInNextStore);
+            log.warn("TieredMessageStore#queryMessageAsync, get earliest 
message time in next store failed, earliestTime={}", earliestTimeInNextStore);
         }
         boolean isForce = storeConfig.getTieredStorageLevel() == 
MessageStoreConfig.TieredStorageLevel.FORCE;
         QueryMessageResult result = end < earliestTimeInNextStore || isForce ? 
new QueryMessageResult() : next.queryMessage(topic, key, maxNum, begin, end, 
indexType, lastKey);
@@ -474,7 +472,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                         return result;
                     });
             } catch (Exception e) {
-                log.error("TieredMessageStore#queryMessageAsync: query message 
in tiered store failed", e);
+                log.error("TieredMessageStore#queryMessageAsync, query message 
in tiered store failed, topic={}, key={}, indexType={}", topic, key, indexType, 
e);
                 return CompletableFuture.completedFuture(result);
             }
         }
@@ -520,7 +518,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                 flatFileStore.destroyFile(queueMetadata.getQueue());
             });
             metadataStore.deleteTopic(topic);
-            log.info("MessageStore delete topic success, topicName={}", topic);
+            log.info("TieredMessageStore#deleteTopics, delete topic success, 
topic={}", topic);
         }
         return next.deleteTopics(deleteTopics);
     }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
index 52e1c82a1d..2a0dfed7a7 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
@@ -176,7 +176,7 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
             if (commitOffset < currentOffset) {
                 this.commitAsync(flatFile).whenComplete((result, throwable) -> 
{
                     if (throwable != null) {
-                        log.error("MessageDispatcher#flatFile commitOffset 
less than currentOffset, commitAsync again failed. topic: {}, queueId: {} ", 
topic, queueId, throwable);
+                        log.error("MessageDispatcher#doScheduleDispatch, 
commitOffset less than currentOffset, commitAsync again failed, topic={}, 
queueId={}", topic, queueId, throwable);
                     }
                 });
                 return CompletableFuture.completedFuture(false);
@@ -316,7 +316,7 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
                             //next commit async,execute constructIndexFile.
                             GroupCommitContext oldCommit = 
failedGroupCommitMap.put(flatFile, groupCommitContext);
                             if (oldCommit != null) {
-                                log.warn("MessageDispatcher#commitAsync 
failed,flatFile old failed commit context not release, topic={}, queueId={}  ", 
topic, queueId);
+                                
log.warn("MessageDispatcher#doScheduleDispatch, commitAsync failed, old failed 
commit context not released, topic={}, queueId={}", topic, queueId);
                                 oldCommit.release();
                             }
                         }
@@ -347,7 +347,7 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
                     groupCommitContext.getDispatchRequests().forEach(request 
-> constructIndexFile0(topicId, request));
                 }
                 catch (Throwable e) {
-                    log.error("constructIndexFile error {}", topicId, e);
+                    log.error("MessageDispatcher#constructIndexFile, construct 
index file error, topicId={}", topicId, e);
                 }
             }
             groupCommitContext.release();
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
index a7586e0b9f..8188febcc4 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
@@ -108,7 +108,7 @@ public class FlatAppendFile {
         if (fileSegment.getCommitPosition() != fileSize) {
             fileSegment.initPosition(fileSize);
             flushFileSegmentMeta(fileSegment);
-            log.warn("FlatAppendFile last file size not correct, filePath: 
{}", this.filePath);
+            log.warn("FlatAppendFile#recoverFileSize, last file size not 
correct, filePath={}", this.filePath);
         }
     }
 
@@ -202,7 +202,7 @@ public class FlatAppendFile {
             result = fileSegment.append(buffer, timestamp);
             if (result == AppendResult.FILE_FULL) {
                 boolean commitResult = fileSegment.commitAsync().join();
-                log.info("FlatAppendFile#append not successful for the file {} 
is full, commit result={}",
+                log.debug("FlatAppendFile#append, file is full, filePath={}, 
commitResult={}",
                     fileSegment.getPath(), commitResult);
                 if (commitResult) {
                     this.flushFileSegmentMeta(fileSegment);
@@ -279,7 +279,7 @@ public class FlatAppendFile {
 
                 if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE &&
                     fileSegment.getMaxTimestamp() >= expireTimestamp) {
-                    log.debug("FileSegment has not expired, filePath={}, 
fileType={}, " +
+                    log.debug("FlatAppendFile#destroyExpiredFile, file not 
expired, filePath={}, fileType={}, " +
                             "offset={}, expireTimestamp={}, maxTimestamp={}", 
filePath, fileType,
                         fileSegment.getBaseOffset(), expireTimestamp, 
fileSegment.getMaxTimestamp());
                     break;
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
index 700f12d0d6..8e33e43c2d 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
@@ -59,12 +59,12 @@ public class FlatFileStore {
         try {
             this.flatFileConcurrentMap.clear();
             this.recover();
-            log.info("FlatFileStore recover finished, total cost={}ms", 
stopwatch.elapsed(TimeUnit.MILLISECONDS));
+            log.info("FlatFileStore#load, recover finished, totalCost={}ms", 
stopwatch.elapsed(TimeUnit.MILLISECONDS));
         } catch (Exception e) {
             long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
-            log.info("FlatFileStore recover error, total cost={}ms", costTime);
+            log.error("FlatFileStore#load, recover error, totalCost={}ms", 
costTime, e);
             LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME)
-                .error("FlatFileStore recover error, total cost={}ms", 
costTime, e);
+                .error("FlatFileStore#load, recover error, totalCost={}ms", 
costTime, e);
             return false;
         }
         return true;
@@ -78,7 +78,7 @@ public class FlatFileStore {
             futures.add(this.recoverAsync(topicMetadata)
                 .whenComplete((unused, throwable) -> {
                     if (throwable != null) {
-                        log.error("FlatFileStore recover file error, 
topic={}", topicMetadata.getTopic(), throwable);
+                        log.error("FlatFileStore#recoverAsync, recover file 
error, topic={}", topicMetadata.getTopic(), throwable);
                     }
                     semaphore.release();
                 }));
@@ -115,7 +115,7 @@ public class FlatFileStore {
                 flatFile.destroyExpiredFile(System.currentTimeMillis() -
                     TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours()));
             } catch (Exception e) {
-                log.error("FlatFileStore delete expire file error", e);
+                log.error("FlatFileStore#scheduleDeleteExpireFile, delete 
expire file error", e);
             } finally {
                 flatFile.getFileLock().unlock();
             }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index 89d6a00abd..1427c6cef9 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -287,16 +287,16 @@ public class FlatMessageFile implements FlatFileInterface 
{
         ByteBuffer buffer = getMessageAsync(cqMax).join();
         long storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
         if (storeTime < timestamp) {
-            log.info("FlatMessageFile getQueueOffsetByTimeAsync, exceeded 
maximum time, " +
-                "filePath={}, timestamp={}, result={}", filePath, timestamp, 
cqMax + 1);
+            log.debug("FlatMessageFile#getQueueOffsetByTimeAsync, exceeded 
maximum time, filePath={}, timestamp={}, result={}",
+                filePath, timestamp, cqMax + 1);
             return CompletableFuture.completedFuture(cqMax + 1);
         }
 
         buffer = getMessageAsync(cqMin).join();
         storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
         if (storeTime > timestamp) {
-            log.info("FlatMessageFile getQueueOffsetByTimeAsync, less than 
minimum time, " +
-                "filePath={}, timestamp={}, result={}", filePath, timestamp, 
cqMin);
+            log.debug("FlatMessageFile#getQueueOffsetByTimeAsync, less than 
minimum time, filePath={}, timestamp={}, result={}",
+                filePath, timestamp, cqMin);
             return CompletableFuture.completedFuture(cqMin);
         }
 
@@ -351,7 +351,7 @@ public class FlatMessageFile implements FlatFileInterface {
             }
         }
 
-        log.info("FlatMessageFile getQueueOffsetByTimeAsync, filePath={}, 
timestamp={}, result={}, log={}",
+        log.debug("FlatMessageFile#getQueueOffsetByTimeAsync, filePath={}, 
timestamp={}, result={}, log={}",
             filePath, timestamp, offset, JSON.toJSONString(queryLog));
         return CompletableFuture.completedFuture(offset);
     }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
index ed624ae620..8fd4b2961b 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -275,13 +275,13 @@ public class IndexStoreFile implements IndexFile {
                 }
                 this.flushNewMetadata(byteBuffer, indexItemMaxCount == 
this.indexItemCount.get() + 1);
 
-                log.trace("IndexStoreFile put key, timestamp: {}, topic: {}, 
key: {}, slot: {}, item: {}, previous item: {}, content: {}",
+                log.trace("IndexStoreFile#putKey, put key, timestamp={}, 
topic={}, key={}, slot={}, item={}, previousItem={}, content={}",
                     this.getTimestamp(), topic, key, hashCode % 
this.hashSlotMaxCount, itemIndex, slotOldValue, indexItem);
             }
             return AppendResult.SUCCESS;
         } catch (Throwable e) {
-            log.error("IndexStoreFile put key error, topic: {}, topicId: {}, 
queueId: {}, keySet: {}, offset: {}, " +
-                "size: {}, timestamp: {}", topic, topicId, queueId, keySet, 
offset, size, timestamp, e);
+            log.error("IndexStoreFile#putKey, put key error, topic={}, 
topicId={}, queueId={}, keySet={}, offset={}, size={}, timestamp={}",
+                topic, topicId, queueId, keySet, offset, size, timestamp, e);
         } finally {
             fileReadWriteLock.writeLock().unlock();
         }
@@ -347,12 +347,11 @@ public class IndexStoreFile implements IndexFile {
                 left--;
             }
 
-            log.debug("IndexStoreFile query from unsealed mapped file, 
timestamp: {}, result size: {}, " +
-                    "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
+            log.debug("IndexStoreFile#queryAsyncFromUnsealedFile, query from 
unsealed mapped file, timestamp={}, resultSize={}, key={}, hashCode={}, 
maxCount={}, timeRange={}-{}",
                 getTimestamp(), result.size(), key, hashCode, maxCount, 
beginTime, endTime);
         } catch (Exception e) {
-            log.error("IndexStoreFile query from unsealed mapped file error, 
timestamp: {}, " +
-                "key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key, 
maxCount, beginTime, endTime, e);
+            log.error("IndexStoreFile#queryAsyncFromUnsealedFile, query from 
unsealed mapped file error, timestamp={}, key={}, maxCount={}, timeRange={}-{}",
+                getTimestamp(), key, maxCount, beginTime, endTime, e);
         } finally {
             fileReadWriteLock.readLock().unlock();
             if (held) {
@@ -378,8 +377,8 @@ public class IndexStoreFile implements IndexFile {
         CompletableFuture<List<IndexItem>> future = 
this.fileSegment.readAsync(slotPosition, HASH_SLOT_SIZE)
             .thenCompose(slotBuffer -> {
                 if (slotBuffer.remaining() < HASH_SLOT_SIZE) {
-                    log.error("IndexStoreFile query from tiered storage return 
error slot buffer, " +
-                        "key: {}, maxCount: {}, timestamp={}-{}", key, 
maxCount, beginTime, endTime);
+                    log.error("IndexStoreFile#queryAsyncFromSegmentFile, slot 
buffer too small, key={}, maxCount={}, timeRange={}-{}",
+                        key, maxCount, beginTime, endTime);
                     return CompletableFuture.completedFuture(null);
                 }
                 int indexPosition = slotBuffer.getInt();
@@ -396,8 +395,8 @@ public class IndexStoreFile implements IndexFile {
                 }
 
                 if (itemBuffer.remaining() % COMPACT_INDEX_ITEM_SIZE != 0) {
-                    log.error("IndexStoreFile query from tiered storage return 
error item buffer, " +
-                        "key: {}, maxCount: {}, timestamp={}-{}", key, 
maxCount, beginTime, endTime);
+                    log.error("IndexStoreFile#queryAsyncFromSegmentFile, item 
buffer size mismatch, key={}, maxCount={}, timeRange={}-{}",
+                        key, maxCount, beginTime, endTime);
                     return result;
                 }
 
@@ -419,8 +418,7 @@ public class IndexStoreFile implements IndexFile {
         return future.whenComplete((result, throwable) -> {
             long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
             if (throwable != null) {
-                log.error("IndexStoreFile query from segment file, cost: {}ms, 
timestamp: {}, " +
-                        "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
+                log.error("IndexStoreFile#queryAsyncFromSegmentFile, query 
from segment file error, cost={}ms, timestamp={}, key={}, hashCode={}, 
maxCount={}, timeRange={}-{}",
                     costTime, getTimestamp(), key, hashCode, maxCount, 
beginTime, endTime, throwable);
             } else {
                 String details = Optional.ofNullable(result)
@@ -429,8 +427,7 @@ public class IndexStoreFile implements IndexFile {
                         .collect(Collectors.joining(", ")))
                     .orElse("");
 
-                log.debug("IndexStoreFile query from segment file, cost: {}ms, 
timestamp: {}, result size: {}, ({}), " +
-                        "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
+                log.debug("IndexStoreFile#queryAsyncFromSegmentFile, query 
from segment file, cost={}ms, timestamp={}, resultSize={}, ({}), key={}, 
hashCode={}, maxCount={}, timeRange={}-{}",
                     costTime, getTimestamp(), result != null ? result.size() : 
0, details, key, hashCode, maxCount, beginTime, endTime);
             }
         });
@@ -442,12 +439,12 @@ public class IndexStoreFile implements IndexFile {
         ByteBuffer buffer;
         try {
             buffer = compactToNewFile();
-            log.debug("IndexStoreFile do compaction, timestamp: {}, file size: 
{}, cost: {}ms",
+            log.debug("IndexStoreFile#doCompaction, compaction done, 
timestamp={}, fileSize={}, cost={}ms",
                 this.getTimestamp(), buffer.capacity(), 
stopwatch.elapsed(TimeUnit.MICROSECONDS));
         } catch (FileNotFoundException e) {
             throw new RuntimeException(e);
         } catch (Throwable e) {
-            log.error("IndexStoreFile do compaction, timestamp: {}, cost: 
{}ms",
+            log.error("IndexStoreFile#doCompaction, compaction failed, 
timestamp={}, cost={}ms",
                 this.getTimestamp(), stopwatch.elapsed(TimeUnit.MICROSECONDS), 
e);
             return null;
         }
@@ -457,7 +454,7 @@ public class IndexStoreFile implements IndexFile {
             fileReadWriteLock.writeLock().lock();
             fileStatus.set(IndexStatusEnum.SEALED);
         } catch (Exception e) {
-            log.error("IndexStoreFile change file status to sealed error, 
timestamp={}", this.getTimestamp());
+            log.error("IndexStoreFile#doCompaction, change file status to 
sealed error, timestamp={}", this.getTimestamp());
         } finally {
             fileReadWriteLock.writeLock().unlock();
         }
@@ -554,7 +551,7 @@ public class IndexStoreFile implements IndexFile {
                 this.compactMappedFile.cleanResources();
             }
         } catch (Throwable e) {
-            log.error("IndexStoreFile shutdown failed, timestamp: {}, status: 
{}", this.getTimestamp(), fileStatus.get(), e);
+            log.error("IndexStoreFile#shutdown, shutdown failed, timestamp={}, 
status={}", this.getTimestamp(), fileStatus.get(), e);
         } finally {
             fileReadWriteLock.writeLock().unlock();
         }
@@ -575,13 +572,13 @@ public class IndexStoreFile implements IndexFile {
                     if (this.compactMappedFile != null) {
                         
this.compactMappedFile.destroy(TimeUnit.SECONDS.toMillis(10));
                     }
-                    log.debug("IndexStoreService destroy local file, 
timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get());
+                    log.debug("IndexStoreFile#destroy, destroy local file, 
timestamp={}, status={}", this.getTimestamp(), fileStatus.get());
                     break;
                 case UPLOAD:
-                    log.warn("[BUG] IndexStoreService destroy remote file, 
timestamp: {}", this.getTimestamp());
+                    log.warn("IndexStoreFile#destroy, unexpected destroy for 
upload status, timestamp={}", this.getTimestamp());
             }
         } catch (Exception e) {
-            log.error("IndexStoreService destroy failed, timestamp: {}, 
status: {}", this.getTimestamp(), fileStatus.get(), e);
+            log.error("IndexStoreFile#destroy, destroy failed, timestamp={}, 
status={}", this.getTimestamp(), fileStatus.get(), e);
         } finally {
             fileReadWriteLock.writeLock().unlock();
         }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 5a55adcb27..0a768e3d1d 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -105,7 +105,7 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
                 mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
             }
         } catch (Exception e) {
-            log.error("IndexStoreService do convert old format error, file: 
{}", filePath, e);
+            log.error("IndexStoreService#doConvertOldFormatFile, convert old 
format error, file={}", filePath, e);
         }
     }
 
@@ -134,9 +134,9 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
                 try {
                     IndexFile indexFile = new IndexStoreFile(storeConfig, 
Long.parseLong(file.getName()));
                     timeStoreTable.put(indexFile.getTimestamp(), indexFile);
-                    log.info("IndexStoreService recover load local file, 
timestamp: {}", indexFile.getTimestamp());
+                    log.info("IndexStoreService#recover, load local file, 
timestamp={}", indexFile.getTimestamp());
                 } catch (Exception e) {
-                    log.error("IndexStoreService recover, load local file 
error", e);
+                    log.error("IndexStoreService#recover, load local file 
error, destroying={}", file.getName(), e);
                 }
             }
         }
@@ -162,11 +162,11 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
                 localFile.destroy();
             }
             timeStoreTable.put(indexFile.getTimestamp(), indexFile);
-            log.info("IndexStoreService recover load remote file, timestamp: 
{}, end timestamp: {}",
+            log.info("IndexStoreService#recover, load remote file, 
timestamp={}, endTimestamp={}",
                 indexFile.getTimestamp(), indexFile.getEndTimestamp());
         }
 
-        log.info("IndexStoreService recover finished, total: {}, cost: {}ms, 
directory: {}",
+        log.info("IndexStoreService#recover, recover finished, total={}, 
cost={}ms, directory={}",
             timeStoreTable.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS), 
dir.getAbsolutePath());
     }
 
@@ -186,9 +186,9 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
             IndexStoreFile newStoreFile = new IndexStoreFile(storeConfig, 
timestamp);
             this.timeStoreTable.put(timestamp, newStoreFile);
             this.currentWriteFile = newStoreFile;
-            log.info("IndexStoreService construct next file, timestamp: {}", 
timestamp);
+            log.info("IndexStoreService#createNewIndexFile, construct next 
file, timestamp={}", timestamp);
         } catch (Exception e) {
-            log.error("IndexStoreService construct next file, timestamp: {}", 
timestamp, e);
+            log.error("IndexStoreService#createNewIndexFile, construct next 
file error, timestamp={}", timestamp, e);
         } finally {
             this.readWriteLock.writeLock().unlock();
         }
@@ -223,8 +223,8 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
             }
         }
 
-        log.error("IndexStoreService put key three times return error, topic: 
{}, topicId: {}, " +
-            "queueId: {}, keySize: {}, timestamp: {}", topic, topicId, 
queueId, keySet.size(), timestamp);
+        log.error("IndexStoreService#putKey, put key three times return error, 
topic={}, topicId={}, queueId={}, keySize={}, timestamp={}",
+            topic, topicId, queueId, keySet.size(), timestamp);
         return AppendResult.SUCCESS;
     }
 
@@ -297,7 +297,7 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
                 }
             }
         } catch (Exception e) {
-            log.error("IndexStoreService force upload error", e);
+            log.error("IndexStoreService#forceUpload, force upload error", e);
             throw new RuntimeException(e);
         } finally {
             readWriteLock.writeLock().unlock();
@@ -306,7 +306,7 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
 
     public boolean doCompactThenUploadFile(IndexFile indexFile) {
         if 
(IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) {
-            log.error("IndexStoreService file status not correct, so skip, 
timestamp: {}, status: {}",
+            log.warn("IndexStoreService#doCompactThenUploadFile, file status 
not correct, skip, timestamp={}, status={}",
                 indexFile.getTimestamp(), indexFile.getFileStatus());
             indexFile.destroy();
             return true;
@@ -316,7 +316,7 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
         if (flatAppendFile.getCommitOffset() == 
flatAppendFile.getAppendOffset()) {
             ByteBuffer byteBuffer = indexFile.doCompaction();
             if (byteBuffer == null) {
-                log.error("IndexStoreService found compaction buffer is null, 
timestamp: {}", indexFile.getTimestamp());
+                log.warn("IndexStoreService#doCompactThenUploadFile, 
compaction buffer is null, timestamp={}", indexFile.getTimestamp());
                 return false;
             }
             flatAppendFile.rollingNewFile(Math.max(0L, 
flatAppendFile.getAppendOffset()));
@@ -329,10 +329,10 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
         List<FileSegment> fileSegmentList = 
flatAppendFile.getFileSegmentList();
         FileSegment fileSegment = fileSegmentList.get(fileSegmentList.size() - 
1);
         if (!result || fileSegment == null || fileSegment.getMinTimestamp() != 
indexFile.getTimestamp()) {
-            log.warn("IndexStoreService upload compacted file error, 
timestamp: {}", indexFile.getTimestamp());
+            log.warn("IndexStoreService#doCompactThenUploadFile, upload 
compacted file error, timestamp={}", indexFile.getTimestamp());
             return false;
         } else {
-            log.info("IndexStoreService upload compacted file success, 
timestamp: {}", indexFile.getTimestamp());
+            log.info("IndexStoreService#doCompactThenUploadFile, upload 
compacted file success, timestamp={}", indexFile.getTimestamp());
         }
 
         readWriteLock.writeLock().lock();
@@ -341,7 +341,7 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
             timeStoreTable.put(storeFile.getTimestamp(), storeFile);
             indexFile.destroy();
         } catch (Exception e) {
-            log.error("IndexStoreService rolling file error, timestamp: {}, 
cost: {}ms",
+            log.error("IndexStoreService#doCompactThenUploadFile, rolling file 
error, timestamp={}, cost={}ms",
                 indexFile.getTimestamp(), 
stopwatch.elapsed(TimeUnit.MILLISECONDS), e);
         } finally {
             readWriteLock.writeLock().unlock();
@@ -361,7 +361,7 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
             int tableSize = (int) timeStoreTable.entrySet().stream()
                 .filter(entry -> 
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()))
                 .count();
-            log.debug("IndexStoreService delete file, timestamp={}, remote={}, 
table={}, all={}",
+            log.debug("IndexStoreService#destroyExpiredFile, delete file, 
timestamp={}, remote={}, table={}, all={}",
                 expireTimestamp, flatAppendFile.getFileSegmentList().size(), 
tableSize, timeStoreTable.size());
         } finally {
             readWriteLock.writeLock().unlock();
@@ -384,7 +384,7 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
                 flatAppendFile.destroy();
             }
         } catch (Exception e) {
-            log.error("IndexStoreService destroy all file error", e);
+            log.error("IndexStoreService#destroy, destroy all file error", e);
         } finally {
             readWriteLock.writeLock().unlock();
         }
@@ -397,7 +397,7 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
 
     public void setCompactTimestamp(long timestamp) {
         this.compactTimestamp.set(timestamp);
-        log.debug("IndexStoreService set compact timestamp to: {}", timestamp);
+        log.debug("IndexStoreService#setCompactTimestamp, timestamp={}", 
timestamp);
     }
 
     protected IndexFile getNextSealedFile() {
@@ -442,7 +442,7 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
                     }
                 }
             } catch (Throwable e) {
-                log.error("IndexStoreService running error", e);
+                log.error("IndexStoreService#run, dispatch loop error", e);
             }
             this.waitForRunning(TimeUnit.SECONDS.toMillis(10));
         }
@@ -452,13 +452,13 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
                 this.forceUpload();
             }
         } catch (Exception e) {
-            log.error("IndexStoreService shutdown error", e);
+            log.error("IndexStoreService#shutdown, shutdown error", e);
         } finally {
             this.timeStoreTable.forEach((timestamp, file) -> file.shutdown());
             this.timeStoreTable.clear();
             readWriteLock.writeLock().unlock();
         }
 
-        log.info(this.getServiceName() + " service shutdown");
+        log.info("{} service shutdown", this.getServiceName());
     }
 }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
index 240b20533a..0bedf64452 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
@@ -228,7 +228,7 @@ public abstract class FileSegment implements 
Comparable<FileSegment>, FileSegmen
         if (fileSegmentInputStream != null) {
             long fileSize = this.getSize();
             if (fileSize == GET_FILE_SIZE_ERROR) {
-                log.error("FileSegment correct position error, fileName={}, 
commit={}, append={}, buffer={}",
+                log.error("FileSegment#commitAsync, correct position error, 
fileName={}, commit={}, append={}, buffer={}",
                     this.getPath(), commitPosition, appendPosition, 
fileSegmentInputStream.getContentLength());
                 releaseCommitLock();
                 return CompletableFuture.completedFuture(false);
@@ -272,7 +272,7 @@ public abstract class FileSegment implements 
Comparable<FileSegment>, FileSegmen
 
     private boolean handleCommitException(Throwable e) {
 
-        log.warn("FileSegment commit exception, filePath={}", this.filePath, 
e);
+        log.warn("FileSegment#handleCommitException, commit exception, 
filePath={}", this.filePath, e);
 
         // Get root cause here
         Throwable rootCause = e.getCause() != null ? e.getCause() : e;
@@ -282,7 +282,7 @@ public abstract class FileSegment implements 
Comparable<FileSegment>, FileSegmen
 
         long expectPosition = commitPosition + 
fileSegmentInputStream.getContentLength();
         if (fileSize == GET_FILE_SIZE_ERROR) {
-            log.error("Get file size error after commit, FileName: {}, Commit: 
{}, Content: {}, Expect: {}, Append: {}",
+            log.error("FileSegment#handleCommitException, get file size error 
after commit, fileName={}, commit={}, content={}, expect={}, append={}",
                 this.getPath(), commitPosition, 
fileSegmentInputStream.getContentLength(), expectPosition, appendPosition);
             return false;
         }
@@ -342,10 +342,10 @@ public abstract class FileSegment implements 
Comparable<FileSegment>, FileSegmen
 
         int readableBytes = (int) (commitPosition - position);
         if (readableBytes < length) {
-            length = readableBytes;
-            log.debug("FileSegment expect request position is greater than 
commit position, " +
-                    "file: {}, request position: {}, commit position: {}, 
change length from {} to {}",
+            log.debug("FileSegment#readAsync, request position exceeds commit 
position, " +
+                    "file={}, requestPosition={}, commitPosition={}, 
changeLength={} to {}",
                 getPath(), position, commitPosition, length, readableBytes);
+            length = readableBytes;
         }
         return this.read0(position, length);
     }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/MemoryFileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/MemoryFileSegment.java
index 93ad74541b..1ad8b77c45 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/MemoryFileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/MemoryFileSegment.java
@@ -92,7 +92,7 @@ public class MemoryFileSegment extends FileSegment {
 
         try {
             if (blocker != null && !blocker.get()) {
-                log.info("Commit Blocker Exception for Memory Test");
+                log.debug("MemoryFileSegment#commit0, commit blocker exception 
for memory test");
                 return CompletableFuture.completedFuture(false);
             }
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
index 3302d0457d..719cd90f8d 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
@@ -73,7 +73,7 @@ public class PosixFileSegment extends FileSegment {
         String clusterBasePath = String.format("%s_%s", 
MessageStoreUtil.getHash(clusterName), clusterName);
         fullPath = Paths.get(basePath, clusterBasePath, filePath,
             fileType.toString(), 
MessageStoreUtil.offset2FileName(baseOffset)).toString();
-        log.info("Constructing Posix FileSegment, filePath: {}", fullPath);
+        log.info("PosixFileSegment#init, constructing, filePath={}", fullPath);
 
         this.createFile();
     }
@@ -123,14 +123,14 @@ public class PosixFileSegment extends FileSegment {
             }
             if (!file.exists()) {
                 if (file.createNewFile()) {
-                    log.debug("Create Posix FileSegment, filePath: {}", 
fullPath);
+                    log.debug("PosixFileSegment#createFile0, create file, 
filePath={}", fullPath);
                 }
             }
             this.readFileChannel = new RandomAccessFile(file, 
"r").getChannel();
             this.writeFileChannel = new RandomAccessFile(file, 
"rwd").getChannel();
             this.file = file;
         } catch (Exception e) {
-            log.error("PosixFileSegment#createFile: create file {} failed: ", 
filePath, e);
+            log.error("PosixFileSegment#createFile0, create file failed, 
filePath={}", filePath, e);
         }
     }
 
@@ -140,9 +140,9 @@ public class PosixFileSegment extends FileSegment {
         this.close();
         if (file != null && file.exists()) {
             if (file.delete()) {
-                log.info("Destroy Posix FileSegment, filePath: {}", fullPath);
+                log.info("PosixFileSegment#destroyFile, destroy file, 
filePath={}", fullPath);
             } else {
-                log.warn("Destroy Posix FileSegment error, filePath: {}", 
fullPath);
+                log.warn("PosixFileSegment#destroyFile, destroy file error, 
filePath={}", fullPath);
             }
         }
     }
@@ -160,7 +160,7 @@ public class PosixFileSegment extends FileSegment {
                 writeFileChannel = null;
             }
         } catch (IOException e) {
-            log.error("Destroy Posix FileSegment failed, filePath: {}", 
fullPath, e);
+            log.error("PosixFileSegment#close, close failed, filePath={}", 
fullPath, e);
         }
     }
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageFormatUtil.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageFormatUtil.java
index 560234d050..c8d879ce8b 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageFormatUtil.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageFormatUtil.java
@@ -104,7 +104,9 @@ public class MessageFormatUtil {
     public static List<SelectBufferResult> splitMessageBuffer(ByteBuffer 
cqBuffer, ByteBuffer msgBuffer) {
 
         if (cqBuffer == null || msgBuffer == null) {
-            log.error("MessageFormatUtil split buffer error, cq buffer or msg 
buffer is null");
+            log.error("MessageFormatUtil#splitMessageBuffer, cqBuffer={}, 
msgBuffer={}",
+                cqBuffer == null ? "null" : "size=" + cqBuffer.remaining(),
+                msgBuffer == null ? "null" : "size=" + msgBuffer.remaining());
             return new ArrayList<>();
         }
 
@@ -115,12 +117,13 @@ public class MessageFormatUtil {
             cqBuffer.remaining() / CONSUME_QUEUE_UNIT_SIZE);
 
         if (msgBuffer.remaining() == 0) {
-            log.error("MessageFormatUtil split buffer error, msg buffer length 
is 0");
+            log.error("MessageFormatUtil#splitMessageBuffer, msgBuffer is 
empty, cqBufferSize={}", cqBuffer.remaining());
             return bufferResultList;
         }
 
         if (cqBuffer.remaining() == 0 || cqBuffer.remaining() % 
CONSUME_QUEUE_UNIT_SIZE != 0) {
-            log.error("MessageFormatUtil split buffer error, cq buffer size is 
{}", cqBuffer.remaining());
+            log.error("MessageFormatUtil#splitMessageBuffer, cqBuffer size 
invalid, cqBufferSize={}, msgBufferSize={}",
+                cqBuffer.remaining(), msgBuffer.remaining());
             return bufferResultList;
         }
 
@@ -137,8 +140,10 @@ public class MessageFormatUtil {
 
                 int offset = (int) (logOffset - firstCommitLogOffset);
                 if (offset + bufferSize > msgBuffer.limit()) {
-                    log.error("MessageFormatUtil split buffer error, message 
buffer offset exceeded limit. " +
-                        "Expect length: {}, Actual length: {}", offset + 
bufferSize, msgBuffer.limit());
+                    log.error("MessageFormatUtil#splitMessageBuffer, message 
buffer offset exceeded limit, " +
+                            "index={}, total={}, firstCommitLogOffset={}, 
logOffset={}, bufferSize={}, msgBufferLimit={}",
+                        position / CONSUME_QUEUE_UNIT_SIZE, 
bufferResultList.size(), firstCommitLogOffset,
+                        logOffset, bufferSize, msgBuffer.limit());
                     break;
                 }
 
@@ -151,14 +156,16 @@ public class MessageFormatUtil {
                 }
                 if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE &&
                     magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
-                    log.error("MessageFormatUtil split buffer error, found 
unknown magic code. " +
-                        "Message offset: {}, wrong magic code: {}", offset, 
magicCode);
+                    log.error("MessageFormatUtil#splitMessageBuffer, unknown 
magic code, " +
+                            "index={}, messageOffset={}, logOffset={}, 
magicCode={}",
+                        position / CONSUME_QUEUE_UNIT_SIZE, offset, logOffset, 
magicCode);
                     continue;
                 }
 
                 if (bufferSize != getTotalSize(msgBuffer)) {
-                    log.error("MessageFormatUtil split buffer error, message 
length not match. " +
-                        "CommitLog length: {}, buffer length: {}", 
getTotalSize(msgBuffer), bufferSize);
+                    log.error("MessageFormatUtil#splitMessageBuffer, message 
length not match, " +
+                            "index={}, messageOffset={}, logOffset={}, 
commitLogLength={}, cqRecordedLength={}",
+                        position / CONSUME_QUEUE_UNIT_SIZE, offset, logOffset, 
getTotalSize(msgBuffer), bufferSize);
                     continue;
                 }
 


Reply via email to