This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d1d26f8240 Fix mmless ingestion and index tasks (#15372)
3d1d26f8240 is described below

commit 3d1d26f8240b983a57e0c620a5a1620f34bf9733
Author: George Shiqi Wu <[email protected]>
AuthorDate: Tue Nov 28 10:06:07 2023 -0500

    Fix mmless ingestion and index tasks (#15372)
    
    * Fix mmless ingestion and index tasks
    
    * Move comment
    
    * remove dup test
---
 .../druid/indexing/common/task/IndexTask.java      | 17 +++++-
 .../parallel/ParallelIndexSupervisorTask.java      |  4 +-
 .../druid/indexing/common/task/IndexTaskTest.java  | 67 ++++++++++++++++++++++
 3 files changed, 85 insertions(+), 3 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index d880f3eb86a..b18a6f0f174 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -169,6 +169,8 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
 
   private IngestionState ingestionState;
 
+  private boolean shouldCleanup;
+
   @MonotonicNonNull
   private ParseExceptionHandler determinePartitionsParseExceptionHandler;
 
@@ -206,7 +208,8 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
         null,
         ingestionSchema,
         context,
-        -1
+        -1,
+        true
     );
   }
 
@@ -218,7 +221,8 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
       @Nullable String baseSequenceName,
       IndexIngestionSpec ingestionSchema,
       Map<String, Object> context,
-      int maxAllowedLockCount
+      int maxAllowedLockCount,
+      boolean shouldCleanup
   )
   {
     super(
@@ -233,6 +237,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
     this.baseSequenceName = baseSequenceName == null ? getId() : 
baseSequenceName;
     this.ingestionSchema = ingestionSchema;
     this.ingestionState = IngestionState.NOT_STARTED;
+    this.shouldCleanup = shouldCleanup;
   }
 
   @Override
@@ -1080,6 +1085,14 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
     return ingestionSchema.getIOConfig().getNonNullInputFormat();
   }
 
+  @Override
+  public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) 
throws Exception
+  {
+    if (shouldCleanup) {
+      super.cleanUp(toolbox, taskStatus);
+    }
+  }
+
   public static class IndexIngestionSpec extends IngestionSpec<IndexIOConfig, 
IndexTuningConfig>
   {
     private final DataSchema dataSchema;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index e99ef35d942..9940dfdcb4d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -1204,7 +1204,9 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
             convertToIndexTuningConfig(getIngestionSchema().getTuningConfig())
         ),
         getContext(),
-        getIngestionSchema().getTuningConfig().getMaxAllowedLockCount()
+        getIngestionSchema().getTuningConfig().getMaxAllowedLockCount(),
+        // Don't run cleanup in the IndexTask since we are wrapping it in the 
ParallelIndexSupervisorTask
+        false
     );
 
     if (currentSubTaskHolder.setTask(sequentialIndexTask)
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 459c29db37a..1ade29d0d8a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -50,6 +50,7 @@ import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
 import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
 import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
@@ -2690,6 +2691,72 @@ public class IndexTaskTest extends IngestionTestBase
     );
   }
 
+  // If shouldCleanup is false, cleanup should be a no-op
+  @Test
+  public void testCleanupIndexTask() throws Exception
+  {
+    new IndexTask(
+        null,
+        null,
+        null,
+        "dataSource",
+        null,
+        createDefaultIngestionSpec(
+            jsonMapper,
+            temporaryFolder.newFolder(),
+            new UniformGranularitySpec(
+                Granularities.MINUTE,
+                Granularities.MINUTE,
+                
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
+            ),
+            null,
+            createTuningConfigWithMaxRowsPerSegment(10, true),
+            false,
+            false
+        ),
+        null,
+        0,
+        false
+    ).cleanUp(null, null);
+  }
+
+  /* if shouldCleanup is true, we should fall back to AbstractTask.cleanup,
+   * check isEncapsulatedTask=false, and then exit.
+   */
+  @Test
+  public void testCleanup() throws Exception
+  {
+    TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
+    TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class);
+    EasyMock.expect(toolbox.getConfig()).andReturn(taskConfig);
+    EasyMock.expect(taskConfig.isEncapsulatedTask()).andReturn(false);
+    EasyMock.replay(toolbox, taskConfig);
+    new IndexTask(
+        null,
+        null,
+        null,
+        "dataSource",
+        null,
+        createDefaultIngestionSpec(
+            jsonMapper,
+            temporaryFolder.newFolder(),
+            new UniformGranularitySpec(
+                Granularities.MINUTE,
+                Granularities.MINUTE,
+                
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
+            ),
+            null,
+            createTuningConfigWithMaxRowsPerSegment(10, true),
+            false,
+            false
+        ),
+        null,
+        0,
+        true
+    ).cleanUp(toolbox, null);
+    EasyMock.verify(toolbox, taskConfig);
+  }
+
   public static void 
checkTaskStatusErrorMsgForParseExceptionsExceeded(TaskStatus status)
   {
     // full stacktrace will be too long and make tests brittle (e.g. if line # 
changes), just match the main message


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to