This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 79f2277aad Make sure FI won't be aborted by timeout checker by mistake
(#7550)
79f2277aad is described below
commit 79f2277aad39d437483d3479fb06510af46ea37e
Author: Jackie Tien <[email protected]>
AuthorDate: Mon Oct 10 08:48:40 2022 +0800
Make sure FI won't be aborted by timeout checker by mistake (#7550)
---
.../fragment/FragmentInstanceExecution.java | 4 ++++
.../schedule/DriverTaskTimeoutSentinelThread.java | 24 +++++++++++++++++++++-
2 files changed, 27 insertions(+), 1 deletion(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index f2dd3605b0..a84f4e9c4e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -26,12 +26,15 @@ import org.apache.iotdb.db.utils.SetThreadName;
import com.google.common.collect.ImmutableList;
import io.airlift.stats.CounterStat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState.FAILED;
public class FragmentInstanceExecution {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FragmentInstanceExecution.class);
private final FragmentInstanceId instanceId;
private final FragmentInstanceContext context;
@@ -56,6 +59,7 @@ public class FragmentInstanceExecution {
FragmentInstanceExecution execution =
new FragmentInstanceExecution(instanceId, context, driver,
stateMachine);
execution.initialize(failedInstances, scheduler);
+ LOGGER.info("timeout is {}ms.", timeOut);
scheduler.submitDrivers(instanceId.getQueryId(), ImmutableList.of(driver),
timeOut);
return execution;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
index ce4bc4884f..7bf30053a5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
@@ -21,9 +21,15 @@ package org.apache.iotdb.db.mpp.execution.schedule;
import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/** the thread for watching the timeout of {@link DriverTask} */
public class DriverTaskTimeoutSentinelThread extends AbstractDriverThread {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class);
+
public DriverTaskTimeoutSentinelThread(
String workerId,
ThreadGroup tg,
@@ -46,10 +52,26 @@ public class DriverTaskTimeoutSentinelThread extends
AbstractDriverThread {
}
// if this task is not timeout, we can wait it to timeout.
long waitTime = task.getDDL() - System.currentTimeMillis();
- if (waitTime > 0L) {
+ while (waitTime > 0L) {
// After this time, the task must be timeout.
Thread.sleep(waitTime);
+ waitTime = task.getDDL() - System.currentTimeMillis();
+ }
+
+ task.lock();
+ try {
+ // if this task is already in an end state, it means that the resource
releasing will be
+ // handled by other threads, we don't care anymore.
+ if (task.isEndState()) {
+ return;
+ }
+ } finally {
+ task.unlock();
}
+ LOGGER.warn(
+ "[DriverTaskTimeout] Current time is {}, ddl of task is {}",
+ System.currentTimeMillis(),
+ task.getDDL());
task.setAbortCause(FragmentInstanceAbortedException.BY_TIMEOUT);
scheduler.toAborted(task);
}