This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/Timeout in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 208c2d6d90e2f827169362877fe26d4afbf7c7fc Author: JackieTien97 <[email protected]> AuthorDate: Sun Oct 9 16:48:43 2022 +0800 Make sure FI won't be aborted by timeout checker by mistake --- .../fragment/FragmentInstanceExecution.java | 6 ++++++ .../schedule/DriverTaskTimeoutSentinelThread.java | 24 +++++++++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java index f2dd3605b0..05d15cd7ba 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java @@ -21,17 +21,22 @@ package org.apache.iotdb.db.mpp.execution.fragment; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.execution.driver.IDriver; import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle; +import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskTimeoutSentinelThread; import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler; import org.apache.iotdb.db.utils.SetThreadName; import com.google.common.collect.ImmutableList; import io.airlift.stats.CounterStat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static java.util.Objects.requireNonNull; import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState.FAILED; public class FragmentInstanceExecution { + private static final Logger LOGGER = + LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class); private final FragmentInstanceId instanceId; private final FragmentInstanceContext context; @@ -56,6 +61,7 @@ public class FragmentInstanceExecution { FragmentInstanceExecution execution = new FragmentInstanceExecution(instanceId, context, driver, stateMachine); execution.initialize(failedInstances, scheduler); + LOGGER.info("timeout is {}ms.", timeOut); scheduler.submitDrivers(instanceId.getQueryId(), ImmutableList.of(driver), timeOut); return execution; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java index ce4bc4884f..7bf30053a5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java @@ -21,9 +21,15 @@ package org.apache.iotdb.db.mpp.execution.schedule; import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue; import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** the thread for watching the timeout of {@link DriverTask} */ public class DriverTaskTimeoutSentinelThread extends AbstractDriverThread { + private static final Logger LOGGER = + LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class); + public DriverTaskTimeoutSentinelThread( String workerId, ThreadGroup tg, @@ -46,10 +52,26 @@ public class DriverTaskTimeoutSentinelThread extends AbstractDriverThread { } // if this task is not timeout, we can wait it to timeout. long waitTime = task.getDDL() - System.currentTimeMillis(); - if (waitTime > 0L) { + while (waitTime > 0L) { // After this time, the task must be timeout. Thread.sleep(waitTime); + waitTime = task.getDDL() - System.currentTimeMillis(); + } + + task.lock(); + try { + // if this task is already in an end state, it means that the resource releasing will be + // handled by other threads, we don't care anymore. + if (task.isEndState()) { + return; + } + } finally { + task.unlock(); } + LOGGER.warn( + "[DriverTaskTimeout] Current time is {}, ddl of task is {}", + System.currentTimeMillis(), + task.getDDL()); task.setAbortCause(FragmentInstanceAbortedException.BY_TIMEOUT); scheduler.toAborted(task); }
