ivandika3 commented on code in PR #1448:
URL: https://github.com/apache/ratis/pull/1448#discussion_r3239938621
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -1107,6 +1121,68 @@ private CompletableFuture<Long>
getReadIndex(RaftClientRequest request, LeaderSt
return
writeIndexCache.getWriteIndexFuture(request).thenCompose(leader::getReadIndex);
}
+ private CompletableFuture<Long>
getReadIndex(CompletableFuture<ReadIndexReplyProto> readIndexReply) {
+ return readIndexReply.thenApply(reply -> {
+ if (reply.getServerReply().getSuccess()) {
+ return reply.getReadIndex();
+ } else {
+ throw new CompletionException(new ReadIndexException(getId()
+ + ": Failed to get read index from the leader: " + reply));
+ }
+ });
+ }
+
+ private CompletableFuture<Long> getReadIndexForReadOnly(ClientId clientId,
ReadRequestTypeProto readRequestType) {
+ final LeaderStateImpl leader = role.getLeaderState().orElse(null);
+ if (leader != null) {
+ return leader.getReadIndex(null);
+ }
+
+ final RaftPeerId leaderId = getInfo().getLeaderId();
+ if (leaderId == null) {
+ return JavaUtils.completeExceptionally(new
ReadIndexException(getMemberId() + ": Leader is unknown."));
+ }
+
+ return getReadIndex(sendReadIndexAsync(clientId, readRequestType,
leaderId));
+ }
+
+ private <T> CompletableFuture<T> checkLeaderStateForReadOnly() {
+ if (!getInfo().isLeader()) {
+ return JavaUtils.completeExceptionally(generateNotLeaderException());
+ }
+ if (!getInfo().isLeaderReady()) {
+ return JavaUtils.completeExceptionally(new
LeaderNotReadyException(getMemberId()));
+ }
+ return null;
+ }
+
+ private static <T> CompletableFuture<T>
supplyReadOnly(Supplier<CompletableFuture<T>> query) {
+ try {
+ return Objects.requireNonNull(query.get(), "query returned null");
+ } catch (Throwable t) {
+ return JavaUtils.completeExceptionally(t);
+ }
+ }
+
+ @Override
+ public <T> CompletableFuture<T> readOnlyAsync(ClientId clientId,
ReadRequestTypeProto readRequestType,
+ Supplier<CompletableFuture<T>> query) throws IOException {
+ Objects.requireNonNull(clientId, "clientId == null");
+ Objects.requireNonNull(readRequestType, "readRequestType == null");
+ Objects.requireNonNull(query, "query == null");
+ assertLifeCycleState(LifeCycle.States.RUNNING);
+ if (readRequestType.getPreferNonLinearizable() || readOption ==
RaftServerConfigKeys.Read.Option.DEFAULT) {
+ final CompletableFuture<T> reply = checkLeaderStateForReadOnly();
+ return reply != null ? reply : supplyReadOnly(query);
+ } else if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) {
+ return getReadIndexForReadOnly(clientId, readRequestType)
+ .thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
+ .thenCompose(readIndex -> supplyReadOnly(query));
Review Comment:
I wonder whether it is better to use thenComposeAsync to improve performance.
```java
return getReadIndexForReadOnly(clientId, readRequestType)
.thenComposeAsync(readIndex ->
getReadRequests().waitToAdvance(readIndex), clientExecutor)
.thenComposeAsync(readIndex -> supplyReadOnly(query),
clientExecutor);
```
or just the supplyReadOnly
```java
return getReadIndexForReadOnly(clientId, readRequestType)
.thenCompose(readIndex ->
getReadRequests().waitToAdvance(readIndex))
.thenComposeAsync(readIndex -> supplyReadOnly(query),
clientExecutor);
```
Let me benchmark it. Can also be applied to `readAsync`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]