This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/sonar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/sonar by this push:
new 0bb317ebc49 server/src/main/java/org/apache/iotdb/db/mpp/execution
done except for operator package
0bb317ebc49 is described below
commit 0bb317ebc49a6a3ddf764a8a5fe291bc7675d74a
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Jun 20 17:01:59 2023 +0800
server/src/main/java/org/apache/iotdb/db/mpp/execution done except for
operator package
---
.../iotdb/db/mpp/execution/FutureStateChange.java | 1 +
.../iotdb/db/mpp/execution/QueryIdGenerator.java | 7 +-
.../iotdb/db/mpp/execution/QueryStateMachine.java | 1 +
.../iotdb/db/mpp/execution/StateMachine.java | 51 ++++---
.../iotdb/db/mpp/execution/driver/DataDriver.java | 15 +-
.../db/mpp/execution/driver/DataDriverContext.java | 2 +-
.../iotdb/db/mpp/execution/driver/Driver.java | 23 +--
.../db/mpp/execution/driver/DriverContext.java | 1 +
.../iotdb/db/mpp/execution/driver/IDriver.java | 16 ++-
.../db/mpp/execution/driver/SchemaDriver.java | 1 +
.../mpp/execution/driver/SchemaDriverContext.java | 2 +-
.../mpp/execution/executor/RegionReadExecutor.java | 13 +-
.../execution/executor/RegionWriteExecutor.java | 155 ++++++++++++---------
.../db/mpp/execution/fragment/FragmentInfo.java | 17 +++
.../fragment/FragmentInstanceContext.java | 44 +++---
.../fragment/FragmentInstanceExecution.java | 64 +++++----
.../fragment/FragmentInstanceFailureInfo.java | 36 +++--
.../fragment/FragmentInstanceFailureListener.java | 1 +
.../execution/fragment/FragmentInstanceInfo.java | 1 +
.../fragment/FragmentInstanceManager.java | 4 +
.../execution/fragment/FragmentInstanceState.java | 1 +
.../fragment/FragmentInstanceStateMachine.java | 6 +-
.../db/mpp/execution/fragment/FragmentState.java | 1 +
.../timer/RuleBasedTimeSliceAllocator.java | 2 +-
24 files changed, 280 insertions(+), 185 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
index 0d33be9b161..a3261925296 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
@@ -11,6 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.iotdb.db.mpp.execution;
import com.google.common.collect.ImmutableSet;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java
index 49ad488d08e..c8a17b6d7a4 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -56,9 +57,9 @@ public class QueryIdGenerator {
private int counter;
public QueryIdGenerator() {
- int dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
- checkArgument(dataNodeId != -1, "DataNodeId should be init first!");
- this.dataNodeId = String.valueOf(dataNodeId);
+ int dataNode = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+ checkArgument(dataNode != -1, "DataNodeId should be init first!");
+ this.dataNodeId = String.valueOf(dataNode);
}
public String getCoordinatorId() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index ffc174cbce8..e3b39a1c952 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
index c4482b87eb7..f1e94464e40 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
@@ -11,12 +11,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.iotdb.db.mpp.execution;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
@@ -36,6 +39,12 @@ import static java.util.Objects.requireNonNull;
@ThreadSafe
public class StateMachine<T> {
+
+ private static final String LOCK_HELD_ERROR_MSG = "Cannot set state while
holding the lock";
+ private static final String STATE_IS_NULL = "newState is null";
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StateMachine.class);
+
private final String name;
private final Executor executor;
private final Object lock = new Object();
@@ -110,12 +119,12 @@ public class StateMachine<T> {
* @return the state before the possible state change
*/
public T trySet(T newState) {
- checkState(!Thread.holdsLock(lock), "Cannot set state while holding the
lock");
- requireNonNull(newState, "newState is null");
+ checkState(!Thread.holdsLock(lock), LOCK_HELD_ERROR_MSG);
+ requireNonNull(newState, STATE_IS_NULL);
T oldState;
- FutureStateChange<T> futureStateChange;
- ImmutableList<StateChangeListener<T>> stateChangeListeners;
+ FutureStateChange<T> oldFutureStateChange;
+ ImmutableList<StateChangeListener<T>> curStateChangeListeners;
synchronized (lock) {
if (state.equals(newState) || isTerminalState(state)) {
return state;
@@ -124,8 +133,8 @@ public class StateMachine<T> {
oldState = state;
state = newState;
- futureStateChange = this.futureStateChange.getAndSet(new
FutureStateChange<>());
- stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
+ oldFutureStateChange = this.futureStateChange.getAndSet(new
FutureStateChange<>());
+ curStateChangeListeners =
ImmutableList.copyOf(this.stateChangeListeners);
// if we are now in a terminal state, free the listeners since this will
be the last
// notification
@@ -134,7 +143,7 @@ public class StateMachine<T> {
}
}
- fireStateChanged(newState, futureStateChange, stateChangeListeners);
+ fireStateChanged(newState, oldFutureStateChange, curStateChangeListeners);
return oldState;
}
@@ -145,8 +154,8 @@ public class StateMachine<T> {
* @return true if the state is set
*/
public boolean setIf(T newState, Predicate<T> predicate) {
- checkState(!Thread.holdsLock(lock), "Cannot set state while holding the
lock");
- requireNonNull(newState, "newState is null");
+ checkState(!Thread.holdsLock(lock), LOCK_HELD_ERROR_MSG);
+ requireNonNull(newState, STATE_IS_NULL);
while (true) {
// check if the current state passes the predicate
@@ -176,12 +185,12 @@ public class StateMachine<T> {
* @return true if the state is set
*/
public boolean compareAndSet(T expectedState, T newState) {
- checkState(!Thread.holdsLock(lock), "Cannot set state while holding the
lock");
+ checkState(!Thread.holdsLock(lock), LOCK_HELD_ERROR_MSG);
requireNonNull(expectedState, "expectedState is null");
- requireNonNull(newState, "newState is null");
+ requireNonNull(newState, STATE_IS_NULL);
- FutureStateChange<T> futureStateChange;
- ImmutableList<StateChangeListener<T>> stateChangeListeners;
+ FutureStateChange<T> oldFutureStateChange;
+ ImmutableList<StateChangeListener<T>> curStateChangeListeners;
synchronized (lock) {
if (!state.equals(expectedState)) {
return false;
@@ -197,8 +206,8 @@ public class StateMachine<T> {
state = newState;
- futureStateChange = this.futureStateChange.getAndSet(new
FutureStateChange<>());
- stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
+ oldFutureStateChange = this.futureStateChange.getAndSet(new
FutureStateChange<>());
+ curStateChangeListeners =
ImmutableList.copyOf(this.stateChangeListeners);
// if we are now in a terminal state, free the listeners since this will
be the last
// notification
@@ -207,16 +216,17 @@ public class StateMachine<T> {
}
}
- fireStateChanged(newState, futureStateChange, stateChangeListeners);
+ fireStateChanged(newState, oldFutureStateChange, curStateChangeListeners);
return true;
}
+ @SuppressWarnings("squid:S1181")
private void fireStateChanged(
T newState,
FutureStateChange<T> futureStateChange,
List<StateChangeListener<T>> stateChangeListeners) {
checkState(!Thread.holdsLock(lock), "Cannot fire state change event while
holding the lock");
- requireNonNull(newState, "newState is null");
+ requireNonNull(newState, STATE_IS_NULL);
// always fire listener callbacks from a different thread
safeExecute(
@@ -225,7 +235,7 @@ public class StateMachine<T> {
try {
futureStateChange.complete(newState);
} catch (Throwable e) {
- // log.error(e, "Error setting future state for
%s", name);
+ LOGGER.error("Error setting future state for {}", name, e);
}
for (StateChangeListener<T> stateChangeListener :
stateChangeListeners) {
fireStateChangedListener(newState, stateChangeListener);
@@ -233,11 +243,12 @@ public class StateMachine<T> {
});
}
+ @SuppressWarnings("squid:S1181")
private void fireStateChangedListener(T newState, StateChangeListener<T>
stateChangeListener) {
try {
stateChangeListener.stateChanged(newState);
} catch (Throwable e) {
- // log.error(e, "Error notifying state change listener for
%s", name);
+ LOGGER.error("Error notifying state change listener for {}", name, e);
}
}
@@ -303,12 +314,12 @@ public class StateMachine<T> {
return get().toString();
}
+ @SuppressWarnings("squid:S112")
private void safeExecute(Runnable command) {
try {
executor.execute(command);
} catch (RejectedExecutionException e) {
if ((executor instanceof ExecutorService) && ((ExecutorService)
executor).isShutdown()) {
- // TODO: (xingtanzjr) handle the exception
throw new RuntimeException("Server is shutting down", e);
}
throw e;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index c7ff76b8de2..266e4372bbd 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.driver;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -48,6 +49,7 @@ public class DataDriver extends Driver {
this.estimatedMemorySize = estimatedMemorySize;
}
+ @SuppressWarnings("squid:S1181")
@Override
protected boolean init(SettableFuture<?> blockedFuture) {
if (!init) {
@@ -65,8 +67,12 @@ public class DataDriver extends Driver {
}
/**
- * init seq file list and unseq file list in QueryDataSource and set it into
each SourceNode TODO
- * we should change all the blocked lock operation into tryLock
+ * init seq file list and unseq file list in QueryDataSource and set it into
each SourceNode.
+ *
+ * @throws QueryProcessException while failed to init query resource,
QueryProcessException will
+ * be thrown
+ * @throws IllegalStateException if QueryDataSource is null after
initialization,
+ * IllegalStateException will be thrown
*/
private void initialize() throws QueryProcessException {
long startTime = System.nanoTime();
@@ -107,7 +113,10 @@ public class DataDriver extends Driver {
/**
* The method is called in mergeLock() when executing query. This method
will get all the
- * QueryDataSource needed for this query
+ * QueryDataSource needed for this query.
+ *
+ * @throws QueryProcessException while failed to init query resource,
QueryProcessException will
+ * be thrown
*/
private QueryDataSource initQueryDataSource() throws QueryProcessException {
return ((DataDriverContext) driverContext).getSharedQueryDataSource();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
index 0b0ad4332fd..88941ddce56 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.driver;
import org.apache.iotdb.commons.path.PartialPath;
@@ -28,7 +29,6 @@ import
org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
import java.util.ArrayList;
import java.util.List;
-/** TODO Add javadoc for context */
public class DataDriverContext extends DriverContext {
private final List<PartialPath> paths;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index c6c7b73cd13..55e8e94ad50 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.driver;
import org.apache.iotdb.commons.utils.FileUtils;
@@ -58,6 +59,9 @@ import static
org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.DRIVER_INTE
public abstract class Driver implements IDriver {
protected static final Logger LOGGER = LoggerFactory.getLogger(Driver.class);
+ protected static final QueryMetricsManager QUERY_METRICS =
QueryMetricsManager.getInstance();
+ protected static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
+ QueryExecutionMetricSet.getInstance();
protected final DriverContext driverContext;
protected final Operator root;
@@ -67,10 +71,6 @@ public abstract class Driver implements IDriver {
protected final DriverLock exclusiveLock = new DriverLock();
- protected final QueryMetricsManager QUERY_METRICS =
QueryMetricsManager.getInstance();
- protected final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
- QueryExecutionMetricSet.getInstance();
-
protected enum State {
ALIVE,
NEED_DESTRUCTION,
@@ -104,13 +104,13 @@ public abstract class Driver implements IDriver {
}
/**
- * do initialization
+ * do initialization.
*
* @return true if init succeed, false otherwise
*/
protected abstract boolean init(SettableFuture<?> blockedFuture);
- /** release resource this driver used */
+ /** release resource this driver used. */
protected abstract void releaseResource();
public int getDependencyDriverIndex() {
@@ -194,6 +194,7 @@ public abstract class Driver implements IDriver {
return sink;
}
+ @SuppressWarnings("squid:S112")
@GuardedBy("exclusiveLock")
private boolean isFinishedInternal() {
checkLockHeld("Lock must be held to call isFinishedInternal");
@@ -214,6 +215,7 @@ public abstract class Driver implements IDriver {
return finished;
}
+ @SuppressWarnings({"squid:S1181", "squid:S112"})
private ListenableFuture<?> processInternal() {
long startTimeNanos = System.nanoTime();
try {
@@ -261,7 +263,7 @@ public abstract class Driver implements IDriver {
driverBlockedFuture.set(newDriverBlockedFuture);
sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null),
directExecutor());
- // TODO Although we don't have memory management for operator now, we
should consider it for
+ // Although we don't have memory management for operator now, we should
consider it for
// future
// it's possible that memory revoking is requested for some operator
// before we update driverBlockedFuture above and we don't want to miss
that
@@ -340,6 +342,7 @@ public abstract class Driver implements IDriver {
return result;
}
+ @SuppressWarnings({"squid:S1181", "squid:S112"})
@GuardedBy("exclusiveLock")
private void destroyIfNecessary() {
checkLockHeld("Lock must be held to call destroyIfNecessary");
@@ -372,6 +375,7 @@ public abstract class Driver implements IDriver {
}
}
+ @SuppressWarnings("squid:S1181")
private Throwable closeAndDestroyOperators() {
// record the current interrupted status (and clear the flag); we'll reset
it later
boolean wasInterrupted = Thread.interrupted();
@@ -405,7 +409,6 @@ public abstract class Driver implements IDriver {
// don't record the stack
wasInterrupted = true;
} catch (Throwable t) {
- // TODO currently, we won't know exact operator which is failed in
closing
inFlightException =
addSuppressedException(
inFlightException,
@@ -431,7 +434,9 @@ public abstract class Driver implements IDriver {
+ driverContext.getPipelineId()
+ File.separator;
File tmpPipeLineDir = new File(pipeLineSortDir);
- if (!tmpPipeLineDir.exists()) return;
+ if (!tmpPipeLineDir.exists()) {
+ return;
+ }
FileUtils.deleteDirectory(tmpPipeLineDir);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 001cc5c4ede..0035932f879 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.driver;
import org.apache.iotdb.commons.utils.TestOnly;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index 24a4ef9d5d7..be015d961aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.driver;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -27,12 +28,12 @@ import io.airlift.units.Duration;
/**
* IDriver encapsulates some methods which are necessary for
FragmentInstanceTaskExecutor to run a
- * fragment instance
+ * fragment instance.
*/
public interface IDriver {
/**
- * Used to judge whether this IDriver should be scheduled for execution
anymore
+ * Used to judge whether this IDriver should be scheduled for execution
anymore.
*
* @return true if the IDriver is done or terminated due to failure,
otherwise false.
*/
@@ -40,14 +41,15 @@ public interface IDriver {
/**
* run the IDriver for {@param duration} time slice, the time of this run is
likely not to be
- * equal to {@param duration}, the actual run time should be calculated by
the caller
+ * equal to {@param duration}, the actual run time should be calculated by
the caller.
*
* @param duration how long should this IDriver run
- * @return the returned ListenableFuture<Void> is used to represent status
of this processing if
+ * @return the returned ListenableFuture is used to represent status of this
processing if
* isDone() return true, meaning that this IDriver is not blocked and is
ready for next
* processing. Otherwise, meaning that this IDriver is blocked and not
ready for next
* processing.
*/
+ @SuppressWarnings("squid:S1452")
ListenableFuture<?> processFor(Duration duration);
/**
@@ -63,17 +65,17 @@ public interface IDriver {
void setDriverTaskId(DriverTaskId driverTaskId);
- /** clear resource used by this fragment instance */
+ /** clear resource used by this fragment instance. */
void close();
/**
- * fail current driver
+ * fail current driver.
*
* @param t reason cause this failure
*/
void failed(Throwable t);
- /** @return get Sink of current IDriver */
+ /** return get Sink of current IDriver. */
ISink getSink();
DriverContext getDriverContext();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
index 670670de310..5ad7c764d5c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.driver;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
index b93e4101270..fa828d846ba 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
@@ -16,12 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.driver;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
-/** TODO Add javadoc for context */
public class SchemaDriverContext extends DriverContext {
private final ISchemaRegion schemaRegion;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
index 9d15aaa0097..583e522774c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
@@ -37,6 +37,9 @@ public class RegionReadExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(RegionReadExecutor.class);
+ private static final String ERROR_MSG_FORMAT = "Execute FragmentInstance
failed: %s";
+
+ @SuppressWarnings("squid:S1181")
public RegionExecutionResult execute(
ConsensusGroupId groupId, FragmentInstance fragmentInstance) {
// execute fragment instance in state machine
@@ -59,8 +62,9 @@ public class RegionReadExecutor {
readResponse.getException());
resp.setAccepted(false);
resp.setMessage(
- "Execute FragmentInstance failed: "
- + (readResponse.getException() == null
+ String.format(
+ ERROR_MSG_FORMAT,
+ readResponse.getException() == null
? ""
: readResponse.getException().getMessage()));
} else {
@@ -73,11 +77,12 @@ public class RegionReadExecutor {
LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.",
groupId, t);
RegionExecutionResult resp = new RegionExecutionResult();
resp.setAccepted(false);
- resp.setMessage("Execute FragmentInstance failed: " + t.getMessage());
+ resp.setMessage(String.format(ERROR_MSG_FORMAT, t.getMessage()));
return resp;
}
}
+ @SuppressWarnings("squid:S1181")
public RegionExecutionResult execute(FragmentInstance fragmentInstance) {
// execute fragment instance in state machine
try (SetThreadName threadName = new
SetThreadName(fragmentInstance.getId().getFullId())) {
@@ -93,7 +98,7 @@ public class RegionReadExecutor {
LOGGER.error("Execute FragmentInstance in QueryExecutor failed.", t);
RegionExecutionResult resp = new RegionExecutionResult();
resp.setAccepted(false);
- resp.setMessage("Execute FragmentInstance failed: " + t.getMessage());
+ resp.setMessage(String.format(ERROR_MSG_FORMAT, t.getMessage()));
return resp;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index 8c8793d64b4..66d582d8a61 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -88,6 +88,9 @@ public class RegionWriteExecutor {
private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS
=
PerformanceOverviewMetrics.getInstance();
+ private static final String METADATA_ERROR_MSG = "Metadata error: ";
+
+ @SuppressWarnings("squid:S1181")
public RegionExecutionResult execute(ConsensusGroupId groupId, PlanNode
planNode) {
try {
WritePlanNodeExecutionContext context =
@@ -164,7 +167,6 @@ public class RegionWriteExecutor {
ConsensusWriteResponse writeResponse =
executePlanNodeInConsensusLayer(context.getRegionId(), node);
- // TODO need consider more status
if (writeResponse.getStatus() != null) {
response.setAccepted(
TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
writeResponse.getStatus().getCode());
@@ -231,7 +233,6 @@ public class RegionWriteExecutor {
ConsensusWriteResponse writeResponse =
fireTriggerAndInsert(context.getRegionId(), insertNode);
- // TODO need consider more status
if (writeResponse.getStatus() != null) {
response.setAccepted(
TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
writeResponse.getStatus().getCode());
@@ -292,7 +293,7 @@ public class RegionWriteExecutor {
return super.visitCreateTimeSeries(node, context);
} else {
MetadataException metadataException = failingMeasurementMap.get(0);
- LOGGER.error("Metadata error: ", metadataException);
+ LOGGER.error(METADATA_ERROR_MSG, metadataException);
result = new RegionExecutionResult();
result.setAccepted(false);
result.setMessage(metadataException.getMessage());
@@ -330,7 +331,7 @@ public class RegionWriteExecutor {
return super.visitCreateAlignedTimeSeries(node, context);
} else {
MetadataException metadataException =
failingMeasurementMap.values().iterator().next();
- LOGGER.error("Metadata error: ", metadataException);
+ LOGGER.error(METADATA_ERROR_MSG, metadataException);
result = new RegionExecutionResult();
result.setAccepted(false);
result.setMessage(metadataException.getMessage());
@@ -368,52 +369,23 @@ public class RegionWriteExecutor {
List<TSStatus> failingStatus = new ArrayList<>();
Map<PartialPath, MeasurementGroup> measurementGroupMap =
node.getMeasurementGroupMap();
List<PartialPath> emptyDeviceList = new ArrayList<>();
- for (Map.Entry<PartialPath, MeasurementGroup> entry :
measurementGroupMap.entrySet()) {
- Map<Integer, MetadataException> failingMeasurementMap =
- schemaRegion.checkMeasurementExistence(
- entry.getKey(),
- entry.getValue().getMeasurements(),
- entry.getValue().getAliasList());
- if (failingMeasurementMap.isEmpty()) {
- continue;
- }
-
- for (Map.Entry<Integer, MetadataException> failingMeasurement :
- failingMeasurementMap.entrySet()) {
- LOGGER.error("Metadata error: ", failingMeasurement.getValue());
- failingStatus.add(
- RpcUtils.getStatus(
- failingMeasurement.getValue().getErrorCode(),
- failingMeasurement.getValue().getMessage()));
- }
-
entry.getValue().removeMeasurements(failingMeasurementMap.keySet());
- if (entry.getValue().isEmpty()) {
- emptyDeviceList.add(entry.getKey());
- }
- }
+ checkMeasurementExistence(
+ measurementGroupMap, schemaRegion, failingStatus,
emptyDeviceList);
for (PartialPath emptyDevice : emptyDeviceList) {
measurementGroupMap.remove(emptyDevice);
}
- if (!measurementGroupMap.isEmpty()) {
- // try registering the rest timeseries
- RegionExecutionResult executionResult =
super.visitCreateMultiTimeSeries(node, context);
- if (failingStatus.isEmpty()) {
- return executionResult;
- }
+ RegionExecutionResult failingResult =
+ registerTimeSeries(measurementGroupMap, node, context,
failingStatus);
- TSStatus executionStatus = executionResult.getStatus();
- if (executionStatus.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- failingStatus.addAll(executionStatus.getSubStatus());
- } else if (executionStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- failingStatus.add(executionStatus);
- }
+ if (failingResult != null) {
+ return failingResult;
}
TSStatus status = RpcUtils.getStatus(failingStatus);
- RegionExecutionResult failingResult = new RegionExecutionResult();
+ failingResult = new RegionExecutionResult();
failingResult.setAccepted(false);
failingResult.setMessage(status.getMessage());
failingResult.setStatus(status);
@@ -426,6 +398,59 @@ public class RegionWriteExecutor {
}
}
+ private void checkMeasurementExistence(
+ Map<PartialPath, MeasurementGroup> measurementGroupMap,
+ ISchemaRegion schemaRegion,
+ List<TSStatus> failingStatus,
+ List<PartialPath> emptyDeviceList) {
+ for (Map.Entry<PartialPath, MeasurementGroup> entry :
measurementGroupMap.entrySet()) {
+ Map<Integer, MetadataException> failingMeasurementMap =
+ schemaRegion.checkMeasurementExistence(
+ entry.getKey(),
+ entry.getValue().getMeasurements(),
+ entry.getValue().getAliasList());
+ if (failingMeasurementMap.isEmpty()) {
+ continue;
+ }
+
+ for (Map.Entry<Integer, MetadataException> failingMeasurement :
+ failingMeasurementMap.entrySet()) {
+ LOGGER.error(METADATA_ERROR_MSG, failingMeasurement.getValue());
+ failingStatus.add(
+ RpcUtils.getStatus(
+ failingMeasurement.getValue().getErrorCode(),
+ failingMeasurement.getValue().getMessage()));
+ }
+ entry.getValue().removeMeasurements(failingMeasurementMap.keySet());
+
+ if (entry.getValue().isEmpty()) {
+ emptyDeviceList.add(entry.getKey());
+ }
+ }
+ }
+
+ private RegionExecutionResult registerTimeSeries(
+ Map<PartialPath, MeasurementGroup> measurementGroupMap,
+ CreateMultiTimeSeriesNode node,
+ WritePlanNodeExecutionContext context,
+ List<TSStatus> failingStatus) {
+ if (!measurementGroupMap.isEmpty()) {
+ // try registering the rest timeseries
+ RegionExecutionResult executionResult =
super.visitCreateMultiTimeSeries(node, context);
+ if (failingStatus.isEmpty()) {
+ return executionResult;
+ }
+
+ TSStatus executionStatus = executionResult.getStatus();
+ if (executionStatus.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ failingStatus.addAll(executionStatus.getSubStatus());
+ } else if (executionStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failingStatus.add(executionStatus);
+ }
+ }
+ return null;
+ }
+
@Override
public RegionExecutionResult visitInternalCreateTimeSeries(
InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext
context) {
@@ -463,7 +488,7 @@ public class RegionWriteExecutor {
((MeasurementAlreadyExistException)
metadataException)
.getMeasurementPath())));
} else {
- LOGGER.warn("Metadata error: ", metadataException);
+ LOGGER.warn(METADATA_ERROR_MSG, metadataException);
failingStatus.add(
RpcUtils.getStatus(
metadataException.getErrorCode(),
metadataException.getMessage()));
@@ -529,7 +554,7 @@ public class RegionWriteExecutor {
((MeasurementAlreadyExistException)
metadataException)
.getMeasurementPath())));
} else {
- LOGGER.warn("Metadata error: ", metadataException);
+ LOGGER.warn(METADATA_ERROR_MSG, metadataException);
failingStatus.add(
RpcUtils.getStatus(
metadataException.getErrorCode(),
metadataException.getMessage()));
@@ -575,6 +600,31 @@ public class RegionWriteExecutor {
List<TSStatus> alreadyExistingStatus) {
TSStatus executionStatus = executionResult.getStatus();
+ separateMeasurementAlreadyExistException(
+ failingStatus, executionStatus, alreadyExistingStatus);
+
+ RegionExecutionResult result = new RegionExecutionResult();
+ TSStatus status;
+ if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
+ status = RpcUtils.SUCCESS_STATUS;
+ result.setAccepted(true);
+ } else if (failingStatus.isEmpty()) {
+ status = RpcUtils.getStatus(alreadyExistingStatus);
+ result.setAccepted(true);
+ } else {
+ status = RpcUtils.getStatus(failingStatus);
+ result.setAccepted(false);
+ }
+
+ result.setMessage(status.getMessage());
+ result.setStatus(status);
+ return result;
+ }
+
+ private void separateMeasurementAlreadyExistException(
+ List<TSStatus> failingStatus,
+ TSStatus executionStatus,
+ List<TSStatus> alreadyExistingStatus) {
// separate the measurement_already_exist exception and other exceptions
process,
// measurement_already_exist exception is acceptable due to concurrent
timeseries creation
if (failingStatus.isEmpty()) {
@@ -599,23 +649,6 @@ public class RegionWriteExecutor {
failingStatus.add(executionStatus);
}
}
-
- RegionExecutionResult result = new RegionExecutionResult();
- TSStatus status;
- if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
- status = RpcUtils.SUCCESS_STATUS;
- result.setAccepted(true);
- } else if (failingStatus.isEmpty()) {
- status = RpcUtils.getStatus(alreadyExistingStatus);
- result.setAccepted(true);
- } else {
- status = RpcUtils.getStatus(failingStatus);
- result.setAccepted(false);
- }
-
- result.setMessage(status.getMessage());
- result.setStatus(status);
- return result;
}
@Override
@@ -781,7 +814,7 @@ public class RegionWriteExecutor {
// if there is some exception, handle each exception and return
first of them.
if (!failingMetadataException.isEmpty()) {
MetadataException metadataException =
failingMetadataException.get(0);
- LOGGER.error("Metadata error: ", metadataException);
+ LOGGER.error(METADATA_ERROR_MSG, metadataException);
RegionExecutionResult result = new RegionExecutionResult();
result.setAccepted(false);
result.setMessage(metadataException.getMessage());
@@ -791,8 +824,6 @@ public class RegionWriteExecutor {
return result;
}
// step 2. make sure all source paths are existed.
- // TODO: CRTODO use a more efficient method
- // List<PartialPath> sourcePaths =
node.getAllTimeSeriesPathInSource();
return super.visitCreateLogicalView(node, context);
} finally {
context.getRegionWriteValidationRWLock().writeLock().unlock();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInfo.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInfo.java
index 064897f9c08..babc711b68e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInfo.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInfo.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.fragment;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -41,4 +42,20 @@ public class FragmentInfo {
this.plan = plan;
this.childrenFragments = childrenFragments;
}
+
+ public PlanFragmentId getStageId() {
+ return stageId;
+ }
+
+ public FragmentState getState() {
+ return state;
+ }
+
+ public PlanFragment getPlan() {
+ return plan;
+ }
+
+ public List<FragmentInfo> getChildrenFragments() {
+ return childrenFragments;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 6434e8faa06..6549d2501f6 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.fragment;
import org.apache.iotdb.commons.path.PartialPath;
@@ -59,15 +60,13 @@ public class FragmentInstanceContext extends QueryContext {
private List<PartialPath> sourcePaths;
// Shared by all scan operators in this fragment instance to avoid memory
problem
private QueryDataSource sharedQueryDataSource;
- /** closed tsfile used in this fragment instance */
+ /** closed tsfile used in this fragment instance. */
private Set<TsFileResource> closedFilePaths;
- /** unClosed tsfile used in this fragment instance */
+ /** unClosed tsfile used in this fragment instance. */
private Set<TsFileResource> unClosedFilePaths;
- /** check if there is tmp file to be deleted */
+ /** check if there is tmp file to be deleted. */
private boolean mayHaveTmpFile = false;
- private final long createNanos = System.nanoTime();
-
private final AtomicLong startNanos = new AtomicLong();
private final AtomicLong endNanos = new AtomicLong();
@@ -80,14 +79,6 @@ public class FragmentInstanceContext extends QueryContext {
// session info
private SessionInfo sessionInfo;
- // private final GcMonitor gcMonitor;
- // private final AtomicLong startNanos = new AtomicLong();
- // private final AtomicLong startFullGcCount = new AtomicLong(-1);
- // private final AtomicLong startFullGcTimeNanos = new AtomicLong(-1);
- // private final AtomicLong endNanos = new AtomicLong();
- // private final AtomicLong endFullGcCount = new AtomicLong(-1);
- // private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
-
public static FragmentInstanceContext createFragmentInstanceContext(
FragmentInstanceId id, FragmentInstanceStateMachine stateMachine,
SessionInfo sessionInfo) {
FragmentInstanceContext instanceContext =
@@ -114,6 +105,17 @@ public class FragmentInstanceContext extends QueryContext {
return new FragmentInstanceContext(queryId);
}
+ @TestOnly
+ public static FragmentInstanceContext createFragmentInstanceContext(
+ FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
+ FragmentInstanceContext instanceContext =
+ new FragmentInstanceContext(
+ id, stateMachine, new SessionInfo(1, "test",
ZoneId.systemDefault().getId()));
+ instanceContext.initialize();
+ instanceContext.start();
+ return instanceContext;
+ }
+
private FragmentInstanceContext(
FragmentInstanceId id,
FragmentInstanceStateMachine stateMachine,
@@ -136,17 +138,6 @@ public class FragmentInstanceContext extends QueryContext {
this.sessionInfo = sessionInfo;
}
- @TestOnly
- public static FragmentInstanceContext createFragmentInstanceContext(
- FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
- FragmentInstanceContext instanceContext =
- new FragmentInstanceContext(
- id, stateMachine, new SessionInfo(1, "test",
ZoneId.systemDefault().getId()));
- instanceContext.initialize();
- instanceContext.start();
- return instanceContext;
- }
-
@TestOnly
public void setDataRegion(IDataRegionForQuery dataRegion) {
this.dataRegion = dataRegion;
@@ -201,7 +192,7 @@ public class FragmentInstanceContext extends QueryContext {
stateMachine.failed(cause);
}
- /** @return Message string of all failures */
+ /** return Message string of all failures */
public String getFailedCause() {
return stateMachine.getFailureCauses().stream()
.findFirst()
@@ -209,7 +200,7 @@ public class FragmentInstanceContext extends QueryContext {
.orElse("");
}
- /** @return List of specific throwable and stack trace */
+ /** return List of specific throwable and stack trace */
public List<FragmentInstanceFailureInfo> getFailureInfoList() {
return stateMachine.getFailureCauses().stream()
.map(FragmentInstanceFailureInfo::toFragmentInstanceFailureInfo)
@@ -364,6 +355,7 @@ public class FragmentInstanceContext extends QueryContext {
allDriversClosed.countDown();
}
+ @SuppressWarnings("squid:S2142")
public void releaseResourceWhenAllDriversAreClosed() {
while (true) {
try {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 04f6043bdc7..1bcc32d076e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.fragment;
import org.apache.iotdb.commons.utils.FileUtils;
@@ -55,8 +56,7 @@ public class FragmentInstanceExecution {
private final long timeoutInMs;
- private long lastHeartbeat;
-
+ @SuppressWarnings("squid:S107")
public static FragmentInstanceExecution createFragmentInstanceExecution(
IDriverScheduler scheduler,
FragmentInstanceId instanceId,
@@ -90,14 +90,6 @@ public class FragmentInstanceExecution {
this.timeoutInMs = timeoutInMs;
}
- public void recordHeartbeat() {
- lastHeartbeat = System.currentTimeMillis();
- }
-
- public void setLastHeartbeat(long lastHeartbeat) {
- this.lastHeartbeat = lastHeartbeat;
- }
-
public FragmentInstanceState getInstanceState() {
return stateMachine.getState();
}
@@ -123,6 +115,7 @@ public class FragmentInstanceExecution {
}
// this is a separate method to ensure that the `this` reference is not
leaked during construction
+ @SuppressWarnings("squid:S1181")
private void initialize(CounterStat failedInstances, IDriverScheduler
scheduler) {
requireNonNull(failedInstances, "failedInstances is null");
stateMachine.addStateChangeListener(
@@ -141,39 +134,28 @@ public class FragmentInstanceExecution {
failedInstances.update(1);
}
- if (newState.isFailed()) {
- sink.abort();
- } else {
- sink.close();
- }
- // help for gc
- sink = null;
+ clearShuffleSinkHandle(newState);
// delete tmp file if exists
- if (context.mayHaveTmpFile()) {
- String tmpFilePath =
- IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()
- + File.separator
- + context.getId().getFullId()
- + File.separator;
- File tmpFile = new File(tmpFilePath);
- if (tmpFile.exists()) {
- FileUtils.deleteDirectory(tmpFile);
- }
- }
+ deleteTmpFile();
// close the driver after sink is aborted or closed because in
driver.close() it
// will try to call ISink.setNoMoreTsBlocks()
for (IDriver driver : drivers) {
driver.close();
}
- context.releaseResourceWhenAllDriversAreClosed();
// help for gc
drivers = null;
+
+ // release file handlers
+ context.releaseResourceWhenAllDriversAreClosed();
+
+ // release memory
MPPDataExchangeService.getInstance()
.getMPPDataExchangeManager()
.deRegisterFragmentInstanceFromMemoryPool(
instanceId.getQueryId().getId(),
instanceId.getFragmentInstanceId());
+
if (newState.isFailed()) {
scheduler.abortFragmentInstance(instanceId);
}
@@ -185,4 +167,28 @@ public class FragmentInstanceExecution {
}
});
}
+
+ private void clearShuffleSinkHandle(FragmentInstanceState newState) {
+ if (newState.isFailed()) {
+ sink.abort();
+ } else {
+ sink.close();
+ }
+ // help for gc
+ sink = null;
+ }
+
+ private void deleteTmpFile() {
+ if (context.mayHaveTmpFile()) {
+ String tmpFilePath =
+ IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()
+ + File.separator
+ + context.getId().getFullId()
+ + File.separator;
+ File tmpFile = new File(tmpFilePath);
+ if (tmpFile.exists()) {
+ FileUtils.deleteDirectory(tmpFile);
+ }
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureInfo.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureInfo.java
index 3bae6edbe4f..349beb06b6a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureInfo.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureInfo.java
@@ -41,8 +41,9 @@ import static java.util.Objects.requireNonNull;
/**
* This class is inspired by Trino <a
- *
href="https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/ExecutionFailureInfo.java">...</a>
+ *
href="https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/ExecutionFailureInfo.java">...</a>.
*/
+@SuppressWarnings("squid:S5852")
public class FragmentInstanceFailureInfo implements Serializable {
private static final Pattern STACK_TRACE_PATTERN =
Pattern.compile("(.*)\\.(.*)\\(([^:]*)(?::(.*))?\\)");
@@ -85,19 +86,6 @@ public class FragmentInstanceFailureInfo implements
Serializable {
return toException(this);
}
- public static FragmentInstanceFailureInfo
toFragmentInstanceFailureInfo(Throwable throwable) {
- if (throwable == null) {
- return null;
- }
- return new FragmentInstanceFailureInfo(
- throwable.getMessage(),
- toFragmentInstanceFailureInfo(throwable.getCause()),
- Arrays.stream(throwable.getSuppressed())
- .map(FragmentInstanceFailureInfo::toFragmentInstanceFailureInfo)
- .collect(Collectors.toList()),
-
Arrays.stream(throwable.getStackTrace()).map(Objects::toString).collect(toImmutableList()));
- }
-
private static FailureException toException(FragmentInstanceFailureInfo
failureInfo) {
if (failureInfo == null) {
return null;
@@ -116,6 +104,19 @@ public class FragmentInstanceFailureInfo implements
Serializable {
return failure;
}
+ public static FragmentInstanceFailureInfo
toFragmentInstanceFailureInfo(Throwable throwable) {
+ if (throwable == null) {
+ return null;
+ }
+ return new FragmentInstanceFailureInfo(
+ throwable.getMessage(),
+ toFragmentInstanceFailureInfo(throwable.getCause()),
+ Arrays.stream(throwable.getSuppressed())
+ .map(FragmentInstanceFailureInfo::toFragmentInstanceFailureInfo)
+ .collect(Collectors.toList()),
+
Arrays.stream(throwable.getStackTrace()).map(Objects::toString).collect(toImmutableList()));
+ }
+
public static StackTraceElement toStackTraceElement(String stack) {
Matcher matcher = STACK_TRACE_PATTERN.matcher(stack);
if (matcher.matches()) {
@@ -162,7 +163,7 @@ public class FragmentInstanceFailureInfo implements
Serializable {
}
public static FragmentInstanceFailureInfo deserialize(ByteBuffer byteBuffer)
{
- String message = ReadWriteIOUtils.readString(byteBuffer);
+ final String message = ReadWriteIOUtils.readString(byteBuffer);
FragmentInstanceFailureInfo cause;
List<FragmentInstanceFailureInfo> suppressed = new ArrayList<>();
List<String> stack = new ArrayList<>();
@@ -185,6 +186,11 @@ public class FragmentInstanceFailureInfo implements
Serializable {
// end region
+ @Override
+ public int hashCode() {
+ return Objects.hash(message, cause, suppressed, stack);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureListener.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureListener.java
index a34f57e66b2..2f041b23b97 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureListener.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureListener.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.fragment;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
index 97fa83b7eef..26d88c886ab 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.fragment;
import org.apache.iotdb.consensus.common.DataSet;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 74e2985207d..c983334e247 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.fragment;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
@@ -54,6 +55,7 @@ import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext
import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceExecution.createFragmentInstanceExecution;
import static
org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.LOCAL_EXECUTION_PLANNER;
+@SuppressWarnings("squid:S6548")
public class FragmentInstanceManager {
private static final Logger logger =
LoggerFactory.getLogger(FragmentInstanceManager.class);
@@ -115,6 +117,7 @@ public class FragmentInstanceManager {
return instanceExecution.size();
}
+ @SuppressWarnings("squid:S1181")
public FragmentInstanceInfo execDataQueryFragmentInstance(
FragmentInstance instance, IDataRegionForQuery dataRegion) {
long startTime = System.nanoTime();
@@ -186,6 +189,7 @@ public class FragmentInstanceManager {
}
}
+ @SuppressWarnings("squid:S1181")
public FragmentInstanceInfo execSchemaQueryFragmentInstance(
FragmentInstance instance, ISchemaRegion schemaRegion) {
FragmentInstanceId instanceId = instance.getId();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
index c8f94f143ae..b931ce549c2 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.fragment;
import java.util.Set;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
index 2071fcc8b25..2c415c5c59d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.fragment;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -159,10 +160,7 @@ public class FragmentInstanceStateMachine {
sourceInstanceFailureListeners.add(listener);
failures = ImmutableMap.copyOf(sourceInstanceFailures);
}
- executor.execute(
- () -> {
- failures.forEach(listener::onTaskFailed);
- });
+ executor.execute(() -> failures.forEach(listener::onTaskFailed));
}
public void sourceTaskFailed(FragmentInstanceId instanceId, Throwable
failure) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentState.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentState.java
index a3382fa0415..bd803ebe1ff 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentState.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentState.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.fragment;
import java.util.Set;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/timer/RuleBasedTimeSliceAllocator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/timer/RuleBasedTimeSliceAllocator.java
index 910b2295bc2..ccc5a22dad6 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/timer/RuleBasedTimeSliceAllocator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/timer/RuleBasedTimeSliceAllocator.java
@@ -32,7 +32,7 @@ import static com.google.common.base.Preconditions.checkState;
public class RuleBasedTimeSliceAllocator implements ITimeSliceAllocator {
- private final long EXECUTION_TIME_SLICE_IN_MS =
+ private static final long EXECUTION_TIME_SLICE_IN_MS =
DriverTaskThread.EXECUTION_TIME_SLICE.roundTo(TimeUnit.MILLISECONDS);
private final Map<OperatorContext, Integer> operatorToWeightMap;