This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 33dcf2fc14 Make select into error more user-friendly (#8027)
33dcf2fc14 is described below

commit 33dcf2fc14a4fc4cad4fdf74dbd47b70b7a2e018
Author: Jackie Tien <[email protected]>
AuthorDate: Fri Nov 18 19:47:38 2022 +0800

    Make select into error more user-friendly (#8027)
---
 .../org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java   |  3 +-
 .../java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java  | 11 ++--
 .../iotdb/db/it/selectinto/IoTDBSelectIntoIT.java  | 12 +++--
 .../schemaregion/rocksdb/RSchemaRegion.java        |  3 +-
 .../metadata/tagSchemaRegion/TagSchemaRegion.java  |  2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../db/metadata/mtree/MTreeBelowSGCachedImpl.java  |  2 +-
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  |  2 +-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |  9 ++++
 .../db/mpp/execution/exchange/ISourceHandle.java   |  8 +++
 .../db/mpp/execution/exchange/LocalSinkHandle.java |  9 +++-
 .../mpp/execution/exchange/LocalSourceHandle.java  | 21 ++++++++
 .../execution/exchange/MPPDataExchangeManager.java |  6 ++-
 .../mpp/execution/exchange/SharedTsBlockQueue.java | 23 +++++++-
 .../db/mpp/execution/exchange/SourceHandle.java    |  5 ++
 .../fragment/FragmentInstanceContext.java          |  8 ++-
 .../operator/process/AbstractIntoOperator.java     |  1 -
 .../execution/schedule/AbstractDriverThread.java   |  2 +-
 .../schedule/FragmentInstanceAbortedException.java |  2 +-
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  | 15 +++---
 .../db/mpp/plan/execution/QueryExecution.java      | 61 ++++++++++++++++------
 .../plan/execution/memory/MemorySourceHandle.java  |  5 ++
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  2 +-
 23 files changed, 162 insertions(+), 52 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
index 7aaecba327..5c014bc444 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.it.cq;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -38,7 +39,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
-@Category(ClusterIT.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
 public class IoTDBCQExecIT {
 
   @BeforeClass
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
index aca7aa661e..3a520e5a71 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.it.cq;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.AfterClass;
@@ -38,7 +39,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
-@Category(ClusterIT.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
 public class IoTDBCQIT {
 
   @BeforeClass
@@ -308,10 +309,14 @@ public class IoTDBCQIT {
         statement.execute(sql);
         fail();
       } catch (Exception e) {
-        assertEquals("932: CQ s1_count_cq has already been created.", 
e.getMessage());
+        assertEquals(
+            TSStatusCode.CQ_AlREADY_EXIST.getStatusCode()
+                + ": CQ s1_count_cq has already been created.",
+            e.getMessage());
+      } finally {
+        statement.execute("DROP CQ s1_count_cq;");
       }
 
-      statement.execute("DROP CQ s1_count_cq;");
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
index fd159a3bed..6236fcd7cf 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -469,7 +468,7 @@ public class IoTDBSelectIntoIT {
     executeNonQuery("CREATE TIMESERIES root.sg_error_bk1.new_d.t1 TEXT;");
     assertTestFail(
         "select s1, s2 into root.sg_error_bk1.new_d(t1, t2, t3, t4) from 
root.sg.*;",
-        "Task was cancelled.");
+        "Fail to insert measurements [t1] caused by [data type of 
root.sg_error_bk1.new_d.t1 is not consistent, registered type TEXT, inserting 
type INT32, timestamp 1, value 1]");
   }
 
   @Test
@@ -477,7 +476,7 @@ public class IoTDBSelectIntoIT {
     executeNonQuery("CREATE ALIGNED TIMESERIES root.sg_error_bk2.new_d(t1 
INT32, t2 INT32);");
     assertTestFail(
         "select s1, s2 into root.sg_error_bk2.new_d(t1, t2, t3, t4) from 
root.sg.*;",
-        "Task was cancelled.");
+        "timeseries under this entity is aligned, please use 
createAlignedTimeseries or change entity. (Path: root.sg_error_bk2.new_d)");
   }
 
   @Test
@@ -503,7 +502,6 @@ public class IoTDBSelectIntoIT {
   }
 
   @Test
-  @Ignore // TODO remove @Ignore after fix error message inconsistent
   public void testPermission2() throws SQLException {
     try (Connection adminCon = EnvFactory.getEnv().getConnection();
         Statement adminStmt = adminCon.createStatement()) {
@@ -516,7 +514,11 @@ public class IoTDBSelectIntoIT {
             "select s1, s2 into root.sg_bk.new_d(t1, t2, t3, t4) from 
root.sg.*;");
         fail("No exception!");
       } catch (SQLException e) {
-        Assert.assertTrue(e.getMessage(), e.getMessage().contains("Task was 
cancelled."));
+        Assert.assertTrue(
+            e.getMessage(),
+            e.getMessage()
+                .contains(
+                    "No permissions for this operation, please add privilege 
INSERT_TIMESERIES"));
       }
     }
   }
diff --git 
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
 
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 90281d27f7..846a13bf8d 100644
--- 
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ 
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -368,8 +368,7 @@ public class RSchemaRegion implements ISchemaRegion {
             } else if (checkResult.getResult(RMNodeType.ENTITY)) {
               if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) != 0) {
                 throw new AlignedTimeseriesException(
-                    "Timeseries under this entity is aligned, please use 
createAlignedTimeseries"
-                        + " or change entity.",
+                    "timeseries under this entity is aligned, please use 
createAlignedTimeseries or change entity.",
                     RSchemaUtils.getPathByLevelPath(levelPath));
               }
             } else {
diff --git 
a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
 
b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
index 174f989e86..df3d187d10 100644
--- 
a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
+++ 
b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
@@ -278,7 +278,7 @@ public class TagSchemaRegion implements ISchemaRegion {
     if (deviceEntry != null) {
       if (!deviceEntry.isAligned()) {
         throw new AlignedTimeseriesException(
-            "Timeseries under this entity is aligned, please use 
createAlignedTimeseries or change entity.",
+            "timeseries under this entity is aligned, please use 
createAlignedTimeseries or change entity.",
             devicePath.getFullPath());
       } else {
         filterExistingMeasurements(plan, 
deviceEntry.getMeasurementMap().keySet());
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 007dc61660..aff501e62a 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -307,7 +307,7 @@ public class IoTDBConfig {
   private int flushThreadCount = Runtime.getRuntime().availableProcessors();
 
   /** How many threads can concurrently execute query statement. When <= 0, 
use CPU core number. */
-  private int queryThreadCount = Runtime.getRuntime().availableProcessors();
+  private int queryThreadCount = Math.max(4, 
Runtime.getRuntime().availableProcessors());
 
   /** How many queries can be concurrently executed. When <= 0, use 1000. */
   private int maxAllowedConcurrentQueries = 1000;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index 58b1c24550..075e701e2f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -268,7 +268,7 @@ public class MTreeBelowSGCachedImpl implements 
IMTreeBelowSG {
 
           if (device.isEntity() && device.getAsEntityMNode().isAligned()) {
             throw new AlignedTimeseriesException(
-                "Timeseries under this entity is aligned, please use 
createAlignedTimeseries or change entity.",
+                "timeseries under this entity is aligned, please use 
createAlignedTimeseries or change entity.",
                 device.getFullPath());
           }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index bb3290f72e..4b2e3b359e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -247,7 +247,7 @@ public class MTreeBelowSGMemoryImpl implements 
IMTreeBelowSG {
 
       if (device.isEntity() && device.getAsEntityMNode().isAligned()) {
         throw new AlignedTimeseriesException(
-            "Timeseries under this entity is aligned, please use 
createAlignedTimeseries or change entity.",
+            "timeseries under this entity is aligned, please use 
createAlignedTimeseries or change entity.",
             device.getFullPath());
       }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index d5aaa093fb..a9a627d299 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.mpp.common.QueryId;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -142,6 +143,14 @@ public class QueryStateMachine {
     return "no detailed failure reason in QueryStateMachine";
   }
 
+  public Throwable getFailureException() {
+    if (failureException == null) {
+      return new IoTDBException(getFailureStatus().getMessage(), 
getFailureStatus().code);
+    } else {
+      return failureException;
+    }
+  }
+
   public TSStatus getFailureStatus() {
     return failureStatus;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
index 5a94184e67..2a958525bb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
@@ -67,6 +67,14 @@ public interface ISourceHandle {
    */
   void abort();
 
+  /**
+   * Abort the handle. Discard all tsblocks which may still be in the memory 
buffer and cancel the
+   * future returned by {@link #isBlocked()}.
+   *
+   * <p>Should only be called in abnormal case
+   */
+  void abort(Throwable t);
+
   /**
    * Close the handle. Discard all tsblocks which may still be in the memory 
buffer and complete the
    * future returned by {@link #isBlocked()}.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index bcf469b38d..c9d03ab833 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Optional;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static 
com.google.common.util.concurrent.Futures.nonCancellationPropagating;
@@ -150,8 +151,12 @@ public class LocalSinkHandle implements ISinkHandle {
           return;
         }
         aborted = true;
-        queue.abort();
-        sinkHandleListener.onAborted(this);
+        Optional<Throwable> t = sinkHandleListener.onAborted(this);
+        if (t.isPresent()) {
+          queue.abort(t.get());
+        } else {
+          queue.abort();
+        }
       }
     }
     logger.debug("[EndAbortLocalSinkHandle]");
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 4db83c9b88..64415f41dc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -176,6 +176,27 @@ public class LocalSourceHandle implements ISourceHandle {
     }
   }
 
+  @Override
+  public void abort(Throwable t) {
+    if (aborted || closed) {
+      return;
+    }
+    try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+      logger.debug("[StartAbortLocalSourceHandle]");
+      synchronized (queue) {
+        synchronized (this) {
+          if (aborted || closed) {
+            return;
+          }
+          queue.abort(t);
+          aborted = true;
+          sourceHandleListener.onAborted(this);
+        }
+      }
+      logger.debug("[EndAbortLocalSourceHandle]");
+    }
+  }
+
   @Override
   public void close() {
     if (aborted || closed) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index d0ee58996c..a4bbe224ca 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -44,6 +44,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
@@ -67,7 +68,7 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
 
     void onEndOfBlocks(ISinkHandle sinkHandle);
 
-    void onAborted(ISinkHandle sinkHandle);
+    Optional<Throwable> onAborted(ISinkHandle sinkHandle);
 
     void onFailure(ISinkHandle sinkHandle, Throwable t);
   }
@@ -279,9 +280,10 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
     }
 
     @Override
-    public void onAborted(ISinkHandle sinkHandle) {
+    public Optional<Throwable> onAborted(ISinkHandle sinkHandle) {
       logger.debug("[SkHListenerOnAbort]");
       removeFromMPPDataExchangeManager(sinkHandle);
+      return context.getFailureCause();
     }
 
     private void removeFromMPPDataExchangeManager(ISinkHandle sinkHandle) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index b42a370780..a593bf5c91 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -196,8 +196,6 @@ public class SharedTsBlockQueue {
     }
   }
 
-  // TODO add Throwable t as a parameter of this method, and then call 
blocked.setException(t);
-  // instead of blocked.cancel(true);
   /** Destroy the queue and cancel the future. Should only be called in 
abnormal case */
   public void abort() {
     if (closed) {
@@ -218,4 +216,25 @@ public class SharedTsBlockQueue {
       bufferRetainedSizeInBytes = 0;
     }
   }
+
+  /** Destroy the queue and cancel the future. Should only be called in 
abnormal case */
+  public void abort(Throwable t) {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    if (!blocked.isDone()) {
+      blocked.setException(t);
+    }
+    if (blockedOnMemory != null) {
+      bufferRetainedSizeInBytes -= 
localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
+    }
+    queue.clear();
+    if (bufferRetainedSizeInBytes > 0L) {
+      localMemoryManager
+          .getQueryPool()
+          .free(localFragmentInstanceId.getQueryId(), 
bufferRetainedSizeInBytes);
+      bufferRetainedSizeInBytes = 0;
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index 27e4cb9cd3..db9ea594d6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -274,6 +274,11 @@ public class SourceHandle implements ISourceHandle {
     }
   }
 
+  @Override
+  public void abort(Throwable t) {
+    abort();
+  }
+
   @Override
   public synchronized void close() {
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 2d11304062..69928dcec3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -185,8 +186,9 @@ public class FragmentInstanceContext extends QueryContext {
   /** @return Message string of all failures */
   public String getFailedCause() {
     return stateMachine.getFailureCauses().stream()
+        .findFirst()
         .map(Throwable::getMessage)
-        .collect(Collectors.joining("; "));
+        .orElse("");
   }
 
   /** @return List of specific throwable and stack trace */
@@ -232,4 +234,8 @@ public class FragmentInstanceContext extends QueryContext {
   public SessionInfo getSessionInfo() {
     return sessionInfo;
   }
+
+  public Optional<Throwable> getFailureCause() {
+    return Optional.ofNullable(stateMachine.getFailureCauses().peek());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index dbcefcafea..632fbeb05a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -120,7 +120,6 @@ public abstract class AbstractIntoOperator implements 
ProcessOperator {
           String.format(
               "Error occurred while inserting tablets in SELECT INTO: %s",
               executionStatus.getMessage());
-      LOGGER.error(message);
       throw new IntoProcessException(message);
     }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
index ae4e997a7a..4a4101da85 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
@@ -77,7 +77,7 @@ public abstract class AbstractDriverThread extends Thread 
implements Closeable {
           // reset the thread name here
           try (SetThreadName fragmentInstanceName =
               new 
SetThreadName(next.getFragmentInstance().getInfo().getFullId())) {
-            logger.error("[ExecuteFailed]", t);
+            logger.warn("[ExecuteFailed]", t);
             
next.setAbortCause(FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
             scheduler.toAborted(next);
           }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
index 7425adc239..6bbc447c3c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 public class FragmentInstanceAbortedException extends Exception {
 
   public static final String BY_TIMEOUT = "timeout";
-  public static final String BY_FRAGMENT_ABORT_CALLED = "fragment abort 
called";
+  public static final String BY_FRAGMENT_ABORT_CALLED = " called";
   public static final String BY_QUERY_CASCADING_ABORTED = "query cascading 
aborted";
   public static final String BY_ALREADY_BEING_CANCELLED = "already being 
cancelled";
   public static final String BY_INTERNAL_ERROR_SCHEDULED = "internal error 
scheduled";
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 6a683fc4ce..31cc8c8deb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.metadata.template.ITemplateManager;
@@ -61,6 +62,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -490,24 +492,19 @@ public class ClusterSchemaFetcher implements 
ISchemaFetcher {
           new IoTDBException(executionResult.status.getMessage(), statusCode));
     }
 
-    List<String> failedCreationList = new ArrayList<>();
+    Set<String> failedCreationSet = new HashSet<>();
     List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
     for (TSStatus subStatus : executionResult.status.subStatus) {
       if (subStatus.code == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
         alreadyExistingMeasurements.add(
             MeasurementPath.parseDataFromString(subStatus.getMessage()));
       } else {
-        failedCreationList.add(subStatus.message);
+        failedCreationSet.add(subStatus.message);
       }
     }
 
-    if (!failedCreationList.isEmpty()) {
-      StringBuilder stringBuilder = new StringBuilder();
-      for (String message : failedCreationList) {
-        stringBuilder.append(message).append("\n");
-      }
-      throw new RuntimeException(
-          new MetadataException(String.format("Failed to auto create schema\n 
%s", stringBuilder)));
+    if (!failedCreationSet.isEmpty()) {
+      throw new SemanticException(new MetadataException(String.join("; ", 
failedCreationSet)));
     }
 
     return alreadyExistingMeasurements;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index d7707926c6..f80c8b8283 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -153,15 +153,16 @@ public class QueryExecution implements IQueryExecution {
             if (!state.isDone()) {
               return;
             }
-            this.stop();
             // TODO: (xingtanzjr) If the query is in abnormal state, the 
releaseResource() should be
             // invoked
             if (state == QueryState.FAILED
                 || state == QueryState.ABORTED
                 || state == QueryState.CANCELED) {
               logger.debug("[ReleaseQueryResource] state is: {}", state);
-              releaseResource();
+              Throwable cause = stateMachine.getFailureException();
+              releaseResource(cause);
             }
+            this.stop();
           }
         });
     this.stopped = new AtomicBoolean(false);
@@ -213,7 +214,7 @@ public class QueryExecution implements IQueryExecution {
     }
     logger.warn("error when executing query. {}", 
stateMachine.getFailureMessage());
     // stop and clean up resources the QueryExecution used
-    this.stopAndCleanup();
+    this.stopAndCleanup(stateMachine.getFailureException());
     logger.info("[WaitBeforeRetry] wait {}ms.", RETRY_INTERVAL_IN_MS);
     try {
       Thread.sleep(RETRY_INTERVAL_IN_MS);
@@ -347,6 +348,26 @@ public class QueryExecution implements IQueryExecution {
     }
   }
 
+  // Stop the query and clean up all the resources this query occupied
+  public void stopAndCleanup(Throwable t) {
+    stop();
+    releaseResource(t);
+  }
+
+  /** Release the resources that current QueryExecution hold with a specified 
exception */
+  private void releaseResource(Throwable t) {
+    // close ResultHandle to unblock client's getResult request
+    // Actually, we should not close the ResultHandle when the QueryExecution 
is Finished.
+    // There are only two scenarios where the ResultHandle should be closed:
+    //   1. The client fetch all the result and the ResultHandle is finished.
+    //   2. The client's connection is closed that all owned QueryExecution 
should be cleaned up
+    // If the QueryExecution's state is abnormal, we should also abort the 
resultHandle without
+    // waiting it to be finished.
+    if (resultHandle != null) {
+      resultHandle.abort(t);
+    }
+  }
+
   /**
    * This method will be called by the request thread from client connection. 
This method will block
    * until one of these conditions occurs: 1. There is a batch of result 2. 
There is no more result
@@ -367,7 +388,8 @@ public class QueryExecution implements IQueryExecution {
                 stateMachine.getFailureStatus().getMessage(), 
stateMachine.getFailureStatus().code);
           } else {
             throw new IoTDBException(
-                stateMachine.getFailureMessage(), 
TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+                stateMachine.getFailureMessage(),
+                TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
           }
         } else if (resultHandle.isFinished()) {
           logger.debug("[ResultHandleFinished]");
@@ -388,25 +410,30 @@ public class QueryExecution implements IQueryExecution {
           return Optional.empty();
         }
       } catch (ExecutionException | CancellationException e) {
-        stateMachine.transitionToFailed(e.getCause() != null ? e.getCause() : 
e);
-        if (stateMachine.getFailureStatus() != null) {
-          throw new IoTDBException(
-              stateMachine.getFailureStatus().getMessage(), 
stateMachine.getFailureStatus().code);
-        }
-        Throwable t = e.getCause() == null ? e : e.getCause();
-        throwIfUnchecked(t);
-        throw new IoTDBException(t, 
TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+        dealWithException(e.getCause() != null ? e.getCause() : e);
       } catch (InterruptedException e) {
-        stateMachine.transitionToFailed(e);
         Thread.currentThread().interrupt();
-        throw new IoTDBException(e, 
TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+        dealWithException(e);
       } catch (Throwable t) {
-        stateMachine.transitionToFailed(t);
-        throw t;
+        dealWithException(t);
       }
     }
   }
 
+  private void dealWithException(Throwable t) throws IoTDBException {
+    stateMachine.transitionToFailed(t);
+    if (stateMachine.getFailureStatus() != null) {
+      throw new IoTDBException(
+          stateMachine.getFailureStatus().getMessage(), 
stateMachine.getFailureStatus().code);
+    } else if (stateMachine.getFailureException() != null) {
+      Throwable rootCause = stateMachine.getFailureException();
+      throw new IoTDBException(rootCause, 
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+    } else {
+      throwIfUnchecked(t);
+      throw new IoTDBException(t, 
TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+    }
+  }
+
   @Override
   public Optional<TsBlock> getBatchResult() throws IoTDBException {
     return getResult(this::getDeserializedTsBlock);
@@ -524,7 +551,7 @@ public class QueryExecution implements IQueryExecution {
       statusCode =
           state == QueryState.FINISHED || state == QueryState.RUNNING
               ? TSStatusCode.SUCCESS_STATUS
-              : TSStatusCode.QUERY_PROCESS_ERROR;
+              : TSStatusCode.EXECUTE_STATEMENT_ERROR;
     }
 
     TSStatus tsstatus = RpcUtils.getStatus(statusCode, 
stateMachine.getFailureMessage());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
index bb116f7ef7..29b612aa7d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
@@ -100,6 +100,11 @@ public class MemorySourceHandle implements ISourceHandle {
   @Override
   public void abort() {}
 
+  @Override
+  public void abort(Throwable t) {
+    abort();
+  }
+
   @Override
   public void close() {}
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 6c8bca7a51..af1a2fec89 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -193,7 +193,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
       logger.error("can't connect to node {}", endPoint, e);
       TSStatus status = new TSStatus();
       status.setCode(TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode());
-      status.setMessage("can't connect to node {}" + endPoint);
+      status.setMessage("can't connect to node " + endPoint);
       // If the DataNode cannot be connected, its endPoint will be put into 
black list
       // so that the following retry will avoid dispatching instance towards 
this DataNode.
       queryContext.addFailedEndPoint(endPoint);

Reply via email to