This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3d1d26f8240 Fix mmless ingestion and index tasks (#15372)
3d1d26f8240 is described below
commit 3d1d26f8240b983a57e0c620a5a1620f34bf9733
Author: George Shiqi Wu <[email protected]>
AuthorDate: Tue Nov 28 10:06:07 2023 -0500
Fix mmless ingestion and index tasks (#15372)
* Fix mmless ingestion and index tasks
* Move comment
* remove dup test
---
.../druid/indexing/common/task/IndexTask.java | 17 +++++-
.../parallel/ParallelIndexSupervisorTask.java | 4 +-
.../druid/indexing/common/task/IndexTaskTest.java | 67 ++++++++++++++++++++++
3 files changed, 85 insertions(+), 3 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index d880f3eb86a..b18a6f0f174 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -169,6 +169,8 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
private IngestionState ingestionState;
+ private boolean shouldCleanup;
+
@MonotonicNonNull
private ParseExceptionHandler determinePartitionsParseExceptionHandler;
@@ -206,7 +208,8 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
null,
ingestionSchema,
context,
- -1
+ -1,
+ true
);
}
@@ -218,7 +221,8 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
@Nullable String baseSequenceName,
IndexIngestionSpec ingestionSchema,
Map<String, Object> context,
- int maxAllowedLockCount
+ int maxAllowedLockCount,
+ boolean shouldCleanup
)
{
super(
@@ -233,6 +237,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
this.baseSequenceName = baseSequenceName == null ? getId() :
baseSequenceName;
this.ingestionSchema = ingestionSchema;
this.ingestionState = IngestionState.NOT_STARTED;
+ this.shouldCleanup = shouldCleanup;
}
@Override
@@ -1080,6 +1085,14 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
return ingestionSchema.getIOConfig().getNonNullInputFormat();
}
+ @Override
+ public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus)
throws Exception
+ {
+ if (shouldCleanup) {
+ super.cleanUp(toolbox, taskStatus);
+ }
+ }
+
public static class IndexIngestionSpec extends IngestionSpec<IndexIOConfig,
IndexTuningConfig>
{
private final DataSchema dataSchema;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index e99ef35d942..9940dfdcb4d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -1204,7 +1204,9 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
convertToIndexTuningConfig(getIngestionSchema().getTuningConfig())
),
getContext(),
- getIngestionSchema().getTuningConfig().getMaxAllowedLockCount()
+ getIngestionSchema().getTuningConfig().getMaxAllowedLockCount(),
+ // Don't run cleanup in the IndexTask since we are wrapping it in the
ParallelIndexSupervisorTask
+ false
);
if (currentSubTaskHolder.setTask(sequentialIndexTask)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 459c29db37a..1ade29d0d8a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -50,6 +50,7 @@ import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
@@ -2690,6 +2691,72 @@ public class IndexTaskTest extends IngestionTestBase
);
}
+ // If shouldCleanup is false, cleanup should be a no-op
+ @Test
+ public void testCleanupIndexTask() throws Exception
+ {
+ new IndexTask(
+ null,
+ null,
+ null,
+ "dataSource",
+ null,
+ createDefaultIngestionSpec(
+ jsonMapper,
+ temporaryFolder.newFolder(),
+ new UniformGranularitySpec(
+ Granularities.MINUTE,
+ Granularities.MINUTE,
+
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
+ ),
+ null,
+ createTuningConfigWithMaxRowsPerSegment(10, true),
+ false,
+ false
+ ),
+ null,
+ 0,
+ false
+ ).cleanUp(null, null);
+ }
+
+ /* if shouldCleanup is true, we should fall back to AbstractTask.cleanup,
+ * check isEncapsulatedTask=false, and then exit.
+ */
+ @Test
+ public void testCleanup() throws Exception
+ {
+ TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
+ TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class);
+ EasyMock.expect(toolbox.getConfig()).andReturn(taskConfig);
+ EasyMock.expect(taskConfig.isEncapsulatedTask()).andReturn(false);
+ EasyMock.replay(toolbox, taskConfig);
+ new IndexTask(
+ null,
+ null,
+ null,
+ "dataSource",
+ null,
+ createDefaultIngestionSpec(
+ jsonMapper,
+ temporaryFolder.newFolder(),
+ new UniformGranularitySpec(
+ Granularities.MINUTE,
+ Granularities.MINUTE,
+
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
+ ),
+ null,
+ createTuningConfigWithMaxRowsPerSegment(10, true),
+ false,
+ false
+ ),
+ null,
+ 0,
+ true
+ ).cleanUp(toolbox, null);
+ EasyMock.verify(toolbox, taskConfig);
+ }
+
public static void
checkTaskStatusErrorMsgForParseExceptionsExceeded(TaskStatus status)
{
// full stacktrace will be too long and make tests brittle (e.g. if line #
changes), just match the main message
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]