This is an automated email from the ASF dual-hosted git repository.
lollipopjin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 78e680b811 [ISSUE #10462] Standardize log format across tiered storage
module (#10474)
78e680b811 is described below
commit 78e680b811f15e7bbb5d5ef40b44ba1b33cf76e0
Author: lizhimins <[email protected]>
AuthorDate: Thu Jun 11 19:23:52 2026 +0800
[ISSUE #10462] Standardize log format across tiered storage module (#10474)
---
.../rocketmq/tieredstore/TieredMessageStore.java | 20 +++++------
.../core/MessageStoreDispatcherImpl.java | 6 ++--
.../rocketmq/tieredstore/file/FlatAppendFile.java | 6 ++--
.../rocketmq/tieredstore/file/FlatFileStore.java | 10 +++---
.../rocketmq/tieredstore/file/FlatMessageFile.java | 10 +++---
.../rocketmq/tieredstore/index/IndexStoreFile.java | 41 ++++++++++-----------
.../tieredstore/index/IndexStoreService.java | 42 +++++++++++-----------
.../rocketmq/tieredstore/provider/FileSegment.java | 12 +++----
.../tieredstore/provider/MemoryFileSegment.java | 2 +-
.../tieredstore/provider/PosixFileSegment.java | 12 +++----
.../tieredstore/util/MessageFormatUtil.java | 25 ++++++++-----
11 files changed, 94 insertions(+), 92 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 38946fd161..ed67badf56 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -247,8 +247,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
if (next.checkInStoreByConsumeOffset(topic, queueId,
offset)) {
TieredStoreMetricsManager.fallbackTotal.add(1,
latencyAttributes);
- log.debug("GetMessageAsync not found, then back to
next store, result: {}, " +
- "topic: {}, queue: {}, queue offset: {},
offset range: {}-{}",
+ log.debug("TieredMessageStore#getMessageAsync, not
found, fall back to next store, result={}, topic={}, queue={}, queueOffset={},
offsetRange={}-{}",
result.getStatus(), topic, queueId, offset,
result.getMinOffset(), result.getMaxOffset());
return next.getMessage(group, topic, queueId, offset,
maxMsgNums, messageFilter);
}
@@ -260,8 +259,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
result.getStatus() != GetMessageStatus.OFFSET_TOO_SMALL &&
result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_ONE
&&
result.getStatus() !=
GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
- log.warn("GetMessageAsync not found and message is not in
next store, result: {}, " +
- "topic: {}, queue: {}, queue offset: {}, offset
range: {}-{}",
+ log.warn("TieredMessageStore#getMessageAsync, not found
and not in next store, result={}, topic={}, queue={}, queueOffset={},
offsetRange={}-{}",
result.getStatus(), topic, queueId, offset,
result.getMinOffset(), result.getMaxOffset());
}
@@ -288,8 +286,8 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
// otherwise it will cause repeated consumption after next
start offset over commit offset.
if (storeConfig.isRecordGetMessageResult()) {
- log.info("GetMessageAsync result, {}, group: {}, topic:
{}, queueId: {}, offset: {}, count:{}",
- result, group, topic, queueId, offset, maxMsgNums);
+ log.info("TieredMessageStore#getMessageAsync, result={},
group={}, topic={}, queueId={}, offset={}, count={}",
+ result.getStatus(), group, topic, queueId, offset,
maxMsgNums);
}
return result;
@@ -419,7 +417,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
int maxNum, long begin, long end) {
long earliestTimeInNextStore = next.getEarliestMessageTime();
if (earliestTimeInNextStore <= 0) {
- log.warn("TieredMessageStore#queryMessageAsync: get earliest
message time in next store failed: {}", earliestTimeInNextStore);
+ log.warn("TieredMessageStore#queryMessageAsync, get earliest
message time in next store failed, earliestTime={}", earliestTimeInNextStore);
}
boolean isForce = storeConfig.getTieredStorageLevel() ==
MessageStoreConfig.TieredStorageLevel.FORCE;
QueryMessageResult result = end < earliestTimeInNextStore || isForce ?
@@ -442,7 +440,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
return result;
});
} catch (Exception e) {
- log.error("TieredMessageStore#queryMessageAsync: query message
in tiered store failed", e);
+ log.error("TieredMessageStore#queryMessageAsync, query message
in tiered store failed, topic={}, key={}", topic, key, e);
return CompletableFuture.completedFuture(result);
}
}
@@ -453,7 +451,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
public CompletableFuture<QueryMessageResult> queryMessageAsync(String
topic, String key, int maxNum, long begin, long end, String indexType, String
lastKey) {
long earliestTimeInNextStore = next.getEarliestMessageTime();
if (earliestTimeInNextStore <= 0) {
- log.warn("TieredMessageStore queryMessageAsync: get earliest
message time in next store failed: {}", earliestTimeInNextStore);
+ log.warn("TieredMessageStore#queryMessageAsync, get earliest
message time in next store failed, earliestTime={}", earliestTimeInNextStore);
}
boolean isForce = storeConfig.getTieredStorageLevel() ==
MessageStoreConfig.TieredStorageLevel.FORCE;
QueryMessageResult result = end < earliestTimeInNextStore || isForce ?
new QueryMessageResult() : next.queryMessage(topic, key, maxNum, begin, end,
indexType, lastKey);
@@ -474,7 +472,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
return result;
});
} catch (Exception e) {
- log.error("TieredMessageStore#queryMessageAsync: query message
in tiered store failed", e);
+ log.error("TieredMessageStore#queryMessageAsync, query message
in tiered store failed, topic={}, key={}, indexType={}", topic, key, indexType,
e);
return CompletableFuture.completedFuture(result);
}
}
@@ -520,7 +518,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
flatFileStore.destroyFile(queueMetadata.getQueue());
});
metadataStore.deleteTopic(topic);
- log.info("MessageStore delete topic success, topicName={}", topic);
+ log.info("TieredMessageStore#deleteTopics, delete topic success,
topic={}", topic);
}
return next.deleteTopics(deleteTopics);
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
index 52e1c82a1d..2a0dfed7a7 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
@@ -176,7 +176,7 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
if (commitOffset < currentOffset) {
this.commitAsync(flatFile).whenComplete((result, throwable) ->
{
if (throwable != null) {
- log.error("MessageDispatcher#flatFile commitOffset
less than currentOffset, commitAsync again failed. topic: {}, queueId: {} ",
topic, queueId, throwable);
+ log.error("MessageDispatcher#doScheduleDispatch,
commitOffset less than currentOffset, commitAsync again failed, topic={},
queueId={}", topic, queueId, throwable);
}
});
return CompletableFuture.completedFuture(false);
@@ -316,7 +316,7 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
//next commit async,execute constructIndexFile.
GroupCommitContext oldCommit =
failedGroupCommitMap.put(flatFile, groupCommitContext);
if (oldCommit != null) {
- log.warn("MessageDispatcher#commitAsync
failed,flatFile old failed commit context not release, topic={}, queueId={} ",
topic, queueId);
+
log.warn("MessageDispatcher#doScheduleDispatch, commitAsync failed, old failed
commit context not released, topic={}, queueId={}", topic, queueId);
oldCommit.release();
}
}
@@ -347,7 +347,7 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
groupCommitContext.getDispatchRequests().forEach(request
-> constructIndexFile0(topicId, request));
}
catch (Throwable e) {
- log.error("constructIndexFile error {}", topicId, e);
+ log.error("MessageDispatcher#constructIndexFile, construct
index file error, topicId={}", topicId, e);
}
}
groupCommitContext.release();
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
index a7586e0b9f..8188febcc4 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
@@ -108,7 +108,7 @@ public class FlatAppendFile {
if (fileSegment.getCommitPosition() != fileSize) {
fileSegment.initPosition(fileSize);
flushFileSegmentMeta(fileSegment);
- log.warn("FlatAppendFile last file size not correct, filePath:
{}", this.filePath);
+ log.warn("FlatAppendFile#recoverFileSize, last file size not
correct, filePath={}", this.filePath);
}
}
@@ -202,7 +202,7 @@ public class FlatAppendFile {
result = fileSegment.append(buffer, timestamp);
if (result == AppendResult.FILE_FULL) {
boolean commitResult = fileSegment.commitAsync().join();
- log.info("FlatAppendFile#append not successful for the file {}
is full, commit result={}",
+ log.debug("FlatAppendFile#append, file is full, filePath={},
commitResult={}",
fileSegment.getPath(), commitResult);
if (commitResult) {
this.flushFileSegmentMeta(fileSegment);
@@ -279,7 +279,7 @@ public class FlatAppendFile {
if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE &&
fileSegment.getMaxTimestamp() >= expireTimestamp) {
- log.debug("FileSegment has not expired, filePath={},
fileType={}, " +
+ log.debug("FlatAppendFile#destroyExpiredFile, file not
expired, filePath={}, fileType={}, " +
"offset={}, expireTimestamp={}, maxTimestamp={}",
filePath, fileType,
fileSegment.getBaseOffset(), expireTimestamp,
fileSegment.getMaxTimestamp());
break;
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
index 700f12d0d6..8e33e43c2d 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
@@ -59,12 +59,12 @@ public class FlatFileStore {
try {
this.flatFileConcurrentMap.clear();
this.recover();
- log.info("FlatFileStore recover finished, total cost={}ms",
stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ log.info("FlatFileStore#load, recover finished, totalCost={}ms",
stopwatch.elapsed(TimeUnit.MILLISECONDS));
} catch (Exception e) {
long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
- log.info("FlatFileStore recover error, total cost={}ms", costTime);
+ log.error("FlatFileStore#load, recover error, totalCost={}ms",
costTime, e);
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME)
- .error("FlatFileStore recover error, total cost={}ms",
costTime, e);
+ .error("FlatFileStore#load, recover error, totalCost={}ms",
costTime, e);
return false;
}
return true;
@@ -78,7 +78,7 @@ public class FlatFileStore {
futures.add(this.recoverAsync(topicMetadata)
.whenComplete((unused, throwable) -> {
if (throwable != null) {
- log.error("FlatFileStore recover file error,
topic={}", topicMetadata.getTopic(), throwable);
+ log.error("FlatFileStore#recoverAsync, recover file
error, topic={}", topicMetadata.getTopic(), throwable);
}
semaphore.release();
}));
@@ -115,7 +115,7 @@ public class FlatFileStore {
flatFile.destroyExpiredFile(System.currentTimeMillis() -
TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours()));
} catch (Exception e) {
- log.error("FlatFileStore delete expire file error", e);
+ log.error("FlatFileStore#scheduleDeleteExpireFile, delete
expire file error", e);
} finally {
flatFile.getFileLock().unlock();
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index 89d6a00abd..1427c6cef9 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -287,16 +287,16 @@ public class FlatMessageFile implements FlatFileInterface
{
ByteBuffer buffer = getMessageAsync(cqMax).join();
long storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
if (storeTime < timestamp) {
- log.info("FlatMessageFile getQueueOffsetByTimeAsync, exceeded
maximum time, " +
- "filePath={}, timestamp={}, result={}", filePath, timestamp,
cqMax + 1);
+ log.debug("FlatMessageFile#getQueueOffsetByTimeAsync, exceeded
maximum time, filePath={}, timestamp={}, result={}",
+ filePath, timestamp, cqMax + 1);
return CompletableFuture.completedFuture(cqMax + 1);
}
buffer = getMessageAsync(cqMin).join();
storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
if (storeTime > timestamp) {
- log.info("FlatMessageFile getQueueOffsetByTimeAsync, less than
minimum time, " +
- "filePath={}, timestamp={}, result={}", filePath, timestamp,
cqMin);
+ log.debug("FlatMessageFile#getQueueOffsetByTimeAsync, less than
minimum time, filePath={}, timestamp={}, result={}",
+ filePath, timestamp, cqMin);
return CompletableFuture.completedFuture(cqMin);
}
@@ -351,7 +351,7 @@ public class FlatMessageFile implements FlatFileInterface {
}
}
- log.info("FlatMessageFile getQueueOffsetByTimeAsync, filePath={},
timestamp={}, result={}, log={}",
+ log.debug("FlatMessageFile#getQueueOffsetByTimeAsync, filePath={},
timestamp={}, result={}, log={}",
filePath, timestamp, offset, JSON.toJSONString(queryLog));
return CompletableFuture.completedFuture(offset);
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
index ed624ae620..8fd4b2961b 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -275,13 +275,13 @@ public class IndexStoreFile implements IndexFile {
}
this.flushNewMetadata(byteBuffer, indexItemMaxCount ==
this.indexItemCount.get() + 1);
- log.trace("IndexStoreFile put key, timestamp: {}, topic: {},
key: {}, slot: {}, item: {}, previous item: {}, content: {}",
+ log.trace("IndexStoreFile#putKey, put key, timestamp={},
topic={}, key={}, slot={}, item={}, previousItem={}, content={}",
this.getTimestamp(), topic, key, hashCode %
this.hashSlotMaxCount, itemIndex, slotOldValue, indexItem);
}
return AppendResult.SUCCESS;
} catch (Throwable e) {
- log.error("IndexStoreFile put key error, topic: {}, topicId: {},
queueId: {}, keySet: {}, offset: {}, " +
- "size: {}, timestamp: {}", topic, topicId, queueId, keySet,
offset, size, timestamp, e);
+ log.error("IndexStoreFile#putKey, put key error, topic={},
topicId={}, queueId={}, keySet={}, offset={}, size={}, timestamp={}",
+ topic, topicId, queueId, keySet, offset, size, timestamp, e);
} finally {
fileReadWriteLock.writeLock().unlock();
}
@@ -347,12 +347,11 @@ public class IndexStoreFile implements IndexFile {
left--;
}
- log.debug("IndexStoreFile query from unsealed mapped file,
timestamp: {}, result size: {}, " +
- "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
+ log.debug("IndexStoreFile#queryAsyncFromUnsealedFile, query from
unsealed mapped file, timestamp={}, resultSize={}, key={}, hashCode={},
maxCount={}, timeRange={}-{}",
getTimestamp(), result.size(), key, hashCode, maxCount,
beginTime, endTime);
} catch (Exception e) {
- log.error("IndexStoreFile query from unsealed mapped file error,
timestamp: {}, " +
- "key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key,
maxCount, beginTime, endTime, e);
+ log.error("IndexStoreFile#queryAsyncFromUnsealedFile, query from
unsealed mapped file error, timestamp={}, key={}, maxCount={}, timeRange={}-{}",
+ getTimestamp(), key, maxCount, beginTime, endTime, e);
} finally {
fileReadWriteLock.readLock().unlock();
if (held) {
@@ -378,8 +377,8 @@ public class IndexStoreFile implements IndexFile {
CompletableFuture<List<IndexItem>> future =
this.fileSegment.readAsync(slotPosition, HASH_SLOT_SIZE)
.thenCompose(slotBuffer -> {
if (slotBuffer.remaining() < HASH_SLOT_SIZE) {
- log.error("IndexStoreFile query from tiered storage return
error slot buffer, " +
- "key: {}, maxCount: {}, timestamp={}-{}", key,
maxCount, beginTime, endTime);
+ log.error("IndexStoreFile#queryAsyncFromSegmentFile, slot
buffer too small, key={}, maxCount={}, timeRange={}-{}",
+ key, maxCount, beginTime, endTime);
return CompletableFuture.completedFuture(null);
}
int indexPosition = slotBuffer.getInt();
@@ -396,8 +395,8 @@ public class IndexStoreFile implements IndexFile {
}
if (itemBuffer.remaining() % COMPACT_INDEX_ITEM_SIZE != 0) {
- log.error("IndexStoreFile query from tiered storage return
error item buffer, " +
- "key: {}, maxCount: {}, timestamp={}-{}", key,
maxCount, beginTime, endTime);
+ log.error("IndexStoreFile#queryAsyncFromSegmentFile, item
buffer size mismatch, key={}, maxCount={}, timeRange={}-{}",
+ key, maxCount, beginTime, endTime);
return result;
}
@@ -419,8 +418,7 @@ public class IndexStoreFile implements IndexFile {
return future.whenComplete((result, throwable) -> {
long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
if (throwable != null) {
- log.error("IndexStoreFile query from segment file, cost: {}ms,
timestamp: {}, " +
- "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
+ log.error("IndexStoreFile#queryAsyncFromSegmentFile, query
from segment file error, cost={}ms, timestamp={}, key={}, hashCode={},
maxCount={}, timeRange={}-{}",
costTime, getTimestamp(), key, hashCode, maxCount,
beginTime, endTime, throwable);
} else {
String details = Optional.ofNullable(result)
@@ -429,8 +427,7 @@ public class IndexStoreFile implements IndexFile {
.collect(Collectors.joining(", ")))
.orElse("");
- log.debug("IndexStoreFile query from segment file, cost: {}ms,
timestamp: {}, result size: {}, ({}), " +
- "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
+ log.debug("IndexStoreFile#queryAsyncFromSegmentFile, query
from segment file, cost={}ms, timestamp={}, resultSize={}, ({}), key={},
hashCode={}, maxCount={}, timeRange={}-{}",
costTime, getTimestamp(), result != null ? result.size() :
0, details, key, hashCode, maxCount, beginTime, endTime);
}
});
@@ -442,12 +439,12 @@ public class IndexStoreFile implements IndexFile {
ByteBuffer buffer;
try {
buffer = compactToNewFile();
- log.debug("IndexStoreFile do compaction, timestamp: {}, file size:
{}, cost: {}ms",
+ log.debug("IndexStoreFile#doCompaction, compaction done,
timestamp={}, fileSize={}, cost={}ms",
this.getTimestamp(), buffer.capacity(),
stopwatch.elapsed(TimeUnit.MICROSECONDS));
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
} catch (Throwable e) {
- log.error("IndexStoreFile do compaction, timestamp: {}, cost:
{}ms",
+ log.error("IndexStoreFile#doCompaction, compaction failed,
timestamp={}, cost={}ms",
this.getTimestamp(), stopwatch.elapsed(TimeUnit.MICROSECONDS),
e);
return null;
}
@@ -457,7 +454,7 @@ public class IndexStoreFile implements IndexFile {
fileReadWriteLock.writeLock().lock();
fileStatus.set(IndexStatusEnum.SEALED);
} catch (Exception e) {
- log.error("IndexStoreFile change file status to sealed error,
timestamp={}", this.getTimestamp());
+ log.error("IndexStoreFile#doCompaction, change file status to
sealed error, timestamp={}", this.getTimestamp());
} finally {
fileReadWriteLock.writeLock().unlock();
}
@@ -554,7 +551,7 @@ public class IndexStoreFile implements IndexFile {
this.compactMappedFile.cleanResources();
}
} catch (Throwable e) {
- log.error("IndexStoreFile shutdown failed, timestamp: {}, status:
{}", this.getTimestamp(), fileStatus.get(), e);
+ log.error("IndexStoreFile#shutdown, shutdown failed, timestamp={},
status={}", this.getTimestamp(), fileStatus.get(), e);
} finally {
fileReadWriteLock.writeLock().unlock();
}
@@ -575,13 +572,13 @@ public class IndexStoreFile implements IndexFile {
if (this.compactMappedFile != null) {
this.compactMappedFile.destroy(TimeUnit.SECONDS.toMillis(10));
}
- log.debug("IndexStoreService destroy local file,
timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get());
+ log.debug("IndexStoreFile#destroy, destroy local file,
timestamp={}, status={}", this.getTimestamp(), fileStatus.get());
break;
case UPLOAD:
- log.warn("[BUG] IndexStoreService destroy remote file,
timestamp: {}", this.getTimestamp());
+ log.warn("IndexStoreFile#destroy, unexpected destroy for
upload status, timestamp={}", this.getTimestamp());
}
} catch (Exception e) {
- log.error("IndexStoreService destroy failed, timestamp: {},
status: {}", this.getTimestamp(), fileStatus.get(), e);
+ log.error("IndexStoreFile#destroy, destroy failed, timestamp={},
status={}", this.getTimestamp(), fileStatus.get(), e);
} finally {
fileReadWriteLock.writeLock().unlock();
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 5a55adcb27..0a768e3d1d 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -105,7 +105,7 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
}
} catch (Exception e) {
- log.error("IndexStoreService do convert old format error, file:
{}", filePath, e);
+ log.error("IndexStoreService#doConvertOldFormatFile, convert old
format error, file={}", filePath, e);
}
}
@@ -134,9 +134,9 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
try {
IndexFile indexFile = new IndexStoreFile(storeConfig,
Long.parseLong(file.getName()));
timeStoreTable.put(indexFile.getTimestamp(), indexFile);
- log.info("IndexStoreService recover load local file,
timestamp: {}", indexFile.getTimestamp());
+ log.info("IndexStoreService#recover, load local file,
timestamp={}", indexFile.getTimestamp());
} catch (Exception e) {
- log.error("IndexStoreService recover, load local file
error", e);
+ log.error("IndexStoreService#recover, load local file
error, destroying={}", file.getName(), e);
}
}
}
@@ -162,11 +162,11 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
localFile.destroy();
}
timeStoreTable.put(indexFile.getTimestamp(), indexFile);
- log.info("IndexStoreService recover load remote file, timestamp:
{}, end timestamp: {}",
+ log.info("IndexStoreService#recover, load remote file,
timestamp={}, endTimestamp={}",
indexFile.getTimestamp(), indexFile.getEndTimestamp());
}
- log.info("IndexStoreService recover finished, total: {}, cost: {}ms,
directory: {}",
+ log.info("IndexStoreService#recover, recover finished, total={},
cost={}ms, directory={}",
timeStoreTable.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS),
dir.getAbsolutePath());
}
@@ -186,9 +186,9 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
IndexStoreFile newStoreFile = new IndexStoreFile(storeConfig,
timestamp);
this.timeStoreTable.put(timestamp, newStoreFile);
this.currentWriteFile = newStoreFile;
- log.info("IndexStoreService construct next file, timestamp: {}",
timestamp);
+ log.info("IndexStoreService#createNewIndexFile, construct next
file, timestamp={}", timestamp);
} catch (Exception e) {
- log.error("IndexStoreService construct next file, timestamp: {}",
timestamp, e);
+ log.error("IndexStoreService#createNewIndexFile, construct next
file error, timestamp={}", timestamp, e);
} finally {
this.readWriteLock.writeLock().unlock();
}
@@ -223,8 +223,8 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
}
}
- log.error("IndexStoreService put key three times return error, topic:
{}, topicId: {}, " +
- "queueId: {}, keySize: {}, timestamp: {}", topic, topicId,
queueId, keySet.size(), timestamp);
+ log.error("IndexStoreService#putKey, put key three times return error,
topic={}, topicId={}, queueId={}, keySize={}, timestamp={}",
+ topic, topicId, queueId, keySet.size(), timestamp);
return AppendResult.SUCCESS;
}
@@ -297,7 +297,7 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
}
}
} catch (Exception e) {
- log.error("IndexStoreService force upload error", e);
+ log.error("IndexStoreService#forceUpload, force upload error", e);
throw new RuntimeException(e);
} finally {
readWriteLock.writeLock().unlock();
@@ -306,7 +306,7 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
public boolean doCompactThenUploadFile(IndexFile indexFile) {
if
(IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) {
- log.error("IndexStoreService file status not correct, so skip,
timestamp: {}, status: {}",
+ log.warn("IndexStoreService#doCompactThenUploadFile, file status
not correct, skip, timestamp={}, status={}",
indexFile.getTimestamp(), indexFile.getFileStatus());
indexFile.destroy();
return true;
@@ -316,7 +316,7 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
if (flatAppendFile.getCommitOffset() ==
flatAppendFile.getAppendOffset()) {
ByteBuffer byteBuffer = indexFile.doCompaction();
if (byteBuffer == null) {
- log.error("IndexStoreService found compaction buffer is null,
timestamp: {}", indexFile.getTimestamp());
+ log.warn("IndexStoreService#doCompactThenUploadFile,
compaction buffer is null, timestamp={}", indexFile.getTimestamp());
return false;
}
flatAppendFile.rollingNewFile(Math.max(0L,
flatAppendFile.getAppendOffset()));
@@ -329,10 +329,10 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
List<FileSegment> fileSegmentList =
flatAppendFile.getFileSegmentList();
FileSegment fileSegment = fileSegmentList.get(fileSegmentList.size() -
1);
if (!result || fileSegment == null || fileSegment.getMinTimestamp() !=
indexFile.getTimestamp()) {
- log.warn("IndexStoreService upload compacted file error,
timestamp: {}", indexFile.getTimestamp());
+ log.warn("IndexStoreService#doCompactThenUploadFile, upload
compacted file error, timestamp={}", indexFile.getTimestamp());
return false;
} else {
- log.info("IndexStoreService upload compacted file success,
timestamp: {}", indexFile.getTimestamp());
+ log.info("IndexStoreService#doCompactThenUploadFile, upload
compacted file success, timestamp={}", indexFile.getTimestamp());
}
readWriteLock.writeLock().lock();
@@ -341,7 +341,7 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
timeStoreTable.put(storeFile.getTimestamp(), storeFile);
indexFile.destroy();
} catch (Exception e) {
- log.error("IndexStoreService rolling file error, timestamp: {},
cost: {}ms",
+ log.error("IndexStoreService#doCompactThenUploadFile, rolling file
error, timestamp={}, cost={}ms",
indexFile.getTimestamp(),
stopwatch.elapsed(TimeUnit.MILLISECONDS), e);
} finally {
readWriteLock.writeLock().unlock();
@@ -361,7 +361,7 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
int tableSize = (int) timeStoreTable.entrySet().stream()
.filter(entry ->
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()))
.count();
- log.debug("IndexStoreService delete file, timestamp={}, remote={},
table={}, all={}",
+ log.debug("IndexStoreService#destroyExpiredFile, delete file,
timestamp={}, remote={}, table={}, all={}",
expireTimestamp, flatAppendFile.getFileSegmentList().size(),
tableSize, timeStoreTable.size());
} finally {
readWriteLock.writeLock().unlock();
@@ -384,7 +384,7 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
flatAppendFile.destroy();
}
} catch (Exception e) {
- log.error("IndexStoreService destroy all file error", e);
+ log.error("IndexStoreService#destroy, destroy all file error", e);
} finally {
readWriteLock.writeLock().unlock();
}
@@ -397,7 +397,7 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
public void setCompactTimestamp(long timestamp) {
this.compactTimestamp.set(timestamp);
- log.debug("IndexStoreService set compact timestamp to: {}", timestamp);
+ log.debug("IndexStoreService#setCompactTimestamp, timestamp={}",
timestamp);
}
protected IndexFile getNextSealedFile() {
@@ -442,7 +442,7 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
}
}
} catch (Throwable e) {
- log.error("IndexStoreService running error", e);
+ log.error("IndexStoreService#run, dispatch loop error", e);
}
this.waitForRunning(TimeUnit.SECONDS.toMillis(10));
}
@@ -452,13 +452,13 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
this.forceUpload();
}
} catch (Exception e) {
- log.error("IndexStoreService shutdown error", e);
+ log.error("IndexStoreService#shutdown, shutdown error", e);
} finally {
this.timeStoreTable.forEach((timestamp, file) -> file.shutdown());
this.timeStoreTable.clear();
readWriteLock.writeLock().unlock();
}
- log.info(this.getServiceName() + " service shutdown");
+ log.info("{} service shutdown", this.getServiceName());
}
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
index 240b20533a..0bedf64452 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
@@ -228,7 +228,7 @@ public abstract class FileSegment implements
Comparable<FileSegment>, FileSegmen
if (fileSegmentInputStream != null) {
long fileSize = this.getSize();
if (fileSize == GET_FILE_SIZE_ERROR) {
- log.error("FileSegment correct position error, fileName={},
commit={}, append={}, buffer={}",
+ log.error("FileSegment#commitAsync, correct position error,
fileName={}, commit={}, append={}, buffer={}",
this.getPath(), commitPosition, appendPosition,
fileSegmentInputStream.getContentLength());
releaseCommitLock();
return CompletableFuture.completedFuture(false);
@@ -272,7 +272,7 @@ public abstract class FileSegment implements
Comparable<FileSegment>, FileSegmen
private boolean handleCommitException(Throwable e) {
- log.warn("FileSegment commit exception, filePath={}", this.filePath,
e);
+ log.warn("FileSegment#handleCommitException, commit exception,
filePath={}", this.filePath, e);
// Get root cause here
Throwable rootCause = e.getCause() != null ? e.getCause() : e;
@@ -282,7 +282,7 @@ public abstract class FileSegment implements
Comparable<FileSegment>, FileSegmen
long expectPosition = commitPosition +
fileSegmentInputStream.getContentLength();
if (fileSize == GET_FILE_SIZE_ERROR) {
- log.error("Get file size error after commit, FileName: {}, Commit:
{}, Content: {}, Expect: {}, Append: {}",
+ log.error("FileSegment#handleCommitException, get file size error
after commit, fileName={}, commit={}, content={}, expect={}, append={}",
this.getPath(), commitPosition,
fileSegmentInputStream.getContentLength(), expectPosition, appendPosition);
return false;
}
@@ -342,10 +342,10 @@ public abstract class FileSegment implements
Comparable<FileSegment>, FileSegmen
int readableBytes = (int) (commitPosition - position);
if (readableBytes < length) {
- length = readableBytes;
- log.debug("FileSegment expect request position is greater than
commit position, " +
- "file: {}, request position: {}, commit position: {},
change length from {} to {}",
+ log.debug("FileSegment#readAsync, request position exceeds commit
position, " +
+ "file={}, requestPosition={}, commitPosition={},
changeLength={} to {}",
getPath(), position, commitPosition, length, readableBytes);
+ length = readableBytes;
}
return this.read0(position, length);
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/MemoryFileSegment.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/MemoryFileSegment.java
index 93ad74541b..1ad8b77c45 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/MemoryFileSegment.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/MemoryFileSegment.java
@@ -92,7 +92,7 @@ public class MemoryFileSegment extends FileSegment {
try {
if (blocker != null && !blocker.get()) {
- log.info("Commit Blocker Exception for Memory Test");
+ log.debug("MemoryFileSegment#commit0, commit blocker exception
for memory test");
return CompletableFuture.completedFuture(false);
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
index 3302d0457d..719cd90f8d 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
@@ -73,7 +73,7 @@ public class PosixFileSegment extends FileSegment {
String clusterBasePath = String.format("%s_%s",
MessageStoreUtil.getHash(clusterName), clusterName);
fullPath = Paths.get(basePath, clusterBasePath, filePath,
fileType.toString(),
MessageStoreUtil.offset2FileName(baseOffset)).toString();
- log.info("Constructing Posix FileSegment, filePath: {}", fullPath);
+ log.info("PosixFileSegment#init, constructing, filePath={}", fullPath);
this.createFile();
}
@@ -123,14 +123,14 @@ public class PosixFileSegment extends FileSegment {
}
if (!file.exists()) {
if (file.createNewFile()) {
- log.debug("Create Posix FileSegment, filePath: {}",
fullPath);
+ log.debug("PosixFileSegment#createFile0, create file,
filePath={}", fullPath);
}
}
this.readFileChannel = new RandomAccessFile(file,
"r").getChannel();
this.writeFileChannel = new RandomAccessFile(file,
"rwd").getChannel();
this.file = file;
} catch (Exception e) {
- log.error("PosixFileSegment#createFile: create file {} failed: ",
filePath, e);
+ log.error("PosixFileSegment#createFile0, create file failed,
filePath={}", filePath, e);
}
}
@@ -140,9 +140,9 @@ public class PosixFileSegment extends FileSegment {
this.close();
if (file != null && file.exists()) {
if (file.delete()) {
- log.info("Destroy Posix FileSegment, filePath: {}", fullPath);
+ log.info("PosixFileSegment#destroyFile, destroy file,
filePath={}", fullPath);
} else {
- log.warn("Destroy Posix FileSegment error, filePath: {}",
fullPath);
+ log.warn("PosixFileSegment#destroyFile, destroy file error,
filePath={}", fullPath);
}
}
}
@@ -160,7 +160,7 @@ public class PosixFileSegment extends FileSegment {
writeFileChannel = null;
}
} catch (IOException e) {
- log.error("Destroy Posix FileSegment failed, filePath: {}",
fullPath, e);
+ log.error("PosixFileSegment#close, close failed, filePath={}",
fullPath, e);
}
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageFormatUtil.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageFormatUtil.java
index 560234d050..c8d879ce8b 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageFormatUtil.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageFormatUtil.java
@@ -104,7 +104,9 @@ public class MessageFormatUtil {
public static List<SelectBufferResult> splitMessageBuffer(ByteBuffer
cqBuffer, ByteBuffer msgBuffer) {
if (cqBuffer == null || msgBuffer == null) {
- log.error("MessageFormatUtil split buffer error, cq buffer or msg
buffer is null");
+ log.error("MessageFormatUtil#splitMessageBuffer, cqBuffer={},
msgBuffer={}",
+ cqBuffer == null ? "null" : "size=" + cqBuffer.remaining(),
+ msgBuffer == null ? "null" : "size=" + msgBuffer.remaining());
return new ArrayList<>();
}
@@ -115,12 +117,13 @@ public class MessageFormatUtil {
cqBuffer.remaining() / CONSUME_QUEUE_UNIT_SIZE);
if (msgBuffer.remaining() == 0) {
- log.error("MessageFormatUtil split buffer error, msg buffer length
is 0");
+ log.error("MessageFormatUtil#splitMessageBuffer, msgBuffer is
empty, cqBufferSize={}", cqBuffer.remaining());
return bufferResultList;
}
if (cqBuffer.remaining() == 0 || cqBuffer.remaining() %
CONSUME_QUEUE_UNIT_SIZE != 0) {
- log.error("MessageFormatUtil split buffer error, cq buffer size is
{}", cqBuffer.remaining());
+ log.error("MessageFormatUtil#splitMessageBuffer, cqBuffer size
invalid, cqBufferSize={}, msgBufferSize={}",
+ cqBuffer.remaining(), msgBuffer.remaining());
return bufferResultList;
}
@@ -137,8 +140,10 @@ public class MessageFormatUtil {
int offset = (int) (logOffset - firstCommitLogOffset);
if (offset + bufferSize > msgBuffer.limit()) {
- log.error("MessageFormatUtil split buffer error, message
buffer offset exceeded limit. " +
- "Expect length: {}, Actual length: {}", offset +
bufferSize, msgBuffer.limit());
+ log.error("MessageFormatUtil#splitMessageBuffer, message
buffer offset exceeded limit, " +
+ "index={}, total={}, firstCommitLogOffset={},
logOffset={}, bufferSize={}, msgBufferLimit={}",
+ position / CONSUME_QUEUE_UNIT_SIZE,
bufferResultList.size(), firstCommitLogOffset,
+ logOffset, bufferSize, msgBuffer.limit());
break;
}
@@ -151,14 +156,16 @@ public class MessageFormatUtil {
}
if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE &&
magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
- log.error("MessageFormatUtil split buffer error, found
unknown magic code. " +
- "Message offset: {}, wrong magic code: {}", offset,
magicCode);
+ log.error("MessageFormatUtil#splitMessageBuffer, unknown
magic code, " +
+ "index={}, messageOffset={}, logOffset={},
magicCode={}",
+ position / CONSUME_QUEUE_UNIT_SIZE, offset, logOffset,
magicCode);
continue;
}
if (bufferSize != getTotalSize(msgBuffer)) {
- log.error("MessageFormatUtil split buffer error, message
length not match. " +
- "CommitLog length: {}, buffer length: {}",
getTotalSize(msgBuffer), bufferSize);
+ log.error("MessageFormatUtil#splitMessageBuffer, message
length not match, " +
+ "index={}, messageOffset={}, logOffset={},
commitLogLength={}, cqRecordedLength={}",
+ position / CONSUME_QUEUE_UNIT_SIZE, offset, logOffset,
getTotalSize(msgBuffer), bufferSize);
continue;
}