szetszwo commented on code in PR #1448:
URL: https://github.com/apache/ratis/pull/1448#discussion_r3243413408
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -1107,6 +1121,68 @@ private CompletableFuture<Long>
getReadIndex(RaftClientRequest request, LeaderSt
return
writeIndexCache.getWriteIndexFuture(request).thenCompose(leader::getReadIndex);
}
+ private CompletableFuture<Long>
getReadIndex(CompletableFuture<ReadIndexReplyProto> readIndexReply) {
+ return readIndexReply.thenApply(reply -> {
+ if (reply.getServerReply().getSuccess()) {
+ return reply.getReadIndex();
+ } else {
+ throw new CompletionException(new ReadIndexException(getId()
+ + ": Failed to get read index from the leader: " + reply));
+ }
+ });
+ }
+
+ private CompletableFuture<Long> getReadIndexForReadOnly(ClientId clientId,
ReadRequestTypeProto readRequestType) {
+ final LeaderStateImpl leader = role.getLeaderState().orElse(null);
+ if (leader != null) {
+ return leader.getReadIndex(null);
+ }
+
+ final RaftPeerId leaderId = getInfo().getLeaderId();
+ if (leaderId == null) {
+ return JavaUtils.completeExceptionally(new
ReadIndexException(getMemberId() + ": Leader is unknown."));
+ }
+
+ return getReadIndex(sendReadIndexAsync(clientId, readRequestType,
leaderId));
+ }
+
+ private <T> CompletableFuture<T> checkLeaderStateForReadOnly() {
+ if (!getInfo().isLeader()) {
+ return JavaUtils.completeExceptionally(generateNotLeaderException());
+ }
+ if (!getInfo().isLeaderReady()) {
+ return JavaUtils.completeExceptionally(new
LeaderNotReadyException(getMemberId()));
+ }
+ return null;
+ }
+
+ private static <T> CompletableFuture<T>
supplyReadOnly(Supplier<CompletableFuture<T>> query) {
+ try {
+ return Objects.requireNonNull(query.get(), "query returned null");
+ } catch (Throwable t) {
+ return JavaUtils.completeExceptionally(t);
+ }
+ }
+
+ @Override
+ public <T> CompletableFuture<T> readOnlyAsync(ClientId clientId,
ReadRequestTypeProto readRequestType,
+ Supplier<CompletableFuture<T>> query) throws IOException {
+ Objects.requireNonNull(clientId, "clientId == null");
+ Objects.requireNonNull(readRequestType, "readRequestType == null");
+ Objects.requireNonNull(query, "query == null");
+ assertLifeCycleState(LifeCycle.States.RUNNING);
+ if (readRequestType.getPreferNonLinearizable() || readOption ==
RaftServerConfigKeys.Read.Option.DEFAULT) {
+ final CompletableFuture<T> reply = checkLeaderStateForReadOnly();
+ return reply != null ? reply : supplyReadOnly(query);
+ } else if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) {
+ return getReadIndexForReadOnly(clientId, readRequestType)
+ .thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
+ .thenCompose(readIndex -> supplyReadOnly(query));
Review Comment:
We should use clientExecutor as earliest as possible. It will keep using
the executor for the later calls; see below:
```
//output
main: main
supplyAsync: pool-A-thread1
thenCompose after pool-A-thread1: pool-A-thread1
thenComposeAsync after pool-A-thread1: pool-B-thread1
thenCompose again after pool-B-thread1: pool-B-thread1
```
```java
public static void main(String[] args) {
final ExecutorService poolA =
Executors.newCachedThreadPool(ConcurrentUtils.newThreadFactory("pool-A"));
final ExecutorService poolB =
Executors.newCachedThreadPool(ConcurrentUtils.newThreadFactory("pool-B"));
printCurrentThread("main");
CompletableFuture.supplyAsync(() -> printCurrentThread("supplyAsync"),
poolA)
.thenCompose(s ->
CompletableFuture.completedFuture(printCurrentThread("thenCompose after " + s)))
.thenComposeAsync(s ->
CompletableFuture.completedFuture(printCurrentThread("thenComposeAsync after "
+ s)), poolB)
.thenCompose(s ->
CompletableFuture.completedFuture(printCurrentThread("thenCompose again after "
+ s)))
;
}
static String printCurrentThread(String label) {
final String name = Thread.currentThread().getName();
System.out.printf("%40s: %s%n", label, name);
return name;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]