This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new de58f05bb RATIS-2282. LogAppender Restart Due to Premature Log Entry
Access During Concurrent Write Processing (#1249)
de58f05bb is described below
commit de58f05bbd5d2773a6719d34d288ec9ad7a57abd
Author: GewuNewOne <[email protected]>
AuthorDate: Sun Apr 20 17:40:10 2025 +0800
RATIS-2282. LogAppender Restart Due to Premature Log Entry Access During
Concurrent Write Processing (#1249)
---
.../ratis/server/raftlog/segmented/LogSegment.java | 32 ++++++++++------------
1 file changed, 15 insertions(+), 17 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 712b0f973..e26de524d 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -454,10 +454,13 @@ public final class LogSegment {
boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer) {
final LogEntryProto entry = entryRef.retain();
try {
- final LogRecord record = appendLogRecord(op, entry);
+ final LogRecord record = new LogRecord(totalFileSize, entry);
if (keepEntryInCache) {
putEntryCache(record.getTermIndex(), entryRef, op);
}
+ appendLogRecord(op, record);
+ totalFileSize += getEntrySize(entry, op);
+
if (logConsumer != null) {
logConsumer.accept(entry);
}
@@ -466,24 +469,22 @@ public final class LogSegment {
}
}
-
- private LogRecord appendLogRecord(Op op, LogEntryProto entry) {
- Objects.requireNonNull(entry, "entry == null");
+ private void appendLogRecord(Op op, LogRecord record) {
+ Objects.requireNonNull(record, "record == null");
final LogRecord currentLast = records.getLast();
+
+ final long index = record.getTermIndex().getIndex();
if (currentLast == null) {
- Preconditions.assertTrue(entry.getIndex() == startIndex,
- "gap between start index %s and first entry to append %s",
- startIndex, entry.getIndex());
+ Preconditions.assertTrue(index == startIndex,
+ "%s: gap between start index %s and the entry to append %s", op,
startIndex, index);
} else {
- Preconditions.assertTrue(entry.getIndex() ==
currentLast.getTermIndex().getIndex() + 1,
- "gap between entries %s and %s", entry.getIndex(),
currentLast.getTermIndex().getIndex());
+ final long currentLastIndex = currentLast.getTermIndex().getIndex();
+ Preconditions.assertTrue(index == currentLastIndex + 1,
+ "%s: gap between last entry %s and the entry to append %s", op,
currentLastIndex, index);
}
- final LogRecord record = new LogRecord(totalFileSize, entry);
records.append(record);
- totalFileSize += getEntrySize(entry, op);
- endIndex = entry.getIndex();
- return record;
+ endIndex = index;
}
ReferenceCountedObject<LogEntryProto> getEntryFromCache(TermIndex ti) {
@@ -514,10 +515,7 @@ public final class LogSegment {
}
LogRecord getLogRecord(long index) {
- if (index >= startIndex && index <= endIndex) {
- return records.get(index);
- }
- return null;
+ return records.get(index);
}
TermIndex getLastTermIndex() {