This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit b8cc947408b401f07ab2a946f479d700f5c6adfa
Author: GewuNewOne <[email protected]>
AuthorDate: Sat Apr 5 21:26:18 2025 +0800

    RATIS-2271. Leadership Loss Causes ClosedByInterruptException and 
NullPointerException in LogAppender Thread (#1245)
---
 .../org/apache/ratis/server/raftlog/segmented/LogSegment.java  | 10 ++++++++--
 .../server/raftlog/segmented/SegmentedRaftLogInputStream.java  |  7 ++++++-
 .../ratis/server/raftlog/segmented/SegmentedRaftLogReader.java | 10 +++++++++-
 3 files changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 1b55b9727..1b0064e44 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -244,7 +244,11 @@ public final class LogSegment {
         }
       });
       loadingTimes.incrementAndGet();
-      return Objects.requireNonNull(toReturn.get());
+      final LogEntryProto proto = toReturn.get();
+      if (proto == null) {
+        throw new RaftLogIOException("Failed to load log entry " + key);
+      }
+      return proto;
     }
   }
 
@@ -346,8 +350,10 @@ public final class LogSegment {
     }
     try {
       return cacheLoader.load(record);
+    } catch (RaftLogIOException e) {
+      throw e;
     } catch (Exception e) {
-      throw new RaftLogIOException(e);
+      throw new RaftLogIOException("Failed to loadCache for log entry " + 
record, e);
     }
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
index 050c472dd..3cc8767fa 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.Optional;
 
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -104,7 +105,11 @@ public class SegmentedRaftLogInputStream implements 
Closeable {
         try {
           init();
         } catch (Exception e) {
-          LOG.error("caught exception initializing " + this, e);
+          if (e.getCause() instanceof ClosedByInterruptException) {
+            LOG.warn("Initialization is interrupted: {}", this, e);
+          } else {
+            LOG.error("Failed to initialize {}", this, e);
+          }
           throw IOUtils.asIOException(e);
         }
     }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
index 7d03105b9..57baffb2f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
@@ -42,6 +42,7 @@ import java.io.File;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.Optional;
 import java.util.zip.Checksum;
 
@@ -169,7 +170,14 @@ class SegmentedRaftLogReader implements Closeable {
    */
   boolean verifyHeader() throws IOException {
     final int headerLength = SegmentedRaftLogFormat.getHeaderLength();
-    final int readLength = in.read(temp, 0, headerLength);
+    final int readLength;
+    try{
+      readLength = in.read(temp, 0, headerLength);
+    } catch (ClosedByInterruptException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException("Interrupted while reading the header of " + file, 
e);
+    }
+
     Preconditions.assertTrue(readLength <= headerLength);
     final int matchLength = SegmentedRaftLogFormat.matchHeader(temp, 0, 
readLength);
     Preconditions.assertTrue(matchLength <= readLength);

Reply via email to