This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 33dcf2fc14 Make select into error more user-friendly (#8027)
33dcf2fc14 is described below
commit 33dcf2fc14a4fc4cad4fdf74dbd47b70b7a2e018
Author: Jackie Tien <[email protected]>
AuthorDate: Fri Nov 18 19:47:38 2022 +0800
Make select into error more user-friendly (#8027)
---
.../org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java | 3 +-
.../java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java | 11 ++--
.../iotdb/db/it/selectinto/IoTDBSelectIntoIT.java | 12 +++--
.../schemaregion/rocksdb/RSchemaRegion.java | 3 +-
.../metadata/tagSchemaRegion/TagSchemaRegion.java | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../db/metadata/mtree/MTreeBelowSGCachedImpl.java | 2 +-
.../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 2 +-
.../iotdb/db/mpp/execution/QueryStateMachine.java | 9 ++++
.../db/mpp/execution/exchange/ISourceHandle.java | 8 +++
.../db/mpp/execution/exchange/LocalSinkHandle.java | 9 +++-
.../mpp/execution/exchange/LocalSourceHandle.java | 21 ++++++++
.../execution/exchange/MPPDataExchangeManager.java | 6 ++-
.../mpp/execution/exchange/SharedTsBlockQueue.java | 23 +++++++-
.../db/mpp/execution/exchange/SourceHandle.java | 5 ++
.../fragment/FragmentInstanceContext.java | 8 ++-
.../operator/process/AbstractIntoOperator.java | 1 -
.../execution/schedule/AbstractDriverThread.java | 2 +-
.../schedule/FragmentInstanceAbortedException.java | 2 +-
.../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 15 +++---
.../db/mpp/plan/execution/QueryExecution.java | 61 ++++++++++++++++------
.../plan/execution/memory/MemorySourceHandle.java | 5 ++
.../scheduler/FragmentInstanceDispatcherImpl.java | 2 +-
23 files changed, 162 insertions(+), 52 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
index 7aaecba327..5c014bc444 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.it.cq;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -38,7 +39,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
-@Category(ClusterIT.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IoTDBCQExecIT {
@BeforeClass
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
index aca7aa661e..3a520e5a71 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.it.cq;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.AfterClass;
@@ -38,7 +39,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
-@Category(ClusterIT.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IoTDBCQIT {
@BeforeClass
@@ -308,10 +309,14 @@ public class IoTDBCQIT {
statement.execute(sql);
fail();
} catch (Exception e) {
- assertEquals("932: CQ s1_count_cq has already been created.",
e.getMessage());
+ assertEquals(
+ TSStatusCode.CQ_AlREADY_EXIST.getStatusCode()
+ + ": CQ s1_count_cq has already been created.",
+ e.getMessage());
+ } finally {
+ statement.execute("DROP CQ s1_count_cq;");
}
- statement.execute("DROP CQ s1_count_cq;");
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
index fd159a3bed..6236fcd7cf 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -469,7 +468,7 @@ public class IoTDBSelectIntoIT {
executeNonQuery("CREATE TIMESERIES root.sg_error_bk1.new_d.t1 TEXT;");
assertTestFail(
"select s1, s2 into root.sg_error_bk1.new_d(t1, t2, t3, t4) from
root.sg.*;",
- "Task was cancelled.");
+ "Fail to insert measurements [t1] caused by [data type of
root.sg_error_bk1.new_d.t1 is not consistent, registered type TEXT, inserting
type INT32, timestamp 1, value 1]");
}
@Test
@@ -477,7 +476,7 @@ public class IoTDBSelectIntoIT {
executeNonQuery("CREATE ALIGNED TIMESERIES root.sg_error_bk2.new_d(t1
INT32, t2 INT32);");
assertTestFail(
"select s1, s2 into root.sg_error_bk2.new_d(t1, t2, t3, t4) from
root.sg.*;",
- "Task was cancelled.");
+ "timeseries under this entity is aligned, please use
createAlignedTimeseries or change entity. (Path: root.sg_error_bk2.new_d)");
}
@Test
@@ -503,7 +502,6 @@ public class IoTDBSelectIntoIT {
}
@Test
- @Ignore // TODO remove @Ignore after fix error message inconsistent
public void testPermission2() throws SQLException {
try (Connection adminCon = EnvFactory.getEnv().getConnection();
Statement adminStmt = adminCon.createStatement()) {
@@ -516,7 +514,11 @@ public class IoTDBSelectIntoIT {
"select s1, s2 into root.sg_bk.new_d(t1, t2, t3, t4) from
root.sg.*;");
fail("No exception!");
} catch (SQLException e) {
- Assert.assertTrue(e.getMessage(), e.getMessage().contains("Task was
cancelled."));
+ Assert.assertTrue(
+ e.getMessage(),
+ e.getMessage()
+ .contains(
+ "No permissions for this operation, please add privilege
INSERT_TIMESERIES"));
}
}
}
diff --git
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 90281d27f7..846a13bf8d 100644
---
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -368,8 +368,7 @@ public class RSchemaRegion implements ISchemaRegion {
} else if (checkResult.getResult(RMNodeType.ENTITY)) {
if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) != 0) {
throw new AlignedTimeseriesException(
- "Timeseries under this entity is aligned, please use
createAlignedTimeseries"
- + " or change entity.",
+ "timeseries under this entity is aligned, please use
createAlignedTimeseries or change entity.",
RSchemaUtils.getPathByLevelPath(levelPath));
}
} else {
diff --git
a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
index 174f989e86..df3d187d10 100644
---
a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
+++
b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
@@ -278,7 +278,7 @@ public class TagSchemaRegion implements ISchemaRegion {
if (deviceEntry != null) {
if (!deviceEntry.isAligned()) {
throw new AlignedTimeseriesException(
- "Timeseries under this entity is aligned, please use
createAlignedTimeseries or change entity.",
+ "timeseries under this entity is aligned, please use
createAlignedTimeseries or change entity.",
devicePath.getFullPath());
} else {
filterExistingMeasurements(plan,
deviceEntry.getMeasurementMap().keySet());
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 007dc61660..aff501e62a 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -307,7 +307,7 @@ public class IoTDBConfig {
private int flushThreadCount = Runtime.getRuntime().availableProcessors();
/** How many threads can concurrently execute query statement. When <= 0,
use CPU core number. */
- private int queryThreadCount = Runtime.getRuntime().availableProcessors();
+ private int queryThreadCount = Math.max(4,
Runtime.getRuntime().availableProcessors());
/** How many queries can be concurrently executed. When <= 0, use 1000. */
private int maxAllowedConcurrentQueries = 1000;
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index 58b1c24550..075e701e2f 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -268,7 +268,7 @@ public class MTreeBelowSGCachedImpl implements
IMTreeBelowSG {
if (device.isEntity() && device.getAsEntityMNode().isAligned()) {
throw new AlignedTimeseriesException(
- "Timeseries under this entity is aligned, please use
createAlignedTimeseries or change entity.",
+ "timeseries under this entity is aligned, please use
createAlignedTimeseries or change entity.",
device.getFullPath());
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index bb3290f72e..4b2e3b359e 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -247,7 +247,7 @@ public class MTreeBelowSGMemoryImpl implements
IMTreeBelowSG {
if (device.isEntity() && device.getAsEntityMNode().isAligned()) {
throw new AlignedTimeseriesException(
- "Timeseries under this entity is aligned, please use
createAlignedTimeseries or change entity.",
+ "timeseries under this entity is aligned, please use
createAlignedTimeseries or change entity.",
device.getFullPath());
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index d5aaa093fb..a9a627d299 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.mpp.common.QueryId;
import com.google.common.util.concurrent.ListenableFuture;
@@ -142,6 +143,14 @@ public class QueryStateMachine {
return "no detailed failure reason in QueryStateMachine";
}
+ public Throwable getFailureException() {
+ if (failureException == null) {
+ return new IoTDBException(getFailureStatus().getMessage(),
getFailureStatus().code);
+ } else {
+ return failureException;
+ }
+ }
+
public TSStatus getFailureStatus() {
return failureStatus;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
index 5a94184e67..2a958525bb 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
@@ -67,6 +67,14 @@ public interface ISourceHandle {
*/
void abort();
+ /**
+ * Abort the handle. Discard all tsblocks which may still be in the memory
buffer and cancel the
+ * future returned by {@link #isBlocked()}.
+ *
+ * <p>Should only be called in abnormal case
+ */
+ void abort(Throwable t);
+
/**
* Close the handle. Discard all tsblocks which may still be in the memory
buffer and complete the
* future returned by {@link #isBlocked()}.
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index bcf469b38d..c9d03ab833 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.Optional;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static
com.google.common.util.concurrent.Futures.nonCancellationPropagating;
@@ -150,8 +151,12 @@ public class LocalSinkHandle implements ISinkHandle {
return;
}
aborted = true;
- queue.abort();
- sinkHandleListener.onAborted(this);
+ Optional<Throwable> t = sinkHandleListener.onAborted(this);
+ if (t.isPresent()) {
+ queue.abort(t.get());
+ } else {
+ queue.abort();
+ }
}
}
logger.debug("[EndAbortLocalSinkHandle]");
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 4db83c9b88..64415f41dc 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -176,6 +176,27 @@ public class LocalSourceHandle implements ISourceHandle {
}
}
+ @Override
+ public void abort(Throwable t) {
+ if (aborted || closed) {
+ return;
+ }
+ try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+ logger.debug("[StartAbortLocalSourceHandle]");
+ synchronized (queue) {
+ synchronized (this) {
+ if (aborted || closed) {
+ return;
+ }
+ queue.abort(t);
+ aborted = true;
+ sourceHandleListener.onAborted(this);
+ }
+ }
+ logger.debug("[EndAbortLocalSourceHandle]");
+ }
+ }
+
@Override
public void close() {
if (aborted || closed) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index d0ee58996c..a4bbe224ca 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -44,6 +44,7 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
@@ -67,7 +68,7 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
void onEndOfBlocks(ISinkHandle sinkHandle);
- void onAborted(ISinkHandle sinkHandle);
+ Optional<Throwable> onAborted(ISinkHandle sinkHandle);
void onFailure(ISinkHandle sinkHandle, Throwable t);
}
@@ -279,9 +280,10 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
}
@Override
- public void onAborted(ISinkHandle sinkHandle) {
+ public Optional<Throwable> onAborted(ISinkHandle sinkHandle) {
logger.debug("[SkHListenerOnAbort]");
removeFromMPPDataExchangeManager(sinkHandle);
+ return context.getFailureCause();
}
private void removeFromMPPDataExchangeManager(ISinkHandle sinkHandle) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index b42a370780..a593bf5c91 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -196,8 +196,6 @@ public class SharedTsBlockQueue {
}
}
- // TODO add Throwable t as a parameter of this method, and then call
blocked.setException(t);
- // instead of blocked.cancel(true);
/** Destroy the queue and cancel the future. Should only be called in
abnormal case */
public void abort() {
if (closed) {
@@ -218,4 +216,25 @@ public class SharedTsBlockQueue {
bufferRetainedSizeInBytes = 0;
}
}
+
+ /** Destroy the queue and cancel the future. Should only be called in
abnormal case */
+ public void abort(Throwable t) {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ if (!blocked.isDone()) {
+ blocked.setException(t);
+ }
+ if (blockedOnMemory != null) {
+ bufferRetainedSizeInBytes -=
localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
+ }
+ queue.clear();
+ if (bufferRetainedSizeInBytes > 0L) {
+ localMemoryManager
+ .getQueryPool()
+ .free(localFragmentInstanceId.getQueryId(),
bufferRetainedSizeInBytes);
+ bufferRetainedSizeInBytes = 0;
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index 27e4cb9cd3..db9ea594d6 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -274,6 +274,11 @@ public class SourceHandle implements ISourceHandle {
}
}
+ @Override
+ public void abort(Throwable t) {
+ abort();
+ }
+
@Override
public synchronized void close() {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 2d11304062..69928dcec3 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -185,8 +186,9 @@ public class FragmentInstanceContext extends QueryContext {
/** @return Message string of all failures */
public String getFailedCause() {
return stateMachine.getFailureCauses().stream()
+ .findFirst()
.map(Throwable::getMessage)
- .collect(Collectors.joining("; "));
+ .orElse("");
}
/** @return List of specific throwable and stack trace */
@@ -232,4 +234,8 @@ public class FragmentInstanceContext extends QueryContext {
public SessionInfo getSessionInfo() {
return sessionInfo;
}
+
+ public Optional<Throwable> getFailureCause() {
+ return Optional.ofNullable(stateMachine.getFailureCauses().peek());
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index dbcefcafea..632fbeb05a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -120,7 +120,6 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
String.format(
"Error occurred while inserting tablets in SELECT INTO: %s",
executionStatus.getMessage());
- LOGGER.error(message);
throw new IntoProcessException(message);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
index ae4e997a7a..4a4101da85 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
@@ -77,7 +77,7 @@ public abstract class AbstractDriverThread extends Thread
implements Closeable {
// reset the thread name here
try (SetThreadName fragmentInstanceName =
new
SetThreadName(next.getFragmentInstance().getInfo().getFullId())) {
- logger.error("[ExecuteFailed]", t);
+ logger.warn("[ExecuteFailed]", t);
next.setAbortCause(FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
scheduler.toAborted(next);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
index 7425adc239..6bbc447c3c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/FragmentInstanceAbortedException.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.mpp.execution.driver.IDriver;
public class FragmentInstanceAbortedException extends Exception {
public static final String BY_TIMEOUT = "timeout";
- public static final String BY_FRAGMENT_ABORT_CALLED = "fragment abort
called";
+ public static final String BY_FRAGMENT_ABORT_CALLED = " called";
public static final String BY_QUERY_CASCADING_ABORTED = "query cascading
aborted";
public static final String BY_ALREADY_BEING_CANCELLED = "already being
cancelled";
public static final String BY_INTERNAL_ERROR_SCHEDULED = "internal error
scheduled";
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 6a683fc4ce..31cc8c8deb 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.ITemplateManager;
@@ -61,6 +62,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -490,24 +492,19 @@ public class ClusterSchemaFetcher implements
ISchemaFetcher {
new IoTDBException(executionResult.status.getMessage(), statusCode));
}
- List<String> failedCreationList = new ArrayList<>();
+ Set<String> failedCreationSet = new HashSet<>();
List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
for (TSStatus subStatus : executionResult.status.subStatus) {
if (subStatus.code ==
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
alreadyExistingMeasurements.add(
MeasurementPath.parseDataFromString(subStatus.getMessage()));
} else {
- failedCreationList.add(subStatus.message);
+ failedCreationSet.add(subStatus.message);
}
}
- if (!failedCreationList.isEmpty()) {
- StringBuilder stringBuilder = new StringBuilder();
- for (String message : failedCreationList) {
- stringBuilder.append(message).append("\n");
- }
- throw new RuntimeException(
- new MetadataException(String.format("Failed to auto create schema\n
%s", stringBuilder)));
+ if (!failedCreationSet.isEmpty()) {
+ throw new SemanticException(new MetadataException(String.join("; ",
failedCreationSet)));
}
return alreadyExistingMeasurements;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index d7707926c6..f80c8b8283 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -153,15 +153,16 @@ public class QueryExecution implements IQueryExecution {
if (!state.isDone()) {
return;
}
- this.stop();
// TODO: (xingtanzjr) If the query is in abnormal state, the
releaseResource() should be
// invoked
if (state == QueryState.FAILED
|| state == QueryState.ABORTED
|| state == QueryState.CANCELED) {
logger.debug("[ReleaseQueryResource] state is: {}", state);
- releaseResource();
+ Throwable cause = stateMachine.getFailureException();
+ releaseResource(cause);
}
+ this.stop();
}
});
this.stopped = new AtomicBoolean(false);
@@ -213,7 +214,7 @@ public class QueryExecution implements IQueryExecution {
}
logger.warn("error when executing query. {}",
stateMachine.getFailureMessage());
// stop and clean up resources the QueryExecution used
- this.stopAndCleanup();
+ this.stopAndCleanup(stateMachine.getFailureException());
logger.info("[WaitBeforeRetry] wait {}ms.", RETRY_INTERVAL_IN_MS);
try {
Thread.sleep(RETRY_INTERVAL_IN_MS);
@@ -347,6 +348,26 @@ public class QueryExecution implements IQueryExecution {
}
}
+ // Stop the query and clean up all the resources this query occupied
+ public void stopAndCleanup(Throwable t) {
+ stop();
+ releaseResource(t);
+ }
+
+ /** Release the resources that current QueryExecution hold with a specified
exception */
+ private void releaseResource(Throwable t) {
+ // close ResultHandle to unblock client's getResult request
+ // Actually, we should not close the ResultHandle when the QueryExecution
is Finished.
+ // There are only two scenarios where the ResultHandle should be closed:
+ // 1. The client fetch all the result and the ResultHandle is finished.
+ // 2. The client's connection is closed that all owned QueryExecution
should be cleaned up
+ // If the QueryExecution's state is abnormal, we should also abort the
resultHandle without
+ // waiting it to be finished.
+ if (resultHandle != null) {
+ resultHandle.abort(t);
+ }
+ }
+
/**
* This method will be called by the request thread from client connection.
This method will block
* until one of these conditions occurs: 1. There is a batch of result 2.
There is no more result
@@ -367,7 +388,8 @@ public class QueryExecution implements IQueryExecution {
stateMachine.getFailureStatus().getMessage(),
stateMachine.getFailureStatus().code);
} else {
throw new IoTDBException(
- stateMachine.getFailureMessage(),
TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+ stateMachine.getFailureMessage(),
+ TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
}
} else if (resultHandle.isFinished()) {
logger.debug("[ResultHandleFinished]");
@@ -388,25 +410,30 @@ public class QueryExecution implements IQueryExecution {
return Optional.empty();
}
} catch (ExecutionException | CancellationException e) {
- stateMachine.transitionToFailed(e.getCause() != null ? e.getCause() :
e);
- if (stateMachine.getFailureStatus() != null) {
- throw new IoTDBException(
- stateMachine.getFailureStatus().getMessage(),
stateMachine.getFailureStatus().code);
- }
- Throwable t = e.getCause() == null ? e : e.getCause();
- throwIfUnchecked(t);
- throw new IoTDBException(t,
TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+ dealWithException(e.getCause() != null ? e.getCause() : e);
} catch (InterruptedException e) {
- stateMachine.transitionToFailed(e);
Thread.currentThread().interrupt();
- throw new IoTDBException(e,
TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+ dealWithException(e);
} catch (Throwable t) {
- stateMachine.transitionToFailed(t);
- throw t;
+ dealWithException(t);
}
}
}
+ private void dealWithException(Throwable t) throws IoTDBException {
+ stateMachine.transitionToFailed(t);
+ if (stateMachine.getFailureStatus() != null) {
+ throw new IoTDBException(
+ stateMachine.getFailureStatus().getMessage(),
stateMachine.getFailureStatus().code);
+ } else if (stateMachine.getFailureException() != null) {
+ Throwable rootCause = stateMachine.getFailureException();
+ throw new IoTDBException(rootCause,
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ } else {
+ throwIfUnchecked(t);
+ throw new IoTDBException(t,
TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+ }
+ }
+
@Override
public Optional<TsBlock> getBatchResult() throws IoTDBException {
return getResult(this::getDeserializedTsBlock);
@@ -524,7 +551,7 @@ public class QueryExecution implements IQueryExecution {
statusCode =
state == QueryState.FINISHED || state == QueryState.RUNNING
? TSStatusCode.SUCCESS_STATUS
- : TSStatusCode.QUERY_PROCESS_ERROR;
+ : TSStatusCode.EXECUTE_STATEMENT_ERROR;
}
TSStatus tsstatus = RpcUtils.getStatus(statusCode,
stateMachine.getFailureMessage());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
index bb116f7ef7..29b612aa7d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
@@ -100,6 +100,11 @@ public class MemorySourceHandle implements ISourceHandle {
@Override
public void abort() {}
+ @Override
+ public void abort(Throwable t) {
+ abort();
+ }
+
@Override
public void close() {}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 6c8bca7a51..af1a2fec89 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -193,7 +193,7 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
logger.error("can't connect to node {}", endPoint, e);
TSStatus status = new TSStatus();
status.setCode(TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode());
- status.setMessage("can't connect to node {}" + endPoint);
+ status.setMessage("can't connect to node " + endPoint);
// If the DataNode cannot be connected, its endPoint will be put into
black list
// so that the following retry will avoid dispatching instance towards
this DataNode.
queryContext.addFailedEndPoint(endPoint);