This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 86087cee0a0 Fix Peon not fail gracefully (#14880)
86087cee0a0 is described below

commit 86087cee0a06cd5ef93b09653d923ec896efe889
Author: YongGang <mail....@gmail.com>
AuthorDate: Fri Sep 29 12:39:59 2023 -0700

    Fix Peon not fail gracefully (#14880)
    
    * fix Peon not fail gracefully
    
    * move methods to Task interface
    
    * fix checkstyle
    
    * extract to interface
    
    * check runThread nullability
    
    * fix merge conflict
    
    * minor refine
    
    * minor refine
    
    * fix unit test
    
    * increase latch waiting time
---
 .../druid/indexing/common/task/AbstractTask.java   | 34 +++++++++++++++++++++-
 .../apache/druid/indexing/common/task/Task.java    | 26 +++++++++++++++++
 .../overlord/SingleTaskBackgroundRunner.java       |  1 +
 .../SeekableStreamIndexTaskRunner.java             |  5 +++-
 .../apache/druid/indexing/common/TestTasks.java    |  8 ++++-
 .../overlord/SingleTaskBackgroundRunnerTest.java   | 10 +++++++
 .../main/java/org/apache/druid/cli/CliPeon.java    |  6 +++-
 7 files changed, 86 insertions(+), 4 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index 680684000ff..e5b6ab1b731 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -55,6 +55,8 @@ import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 public abstract class AbstractTask implements Task
 {
@@ -101,6 +103,8 @@ public abstract class AbstractTask implements Task
 
   private final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
 
+  private volatile CountDownLatch cleanupCompletionLatch;
+
   protected AbstractTask(String id, String dataSource, Map<String, Object> 
context, IngestionMode ingestionMode)
   {
     this(id, null, null, dataSource, context, ingestionMode);
@@ -166,6 +170,7 @@ public abstract class AbstractTask implements Task
   {
     TaskStatus taskStatus = TaskStatus.running(getId());
     try {
+      cleanupCompletionLatch = new CountDownLatch(1);
       String errorMessage = setup(taskToolbox);
       if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) {
         return TaskStatus.failure(getId(), errorMessage);
@@ -178,14 +183,23 @@ public abstract class AbstractTask implements Task
       throw e;
     }
     finally {
-      cleanUp(taskToolbox, taskStatus);
+      try {
+        cleanUp(taskToolbox, taskStatus);
+      }
+      finally {
+        cleanupCompletionLatch.countDown();
+      }
     }
   }
 
   public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception;
 
+  @Override
   public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws 
Exception
   {
+    // clear any interrupted status to ensure subsequent cleanup proceeds 
without interruption.
+    Thread.interrupted();
+
     if (!toolbox.getConfig().isEncapsulatedTask()) {
       log.debug("Not pushing task logs and reports from task.");
       return;
@@ -216,6 +230,24 @@ public abstract class AbstractTask implements Task
     }
   }
 
+  @Override
+  public boolean waitForCleanupToFinish()
+  {
+    try {
+      if (cleanupCompletionLatch != null) {
+        // block until the cleanup process completes
+        return cleanupCompletionLatch.await(300, TimeUnit.SECONDS);
+      }
+
+      return true;
+    }
+    catch (InterruptedException e) {
+      log.warn("Interrupted while waiting for task cleanUp to finish!");
+      Thread.currentThread().interrupt();
+      return false;
+    }
+  }
+
   public static String getOrMakeId(@Nullable String id, final String typeName, 
String dataSource)
   {
     return getOrMakeId(id, typeName, dataSource, null);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 58a2ad435b0..81a55aae1b4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -257,6 +257,32 @@ public interface Task
    */
   TaskStatus run(TaskToolbox toolbox) throws Exception;
 
+  /**
+   * Performs cleanup operations after the task execution.
+   * This method is intended to be overridden by tasks that need to perform
+   * specific cleanup actions upon task completion or termination.
+   *
+   * @param toolbox Toolbox for this task
+   * @param taskStatus Provides the final status of the task, indicating if 
the task
+   *                   was successful, failed, or was killed.
+   * @throws Exception If any error occurs during the cleanup process.
+   */
+  default void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws 
Exception
+  {
+  }
+
+  /**
+   * Waits for the cleanup operations to finish.
+   * This method can be overridden by tasks that need to ensure that certain 
cleanup
+   * operations have completed before proceeding further.
+   *
+   * @return true if the cleanup completed successfully, false otherwise.
+   */
+  default boolean waitForCleanupToFinish()
+  {
+    return true;
+  }
+
   default Map<String, Object> addToContext(String key, Object val)
   {
     getContext().put(key, val);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
index 26358deea3e..7f0e95ce08d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
@@ -185,6 +185,7 @@ public class SingleTaskBackgroundRunner implements 
TaskRunner, QuerySegmentWalke
       // stopGracefully for resource cleaning
       log.info("Starting graceful shutdown of task[%s].", task.getId());
       task.stopGracefully(taskConfig);
+      task.waitForCleanupToFinish();
 
       if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
         try {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index e44dfe9a451..27909aea83c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1422,7 +1422,10 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   {
     log.info("Stopping forcefully (status: [%s])", status);
     stopRequested.set(true);
-    runThread.interrupt();
+    // Interrupt if the task has started to run
+    if (runThread != null) {
+      runThread.interrupt();
+    }
   }
 
   public void stopGracefully()
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java
index fc7a6c99156..22485806c51 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java
@@ -84,6 +84,8 @@ public class TestTasks
   @JsonTypeName("unending")
   public static class UnendingTask extends AbstractTask
   {
+    private Thread runningThread;
+
     @JsonCreator
     public UnendingTask(@JsonProperty("id") String id)
     {
@@ -105,12 +107,16 @@ public class TestTasks
     @Override
     public void stopGracefully(TaskConfig taskConfig)
     {
+      if (runningThread != null) {
+        runningThread.interrupt();
+      }
     }
 
     @Override
     public TaskStatus runTask(TaskToolbox toolbox) throws Exception
     {
-      while (!Thread.currentThread().isInterrupted()) {
+      runningThread = Thread.currentThread();
+      while (!runningThread.isInterrupted()) {
         Thread.sleep(1000);
       }
       return TaskStatus.failure(getId(), "Dummy task status failure for 
testing");
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index fc7bd923601..087ae3e1fc1 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -179,14 +179,24 @@ public class SingleTaskBackgroundRunnerTest
   @Test
   public void testStop() throws ExecutionException, InterruptedException, 
TimeoutException
   {
+    AtomicReference<Boolean> methodCallHolder = new AtomicReference<>();
     final ListenableFuture<TaskStatus> future = runner.run(
         new NoopTask(null, null, null, Long.MAX_VALUE, 0, null) // infinite 
task
+        {
+          @Override
+          public boolean waitForCleanupToFinish()
+          {
+            methodCallHolder.set(true);
+            return true;
+          }
+        }
     );
     runner.stop();
     Assert.assertEquals(
         TaskState.FAILED,
         future.get(1000, TimeUnit.MILLISECONDS).getStatusCode()
     );
+    Assert.assertTrue(methodCallHolder.get());
   }
 
   @Test
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java 
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 2ba88117cd5..50dc64a1e06 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -56,6 +56,7 @@ import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.ManageLifecycleServer;
 import org.apache.druid.guice.PolyBind;
 import org.apache.druid.guice.QueryRunnerFactoryModule;
 import org.apache.druid.guice.QueryableModule;
@@ -246,7 +247,10 @@ public class CliPeon extends GuiceRunnable
 
             binder.bind(TaskRunner.class).to(SingleTaskBackgroundRunner.class);
             
binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class);
-            
binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycle.class);
+            // Bind to ManageLifecycleServer to ensure 
SingleTaskBackgroundRunner is closed before
+            // its dependent services, such as DiscoveryServiceLocator and 
OverlordClient.
+            // This order ensures that tasks can finalize their cleanup 
operations before service location closure.
+            
binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycleServer.class);
 
             bindRealtimeCache(binder);
             bindCoordinatorHandoffNotifer(binder);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to