szetszwo commented on code in PR #845:
URL: https://github.com/apache/ratis/pull/845#discussion_r1138017225


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java:
##########
@@ -93,49 +132,76 @@ boolean isSteppingDown() {
   }
 
   void onFollowerAppendEntriesReply(LeaderStateImpl leaderState, FollowerInfo 
follower) {
-    final Optional<RaftPeerId> transferee = getTransferee();
     // If the transferee has just append some entries and becomes up-to-date,
     // send StartLeaderElection to it
-    if (transferee.filter(t -> t.equals(follower.getId())).isPresent()
-        && leaderState.sendStartLeaderElection(follower)) {
-      LOG.info("{}: sent StartLeaderElection to transferee {} after received 
AppendEntriesResponse",
-          server.getMemberId(), transferee.get());
+    if (getTransferee().filter(t -> t.equals(follower.getId())).isPresent()) {
+      final String error = leaderState.sendStartLeaderElection(follower, 
leaderState.getLastEntry());
+      if (error == null) {
+        LOG.info("{}: sent StartLeaderElection to transferee {} after received 
AppendEntriesResponse",
+            server.getMemberId(), follower.getId());
+      }
     }
   }
 
-  private void tryTransferLeadership(LeaderStateImpl leaderState, RaftPeerId 
transferee) {
+  private String tryTransferLeadership(LeaderStateImpl leaderState, Context 
context) {
+    final RaftPeerId transferee = context.getTransfereeId();
     LOG.info("{}: start transferring leadership to {}", server.getMemberId(), 
transferee);
-    final LogAppender appender = 
leaderState.getLogAppender(transferee).orElse(null);
-
+    final LogAppender appender = context.getTransfereeLogAppender();
     if (appender == null) {
-      LOG.error("{}: cannot find LogAppender for transferee {}", 
server.getMemberId(), transferee);
-      return;
+      return "LogAppender for transferee " + transferee + " is null";
     }
     final FollowerInfo follower = appender.getFollower();
-    if (leaderState.sendStartLeaderElection(follower)) {
+    final String error = leaderState.sendStartLeaderElection(follower, 
context.getLeaderLastEntry());
+    if (error == null) {
       LOG.info("{}: sent StartLeaderElection to transferee {} immediately as 
it already has up-to-date log",
           server.getMemberId(), transferee);
-    } else {
-      LOG.info("{}: notifying LogAppender to send AppendEntries as transferee 
{} is not up-to-date",
-          server.getMemberId(), transferee);
+    } else if (error.contains("not up-to-date")) {
+      LOG.info("{}: notifying LogAppender to send AppendEntries as transferee 
{} is not up-to-date: {}",
+          server.getMemberId(), transferee, error);
       appender.notifyLogAppender();
+      return null;
     }
+    return error;
+  }
+
+  TransferLeadershipRequest newRequest(RaftPeerId transfereeId) {
+    // TransferLeadership will block client request, so we don't want wait too 
long.
+    // If everything goes well, transferee should be elected within an 
election timeout.
+    final long timeout = 
server.getRandomElectionTimeout().toLong(TimeUnit.MILLISECONDS);
+    return new TransferLeadershipRequest(ClientId.emptyClientId(),
+        server.getId(), server.getMemberId().getGroupId(), 0, transfereeId, 
timeout);
+  }
+
+  void start(LeaderStateImpl leaderState, LogAppender transferee, TermIndex 
leaderLastEntry) {
+    start(leaderState, new Context(newRequest(transferee.getFollowerId()), () 
-> leaderLastEntry, () -> transferee));
   }
 
   CompletableFuture<RaftClientReply> start(LeaderStateImpl leaderState, 
TransferLeadershipRequest request) {
+    final Context context = new Context(request,
+        JavaUtils.memoize(leaderState::getLastEntry),
+        JavaUtils.memoize(() -> 
leaderState.getLogAppender(request.getNewLeader()).orElse(null)));
+    return start(leaderState, context);
+  }
+
+  private CompletableFuture<RaftClientReply> start(LeaderStateImpl 
leaderState, Context context) {
+    final TransferLeadershipRequest request = context.getRequest();
     final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(() -> 
new PendingRequest(request));
     final PendingRequest previous = pending.getAndUpdate(f -> f != null? f: 
supplier.get());
     if (previous != null) {
-      return createReplyFutureFromPreviousRequest(request, previous);
+      return request != null? createReplyFutureFromPreviousRequest(request, 
previous): null;

Review Comment:
   Sorry, this needs to be reverted since `request` won't be null after we 
added `Context`.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java:
##########
@@ -93,49 +132,76 @@ boolean isSteppingDown() {
   }
 
   void onFollowerAppendEntriesReply(LeaderStateImpl leaderState, FollowerInfo 
follower) {
-    final Optional<RaftPeerId> transferee = getTransferee();
     // If the transferee has just append some entries and becomes up-to-date,
     // send StartLeaderElection to it
-    if (transferee.filter(t -> t.equals(follower.getId())).isPresent()
-        && leaderState.sendStartLeaderElection(follower)) {
-      LOG.info("{}: sent StartLeaderElection to transferee {} after received 
AppendEntriesResponse",
-          server.getMemberId(), transferee.get());
+    if (getTransferee().filter(t -> t.equals(follower.getId())).isPresent()) {
+      final String error = leaderState.sendStartLeaderElection(follower, 
leaderState.getLastEntry());
+      if (error == null) {
+        LOG.info("{}: sent StartLeaderElection to transferee {} after received 
AppendEntriesResponse",
+            server.getMemberId(), follower.getId());
+      }
     }
   }
 
-  private void tryTransferLeadership(LeaderStateImpl leaderState, RaftPeerId 
transferee) {
+  private String tryTransferLeadership(LeaderStateImpl leaderState, Context 
context) {
+    final RaftPeerId transferee = context.getTransfereeId();
     LOG.info("{}: start transferring leadership to {}", server.getMemberId(), 
transferee);
-    final LogAppender appender = 
leaderState.getLogAppender(transferee).orElse(null);
-
+    final LogAppender appender = context.getTransfereeLogAppender();
     if (appender == null) {
-      LOG.error("{}: cannot find LogAppender for transferee {}", 
server.getMemberId(), transferee);
-      return;
+      return "LogAppender for transferee " + transferee + " is null";
     }
     final FollowerInfo follower = appender.getFollower();
-    if (leaderState.sendStartLeaderElection(follower)) {
+    final String error = leaderState.sendStartLeaderElection(follower, 
context.getLeaderLastEntry());
+    if (error == null) {
       LOG.info("{}: sent StartLeaderElection to transferee {} immediately as 
it already has up-to-date log",
           server.getMemberId(), transferee);
-    } else {
-      LOG.info("{}: notifying LogAppender to send AppendEntries as transferee 
{} is not up-to-date",
-          server.getMemberId(), transferee);
+    } else if (error.contains("not up-to-date")) {
+      LOG.info("{}: notifying LogAppender to send AppendEntries as transferee 
{} is not up-to-date: {}",
+          server.getMemberId(), transferee, error);
       appender.notifyLogAppender();
+      return null;
     }
+    return error;
+  }
+
+  TransferLeadershipRequest newRequest(RaftPeerId transfereeId) {

Review Comment:
   Now `newRequest` is used once.  Let's move the code to the `start(..)` 
method below.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java:
##########
@@ -93,49 +132,76 @@ boolean isSteppingDown() {
   }
 
   void onFollowerAppendEntriesReply(LeaderStateImpl leaderState, FollowerInfo 
follower) {
-    final Optional<RaftPeerId> transferee = getTransferee();
     // If the transferee has just append some entries and becomes up-to-date,
     // send StartLeaderElection to it
-    if (transferee.filter(t -> t.equals(follower.getId())).isPresent()
-        && leaderState.sendStartLeaderElection(follower)) {
-      LOG.info("{}: sent StartLeaderElection to transferee {} after received 
AppendEntriesResponse",
-          server.getMemberId(), transferee.get());
+    if (getTransferee().filter(t -> t.equals(follower.getId())).isPresent()) {
+      final String error = leaderState.sendStartLeaderElection(follower, 
leaderState.getLastEntry());
+      if (error == null) {
+        LOG.info("{}: sent StartLeaderElection to transferee {} after received 
AppendEntriesResponse",
+            server.getMemberId(), follower.getId());
+      }
     }
   }
 
-  private void tryTransferLeadership(LeaderStateImpl leaderState, RaftPeerId 
transferee) {
+  private String tryTransferLeadership(LeaderStateImpl leaderState, Context 
context) {
+    final RaftPeerId transferee = context.getTransfereeId();
     LOG.info("{}: start transferring leadership to {}", server.getMemberId(), 
transferee);
-    final LogAppender appender = 
leaderState.getLogAppender(transferee).orElse(null);
-
+    final LogAppender appender = context.getTransfereeLogAppender();
     if (appender == null) {
-      LOG.error("{}: cannot find LogAppender for transferee {}", 
server.getMemberId(), transferee);
-      return;
+      return "LogAppender for transferee " + transferee + " is null";
     }
     final FollowerInfo follower = appender.getFollower();
-    if (leaderState.sendStartLeaderElection(follower)) {
+    final String error = leaderState.sendStartLeaderElection(follower, 
context.getLeaderLastEntry());
+    if (error == null) {
       LOG.info("{}: sent StartLeaderElection to transferee {} immediately as 
it already has up-to-date log",
           server.getMemberId(), transferee);
-    } else {
-      LOG.info("{}: notifying LogAppender to send AppendEntries as transferee 
{} is not up-to-date",
-          server.getMemberId(), transferee);
+    } else if (error.contains("not up-to-date")) {

Review Comment:
   Let's add a `Result` class to avoid depending on error message.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java:
##########
@@ -93,49 +132,76 @@ boolean isSteppingDown() {
   }
 
   void onFollowerAppendEntriesReply(LeaderStateImpl leaderState, FollowerInfo 
follower) {
-    final Optional<RaftPeerId> transferee = getTransferee();
     // If the transferee has just append some entries and becomes up-to-date,
     // send StartLeaderElection to it
-    if (transferee.filter(t -> t.equals(follower.getId())).isPresent()
-        && leaderState.sendStartLeaderElection(follower)) {
-      LOG.info("{}: sent StartLeaderElection to transferee {} after received 
AppendEntriesResponse",
-          server.getMemberId(), transferee.get());
+    if (getTransferee().filter(t -> t.equals(follower.getId())).isPresent()) {
+      final String error = leaderState.sendStartLeaderElection(follower, 
leaderState.getLastEntry());
+      if (error == null) {
+        LOG.info("{}: sent StartLeaderElection to transferee {} after received 
AppendEntriesResponse",
+            server.getMemberId(), follower.getId());
+      }
     }
   }
 
-  private void tryTransferLeadership(LeaderStateImpl leaderState, RaftPeerId 
transferee) {
+  private String tryTransferLeadership(LeaderStateImpl leaderState, Context 
context) {
+    final RaftPeerId transferee = context.getTransfereeId();
     LOG.info("{}: start transferring leadership to {}", server.getMemberId(), 
transferee);
-    final LogAppender appender = 
leaderState.getLogAppender(transferee).orElse(null);
-
+    final LogAppender appender = context.getTransfereeLogAppender();
     if (appender == null) {
-      LOG.error("{}: cannot find LogAppender for transferee {}", 
server.getMemberId(), transferee);
-      return;
+      return "LogAppender for transferee " + transferee + " is null";
     }
     final FollowerInfo follower = appender.getFollower();
-    if (leaderState.sendStartLeaderElection(follower)) {
+    final String error = leaderState.sendStartLeaderElection(follower, 
context.getLeaderLastEntry());
+    if (error == null) {
       LOG.info("{}: sent StartLeaderElection to transferee {} immediately as 
it already has up-to-date log",
           server.getMemberId(), transferee);
-    } else {
-      LOG.info("{}: notifying LogAppender to send AppendEntries as transferee 
{} is not up-to-date",
-          server.getMemberId(), transferee);
+    } else if (error.contains("not up-to-date")) {
+      LOG.info("{}: notifying LogAppender to send AppendEntries as transferee 
{} is not up-to-date: {}",
+          server.getMemberId(), transferee, error);
       appender.notifyLogAppender();
+      return null;
     }
+    return error;
+  }
+
+  TransferLeadershipRequest newRequest(RaftPeerId transfereeId) {
+    // TransferLeadership will block client request, so we don't want wait too 
long.
+    // If everything goes well, transferee should be elected within an 
election timeout.
+    final long timeout = 
server.getRandomElectionTimeout().toLong(TimeUnit.MILLISECONDS);

Review Comment:
   It should use `server.properties().minRpcTimeoutMs()` (or max).  Random does 
not sound correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to