This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_retry
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit be53e3a8a92e5adecb37497a7b1605ec0dab6c53
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Wed Jul 13 17:21:23 2022 +0800

    add retry logic to QueryExecution
---
 .../apache/iotdb/db/mpp/execution/QueryState.java  |  1 +
 .../iotdb/db/mpp/execution/QueryStateMachine.java  | 15 ++++++++++++
 .../db/mpp/plan/execution/QueryExecution.java      | 28 ++++++++++++++++++++--
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |  4 ++--
 4 files changed, 44 insertions(+), 4 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
index 383875a022..d4ed15d9ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
@@ -28,6 +28,7 @@ public enum QueryState {
   QUEUED(false),
   PLANNED(false),
   DISPATCHING(false),
+  RETRYING(false),
   RUNNING(false),
   FINISHED(true),
   CANCELED(true),
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 801a1d3df2..42e793b41f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -100,6 +100,14 @@ public class QueryStateMachine {
     queryState.set(QueryState.DISPATCHING);
   }
 
+  public void transitionToRetrying(Throwable throwable) {
+    if (queryState.get().isDone()) {
+      return;
+    }
+    queryState.set(QueryState.RETRYING);
+    this.failureException = throwable;
+  }
+
   public void transitionToRunning() {
     queryState.set(QueryState.RUNNING);
   }
@@ -125,6 +133,13 @@ public class QueryStateMachine {
     queryState.set(QueryState.ABORTED);
   }
 
+  public void transitionToFailed() {
+    if (queryState.get().isDone()) {
+      return;
+    }
+    queryState.set(QueryState.FAILED);
+  }
+
   public void transitionToFailed(Throwable throwable) {
     if (queryState.get().isDone()) {
       return;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 4bae3eeafc..75ea799ac5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -87,14 +87,15 @@ public class QueryExecution implements IQueryExecution {
   private static final Logger logger = 
LoggerFactory.getLogger(QueryExecution.class);
 
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
-
+  private static final int MAX_RETRY_COUNT = 3;
+  private int retryCount = 0;
   private final MPPQueryContext context;
   private IScheduler scheduler;
   private final QueryStateMachine stateMachine;
 
   private final List<PlanOptimizer> planOptimizers;
 
-  private final Analysis analysis;
+  private Analysis analysis;
   private LogicalQueryPlan logicalPlan;
   private DistributedQueryPlan distributedPlan;
 
@@ -138,6 +139,10 @@ public class QueryExecution implements IQueryExecution {
     stateMachine.addStateChangeListener(
         state -> {
           try (SetThreadName queryName = new 
SetThreadName(context.getQueryId().getId())) {
+            if (state == QueryState.RETRYING) {
+              retry();
+              return;
+            }
             if (!state.isDone()) {
               return;
             }
@@ -169,6 +174,25 @@ public class QueryExecution implements IQueryExecution {
     schedule();
   }
 
+  private void retry() {
+    if (retryCount >= MAX_RETRY_COUNT) {
+      logger.error("reach max retry count. transit query to failed");
+      stateMachine.transitionToFailed();
+      return;
+    }
+    retryCount++;
+    logger.error("error when executing query. {}", 
stateMachine.getFailureMessage());
+    logger.warn("start to retry. Retry count is: {}", retryCount);
+    // stop and clean up resources the QueryExecution used
+    this.stopAndCleanup();
+    // force invalid PartitionCache
+    partitionFetcher.invalidAllCache();
+    // re-analyze the query
+    this.analysis = analyze(this.analysis.getStatement(), context, 
partitionFetcher, schemaFetcher);
+    // re-start the QueryExecution
+    this.start();
+  }
+
   private boolean skipExecute() {
     return analysis.isFinishQueryAfterAnalyze()
         || (context.getQueryType() == QueryType.READ && 
!analysis.hasDataSource());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 0a6d51eb4a..bca37f6816 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -109,11 +109,11 @@ public class ClusterScheduler implements IScheduler {
         return;
       }
     } catch (InterruptedException | ExecutionException e) {
-      // If the dispatch failed, we make the QueryState as failed, and return.
+      // If the dispatch request cannot be sent or TException is caught, we 
will retry this query.
       if (e instanceof InterruptedException) {
         Thread.currentThread().interrupt();
       }
-      stateMachine.transitionToFailed(e);
+      stateMachine.transitionToRetrying(e);
       return;
     }
 

Reply via email to