This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/migration_add_wait_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c5098b4a70465de8fa1f12f3b533c0c8601eaa4d
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Wed Oct 26 16:26:06 2022 +0800

    tmp save
---
 .../service/MultiLeaderRPCServiceProcessor.java     |  9 +++++++++
 .../org/apache/iotdb/db/mpp/plan/TestRPCClient.java | 21 ++++++++++++++++++---
 2 files changed, 27 insertions(+), 3 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 20b6aad421..4592756100 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -238,6 +238,15 @@ public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIServ
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
     MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format("unexpected consensusGroupId %s for 
waitSyncLogComplete request", groupId);
+      logger.error(message);
+      TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0));
+      return;
+    }
     long searchIndex = impl.getIndex();
     long safeIndex = impl.getCurrentSafelyDeletedSearchIndex();
     resultHandler.onComplete(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
index 4a5b4169fe..cda96d3379 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
@@ -33,6 +33,8 @@ import 
org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq;
 import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes;
 import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq;
 import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes;
+import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteReq;
+import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteRes;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
@@ -62,11 +64,24 @@ public class TestRPCClient {
 
   public static void main(String args[]) {
     TestRPCClient client = new TestRPCClient();
-    //    client.removeRegionPeer();
-    client.addPeer();
+    client.removeRegionPeer();
+//    client.testWaitSyncLog();
     //    client.loadSnapshot();
   }
 
+  private void testWaitSyncLog() {
+    try (SyncMultiLeaderServiceClient client =
+             syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 
40012))) {
+      TWaitSyncLogCompleteRes res =
+          client.waitSyncLogComplete(
+              new TWaitSyncLogCompleteReq(new 
DataRegionId(1).convertToTConsensusGroupId()));
+      System.out.printf("%s, %d, %d",res.complete, res.searchIndex, 
res.safeIndex);
+    } catch (IOException | TException e) {
+      System.out.println("Error: " + e.getMessage());
+      throw new RuntimeException(e);
+    }
+  }
+
   private void loadSnapshot() {
     try (SyncMultiLeaderServiceClient client =
         syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 40011))) {
@@ -96,7 +111,7 @@ public class TestRPCClient {
     try (SyncDataNodeInternalServiceClient client =
         INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new 
TEndPoint("127.0.0.1", 9003))) {
       client.removeRegionPeer(
-          new TMaintainPeerReq(new 
DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3)));
+          new TMaintainPeerReq(new 
DataRegionId(1).convertToTConsensusGroupId(), getLocation3(3)));
     } catch (IOException | TException e) {
       throw new RuntimeException(e);
     }

Reply via email to