This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 688bbaee7 RATIS-2271 Leadership Loss Causes ClosedByInterruptException
and NullPointerException in LogAppender Thread (#1245)
688bbaee7 is described below
commit 688bbaee7b7b388c0e1eaf49b210a6a3877e4bd4
Author: GewuNewOne <[email protected]>
AuthorDate: Sat Apr 5 21:26:18 2025 +0800
RATIS-2271 Leadership Loss Causes ClosedByInterruptException and
NullPointerException in LogAppender Thread (#1245)
---
.../org/apache/ratis/server/raftlog/segmented/LogSegment.java | 10 ++++++++--
.../server/raftlog/segmented/SegmentedRaftLogInputStream.java | 7 ++++++-
.../ratis/server/raftlog/segmented/SegmentedRaftLogReader.java | 10 +++++++++-
3 files changed, 23 insertions(+), 4 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 2549e13e0..712b0f973 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -292,7 +292,11 @@ public final class LogSegment {
}
});
loadingTimes.incrementAndGet();
- return Objects.requireNonNull(toReturn.get(), () -> "toReturn == null
for " + key);
+ final ReferenceCountedObject<LogEntryProto> proto = toReturn.get();
+ if (proto == null) {
+ throw new RaftLogIOException("Failed to load log entry " + key);
+ }
+ return proto;
}
}
@@ -502,8 +506,10 @@ public final class LogSegment {
}
try {
return cacheLoader.load(ti);
+ } catch (RaftLogIOException e) {
+ throw e;
} catch (Exception e) {
- throw new RaftLogIOException(e);
+ throw new RaftLogIOException("Failed to loadCache for log entry " + ti,
e);
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
index 050c472dd..3cc8767fa 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
import java.util.Optional;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -104,7 +105,11 @@ public class SegmentedRaftLogInputStream implements
Closeable {
try {
init();
} catch (Exception e) {
- LOG.error("caught exception initializing " + this, e);
+ if (e.getCause() instanceof ClosedByInterruptException) {
+ LOG.warn("Initialization is interrupted: {}", this, e);
+ } else {
+ LOG.error("Failed to initialize {}", this, e);
+ }
throw IOUtils.asIOException(e);
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
index 7d03105b9..57baffb2f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
@@ -42,6 +42,7 @@ import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.channels.ClosedByInterruptException;
import java.util.Optional;
import java.util.zip.Checksum;
@@ -169,7 +170,14 @@ class SegmentedRaftLogReader implements Closeable {
*/
boolean verifyHeader() throws IOException {
final int headerLength = SegmentedRaftLogFormat.getHeaderLength();
- final int readLength = in.read(temp, 0, headerLength);
+ final int readLength;
+ try{
+ readLength = in.read(temp, 0, headerLength);
+ } catch (ClosedByInterruptException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while reading the header of " + file,
e);
+ }
+
Preconditions.assertTrue(readLength <= headerLength);
final int matchLength = SegmentedRaftLogFormat.matchHeader(temp, 0,
readLength);
Preconditions.assertTrue(matchLength <= readLength);