cecemei commented on code in PR #18950:
URL: https://github.com/apache/druid/pull/18950#discussion_r2770161133
##########
multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java:
##########
@@ -239,4 +248,20 @@ public void configure(Binder binder)
.to(WindowOperatorQueryKit.class);
binder.bind(WindowOperatorQueryKit.class).in(LazySingleton.class);
}
+
+ @Provides
+ IndexerControllerContext.Builder providesContextBuilder(Injector injector)
Review Comment:
there's no functional change, it's just easier for testing so that we can
inject the context in tests
##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java:
##########
@@ -75,6 +75,11 @@
*/
public class IndexerControllerContext implements ControllerContext
{
+ public interface Builder
Review Comment:
updated
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java:
##########
@@ -66,7 +68,46 @@ public class TaskActionTestKit extends ExternalResource
private BlockingExecutorService metadataCachePollExec;
private boolean useSegmentMetadataCache = false;
+ private boolean useCentralizedDatasourceSchema = false;
+ private boolean batchSegmentAllocation = true;
private boolean skipSegmentPayloadFetchForAllocation = new
TaskLockConfig().isBatchAllocationReduceMetadataIO();
+ private AtomicBoolean configFinalized = new AtomicBoolean();
+
+ public TaskActionTestKit setUseSegmentMetadataCache(boolean
useSegmentMetadataCache)
+ {
+ if (configFinalized.get()) {
+ throw new IllegalStateException("Test config already finalized");
+ }
+ this.useSegmentMetadataCache = useSegmentMetadataCache;
+ return this;
+ }
+
+ public TaskActionTestKit setUseCentralizedDatasourceSchema(boolean
useCentralizedDatasourceSchema)
+ {
+ if (configFinalized.get()) {
+ throw new IllegalStateException("Test config already finalized");
+ }
+ this.useCentralizedDatasourceSchema = useCentralizedDatasourceSchema;
+ return this;
+ }
+
+ public TaskActionTestKit setBatchSegmentAllocation(boolean
batchSegmentAllocation)
+ {
+ if (configFinalized.get()) {
Review Comment:
well this is just to prevent ppl from accidently calling it inside test
method
##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -411,12 +411,18 @@ private static List<DimensionSpec> getAggregateDimensions(
{
List<DimensionSpec> dimensionSpecs = new ArrayList<>();
- if (isQueryGranularityEmptyOrNone(dataSchema)) {
- // Dimensions in group-by aren't allowed to have time column name as the
output name.
- dimensionSpecs.add(new
DefaultDimensionSpec(ColumnHolder.TIME_COLUMN_NAME, TIME_VIRTUAL_COLUMN,
ColumnType.LONG));
- } else {
- // The changed granularity would result in a new virtual column that
needs to be aggregated upon.
- dimensionSpecs.add(new DefaultDimensionSpec(TIME_VIRTUAL_COLUMN,
TIME_VIRTUAL_COLUMN, ColumnType.LONG));
+ if
(!dataSchema.getDimensionsSpec().getDimensionNames().contains(ColumnHolder.TIME_COLUMN_NAME))
{
Review Comment:
added
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java:
##########
@@ -66,7 +68,46 @@ public class TaskActionTestKit extends ExternalResource
private BlockingExecutorService metadataCachePollExec;
private boolean useSegmentMetadataCache = false;
+ private boolean useCentralizedDatasourceSchema = false;
+ private boolean batchSegmentAllocation = true;
private boolean skipSegmentPayloadFetchForAllocation = new
TaskLockConfig().isBatchAllocationReduceMetadataIO();
+ private AtomicBoolean configFinalized = new AtomicBoolean();
+
+ public TaskActionTestKit setUseSegmentMetadataCache(boolean
useSegmentMetadataCache)
+ {
+ if (configFinalized.get()) {
+ throw new IllegalStateException("Test config already finalized");
+ }
+ this.useSegmentMetadataCache = useSegmentMetadataCache;
+ return this;
+ }
+
+ public TaskActionTestKit setUseCentralizedDatasourceSchema(boolean
useCentralizedDatasourceSchema)
Review Comment:
it's not supported by msq engine, schema is not saved
##########
multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java:
##########
@@ -239,4 +248,20 @@ public void configure(Binder binder)
.to(WindowOperatorQueryKit.class);
binder.bind(WindowOperatorQueryKit.class).in(LazySingleton.class);
}
+
+ @Provides
+ IndexerControllerContext.Builder providesContextBuilder(Injector injector)
Review Comment:
controller context need the injector... i originally added separate class as
well but i think some tests breaks since the guice module asks for class
instance even before it asks to provide context instance. it's not ideal but
injector is in many places.
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java:
##########
@@ -66,7 +68,46 @@ public class TaskActionTestKit extends ExternalResource
private BlockingExecutorService metadataCachePollExec;
private boolean useSegmentMetadataCache = false;
+ private boolean useCentralizedDatasourceSchema = false;
+ private boolean batchSegmentAllocation = true;
private boolean skipSegmentPayloadFetchForAllocation = new
TaskLockConfig().isBatchAllocationReduceMetadataIO();
+ private AtomicBoolean configFinalized = new AtomicBoolean();
+
+ public TaskActionTestKit setUseSegmentMetadataCache(boolean
useSegmentMetadataCache)
+ {
+ if (configFinalized.get()) {
+ throw new IllegalStateException("Test config already finalized");
+ }
+ this.useSegmentMetadataCache = useSegmentMetadataCache;
+ return this;
+ }
+
+ public TaskActionTestKit setUseCentralizedDatasourceSchema(boolean
useCentralizedDatasourceSchema)
+ {
+ if (configFinalized.get()) {
+ throw new IllegalStateException("Test config already finalized");
+ }
+ this.useCentralizedDatasourceSchema = useCentralizedDatasourceSchema;
+ return this;
+ }
+
+ public TaskActionTestKit setBatchSegmentAllocation(boolean
batchSegmentAllocation)
+ {
+ if (configFinalized.get()) {
Review Comment:
the before method was called before @test, wont that mean we can't change
the config between tests, but in setup only?
##########
processing/src/main/java/org/apache/druid/timeline/CompactionState.java:
##########
@@ -206,4 +206,92 @@ public static Function<Set<DataSegment>, Set<DataSegment>>
addCompactionStateToS
.map(s -> s.withLastCompactionState(compactionState))
.collect(Collectors.toSet());
}
+
+
+ public Builder toBuilder()
Review Comment:
ah i thought this toBuilder() is more fluent style? like:
> getDefaultCompactionState().toBuilder().dimensionSpec().build()
##########
multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -1012,7 +1016,10 @@ public void testReplaceOnFoo1WithWhereExtern(String
contextName, Map<String, Obj
Collections.emptyList(),
Collections.singletonList(new
StringDimensionSchema("user")),
GranularityType.HOUR,
-
Intervals.of("2016-06-27T01:00:00.000Z/2016-06-27T02:00:00.000Z")
+
Intervals.of("2016-06-27T01:00:00.000Z/2016-06-27T02:00:00.000Z"),
+ new CompactionTransformSpec(
Review Comment:
tests without filters are still the same (without transform spec).
##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java:
##########
@@ -108,7 +113,10 @@ public IndexerControllerContext(
this.clientFactory = clientFactory;
this.overlordClient = overlordClient;
this.memoryIntrospector = injector.getInstance(MemoryIntrospector.class);
- final StorageConnectorProvider storageConnectorProvider =
injector.getInstance(Key.get(StorageConnectorProvider.class,
MultiStageQuery.class));
+ final StorageConnectorProvider storageConnectorProvider =
injector.getInstance(Key.get(
Review Comment:
this has been reverted
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestSpyTaskActionClient.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
+import org.apache.druid.indexing.common.actions.TaskAction;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.segment.SegmentSchemaMapping;
+import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.timeline.DataSegment;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Test utility that wraps a {@link TaskActionClient} to track published
segments and their schema mappings.
+ * <p>
+ * This spy intercepts {@link SegmentTransactionalInsertAction}, {@link
SegmentTransactionalReplaceAction},
+ * and {@link SegmentTransactionalAppendAction} submissions to collect the
segments and schema mappings being
+ * published. All other task actions are delegated to the wrapped client
without modification.
+ * <p>
+ * Useful for verifying that tasks publish the expected segments and schemas
in integration tests.
+ */
+public class TestSpyTaskActionClient implements TaskActionClient
Review Comment:
this is done
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java:
##########
@@ -137,30 +138,35 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-@RunWith(Parameterized.class)
-public class CompactionTaskRunTest extends IngestionTestBase
+public abstract class CompactionTaskRunBase
Review Comment:
motivation to change this test is that it doesnt cover msq runner, and while
re-writing it i find some tests too long to read, and some issues in msq runner
have been discovered due to making it run with msq runner. the test coverage
has improved a lot in this class, priori to this pr, this test only tests
native compaction runner and also dont use a real task action client. i was
surprised by how little test coverage we have on msq compaction runner, wanted
to use this test file for any msq compaction runner related change in the
future.
the changes in this pr actually was mostly not covered by this test (before
this pr), only the interval locking is related. i re-ran the old test with the
interval lock change, the failures are due to the interval diff in compaction
state, and another test failed due to lock interval covers the input interval
now. so all seems expected.
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java:
##########
@@ -66,7 +68,46 @@ public class TaskActionTestKit extends ExternalResource
private BlockingExecutorService metadataCachePollExec;
private boolean useSegmentMetadataCache = false;
+ private boolean useCentralizedDatasourceSchema = false;
+ private boolean batchSegmentAllocation = true;
private boolean skipSegmentPayloadFetchForAllocation = new
TaskLockConfig().isBatchAllocationReduceMetadataIO();
+ private AtomicBoolean configFinalized = new AtomicBoolean();
+
+ public TaskActionTestKit setUseSegmentMetadataCache(boolean
useSegmentMetadataCache)
+ {
+ if (configFinalized.get()) {
+ throw new IllegalStateException("Test config already finalized");
+ }
+ this.useSegmentMetadataCache = useSegmentMetadataCache;
+ return this;
+ }
+
+ public TaskActionTestKit setUseCentralizedDatasourceSchema(boolean
useCentralizedDatasourceSchema)
+ {
+ if (configFinalized.get()) {
+ throw new IllegalStateException("Test config already finalized");
+ }
+ this.useCentralizedDatasourceSchema = useCentralizedDatasourceSchema;
+ return this;
+ }
+
+ public TaskActionTestKit setBatchSegmentAllocation(boolean
batchSegmentAllocation)
Review Comment:
i could not reproduce the test cases any more, maybe it disappeared after we
switched to use the widen interval. anyways, so i added the segmentQueue test
param.
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java:
##########
@@ -571,513 +479,277 @@ public void
testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exc
}
}
- Assert.assertTrue(compactionFuture.get().lhs.isSuccess());
-
- dataSegmentsWithSchemas = compactionFuture.get().rhs;
- verifySchema(dataSegmentsWithSchemas);
- segments = new ArrayList<>(dataSegmentsWithSchemas.getSegments());
- Assert.assertEquals(3, segments.size());
-
- for (int i = 0; i < 3; i++) {
- Assert.assertEquals(
- Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
- segments.get(i).getInterval()
- );
- Assert.assertEquals(
- getDefaultCompactionState(
- Granularities.HOUR,
- Granularities.MINUTE,
- ImmutableList.of(Intervals.of(
- "2014-01-01T0%d:00:00/2014-01-01T0%d:00:00",
- i,
- i + 1
- ))
- ),
- segments.get(i).getLastCompactionState()
- );
- if (lockGranularity == LockGranularity.SEGMENT) {
- Assert.assertEquals(
- new
NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2,
(short) 1, (short) 1),
- segments.get(i).getShardSpec()
- );
- } else {
- Assert.assertEquals(new NumberedShardSpec(0, 1),
segments.get(i).getShardSpec());
- }
- }
- }
-
- @Test
- public void testWithSegmentGranularity() throws Exception
Review Comment:
segment granularity has been tested very throughly in this test after this
change, so we dont need it any more.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java:
##########
@@ -60,6 +60,7 @@ public CompactionInputSpec getInputSpec()
return inputSpec;
}
+ @Deprecated
Review Comment:
added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]