[ 
https://issues.apache.org/jira/browse/RATIS-386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16686481#comment-16686481
 ] 

Shashikant Banerjee edited comment on RATIS-386 at 11/15/18 1:07 AM:
---------------------------------------------------------------------

Thanks [~szetszwo] for the comments. Moving the retryPolicy Check to here :
{code:java}
private CompletableFuture<RaftClientReply> sendRequestAsync( RaftClientRequest 
request, int    attemptCount) {
  LOG.debug("{}: send* {}", clientId, request);
  return clientRpc.sendRequestAsync(request).thenApply(reply -> {
    LOG.info("{}: receive* {}", clientId, reply);
    reply = handleNotLeaderException(request, reply);
    if (reply == null) {
      if (!retryPolicy.shouldRetry(attemptCount)) {
        LOG.info(" fail with max attempts failed");
        reply = new RaftClientReply(request, new RaftException("Failed " + 
request + " for " 
            + attemptCount + " attempts with " + retryPolicy), null);
      }
    }
    if (reply != null) {
      getSlidingWindow(request).receiveReply(
          request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
    }
    return reply;
  }).exceptionally(e -> {
    if (LOG.isTraceEnabled()) {
      LOG.trace(clientId + ": Failed " + request, e);
    } else {
      LOG.debug("{}: Failed {} with {}", clientId, request, e);
    }
    e = JavaUtils.unwrapCompletionException(e);
    if (e instanceof GroupMismatchException) {
      throw new CompletionException(e);
    } else if (e instanceof IOException) {
      handleIOException(request, (IOException)e, null);
    } else {
      throw new CompletionException(e);
    }
    return null;
  });
}{code}
In case, clientRpc.sendRequestAsync(request) timeout, it will execute the code 
in exceptionally Path. In such case, #sendRequestWithRetryAsync will keep on 
retrying calling #sendRequestAsync as the retry validation will only be 
executed if clientRpc.sendRequestAsync(request) completes normally.

Also, in case the retryValidation check fails, we just return null for 
RaftClientReply for the sync API here without throwing any exception:
{code:java}
private RaftClientReply sendRequestWithRetry(
    Supplier<RaftClientRequest> supplier)
    throws InterruptedIOException, StateMachineException, 
GroupMismatchException {
  for(int attemptCount = 0;; attemptCount++) {
    final RaftClientRequest request = supplier.get();
    final RaftClientReply reply = sendRequest(request);
    if (reply != null) {
      return reply;
    }
    if (!retryPolicy.shouldRetry(attemptCount)) {
      return null;
    }
    try {
      retryPolicy.getSleepTime().sleep();
    } catch (InterruptedException e) {
      throw new InterruptedIOException("retry policy=" + retryPolicy);
    }
  }
}

{code}
I think ,we probably should have same result for sync/async api's here.

Let me know if i am missing something here.


was (Author: shashikant):
Thanks [~szetszwo] for the comments. Moving the retryPolicy Check to here :
{code:java}
private CompletableFuture<RaftClientReply> sendRequestWithRetryAsync( 
RaftClientRequest request, int    attemptCount) {
  LOG.debug("{}: send* {}", clientId, request);
  return clientRpc.sendRequestAsync(request).thenApply(reply -> {
    LOG.info("{}: receive* {}", clientId, reply);
    reply = handleNotLeaderException(request, reply);
    if (reply == null) {
      if (!retryPolicy.shouldRetry(attemptCount)) {
        LOG.info(" fail with max attempts failed");
        reply = new RaftClientReply(request, new RaftException("Failed " + 
request + " for " 
            + attemptCount + " attempts with " + retryPolicy), null);
      }
    }
    if (reply != null) {
      getSlidingWindow(request).receiveReply(
          request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
    }
    return reply;
  }).exceptionally(e -> {
    if (LOG.isTraceEnabled()) {
      LOG.trace(clientId + ": Failed " + request, e);
    } else {
      LOG.debug("{}: Failed {} with {}", clientId, request, e);
    }
    e = JavaUtils.unwrapCompletionException(e);
    if (e instanceof GroupMismatchException) {
      throw new CompletionException(e);
    } else if (e instanceof IOException) {
      handleIOException(request, (IOException)e, null);
    } else {
      throw new CompletionException(e);
    }
    return null;
  });
}{code}
In case, clientRpc.sendRequestAsync(request) timeout, it will execute the code 
in exceptionally Path. In such case, #sendRequestWithRetryAsync will keep on 
retrying calling #sendRequestAsync as the retry validation will only be 
executed if clientRpc.sendRequestAsync(request) completes normally.

Also, in case the retryValidation check fails, we just return null for 
RaftClientReply for the sync API here without throwing any exception:
{code:java}
private RaftClientReply sendRequestWithRetry(
    Supplier<RaftClientRequest> supplier)
    throws InterruptedIOException, StateMachineException, 
GroupMismatchException {
  for(int attemptCount = 0;; attemptCount++) {
    final RaftClientRequest request = supplier.get();
    final RaftClientReply reply = sendRequest(request);
    if (reply != null) {
      return reply;
    }
    if (!retryPolicy.shouldRetry(attemptCount)) {
      return null;
    }
    try {
      retryPolicy.getSleepTime().sleep();
    } catch (InterruptedException e) {
      throw new InterruptedIOException("retry policy=" + retryPolicy);
    }
  }
}

{code}
I think ,we probably should have same result for sync/async api's here.

Let me know if i am missing something here.

> Raft Client Async API's should honor Retry Policy 
> --------------------------------------------------
>
>                 Key: RATIS-386
>                 URL: https://issues.apache.org/jira/browse/RATIS-386
>             Project: Ratis
>          Issue Type: Improvement
>          Components: client
>    Affects Versions: 0.3.0
>            Reporter: Shashikant Banerjee
>            Assignee: Shashikant Banerjee
>            Priority: Major
>             Fix For: 0.3.0
>
>         Attachments: RATIS-386.000.patch
>
>
> Raft client sync Api has support for retry policies. Similarly, for Async 
> API's including watch Api, support for Retry Policy is required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to