This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch 31.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/31.0.0 by this push:
new f7010253dab Fix issues with MSQ Compaction (#17250) (#17263)
f7010253dab is described below
commit f7010253dab259498581b62395a17d3f1bb2ee68
Author: Vishesh Garg <[email protected]>
AuthorDate: Mon Oct 7 08:42:34 2024 +0530
Fix issues with MSQ Compaction (#17250) (#17263)
The patch makes the following changes:
1. Fixes a bug causing compaction to fail on array, complex, and other
non-primitive-type columns
2. Updates compaction status check to be conscious of partition dimensions
when comparing dimension ordering.
3. Ensures only string columns are specified as partition dimensions
4. Ensures `rollup` is true if and only if metricsSpec is non-empty
5. Ensures disjoint intervals aren't submitted for compaction
6. Adds `compactionReason` to compaction task context.
(cherry picked from commit 7e35e50052ee1b4f4d65222e0d5c4883e9fa26da)
---
.../druid/msq/indexing/MSQCompactionRunner.java | 33 ++++++---
.../msq/indexing/MSQCompactionRunnerTest.java | 83 +++++++++++++++++-----
.../indexing/common/task/CompactionRunner.java | 5 +-
.../druid/indexing/common/task/CompactionTask.java | 5 +-
.../common/task/NativeCompactionRunner.java | 3 +-
.../indexing/ClientCompactionRunnerInfo.java | 54 ++++++++++----
.../druid/server/compaction/CompactionStatus.java | 44 ++++++++++--
.../server/coordinator/duty/CompactSegments.java | 5 ++
.../indexing/ClientCompactionRunnerInfoTest.java | 68 ++++++++++++++----
.../compaction/NewestSegmentFirstPolicyTest.java | 77 ++++++++++++++++++++
10 files changed, 317 insertions(+), 60 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index 417fdb60d0f..e20188d5829 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import com.google.inject.Injector;
import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.data.input.impl.DimensionSchema;
@@ -129,21 +130,35 @@ public class MSQCompactionRunner implements
CompactionRunner
* The following configs aren't supported:
* <ul>
* <li>partitionsSpec of type HashedParititionsSpec.</li>
+ * <li>'range' partitionsSpec with non-string partition dimensions.</li>
* <li>maxTotalRows in DynamicPartitionsSpec.</li>
- * <li>rollup in granularitySpec set to false when metricsSpec is specified
or true when it's null.
- * Null is treated as true if metricsSpec exist and false if empty.</li>
- * <li>any metric is non-idempotent, i.e. it defines some aggregatorFactory
'A' s.t. 'A != A.combiningFactory()'.</li>
+ * <li>Rollup without metricsSpec being specified or vice-versa.</li>
+ * <li>Any aggregatorFactory {@code A} s.t. {@code A !=
A.combiningFactory()}.</li>
+ * <li>Multiple disjoint intervals in compaction task</li>
* </ul>
*/
@Override
public CompactionConfigValidationResult validateCompactionTask(
- CompactionTask compactionTask
+ CompactionTask compactionTask,
+ Map<Interval, DataSchema> intervalToDataSchemaMap
)
{
+ if (intervalToDataSchemaMap.size() > 1) {
+ // We are currently not able to handle multiple intervals in the map for
multiple reasons, one of them being that
+ // the subsequent worker ids clash -- since they are derived from
MSQControllerTask ID which in turn is equal to
+ // CompactionTask ID for each sequentially launched MSQControllerTask.
+ return CompactionConfigValidationResult.failure(
+ "MSQ: Disjoint compaction intervals[%s] not supported",
+ intervalToDataSchemaMap.keySet()
+ );
+ }
List<CompactionConfigValidationResult> validationResults = new
ArrayList<>();
if (compactionTask.getTuningConfig() != null) {
-
validationResults.add(ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
- compactionTask.getTuningConfig().getPartitionsSpec())
+ validationResults.add(
+ ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
+ compactionTask.getTuningConfig().getPartitionsSpec(),
+
Iterables.getOnlyElement(intervalToDataSchemaMap.values()).getDimensionsSpec().getDimensions()
+ )
);
}
if (compactionTask.getGranularitySpec() != null) {
@@ -300,7 +315,7 @@ public class MSQCompactionRunner implements CompactionRunner
rowSignatureBuilder.add(TIME_VIRTUAL_COLUMN, ColumnType.LONG);
}
for (DimensionSchema dimensionSchema :
dataSchema.getDimensionsSpec().getDimensions()) {
- rowSignatureBuilder.add(dimensionSchema.getName(),
ColumnType.fromString(dimensionSchema.getTypeName()));
+ rowSignatureBuilder.add(dimensionSchema.getName(),
dimensionSchema.getColumnType());
}
// There can be columns that are part of metricsSpec for a datasource.
for (AggregatorFactory aggregatorFactory : dataSchema.getAggregators()) {
@@ -416,7 +431,9 @@ public class MSQCompactionRunner implements CompactionRunner
{
if (dataSchema.getGranularitySpec() != null) {
// If rollup is true without any metrics, all columns are treated as
dimensions and
- // duplicate rows are removed in line with native compaction.
+ // duplicate rows are removed in line with native compaction. This case
can only happen if the rollup is
+ // specified as null in the compaction spec and is then inferred to be
true by segment analysis. metrics=null and
+ // rollup=true combination in turn can only have been recorded for
natively ingested segments.
return dataSchema.getGranularitySpec().isRollup();
}
// If no rollup specified, decide based on whether metrics are present.
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index 15b12be1575..0b5395d727f 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.msq.indexing;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
@@ -41,6 +42,7 @@ import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
@@ -96,7 +98,6 @@ public class MSQCompactionRunnerTest
private static final int MAX_ROWS_PER_SEGMENT = 150000;
private static final GranularityType SEGMENT_GRANULARITY =
GranularityType.HOUR;
private static final GranularityType QUERY_GRANULARITY =
GranularityType.HOUR;
- private static List<String> PARTITION_DIMENSIONS;
private static final StringDimensionSchema STRING_DIMENSION = new
StringDimensionSchema("string_dim", null, false);
private static final StringDimensionSchema MV_STRING_DIMENSION = new
StringDimensionSchema("mv_string_dim", null, null);
@@ -106,24 +107,49 @@ public class MSQCompactionRunnerTest
LONG_DIMENSION,
MV_STRING_DIMENSION
);
+ private static final Map<Interval, DataSchema> INTERVAL_DATASCHEMAS =
ImmutableMap.of(
+ COMPACTION_INTERVAL,
+ new DataSchema.Builder()
+ .withDataSource(DATA_SOURCE)
+ .withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null))
+ .withDimensions(new DimensionsSpec(DIMENSIONS))
+ .build()
+ );
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final AggregatorFactory AGG1 = new
CountAggregatorFactory("agg_0");
private static final AggregatorFactory AGG2 = new
LongSumAggregatorFactory("sum_added", "sum_added");
private static final List<AggregatorFactory> AGGREGATORS =
ImmutableList.of(AGG1, AGG2);
private static final MSQCompactionRunner MSQ_COMPACTION_RUNNER = new
MSQCompactionRunner(JSON_MAPPER, TestExprMacroTable.INSTANCE, null);
+ private static final List<String> PARTITION_DIMENSIONS =
Collections.singletonList(STRING_DIMENSION.getName());
+
@BeforeClass
public static void setupClass()
{
NullHandling.initializeForTests();
+ }
- final StringDimensionSchema stringDimensionSchema = new
StringDimensionSchema(
- "string_dim",
+ @Test
+ public void testMultipleDisjointCompactionIntervalsAreInvalid()
+ {
+ Map<Interval, DataSchema> intervalDataschemas = new
HashMap<>(INTERVAL_DATASCHEMAS);
+ intervalDataschemas.put(Intervals.of("2017-07-01/2018-01-01"), null);
+ CompactionTask compactionTask = createCompactionTask(
+ new HashedPartitionsSpec(3, null, ImmutableList.of("dummy")),
+ null,
+ Collections.emptyMap(),
null,
null
);
-
- PARTITION_DIMENSIONS =
Collections.singletonList(stringDimensionSchema.getName());
+ CompactionConfigValidationResult validationResult =
MSQ_COMPACTION_RUNNER.validateCompactionTask(
+ compactionTask,
+ intervalDataschemas
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ StringUtils.format("MSQ: Disjoint compaction intervals[%s] not
supported", intervalDataschemas.keySet()),
+ validationResult.getReason()
+ );
}
@Test
@@ -136,11 +162,11 @@ public class MSQCompactionRunnerTest
null,
null
);
-
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS).isValid());
}
@Test
- public void testDimensionRangePartitionsSpecIsValid()
+ public void testStringDimensionInRangePartitionsSpecIsValid()
{
CompactionTask compactionTask = createCompactionTask(
new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null,
PARTITION_DIMENSIONS, false),
@@ -149,7 +175,29 @@ public class MSQCompactionRunnerTest
null,
null
);
-
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS).isValid());
+ }
+
+ @Test
+ public void testLongDimensionInRangePartitionsSpecIsInvalid()
+ {
+ List<String> longPartitionDimension =
Collections.singletonList(LONG_DIMENSION.getName());
+ CompactionTask compactionTask = createCompactionTask(
+ new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null,
longPartitionDimension, false),
+ null,
+ Collections.emptyMap(),
+ null,
+ null
+ );
+
+ CompactionConfigValidationResult validationResult =
MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
+
INTERVAL_DATASCHEMAS
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ "MSQ: Non-string partition dimension[long_dim] of type[long] not
supported with 'range' partition spec",
+ validationResult.getReason()
+ );
}
@Test
@@ -162,7 +210,7 @@ public class MSQCompactionRunnerTest
null,
null
);
-
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS).isValid());
}
@Test
@@ -175,7 +223,7 @@ public class MSQCompactionRunnerTest
null,
null
);
-
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS).isValid());
}
@Test
@@ -188,7 +236,7 @@ public class MSQCompactionRunnerTest
new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null),
null
);
-
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS).isValid());
}
@Test
@@ -201,7 +249,7 @@ public class MSQCompactionRunnerTest
new ClientCompactionTaskGranularitySpec(null, null, false),
AGGREGATORS.toArray(new AggregatorFactory[0])
);
-
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS).isValid());
}
@Test
@@ -214,7 +262,7 @@ public class MSQCompactionRunnerTest
new ClientCompactionTaskGranularitySpec(null, null, true),
null
);
-
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS).isValid());
}
@Test
@@ -227,13 +275,16 @@ public class MSQCompactionRunnerTest
new DynamicPartitionsSpec(3, null),
null,
Collections.emptyMap(),
- new ClientCompactionTaskGranularitySpec(null, null, null),
+ new ClientCompactionTaskGranularitySpec(null, null, true),
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName,
inputColName)}
);
- CompactionConfigValidationResult validationResult =
MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask);
+ CompactionConfigValidationResult validationResult =
MSQ_COMPACTION_RUNNER.validateCompactionTask(
+ compactionTask,
+ INTERVAL_DATASCHEMAS
+ );
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
- "MSQ: Non-idempotent aggregator[sum_added] not supported in
'metricsSpec'.",
+ "MSQ: Aggregator[sum_added] not supported in 'metricsSpec'",
validationResult.getReason()
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
index 8d30a60d04e..0abaeed8eb2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
@@ -57,6 +57,9 @@ public interface CompactionRunner
* Checks if the provided compaction config is supported by the runner.
* The same validation is done at {@link
org.apache.druid.msq.indexing.MSQCompactionRunner#validateCompactionTask}
*/
- CompactionConfigValidationResult validateCompactionTask(CompactionTask
compactionTask);
+ CompactionConfigValidationResult validateCompactionTask(
+ CompactionTask compactionTask,
+ Map<Interval, DataSchema> intervalToDataSchemaMap
+ );
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 73c8a35405c..fef1ba6635f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -469,7 +469,10 @@ public class CompactionTask extends AbstractBatchIndexTask
implements PendingSeg
);
registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
- CompactionConfigValidationResult supportsCompactionConfig =
compactionRunner.validateCompactionTask(this);
+ CompactionConfigValidationResult supportsCompactionConfig =
compactionRunner.validateCompactionTask(
+ this,
+ intervalDataSchemas
+ );
if (!supportsCompactionConfig.isValid()) {
throw InvalidInput.exception("Compaction spec not supported.
Reason[%s].", supportsCompactionConfig.getReason());
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
index 2074d14f0f9..541f24fe088 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
@@ -85,7 +85,8 @@ public class NativeCompactionRunner implements
CompactionRunner
@Override
public CompactionConfigValidationResult validateCompactionTask(
- CompactionTask compactionTask
+ CompactionTask compactionTask,
+ Map<Interval, DataSchema> intervalDataSchemaMap
)
{
return CompactionConfigValidationResult.success();
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
index 806b35e9481..f6a009afe1c 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
@@ -21,12 +21,14 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -36,6 +38,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/**
@@ -102,16 +107,20 @@ public class ClientCompactionRunnerInfo
* Checks if the provided compaction config is supported by MSQ. The
following configs aren't supported:
* <ul>
* <li>partitionsSpec of type HashedParititionsSpec.</li>
+ * <li>'range' partitionsSpec with non-string partition dimensions.</li>
* <li>maxTotalRows in DynamicPartitionsSpec.</li>
- * <li>rollup in granularitySpec set to false when metricsSpec is specified
or true when it's empty.</li>
- * <li>any metric is non-idempotent, i.e. it defines some aggregatorFactory
'A' s.t. 'A != A.combiningFactory()'.</li>
+ * <li>Rollup without metricsSpec being specified or vice-versa.</li>
+ * <li>Any aggregatorFactory {@code A} s.t. {@code A !=
A.combiningFactory()}.</li>
* </ul>
*/
private static CompactionConfigValidationResult
compactionConfigSupportedByMSQEngine(DataSourceCompactionConfig newConfig)
{
List<CompactionConfigValidationResult> validationResults = new
ArrayList<>();
if (newConfig.getTuningConfig() != null) {
-
validationResults.add(validatePartitionsSpecForMSQ(newConfig.getTuningConfig().getPartitionsSpec()));
+ validationResults.add(validatePartitionsSpecForMSQ(
+ newConfig.getTuningConfig().getPartitionsSpec(),
+ newConfig.getDimensionsSpec() == null ? null :
newConfig.getDimensionsSpec().getDimensions()
+ ));
}
if (newConfig.getGranularitySpec() != null) {
validationResults.add(validateRollupForMSQ(
@@ -128,9 +137,13 @@ public class ClientCompactionRunnerInfo
}
/**
- * Validate that partitionSpec is either 'dynamic` or 'range', and if
'dynamic', ensure 'maxTotalRows' is null.
+ * Validate that partitionSpec is either 'dynamic` or 'range'. If 'dynamic',
ensure 'maxTotalRows' is null. If range
+ * ensure all partition columns are of string type.
*/
- public static CompactionConfigValidationResult
validatePartitionsSpecForMSQ(PartitionsSpec partitionsSpec)
+ public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(
+ @Nullable PartitionsSpec partitionsSpec,
+ @Nullable List<DimensionSchema> dimensionSchemas
+ )
{
if (!(partitionsSpec instanceof DimensionRangePartitionsSpec
|| partitionsSpec instanceof DynamicPartitionsSpec)) {
@@ -146,11 +159,28 @@ public class ClientCompactionRunnerInfo
"MSQ: 'maxTotalRows' not supported with 'dynamic' partitioning"
);
}
+ if (partitionsSpec instanceof DimensionRangePartitionsSpec &&
dimensionSchemas != null) {
+ Map<String, DimensionSchema> dimensionSchemaMap =
dimensionSchemas.stream().collect(
+ Collectors.toMap(DimensionSchema::getName, Function.identity())
+ );
+ Optional<String> nonStringDimension = ((DimensionRangePartitionsSpec)
partitionsSpec)
+ .getPartitionDimensions()
+ .stream()
+ .filter(dim ->
!ColumnType.STRING.equals(dimensionSchemaMap.get(dim).getColumnType()))
+ .findAny();
+ if (nonStringDimension.isPresent()) {
+ return CompactionConfigValidationResult.failure(
+ "MSQ: Non-string partition dimension[%s] of type[%s] not supported
with 'range' partition spec",
+ nonStringDimension.get(),
+ dimensionSchemaMap.get(nonStringDimension.get()).getTypeName()
+ );
+ }
+ }
return CompactionConfigValidationResult.success();
}
/**
- * Validate rollup in granularitySpec is set to true when metricsSpec is
specified and false if it's null.
+ * Validate rollup in granularitySpec is set to true iff metricsSpec is
specified.
* If rollup set to null, all existing segments are analyzed, and it's set
to true iff all segments have rollup
* set to true.
*/
@@ -159,13 +189,9 @@ public class ClientCompactionRunnerInfo
@Nullable Boolean isRollup
)
{
- if (metricsSpec != null && metricsSpec.length != 0 && isRollup != null &&
!isRollup) {
- return CompactionConfigValidationResult.failure(
- "MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is
specified"
- );
- } else if ((metricsSpec == null || metricsSpec.length == 0) && isRollup !=
null && isRollup) {
+ if ((metricsSpec != null && metricsSpec.length > 0) !=
Boolean.TRUE.equals(isRollup)) {
return CompactionConfigValidationResult.failure(
- "MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is
null"
+ "MSQ: 'granularitySpec.rollup' must be true if and only if
'metricsSpec' is specified"
);
}
return CompactionConfigValidationResult.success();
@@ -190,7 +216,7 @@ public class ClientCompactionRunnerInfo
}
/**
- * Validate each metric is idempotent, i.e. it defines some
aggregatorFactory 'A' s.t. 'A = A.combiningFactory()'.
+ * Validate each metric defines some aggregatorFactory 'A' s.t. 'A =
A.combiningFactory()'.
*/
public static CompactionConfigValidationResult
validateMetricsSpecForMSQ(AggregatorFactory[] metricsSpec)
{
@@ -202,7 +228,7 @@ public class ClientCompactionRunnerInfo
.findFirst()
.map(aggregatorFactory ->
CompactionConfigValidationResult.failure(
- "MSQ: Non-idempotent aggregator[%s] not
supported in 'metricsSpec'.",
+ "MSQ: Aggregator[%s] not supported in
'metricsSpec'",
aggregatorFactory.getName()
)
).orElse(CompactionConfigValidationResult.success());
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
index ffbdd44bce6..77621a9d13c 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
@@ -25,7 +25,7 @@ import
org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.common.config.Configs;
-import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@@ -44,6 +44,7 @@ import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* Represents the status of compaction for a given {@link CompactionCandidate}.
@@ -230,6 +231,21 @@ public class CompactionStatus
}
}
+ private static List<DimensionSchema> getNonPartitioningDimensions(
+ @Nullable final List<DimensionSchema> dimensionSchemas,
+ @Nullable final PartitionsSpec partitionsSpec
+ )
+ {
+ if (dimensionSchemas == null || !(partitionsSpec instanceof
DimensionRangePartitionsSpec)) {
+ return dimensionSchemas;
+ }
+
+ final List<String> partitionsDimensions = ((DimensionRangePartitionsSpec)
partitionsSpec).getPartitionDimensions();
+ return dimensionSchemas.stream()
+ .filter(dim ->
!partitionsDimensions.contains(dim.getName()))
+ .collect(Collectors.toList());
+ }
+
/**
* Converts to have only the effective maxRowsPerSegment to avoid false
positives when targetRowsPerSegment is set but
* effectively translates to the same maxRowsPerSegment.
@@ -389,18 +405,34 @@ public class CompactionStatus
}
}
+ /**
+ * Removes partition dimensions before comparison, since they are placed
in front of the sort order --
+ * which can create a mismatch between expected and actual order of
dimensions. Partition dimensions are separately
+ * covered in {@link Evaluator#partitionsSpecIsUpToDate()} check.
+ */
private CompactionStatus dimensionsSpecIsUpToDate()
{
if (compactionConfig.getDimensionsSpec() == null) {
return COMPLETE;
} else {
- final DimensionsSpec existingDimensionsSpec =
lastCompactionState.getDimensionsSpec();
- return CompactionStatus.completeIfEqual(
- "dimensionsSpec",
+ List<DimensionSchema> existingDimensions =
getNonPartitioningDimensions(
+ lastCompactionState.getDimensionsSpec() == null
+ ? null
+ : lastCompactionState.getDimensionsSpec().getDimensions(),
+ lastCompactionState.getPartitionsSpec()
+ );
+ List<DimensionSchema> configuredDimensions =
getNonPartitioningDimensions(
compactionConfig.getDimensionsSpec().getDimensions(),
- existingDimensionsSpec == null ? null :
existingDimensionsSpec.getDimensions(),
- String::valueOf
+ compactionConfig.getTuningConfig() == null ? null :
compactionConfig.getTuningConfig().getPartitionsSpec()
);
+ {
+ return CompactionStatus.completeIfEqual(
+ "dimensionsSpec",
+ configuredDimensions,
+ existingDimensions,
+ String::valueOf
+ );
+ }
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index b347a57dcb6..035286692bf 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -86,6 +86,7 @@ public class CompactSegments implements CoordinatorCustomDuty
* Must be the same as
org.apache.druid.indexing.common.task.Tasks.STORE_COMPACTION_STATE_KEY
*/
public static final String STORE_COMPACTION_STATE_KEY =
"storeCompactionState";
+ private static final String COMPACTION_REASON_KEY = "compactionReason";
private static final Logger LOG = new Logger(CompactSegments.class);
@@ -567,6 +568,10 @@ public class CompactSegments implements
CoordinatorCustomDuty
slotsRequiredForCurrentTask =
findMaxNumTaskSlotsUsedByOneNativeCompactionTask(config.getTuningConfig());
}
+ if (entry.getCurrentStatus() != null) {
+ autoCompactionContext.put(COMPACTION_REASON_KEY,
entry.getCurrentStatus().getReason());
+ }
+
final String taskId = compactSegments(
entry,
config.getTaskPriority(),
diff --git
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
index 011a4640da3..b1f06542280 100644
---
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
+++
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
@@ -21,6 +21,8 @@ package org.apache.druid.client.indexing;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
@@ -36,6 +38,7 @@ import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.joda.time.Duration;
@@ -45,6 +48,7 @@ import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
public class ClientCompactionRunnerInfoTest
@@ -56,6 +60,7 @@ public class ClientCompactionRunnerInfoTest
new HashedPartitionsSpec(100, null, null),
Collections.emptyMap(),
null,
+ null,
null
);
CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
@@ -76,6 +81,7 @@ public class ClientCompactionRunnerInfoTest
new DynamicPartitionsSpec(100, 100L),
Collections.emptyMap(),
null,
+ null,
null
);
CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
@@ -96,6 +102,7 @@ public class ClientCompactionRunnerInfoTest
new DynamicPartitionsSpec(100, null),
Collections.emptyMap(),
null,
+ null,
null
);
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
@@ -103,18 +110,40 @@ public class ClientCompactionRunnerInfoTest
}
@Test
- public void testMSQEngineWithDimensionRangePartitionsSpecIsValid()
+ public void testMSQEngineWithStringDimensionsInRangePartitionsSpecIsValid()
{
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DimensionRangePartitionsSpec(100, null,
ImmutableList.of("partitionDim"), false),
Collections.emptyMap(),
null,
+ null,
null
);
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
.isValid());
}
+ @Test
+ public void testMSQEngineWithLongDimensionsInRangePartitionsSpecIsValid()
+ {
+ DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
+ new DimensionRangePartitionsSpec(100, null,
ImmutableList.of("partitionDim"), false),
+ Collections.emptyMap(),
+ null,
+ null,
+ ImmutableList.of(new LongDimensionSchema("partitionDim"))
+ );
+ CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ "MSQ: Non-string partition dimension[partitionDim] of type[long] not
supported with 'range' partition spec",
+ validationResult.getReason()
+ );
+ }
+
@Test
public void testMSQEngineWithQueryGranularityAllIsValid()
{
@@ -122,6 +151,7 @@ public class ClientCompactionRunnerInfoTest
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(Granularities.ALL,
Granularities.ALL, false),
+ null,
null
);
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
@@ -135,7 +165,8 @@ public class ClientCompactionRunnerInfoTest
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, false),
- new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")}
+ new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")},
+ null
);
CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
compactionConfig,
@@ -143,7 +174,7 @@ public class ClientCompactionRunnerInfoTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
- "MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is
specified",
+ "MSQ: 'granularitySpec.rollup' must be true if and only if
'metricsSpec' is specified",
validationResult.getReason()
);
}
@@ -155,6 +186,7 @@ public class ClientCompactionRunnerInfoTest
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, true),
+ null,
null
);
CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
@@ -163,7 +195,7 @@ public class ClientCompactionRunnerInfoTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
- "MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is null",
+ "MSQ: 'granularitySpec.rollup' must be true if and only if
'metricsSpec' is specified",
validationResult.getReason()
);
}
@@ -177,8 +209,9 @@ public class ClientCompactionRunnerInfoTest
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
- new UserCompactionTaskGranularityConfig(null, null, null),
- new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName,
inputColName)}
+ new UserCompactionTaskGranularityConfig(null, null, true),
+ new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName,
inputColName)},
+ null
);
CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
compactionConfig,
@@ -186,29 +219,38 @@ public class ClientCompactionRunnerInfoTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
- "MSQ: Non-idempotent aggregator[sum_added] not supported in
'metricsSpec'.",
+ "MSQ: Aggregator[sum_added] not supported in 'metricsSpec'",
validationResult.getReason()
);
}
@Test
- public void testMSQEngineWithRollupNullWithMetricsSpecIsValid()
+ public void testMSQEngineWithRollupNullWithMetricsSpecIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, null),
- new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")}
+ new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")},
+ null
+ );
+ CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ "MSQ: 'granularitySpec.rollup' must be true if and only if
'metricsSpec' is specified",
+ validationResult.getReason()
);
-
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
- .isValid());
}
private static DataSourceCompactionConfig createMSQCompactionConfig(
PartitionsSpec partitionsSpec,
Map<String, Object> context,
@Nullable UserCompactionTaskGranularityConfig granularitySpec,
- @Nullable AggregatorFactory[] metricsSpec
+ @Nullable AggregatorFactory[] metricsSpec,
+ List<DimensionSchema> dimensions
)
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
@@ -219,7 +261,7 @@ public class ClientCompactionRunnerInfoTest
new Period(3600),
createTuningConfig(partitionsSpec),
granularitySpec,
- null,
+ new UserCompactionTaskDimensionsConfig(dimensions),
metricsSpec,
null,
null,
diff --git
a/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java
b/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java
index 7580582685b..5659a0ff5bf 100644
---
a/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java
+++
b/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@@ -1137,6 +1138,82 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext());
}
+ @Test
+ public void
testIteratorDoesNotReturnsSegmentsWhenPartitionDimensionsPrefixed()
+ {
+ // Same indexSpec as what is set in the auto compaction config
+ Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
+ // Set range partitions spec with dimensions ["dim2", "dim4"] -- the same
as what is set in the auto compaction config
+ PartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec(
+ null,
+ Integer.MAX_VALUE,
+ ImmutableList.of("dim2", "dim4"),
+ false
+ );
+
+ // Create segments that were compacted (CompactionState != null) and have
+ // Dimensions=["dim2", "dim4", "dim3", "dim1"] with ["dim2", "dim4"] as
partition dimensions for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
+ // Dimensions=["dim2", "dim4", "dim1", "dim3"] with ["dim2", "dim4"] as
partition dimensions for interval 2017-10-02T00:00:00/2017-10-03T00:00:00,
+ final SegmentTimeline timeline = createTimeline(
+ createSegments()
+ .startingAt("2017-10-01")
+ .withNumPartitions(4)
+ .withCompactionState(
+ new CompactionState(partitionsSpec, new
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim2",
"dim4", "dim3", "dim1"))), null, null, indexSpec, null)
+ ),
+ createSegments()
+ .startingAt("2017-10-02")
+ .withNumPartitions(4)
+ .withCompactionState(
+ new CompactionState(partitionsSpec, new
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim2",
"dim4", "dim1", "dim3"))), null, null, indexSpec, null)
+ )
+ );
+
+ // Auto compaction config sets Dimensions=["dim1", "dim2", "dim3", "dim4"]
and partition dimensions as ["dim2", "dim4"]
+ CompactionSegmentIterator iterator = createIterator(
+ configBuilder().withDimensionsSpec(
+ new
UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1",
"dim2", "dim3", "dim4")))
+ )
+ .withTuningConfig(
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ 1000L,
+ null,
+ partitionsSpec,
+ IndexSpec.DEFAULT,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ )
+ .build(),
+ timeline
+ );
+ // We should get only interval 2017-10-01T00:00:00/2017-10-02T00:00:00
since 2017-10-02T00:00:00/2017-10-03T00:00:00
+ // has dimension order as expected post reordering of partition dimensions.
+ Assert.assertTrue(iterator.hasNext());
+ List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
Partitions.ONLY_COMPLETE)
+ );
+ Assert.assertEquals(
+ ImmutableSet.copyOf(expectedSegmentsToCompact),
+ ImmutableSet.copyOf(iterator.next().getSegments())
+ );
+ // No more
+ Assert.assertFalse(iterator.hasNext());
+ }
+
@Test
public void
testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentFilter()
throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]