This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 79f2277aad Make sure FI won't be aborted by timeout checker by mistake 
(#7550)
79f2277aad is described below

commit 79f2277aad39d437483d3479fb06510af46ea37e
Author: Jackie Tien <[email protected]>
AuthorDate: Mon Oct 10 08:48:40 2022 +0800

    Make sure FI won't be aborted by timeout checker by mistake (#7550)
---
 .../fragment/FragmentInstanceExecution.java        |  4 ++++
 .../schedule/DriverTaskTimeoutSentinelThread.java  | 24 +++++++++++++++++++++-
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index f2dd3605b0..a84f4e9c4e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -26,12 +26,15 @@ import org.apache.iotdb.db.utils.SetThreadName;
 
 import com.google.common.collect.ImmutableList;
 import io.airlift.stats.CounterStat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState.FAILED;
 
 public class FragmentInstanceExecution {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FragmentInstanceExecution.class);
   private final FragmentInstanceId instanceId;
   private final FragmentInstanceContext context;
 
@@ -56,6 +59,7 @@ public class FragmentInstanceExecution {
     FragmentInstanceExecution execution =
         new FragmentInstanceExecution(instanceId, context, driver, 
stateMachine);
     execution.initialize(failedInstances, scheduler);
+    LOGGER.info("timeout is {}ms.", timeOut);
     scheduler.submitDrivers(instanceId.getQueryId(), ImmutableList.of(driver), 
timeOut);
     return execution;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
index ce4bc4884f..7bf30053a5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThread.java
@@ -21,9 +21,15 @@ package org.apache.iotdb.db.mpp.execution.schedule;
 import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /** the thread for watching the timeout of {@link DriverTask} */
 public class DriverTaskTimeoutSentinelThread extends AbstractDriverThread {
 
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class);
+
   public DriverTaskTimeoutSentinelThread(
       String workerId,
       ThreadGroup tg,
@@ -46,10 +52,26 @@ public class DriverTaskTimeoutSentinelThread extends 
AbstractDriverThread {
     }
     // if this task is not timeout, we can wait it to timeout.
     long waitTime = task.getDDL() - System.currentTimeMillis();
-    if (waitTime > 0L) {
+    while (waitTime > 0L) {
       // After this time, the task must be timeout.
       Thread.sleep(waitTime);
+      waitTime = task.getDDL() - System.currentTimeMillis();
+    }
+
+    task.lock();
+    try {
+      // if this task is already in an end state, it means that the resource 
releasing will be
+      // handled by other threads, we don't care anymore.
+      if (task.isEndState()) {
+        return;
+      }
+    } finally {
+      task.unlock();
     }
+    LOGGER.warn(
+        "[DriverTaskTimeout] Current time is {}, ddl of task is {}",
+        System.currentTimeMillis(),
+        task.getDDL());
     task.setAbortCause(FragmentInstanceAbortedException.BY_TIMEOUT);
     scheduler.toAborted(task);
   }

Reply via email to