szetszwo commented on code in PR #1444:
URL: https://github.com/apache/ratis/pull/1444#discussion_r3237626570


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -1092,12 +1092,11 @@ private CompletableFuture<RaftClientReply> 
staleReadAsync(RaftClientRequest requ
     }
     return processQueryFuture(stateMachine.queryStale(request.getMessage(), 
minIndex), request);
   }
-
-  ReadRequests getReadRequests() {
-    return getState().getReadRequests();
-  }
-
   private CompletableFuture<ReadIndexReplyProto> 
sendReadIndexAsync(RaftClientRequest clientRequest) {
+    final Throwable snapshotInstallation = 
snapshotInstallationHandler.getInProgressInstallSnapshotReadException();

Review Comment:
   Let's create the ReadException with a different message.  It will be easier 
to debug later.
   ```java
     ReadException newReadException(String op, long installSnapshot, boolean 
started) {
       return new ReadException(getMemberId() + ": Failed to " + op + " 
readIndex as snapshot (" + installSnapshot
           + ") installation is " + (started ? "started" : "in progress"));
     }
   
     private CompletableFuture<ReadIndexReplyProto> 
sendReadIndexAsync(RaftClientRequest clientRequest) {
       final long installSnapshot = 
snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
       if (installSnapshot != RaftLog.INVALID_LOG_INDEX) {
         return JavaUtils.completeExceptionally(newReadException("get", 
installSnapshot, false));
       }
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -1146,7 +1145,8 @@ private CompletableFuture<RaftClientReply> 
readAsync(RaftClientRequest request)
       }
 
       return replyFuture
-          .thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
+          .thenCompose(readIndex -> 
getState().getReadRequests().waitToAdvance(readIndex,
+              
snapshotInstallationHandler::getInProgressInstallSnapshotReadException))

Review Comment:
   Let's add a waitReadIndex method and check there.
   ```java
             .thenCompose(this::waitReadIndex)
   ```
   ```java
     private CompletableFuture<Long> waitReadIndex(long readIndex) {
       final long installSnapshot = 
snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
       if (installSnapshot != RaftLog.INVALID_LOG_INDEX) {
         return JavaUtils.completeExceptionally(newReadException("start waiting 
for", installSnapshot, false));
       }
       return getState().getReadRequests().waitToAdvance(readIndex);
     }
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -88,6 +101,14 @@ private void handleTimeout(long readIndex) {
       removed.completeExceptionally(new ReadException("Read timeout " + 
readTimeout + " for index " + readIndex));
     }
 
+    void fail(Throwable cause) {
+      final Collection<CompletableFuture<Long>> futures;
+      synchronized (this) {
+        futures = new ArrayList<>(sorted.values());
+        sorted.clear();

Review Comment:
   - Let's make sorted non-final in order to avoid the copying.
   - We may return the futures here and fail them in ReadRequests.fail(..)..
   ```java
   //ReadIndexQueue
       synchronized Collection<CompletableFuture<Long>> clear(Throwable cause) {
         final Collection<CompletableFuture<Long>> futures = sorted.values();
         sorted = new TreeMap<>();
         return futures;
       }
   ```
   
   ```java
   //ReadRequests
     void fail(Throwable cause) {
       for(CompletableFuture<Long> f : readIndexQueue.clear(cause)) {
         f.completeExceptionally(cause);
       }
     }
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -20,21 +20,30 @@
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.exceptions.ReadException;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.LongConsumer;
+import java.util.function.Supplier;
 
 /** For supporting linearizable read. */
 class ReadRequests {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReadRequests.class);
 
+  static ReadException newException(Object server, long installSnapshot) {
+    return new ReadException(server + ": Failed read as snapshot (" + 
installSnapshot
+        + ") installation is in progress");
+  }

Review Comment:
   Let's move this method to `RaftServerImpl.newReadException` as mentioned 
earlier.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java:
##########
@@ -52,10 +61,14 @@ static class ReadIndexQueue {
       this.readTimeout = readTimeout;
     }
 
-    CompletableFuture<Long> add(long readIndex) {
+    CompletableFuture<Long> add(long readIndex, Supplier<Throwable> 
failureSupplier) {
       final CompletableFuture<Long> returned;
       final boolean create;
       synchronized (this) {
+        final Throwable failure = failureSupplier.get();
+        if (failure != null) {
+          return JavaUtils.completeExceptionally(failure);
+        }

Review Comment:
   This can be checked in RaftServerImpl.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to