kfaraz commented on code in PR #18950:
URL: https://github.com/apache/druid/pull/18950#discussion_r2769468924
##########
processing/src/main/java/org/apache/druid/timeline/CompactionState.java:
##########
@@ -206,4 +206,92 @@ public static Function<Set<DataSegment>, Set<DataSegment>>
addCompactionStateToS
.map(s -> s.withLastCompactionState(compactionState))
.collect(Collectors.toSet());
}
+
+
+ public Builder toBuilder()
Review Comment:
Nit: Would it be to make this method static?
```
public static Builder builder(CompactionState state) {...}
```
##########
multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -1012,7 +1016,10 @@ public void testReplaceOnFoo1WithWhereExtern(String
contextName, Map<String, Obj
Collections.emptyList(),
Collections.singletonList(new
StringDimensionSchema("user")),
GranularityType.HOUR,
-
Intervals.of("2016-06-27T01:00:00.000Z/2016-06-27T02:00:00.000Z")
+
Intervals.of("2016-06-27T01:00:00.000Z/2016-06-27T02:00:00.000Z"),
+ new CompactionTransformSpec(
Review Comment:
Do all the tests now have a transform spec? Should we retain some tests
without transform spec too?
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java:
##########
@@ -66,7 +68,46 @@ public class TaskActionTestKit extends ExternalResource
private BlockingExecutorService metadataCachePollExec;
private boolean useSegmentMetadataCache = false;
+ private boolean useCentralizedDatasourceSchema = false;
+ private boolean batchSegmentAllocation = true;
private boolean skipSegmentPayloadFetchForAllocation = new
TaskLockConfig().isBatchAllocationReduceMetadataIO();
+ private AtomicBoolean configFinalized = new AtomicBoolean();
+
+ public TaskActionTestKit setUseSegmentMetadataCache(boolean
useSegmentMetadataCache)
+ {
+ if (configFinalized.get()) {
+ throw new IllegalStateException("Test config already finalized");
+ }
+ this.useSegmentMetadataCache = useSegmentMetadataCache;
+ return this;
+ }
+
+ public TaskActionTestKit setUseCentralizedDatasourceSchema(boolean
useCentralizedDatasourceSchema)
Review Comment:
Did centralized schema also run into an issue similar to batch allocation?
##########
multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java:
##########
@@ -239,4 +248,20 @@ public void configure(Binder binder)
.to(WindowOperatorQueryKit.class);
binder.bind(WindowOperatorQueryKit.class).in(LazySingleton.class);
}
+
+ @Provides
+ IndexerControllerContext.Builder providesContextBuilder(Injector injector)
Review Comment:
We need not use the `Injector` directly here.
```suggestion
IndexerControllerContext.Builder getControllerContextBuilder(
@EscalatedGlobal ServiceClientFactory serviceClientFactory,
OverlordClient overlordClient
)
```
##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java:
##########
@@ -75,6 +75,11 @@
*/
public class IndexerControllerContext implements ControllerContext
{
+ public interface Builder
Review Comment:
It might be cleaner to move this into a separate file and call it
`IndexerControllerContextFactory` to align with other similar factory classes
in Druid.
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestSpyTaskActionClient.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
+import org.apache.druid.indexing.common.actions.TaskAction;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.segment.SegmentSchemaMapping;
+import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.timeline.DataSegment;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Test utility that wraps a {@link TaskActionClient} to track published
segments and their schema mappings.
+ * <p>
+ * This spy intercepts {@link SegmentTransactionalInsertAction}, {@link
SegmentTransactionalReplaceAction},
+ * and {@link SegmentTransactionalAppendAction} submissions to collect the
segments and schema mappings being
+ * published. All other task actions are delegated to the wrapped client
without modification.
+ * <p>
+ * Useful for verifying that tasks publish the expected segments and schemas
in integration tests.
+ */
+public class TestSpyTaskActionClient implements TaskActionClient
Review Comment:
This action client is very specific to segment transactions and might not
have a wide usage.
I would suggest the following:
- Move this test class to `CompactionTaskRunBase` itself
- Rename it to something simpler `TestTaskActionClient` or
`WrappingTaskActionClient`. (The "spy" is a little misleading since it suggests
some usage of Mockito spy utilities).
- Avoid javadocs since this code is unit-test-only and seems self
explanatory. Add only 1-line javadocs or regular comments where necessary.
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java:
##########
@@ -66,7 +68,46 @@ public class TaskActionTestKit extends ExternalResource
private BlockingExecutorService metadataCachePollExec;
private boolean useSegmentMetadataCache = false;
+ private boolean useCentralizedDatasourceSchema = false;
+ private boolean batchSegmentAllocation = true;
private boolean skipSegmentPayloadFetchForAllocation = new
TaskLockConfig().isBatchAllocationReduceMetadataIO();
+ private AtomicBoolean configFinalized = new AtomicBoolean();
+
+ public TaskActionTestKit setUseSegmentMetadataCache(boolean
useSegmentMetadataCache)
+ {
+ if (configFinalized.get()) {
+ throw new IllegalStateException("Test config already finalized");
+ }
+ this.useSegmentMetadataCache = useSegmentMetadataCache;
+ return this;
+ }
+
+ public TaskActionTestKit setUseCentralizedDatasourceSchema(boolean
useCentralizedDatasourceSchema)
+ {
+ if (configFinalized.get()) {
+ throw new IllegalStateException("Test config already finalized");
+ }
+ this.useCentralizedDatasourceSchema = useCentralizedDatasourceSchema;
+ return this;
+ }
+
+ public TaskActionTestKit setBatchSegmentAllocation(boolean
batchSegmentAllocation)
+ {
+ if (configFinalized.get()) {
Review Comment:
Why do we want to impose this limit? It should be okay to be able to change
the config between tests.
The `before` method would be invoked before every test, right?
##########
multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java:
##########
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.util.Modules;
+import com.google.inject.util.Providers;
+import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.SegmentWranglerModule;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.CompactionIntervalSpec;
+import org.apache.druid.indexing.common.task.CompactionTask;
+import org.apache.druid.indexing.common.task.CompactionTaskRunBase;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.IndexerControllerContext;
+import org.apache.druid.msq.indexing.MSQCompactionRunner;
+import org.apache.druid.msq.indexing.MSQControllerTask;
+import org.apache.druid.msq.test.MSQTestBase;
+import org.apache.druid.msq.test.MSQTestControllerContext;
+import org.apache.druid.query.ForwardingQueryProcessingPool;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
+import org.apache.druid.query.groupby.GroupingEngine;
+import org.apache.druid.query.groupby.TestGroupByBuffers;
+import org.apache.druid.query.policy.NoopPolicyEnforcer;
+import org.apache.druid.query.policy.PolicyEnforcer;
+import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
+import org.apache.druid.segment.DataSegmentsWithSchemas;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.ReferenceCountedSegmentProvider;
+import org.apache.druid.segment.loading.AcquireSegmentAction;
+import org.apache.druid.segment.loading.AcquireSegmentResult;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
+import org.apache.druid.segment.loading.SegmentCacheManager;
+import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
+import org.apache.druid.segment.serde.ComplexMetrics;
+import org.apache.druid.server.security.Escalator;
+import org.apache.druid.sql.calcite.util.LookylooModule;
+import org.apache.druid.timeline.CompactionState;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for CompactionTask using MSQCompactionRunner.
+ * Extends CompactionTaskRunTest to reuse all test infrastructure.
+ */
+@RunWith(Parameterized.class)
+public class MSQCompactionTaskRunTest extends CompactionTaskRunBase
+{
+ private final ConcurrentHashMap<String, TaskActionClient> taskActionClients
= new ConcurrentHashMap<>();
+ private Injector injector;
+
+ @Parameterized.Parameters(name = "name: {0}, inputInterval={5},
segmentGran={6}")
+ public static Iterable<Object[]> constructorFeeder()
+ {
+ final List<Object[]> constructors = new ArrayList<>();
+
+ for (LockGranularity lockGranularity : new
LockGranularity[]{LockGranularity.TIME_CHUNK}) {
+ for (boolean useCentralizedDatasourceSchema : new boolean[]{false}) {
+ for (boolean useSegmentMetadataCache : new boolean[]{false, true}) {
+ for (boolean useConcurrentLocks : new boolean[]{false, true}) {
+ for (Interval inputInterval : new Interval[]{TEST_INTERVAL}) {
+ for (Granularity segmentGran : new
Granularity[]{Granularities.SIX_HOUR}) {
+ String name = StringUtils.format(
Review Comment:
This should be done in the `@Paramaters` annotation itself.
##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java:
##########
@@ -108,7 +113,10 @@ public IndexerControllerContext(
this.clientFactory = clientFactory;
this.overlordClient = overlordClient;
this.memoryIntrospector = injector.getInstance(MemoryIntrospector.class);
- final StorageConnectorProvider storageConnectorProvider =
injector.getInstance(Key.get(StorageConnectorProvider.class,
MultiStageQuery.class));
+ final StorageConnectorProvider storageConnectorProvider =
injector.getInstance(Key.get(
Review Comment:
Suggestion: Change doesn't seem necessary. It's best to avoid formatting
changes unless they really help with readability.
##########
multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java:
##########
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.util.Modules;
+import com.google.inject.util.Providers;
+import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.SegmentWranglerModule;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.CompactionIntervalSpec;
+import org.apache.druid.indexing.common.task.CompactionTask;
+import org.apache.druid.indexing.common.task.CompactionTaskRunBase;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.IndexerControllerContext;
+import org.apache.druid.msq.indexing.MSQCompactionRunner;
+import org.apache.druid.msq.indexing.MSQControllerTask;
+import org.apache.druid.msq.test.MSQTestBase;
+import org.apache.druid.msq.test.MSQTestControllerContext;
+import org.apache.druid.query.ForwardingQueryProcessingPool;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
+import org.apache.druid.query.groupby.GroupingEngine;
+import org.apache.druid.query.groupby.TestGroupByBuffers;
+import org.apache.druid.query.policy.NoopPolicyEnforcer;
+import org.apache.druid.query.policy.PolicyEnforcer;
+import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
+import org.apache.druid.segment.DataSegmentsWithSchemas;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.ReferenceCountedSegmentProvider;
+import org.apache.druid.segment.loading.AcquireSegmentAction;
+import org.apache.druid.segment.loading.AcquireSegmentResult;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
+import org.apache.druid.segment.loading.SegmentCacheManager;
+import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
+import org.apache.druid.segment.serde.ComplexMetrics;
+import org.apache.druid.server.security.Escalator;
+import org.apache.druid.sql.calcite.util.LookylooModule;
+import org.apache.druid.timeline.CompactionState;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for CompactionTask using MSQCompactionRunner.
+ * Extends CompactionTaskRunTest to reuse all test infrastructure.
+ */
+@RunWith(Parameterized.class)
+public class MSQCompactionTaskRunTest extends CompactionTaskRunBase
+{
+ private final ConcurrentHashMap<String, TaskActionClient> taskActionClients
= new ConcurrentHashMap<>();
+ private Injector injector;
+
+ @Parameterized.Parameters(name = "name: {0}, inputInterval={5},
segmentGran={6}")
+ public static Iterable<Object[]> constructorFeeder()
+ {
+ final List<Object[]> constructors = new ArrayList<>();
+
+ for (LockGranularity lockGranularity : new
LockGranularity[]{LockGranularity.TIME_CHUNK}) {
+ for (boolean useCentralizedDatasourceSchema : new boolean[]{false}) {
+ for (boolean useSegmentMetadataCache : new boolean[]{false, true}) {
+ for (boolean useConcurrentLocks : new boolean[]{false, true}) {
+ for (Interval inputInterval : new Interval[]{TEST_INTERVAL}) {
+ for (Granularity segmentGran : new
Granularity[]{Granularities.SIX_HOUR}) {
Review Comment:
I think only 2 for loops (for `useSegmentMetadataCache` and
`useConcurrentLocks`) should be enough here. The other parameters seem to be
taking a single value only.
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java:
##########
@@ -66,7 +68,46 @@ public class TaskActionTestKit extends ExternalResource
private BlockingExecutorService metadataCachePollExec;
private boolean useSegmentMetadataCache = false;
+ private boolean useCentralizedDatasourceSchema = false;
+ private boolean batchSegmentAllocation = true;
private boolean skipSegmentPayloadFetchForAllocation = new
TaskLockConfig().isBatchAllocationReduceMetadataIO();
+ private AtomicBoolean configFinalized = new AtomicBoolean();
+
+ public TaskActionTestKit setUseSegmentMetadataCache(boolean
useSegmentMetadataCache)
+ {
+ if (configFinalized.get()) {
+ throw new IllegalStateException("Test config already finalized");
+ }
+ this.useSegmentMetadataCache = useSegmentMetadataCache;
+ return this;
+ }
+
+ public TaskActionTestKit setUseCentralizedDatasourceSchema(boolean
useCentralizedDatasourceSchema)
+ {
+ if (configFinalized.get()) {
+ throw new IllegalStateException("Test config already finalized");
+ }
+ this.useCentralizedDatasourceSchema = useCentralizedDatasourceSchema;
+ return this;
+ }
+
+ public TaskActionTestKit setBatchSegmentAllocation(boolean
batchSegmentAllocation)
Review Comment:
Okay, in that case, let's skip the test case for segment lock and add a
comment or add a test case and mark it disabled. That way, we would be aware
that there is a bug and can address it later.
##########
multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java:
##########
@@ -239,4 +248,20 @@ public void configure(Binder binder)
.to(WindowOperatorQueryKit.class);
binder.bind(WindowOperatorQueryKit.class).in(LazySingleton.class);
}
+
+ @Provides
+ IndexerControllerContext.Builder providesContextBuilder(Injector injector)
Review Comment:
Also, is there any functional change here or is the code just being moved
from `MSQControllerTask`?
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java:
##########
@@ -137,30 +138,35 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-@RunWith(Parameterized.class)
-public class CompactionTaskRunTest extends IngestionTestBase
+public abstract class CompactionTaskRunBase
Review Comment:
The diff for this class seems big and it is a little difficult to ensure
that no assertions have actually changed.
Since we are changing some core logic in this PR, I would advise minimizing
the changes to this test class so that we can be certain that the new code
works correctly with all existing tests.
If any refactor is needed in this test, we can do it in a follow up PR.
For the time being, if some methods need to be reused for the
`MSQCompactionTaskRunTest`, they may be copied over or we may make them `public
static` where applicable.
##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -411,12 +411,18 @@ private static List<DimensionSpec> getAggregateDimensions(
{
List<DimensionSpec> dimensionSpecs = new ArrayList<>();
- if (isQueryGranularityEmptyOrNone(dataSchema)) {
- // Dimensions in group-by aren't allowed to have time column name as the
output name.
- dimensionSpecs.add(new
DefaultDimensionSpec(ColumnHolder.TIME_COLUMN_NAME, TIME_VIRTUAL_COLUMN,
ColumnType.LONG));
- } else {
- // The changed granularity would result in a new virtual column that
needs to be aggregated upon.
- dimensionSpecs.add(new DefaultDimensionSpec(TIME_VIRTUAL_COLUMN,
TIME_VIRTUAL_COLUMN, ColumnType.LONG));
+ if
(!dataSchema.getDimensionsSpec().getDimensionNames().contains(ColumnHolder.TIME_COLUMN_NAME))
{
Review Comment:
Please add a 1-line comment before this `if` clause clarifying the reason
behind the check.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java:
##########
@@ -60,6 +60,7 @@ public CompactionInputSpec getInputSpec()
return inputSpec;
}
+ @Deprecated
Review Comment:
Nit: Adding a short javadoc indicating why this is deprecated would be nice.
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java:
##########
@@ -571,513 +479,277 @@ public void
testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exc
}
}
- Assert.assertTrue(compactionFuture.get().lhs.isSuccess());
-
- dataSegmentsWithSchemas = compactionFuture.get().rhs;
- verifySchema(dataSegmentsWithSchemas);
- segments = new ArrayList<>(dataSegmentsWithSchemas.getSegments());
- Assert.assertEquals(3, segments.size());
-
- for (int i = 0; i < 3; i++) {
- Assert.assertEquals(
- Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
- segments.get(i).getInterval()
- );
- Assert.assertEquals(
- getDefaultCompactionState(
- Granularities.HOUR,
- Granularities.MINUTE,
- ImmutableList.of(Intervals.of(
- "2014-01-01T0%d:00:00/2014-01-01T0%d:00:00",
- i,
- i + 1
- ))
- ),
- segments.get(i).getLastCompactionState()
- );
- if (lockGranularity == LockGranularity.SEGMENT) {
- Assert.assertEquals(
- new
NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2,
(short) 1, (short) 1),
- segments.get(i).getShardSpec()
- );
- } else {
- Assert.assertEquals(new NumberedShardSpec(0, 1),
segments.get(i).getShardSpec());
- }
- }
- }
-
- @Test
- public void testWithSegmentGranularity() throws Exception
Review Comment:
Is this test not needed anymore?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]