szetszwo commented on code in PR #1448:
URL: https://github.com/apache/ratis/pull/1448#discussion_r3243413408


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -1107,6 +1121,68 @@ private CompletableFuture<Long> 
getReadIndex(RaftClientRequest request, LeaderSt
     return 
writeIndexCache.getWriteIndexFuture(request).thenCompose(leader::getReadIndex);
   }
 
+  private CompletableFuture<Long> 
getReadIndex(CompletableFuture<ReadIndexReplyProto> readIndexReply) {
+    return readIndexReply.thenApply(reply -> {
+      if (reply.getServerReply().getSuccess()) {
+        return reply.getReadIndex();
+      } else {
+        throw new CompletionException(new ReadIndexException(getId()
+            + ": Failed to get read index from the leader: " + reply));
+      }
+    });
+  }
+
+  private CompletableFuture<Long> getReadIndexForReadOnly(ClientId clientId, 
ReadRequestTypeProto readRequestType) {
+    final LeaderStateImpl leader = role.getLeaderState().orElse(null);
+    if (leader != null) {
+      return leader.getReadIndex(null);
+    }
+
+    final RaftPeerId leaderId = getInfo().getLeaderId();
+    if (leaderId == null) {
+      return JavaUtils.completeExceptionally(new 
ReadIndexException(getMemberId() + ": Leader is unknown."));
+    }
+
+    return getReadIndex(sendReadIndexAsync(clientId, readRequestType, 
leaderId));
+  }
+
+  private <T> CompletableFuture<T> checkLeaderStateForReadOnly() {
+    if (!getInfo().isLeader()) {
+      return JavaUtils.completeExceptionally(generateNotLeaderException());
+    }
+    if (!getInfo().isLeaderReady()) {
+      return JavaUtils.completeExceptionally(new 
LeaderNotReadyException(getMemberId()));
+    }
+    return null;
+  }
+
+  private static <T> CompletableFuture<T> 
supplyReadOnly(Supplier<CompletableFuture<T>> query) {
+    try {
+      return Objects.requireNonNull(query.get(), "query returned null");
+    } catch (Throwable t) {
+      return JavaUtils.completeExceptionally(t);
+    }
+  }
+
+  @Override
+  public <T> CompletableFuture<T> readOnlyAsync(ClientId clientId, 
ReadRequestTypeProto readRequestType,
+      Supplier<CompletableFuture<T>> query) throws IOException {
+    Objects.requireNonNull(clientId, "clientId == null");
+    Objects.requireNonNull(readRequestType, "readRequestType == null");
+    Objects.requireNonNull(query, "query == null");
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    if (readRequestType.getPreferNonLinearizable() || readOption == 
RaftServerConfigKeys.Read.Option.DEFAULT) {
+      final CompletableFuture<T> reply = checkLeaderStateForReadOnly();
+      return reply != null ? reply : supplyReadOnly(query);
+    } else if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) {
+      return getReadIndexForReadOnly(clientId, readRequestType)
+          .thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
+          .thenCompose(readIndex -> supplyReadOnly(query));

Review Comment:
   We should use clientExecutor as earliest as possible.  It will keep using 
the executor for the later calls; see below:
   ```
   //output
                                       main: main
                                supplyAsync: pool-A-thread1
           thenCompose after pool-A-thread1: pool-A-thread1
      thenComposeAsync after pool-A-thread1: pool-B-thread1
     thenCompose again after pool-B-thread1: pool-B-thread1
   ```
   ```java
     public static void main(String[] args) {
       final ExecutorService poolA = 
Executors.newCachedThreadPool(ConcurrentUtils.newThreadFactory("pool-A"));
       final ExecutorService poolB = 
Executors.newCachedThreadPool(ConcurrentUtils.newThreadFactory("pool-B"));
   
       printCurrentThread("main");
       CompletableFuture.supplyAsync(() -> printCurrentThread("supplyAsync"), 
poolA)
           .thenCompose(s -> 
CompletableFuture.completedFuture(printCurrentThread("thenCompose after " + s)))
           .thenComposeAsync(s -> 
CompletableFuture.completedFuture(printCurrentThread("thenComposeAsync after " 
+ s)), poolB)
           .thenCompose(s -> 
CompletableFuture.completedFuture(printCurrentThread("thenCompose again after " 
+ s)))
           ;
     }
   
     static String printCurrentThread(String label) {
       final String name = Thread.currentThread().getName();
       System.out.printf("%40s: %s%n", label, name);
       return name;
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to