This is an automated email from the ASF dual-hosted git repository. maytasm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new 6541178 Support segmentGranularity for auto-compaction (#10843) 6541178 is described below commit 6541178c21839530a42af4b4675a9bc680bffca6 Author: Maytas Monsereenusorn <mayt...@apache.org> AuthorDate: Fri Feb 12 03:03:20 2021 -0800 Support segmentGranularity for auto-compaction (#10843) * Support segmentGranularity for auto-compaction * Support segmentGranularity for auto-compaction * Support segmentGranularity for auto-compaction * Support segmentGranularity for auto-compaction * resolve conflict * Support segmentGranularity for auto-compaction * Support segmentGranularity for auto-compaction * fix tests * fix more tests * fix checkstyle * add unit tests * fix checkstyle * fix checkstyle * fix checkstyle * add unit tests * add integration tests * fix checkstyle * fix checkstyle * fix failing tests * address comments * address comments * fix tests * fix tests * fix test * fix test * fix test * fix test * fix test * fix test * fix test * fix test --- .../NewestSegmentFirstPolicyBenchmark.java | 1 + .../org/apache/druid/timeline/CompactionState.java | 19 +- .../druid/timeline/partition/PartitionChunk.java | 2 +- .../org/apache/druid/timeline/DataSegmentTest.java | 4 +- .../common/task/AbstractBatchIndexTask.java | 6 +- .../druid/indexing/common/task/CompactionTask.java | 35 ++- .../druid/indexing/common/task/IndexTask.java | 3 +- .../parallel/ParallelIndexSupervisorTask.java | 3 +- .../task/ClientCompactionTaskQuerySerdeTest.java | 18 ++ .../common/task/CompactionTaskParallelRunTest.java | 84 +++++-- .../common/task/CompactionTaskRunTest.java | 81 +++++-- .../indexing/common/task/CompactionTaskTest.java | 41 ++++ .../coordinator/duty/ITAutoCompactionTest.java | 80 ++++++- .../druid/tests/indexer/ITCompactionTaskTest.java | 75 +++++-- ...edia_compaction_task_with_granularity_spec.json | 17 ++ ...a_compaction_task_with_segment_granularity.json | 15 ++ .../client/indexing/ClientCompactionTaskQuery.java | 14 +- .../ClientCompactionTaskQueryGranularitySpec.java | 95 ++++++++ .../client/indexing/HttpIndexingServiceClient.java | 4 +- .../client/indexing/IndexingServiceClient.java | 1 + .../granularity/ArbitraryGranularitySpec.java | 1 - .../indexing/granularity/BaseGranularitySpec.java | 22 +- .../indexing/granularity/GranularitySpec.java | 4 + .../granularity/UniformGranularitySpec.java | 5 - .../coordinator/DataSourceCompactionConfig.java | 34 +++ .../server/coordinator/duty/CompactSegments.java | 33 +++ .../duty/NewestSegmentFirstIterator.java | 133 +++++++++++- .../client/indexing/NoopIndexingServiceClient.java | 1 + .../granularity/ArbitraryGranularityTest.java | 27 +++ .../granularity/UniformGranularityTest.java | 33 ++- .../DataSourceCompactionConfigTest.java | 98 +++++++++ .../coordinator/duty/CompactSegmentsTest.java | 241 ++++++++++++++++++++- .../duty/NewestSegmentFirstIteratorTest.java | 10 + .../duty/NewestSegmentFirstPolicyTest.java | 216 ++++++++++++++++-- .../druid/sql/calcite/schema/SystemSchemaTest.java | 3 +- 35 files changed, 1336 insertions(+), 123 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java index 91c1409..e744bf9 100644 --- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java @@ -99,6 +99,7 @@ public class NewestSegmentFirstPolicyBenchmark null, null, null, + null, null ) ); diff --git a/core/src/main/java/org/apache/druid/timeline/CompactionState.java b/core/src/main/java/org/apache/druid/timeline/CompactionState.java index c30f427..8588717 100644 --- a/core/src/main/java/org/apache/druid/timeline/CompactionState.java +++ b/core/src/main/java/org/apache/druid/timeline/CompactionState.java @@ -43,15 +43,20 @@ public class CompactionState // org.apache.druid.segment.IndexSpec cannot be used here because it's in the 'processing' module which // has a dependency on the 'core' module where this class is. private final Map<String, Object> indexSpec; + // org.apache.druid.segment.indexing.granularity.GranularitySpec cannot be used here because it's in the + // 'server' module which has a dependency on the 'core' module where this class is. + private final Map<String, Object> granularitySpec; @JsonCreator public CompactionState( @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, - @JsonProperty("indexSpec") Map<String, Object> indexSpec + @JsonProperty("indexSpec") Map<String, Object> indexSpec, + @JsonProperty("granularitySpec") Map<String, Object> granularitySpec ) { this.partitionsSpec = partitionsSpec; this.indexSpec = indexSpec; + this.granularitySpec = granularitySpec; } @JsonProperty @@ -66,6 +71,12 @@ public class CompactionState return indexSpec; } + @JsonProperty + public Map<String, Object> getGranularitySpec() + { + return granularitySpec; + } + @Override public boolean equals(Object o) { @@ -77,13 +88,14 @@ public class CompactionState } CompactionState that = (CompactionState) o; return Objects.equals(partitionsSpec, that.partitionsSpec) && - Objects.equals(indexSpec, that.indexSpec); + Objects.equals(indexSpec, that.indexSpec) && + Objects.equals(granularitySpec, that.granularitySpec); } @Override public int hashCode() { - return Objects.hash(partitionsSpec, indexSpec); + return Objects.hash(partitionsSpec, indexSpec, granularitySpec); } @Override @@ -92,6 +104,7 @@ public class CompactionState return "CompactionState{" + "partitionsSpec=" + partitionsSpec + ", indexSpec=" + indexSpec + + ", granularitySpec=" + granularitySpec + '}'; } } diff --git a/core/src/main/java/org/apache/druid/timeline/partition/PartitionChunk.java b/core/src/main/java/org/apache/druid/timeline/partition/PartitionChunk.java index 10b43b8..54aaf03 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/PartitionChunk.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/PartitionChunk.java @@ -58,7 +58,7 @@ public interface PartitionChunk<T> extends Comparable<PartitionChunk<T>> * Returns true if this chunk is the end of the partition. Most commonly, that means it represents the range * [X, infinity] for some concrete X. * - * @return true if the chunk is the beginning of the partition + * @return true if the chunk is the end of the partition */ boolean isEnd(); diff --git a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java index 5483642..66c3400 100644 --- a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java +++ b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java @@ -120,6 +120,7 @@ public class DataSegmentTest new NumberedShardSpec(3, 0), new CompactionState( new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")), + ImmutableMap.of(), ImmutableMap.of() ), TEST_VERSION, @@ -231,7 +232,8 @@ public class DataSegmentTest { final CompactionState compactionState = new CompactionState( new DynamicPartitionsSpec(null, null), - Collections.singletonMap("test", "map") + Collections.singletonMap("test", "map"), + Collections.singletonMap("test2", "map2") ); final DataSegment segment1 = DataSegment.builder() .dataSource("foo") diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 4a7fa0d..6f7cdf6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -473,12 +473,14 @@ public abstract class AbstractBatchIndexTask extends AbstractTask public static Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction( boolean storeCompactionState, TaskToolbox toolbox, - IndexTuningConfig tuningConfig + IndexTuningConfig tuningConfig, + GranularitySpec granularitySpec ) { if (storeCompactionState) { final Map<String, Object> indexSpecMap = tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper()); - final CompactionState compactionState = new CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap); + final Map<String, Object> granularitySpecMap = granularitySpec.asMap(toolbox.getJsonMapper()); + final CompactionState compactionState = new CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap, granularitySpecMap); return segments -> segments .stream() .map(s -> s.withLastCompactionState(compactionState)) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 530a400..5d971c3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -141,6 +141,8 @@ public class CompactionTask extends AbstractBatchIndexTask @Nullable private final Granularity segmentGranularity; @Nullable + private final GranularitySpec granularitySpec; + @Nullable private final ParallelIndexTuningConfig tuningConfig; @JsonIgnore private final SegmentProvider segmentProvider; @@ -172,7 +174,8 @@ public class CompactionTask extends AbstractBatchIndexTask @JsonProperty("dimensions") @Nullable final DimensionsSpec dimensions, @JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec, @JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec, - @JsonProperty("segmentGranularity") @Nullable final Granularity segmentGranularity, + @JsonProperty("segmentGranularity") @Deprecated @Nullable final Granularity segmentGranularity, + @JsonProperty("granularitySpec") @Nullable final GranularitySpec granularitySpec, @JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig, @JsonProperty("context") @Nullable final Map<String, Object> context, @JacksonInject SegmentLoaderFactory segmentLoaderFactory, @@ -202,6 +205,16 @@ public class CompactionTask extends AbstractBatchIndexTask this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec; this.metricsSpec = metricsSpec; this.segmentGranularity = segmentGranularity; + if (granularitySpec == null && segmentGranularity != null) { + this.granularitySpec = new UniformGranularitySpec( + segmentGranularity, + null, + null, + null + ); + } else { + this.granularitySpec = granularitySpec; + } this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) : null; this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec()); this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig); @@ -288,7 +301,14 @@ public class CompactionTask extends AbstractBatchIndexTask @Override public Granularity getSegmentGranularity() { - return segmentGranularity; + return granularitySpec == null ? null : granularitySpec.getSegmentGranularity(); + } + + @JsonProperty + @Nullable + public GranularitySpec getGranularitySpec() + { + return granularitySpec; } @Nullable @@ -348,7 +368,7 @@ public class CompactionTask extends AbstractBatchIndexTask partitionConfigurationManager, dimensionsSpec, metricsSpec, - segmentGranularity, + getSegmentGranularity(), toolbox.getCoordinatorClient(), segmentLoaderFactory, retryPolicyFactory @@ -892,6 +912,8 @@ public class CompactionTask extends AbstractBatchIndexTask @Nullable private Granularity segmentGranularity; @Nullable + private GranularitySpec granularitySpec; + @Nullable private TuningConfig tuningConfig; @Nullable private Map<String, Object> context; @@ -941,6 +963,12 @@ public class CompactionTask extends AbstractBatchIndexTask return this; } + public Builder granularitySpec(GranularitySpec granularitySpec) + { + this.granularitySpec = granularitySpec; + return this; + } + public Builder tuningConfig(TuningConfig tuningConfig) { this.tuningConfig = tuningConfig; @@ -966,6 +994,7 @@ public class CompactionTask extends AbstractBatchIndexTask dimensionsSpec, metricsSpec, segmentGranularity, + granularitySpec, tuningConfig, context, segmentLoaderFactory, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 88779ab..517c08a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -904,7 +904,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler compactionStateAnnotateFunction( storeCompactionState, toolbox, - ingestionSchema.getTuningConfig() + ingestionSchema.getTuningConfig(), + ingestionSchema.getDataSchema().getGranularitySpec() ); // Probably we can publish atomicUpdateGroup along with segments. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index bd64fbd..af4352b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -929,7 +929,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction = compactionStateAnnotateFunction( storeCompactionState, toolbox, - ingestionSchema.getTuningConfig() + ingestionSchema.getTuningConfig(), + ingestionSchema.getDataSchema().getGranularitySpec() ); final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index ef3cf68..1e9628f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -28,6 +28,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.indexing.ClientCompactionIOConfig; import org.apache.druid.client.indexing.ClientCompactionIntervalSpec; import org.apache.druid.client.indexing.ClientCompactionTaskQuery; +import org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.client.indexing.IndexingServiceClient; @@ -45,11 +46,13 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningC import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory; import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy; import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; @@ -113,6 +116,7 @@ public class ClientCompactionTaskQuerySerdeTest 1000, 100 ), + new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, Granularities.HOUR, true), ImmutableMap.of("key", "value") ); @@ -186,6 +190,18 @@ public class ClientCompactionTaskQuerySerdeTest query.getTuningConfig().getTotalNumMergeTasks().intValue(), task.getTuningConfig().getTotalNumMergeTasks() ); + Assert.assertEquals( + query.getGranularitySpec().getQueryGranularity(), + task.getGranularitySpec().getQueryGranularity() + ); + Assert.assertEquals( + query.getGranularitySpec().getSegmentGranularity(), + task.getGranularitySpec().getSegmentGranularity() + ); + Assert.assertEquals( + query.getGranularitySpec().isRollup(), + task.getGranularitySpec().isRollup() + ); Assert.assertEquals(query.getContext(), task.getContext()); } @@ -243,6 +259,7 @@ public class ClientCompactionTaskQuerySerdeTest null ) ) + .granularitySpec(new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, null)) .build(); final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery( @@ -284,6 +301,7 @@ public class ClientCompactionTaskQuerySerdeTest 1000, 100 ), + new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, Granularities.HOUR, true), new HashMap<>() ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 7b7005b..7fe8781 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -129,7 +129,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis } @Test - public void testRunParallelWithDynamicPartitioningMatchCompactionState() + public void testRunParallelWithDynamicPartitioningMatchCompactionState() throws Exception { runIndexTask(null, true); @@ -144,22 +144,33 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis .build(); final Set<DataSegment> compactedSegments = runTask(compactionTask); - final CompactionState expectedState = new CompactionState( - new DynamicPartitionsSpec(null, Long.MAX_VALUE), - compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) - ); for (DataSegment segment : compactedSegments) { Assert.assertSame( lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class, segment.getShardSpec().getClass() ); // Expect compaction state to exist as store compaction state by default + CompactionState expectedState = new CompactionState( + new DynamicPartitionsSpec(null, Long.MAX_VALUE), + compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), + getObjectMapper().readValue( + getObjectMapper().writeValueAsString( + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.NONE, + true, + ImmutableList.of(segment.getInterval()) + ) + ), + Map.class + ) + ); Assert.assertEquals(expectedState, segment.getLastCompactionState()); } } @Test - public void testRunParallelWithHashPartitioningMatchCompactionState() + public void testRunParallelWithHashPartitioningMatchCompactionState() throws Exception { // Hash partitioning is not supported with segment lock yet Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT); @@ -176,19 +187,30 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis .build(); final Set<DataSegment> compactedSegments = runTask(compactionTask); - final CompactionState expectedState = new CompactionState( - new HashedPartitionsSpec(null, 3, null), - compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) - ); for (DataSegment segment : compactedSegments) { // Expect compaction state to exist as store compaction state by default Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass()); + CompactionState expectedState = new CompactionState( + new HashedPartitionsSpec(null, 3, null), + compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), + getObjectMapper().readValue( + getObjectMapper().writeValueAsString( + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.NONE, + true, + ImmutableList.of(segment.getInterval()) + ) + ), + Map.class + ) + ); Assert.assertEquals(expectedState, segment.getLastCompactionState()); } } @Test - public void testRunParallelWithRangePartitioning() + public void testRunParallelWithRangePartitioning() throws Exception { // Range partitioning is not supported with segment lock yet Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT); @@ -205,19 +227,30 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis .build(); final Set<DataSegment> compactedSegments = runTask(compactionTask); - final CompactionState expectedState = new CompactionState( - new SingleDimensionPartitionsSpec(7, null, "dim", false), - compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) - ); for (DataSegment segment : compactedSegments) { // Expect compaction state to exist as store compaction state by default Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass()); + CompactionState expectedState = new CompactionState( + new SingleDimensionPartitionsSpec(7, null, "dim", false), + compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), + getObjectMapper().readValue( + getObjectMapper().writeValueAsString( + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.NONE, + true, + ImmutableList.of(segment.getInterval()) + ) + ), + Map.class + ) + ); Assert.assertEquals(expectedState, segment.getLastCompactionState()); } } @Test - public void testRunParallelWithRangePartitioningWithSingleTask() + public void testRunParallelWithRangePartitioningWithSingleTask() throws Exception { // Range partitioning is not supported with segment lock yet Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT); @@ -234,13 +267,24 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis .build(); final Set<DataSegment> compactedSegments = runTask(compactionTask); - final CompactionState expectedState = new CompactionState( - new SingleDimensionPartitionsSpec(7, null, "dim", false), - compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) - ); for (DataSegment segment : compactedSegments) { // Expect compaction state to exist as store compaction state by default Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass()); + CompactionState expectedState = new CompactionState( + new SingleDimensionPartitionsSpec(7, null, "dim", false), + compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), + getObjectMapper().readValue( + getObjectMapper().writeValueAsString( + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.NONE, + true, + ImmutableList.of(segment.getInterval()) + ) + ), + Map.class + ) + ); Assert.assertEquals(expectedState, segment.getLastCompactionState()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index c3f30ec..bd1f819 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -54,6 +54,7 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; @@ -88,7 +89,6 @@ import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -131,9 +131,6 @@ public class CompactionTaskRunTest extends IngestionTestBase 0 ); - // Expecte compaction state to exist after compaction as we store compaction state by default - private static CompactionState DEFAULT_COMPACTION_STATE; - private static final List<String> TEST_ROWS = ImmutableList.of( "2014-01-01T00:00:10Z,a,1\n", "2014-01-01T00:00:10Z,b,2\n", @@ -186,14 +183,26 @@ public class CompactionTaskRunTest extends IngestionTestBase this.lockGranularity = lockGranularity; } - @BeforeClass - public static void setupClass() throws JsonProcessingException + public static CompactionState getDefaultCompactionState(Granularity segmentGranularity, + Granularity queryGranularity, + List<Interval> intervals) throws JsonProcessingException { ObjectMapper mapper = new DefaultObjectMapper(); - - DEFAULT_COMPACTION_STATE = new CompactionState( + // Expected compaction state to exist after compaction as we store compaction state by default + return new CompactionState( new DynamicPartitionsSpec(5000000, Long.MAX_VALUE), - mapper.readValue(mapper.writeValueAsString(new IndexSpec()), Map.class) + mapper.readValue(mapper.writeValueAsString(new IndexSpec()), Map.class), + mapper.readValue( + mapper.writeValueAsString( + new UniformGranularitySpec( + segmentGranularity, + queryGranularity, + true, + intervals + ) + ), + Map.class + ) ); } @@ -238,7 +247,10 @@ public class CompactionTaskRunTest extends IngestionTestBase Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval() ); - Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState()); + Assert.assertEquals( + getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))), + segments.get(i).getLastCompactionState() + ); if (lockGranularity == LockGranularity.SEGMENT) { Assert.assertEquals( new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1), @@ -311,10 +323,6 @@ public class CompactionTaskRunTest extends IngestionTestBase final List<DataSegment> segments = resultPair.rhs; Assert.assertEquals(6, segments.size()); - final CompactionState expectedState = new CompactionState( - new HashedPartitionsSpec(null, 3, null), - compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) - ); for (int i = 0; i < 3; i++) { final Interval interval = Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1); @@ -324,6 +332,21 @@ public class CompactionTaskRunTest extends IngestionTestBase interval, segments.get(segmentIdx).getInterval() ); + CompactionState expectedState = new CompactionState( + new HashedPartitionsSpec(null, 3, null), + compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), + getObjectMapper().readValue( + getObjectMapper().writeValueAsString( + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.NONE, + true, + ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1)) + ) + ), + Map.class + ) + ); Assert.assertEquals(expectedState, segments.get(segmentIdx).getLastCompactionState()); Assert.assertSame(HashBasedNumberedShardSpec.class, segments.get(segmentIdx).getShardSpec().getClass()); } @@ -361,7 +384,10 @@ public class CompactionTaskRunTest extends IngestionTestBase Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval() ); - Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState()); + Assert.assertEquals( + getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))), + segments.get(i).getLastCompactionState() + ); if (lockGranularity == LockGranularity.SEGMENT) { Assert.assertEquals( new NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2, (short) 1, (short) 1), @@ -388,7 +414,10 @@ public class CompactionTaskRunTest extends IngestionTestBase Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval() ); - Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState()); + Assert.assertEquals( + getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))), + segments.get(i).getLastCompactionState() + ); if (lockGranularity == LockGranularity.SEGMENT) { Assert.assertEquals( new NumberedOverwriteShardSpec( @@ -487,7 +516,10 @@ public class CompactionTaskRunTest extends IngestionTestBase Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval() ); - Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState()); + Assert.assertEquals( + getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))), + segments.get(i).getLastCompactionState() + ); if (lockGranularity == LockGranularity.SEGMENT) { Assert.assertEquals( new NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2, (short) 1, (short) 1), @@ -526,7 +558,10 @@ public class CompactionTaskRunTest extends IngestionTestBase Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), segments.get(0).getInterval()); Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec()); - Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(0).getLastCompactionState()); + Assert.assertEquals( + getDefaultCompactionState(Granularities.DAY, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01T00:00:00/2014-01-01T03:00:00"))), + segments.get(0).getLastCompactionState() + ); // hour segmentGranularity final CompactionTask compactionTask2 = builder @@ -544,7 +579,10 @@ public class CompactionTaskRunTest extends IngestionTestBase for (int i = 0; i < 3; i++) { Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval()); Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec()); - Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState()); + Assert.assertEquals( + getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01/2014-01-02"))), + segments.get(i).getLastCompactionState() + ); } } @@ -781,7 +819,10 @@ public class CompactionTaskRunTest extends IngestionTestBase Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval() ); - Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState()); + Assert.assertEquals( + getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, ImmutableList.of()), + segments.get(i).getLastCompactionState() + ); if (lockGranularity == LockGranularity.SEGMENT) { Assert.assertEquals( new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 21b7f73..71b603e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -356,6 +356,47 @@ public class CompactionTaskTest } @Test + public void testCreateCompactionTaskWithGranularitySpec() + { + final Builder builder = new Builder( + DATA_SOURCE, + segmentLoaderFactory, + RETRY_POLICY_FACTORY + ); + builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); + builder.tuningConfig(createTuningConfig()); + builder.segmentGranularity(Granularities.HOUR); + final CompactionTask taskCreatedWithSegmentGranularity = builder.build(); + + final Builder builder2 = new Builder( + DATA_SOURCE, + segmentLoaderFactory, + RETRY_POLICY_FACTORY + ); + builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); + builder2.tuningConfig(createTuningConfig()); + builder2.granularitySpec(new UniformGranularitySpec(Granularities.HOUR, Granularities.DAY, null)); + final CompactionTask taskCreatedWithGranularitySpec = builder2.build(); + Assert.assertEquals(taskCreatedWithGranularitySpec.getSegmentGranularity(), taskCreatedWithSegmentGranularity.getSegmentGranularity()); + } + + @Test + public void testCreateCompactionTaskWithGranularitySpecOverrideSegmentGranularity() + { + final Builder builder = new Builder( + DATA_SOURCE, + segmentLoaderFactory, + RETRY_POLICY_FACTORY + ); + builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); + builder.tuningConfig(createTuningConfig()); + builder.segmentGranularity(Granularities.HOUR); + builder.granularitySpec(new UniformGranularitySpec(Granularities.MINUTE, Granularities.DAY, null)); + final CompactionTask taskCreatedWithSegmentGranularity = builder.build(); + Assert.assertEquals(Granularities.MINUTE, taskCreatedWithSegmentGranularity.getSegmentGranularity()); + } + + @Test public void testSerdeWithInterval() throws IOException { final Builder builder = new Builder( diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 5d1d55b..91bac02 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -28,7 +28,11 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.indexing.granularity.GranularitySpec; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -41,7 +45,9 @@ import org.apache.druid.tests.TestNGGroup; import org.apache.druid.tests.indexer.AbstractITBatchIndexTest; import org.apache.druid.tests.indexer.AbstractIndexerTest; import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; import org.joda.time.Period; +import org.joda.time.chrono.ISOChronology; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Guice; @@ -52,8 +58,10 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; @Test(groups = {TestNGGroup.COMPACTION}) @@ -160,7 +168,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest LOG.info("Auto compaction test with hash partitioning"); final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null); - submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1); + submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null); // 2 segments published per day after compaction. forceTriggerAutoCompaction(4); verifyQuery(INDEX_QUERIES_RESOURCE); @@ -175,7 +183,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest "city", false ); - submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1); + submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null); forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(rangePartitionsSpec, 2); @@ -278,6 +286,55 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } + @Test + public void testAutoCompactionDutyWithSegmentGranularity() throws Exception + { + loadData(INDEX_TASK); + try (final Closeable ignored = unloader(fullDatasourceName)) { + final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + // 4 segments across 2 days (4 total)... + verifySegmentsCount(4); + verifyQuery(INDEX_QUERIES_RESOURCE); + + Granularity newGranularity = Granularities.YEAR; + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UniformGranularitySpec(newGranularity, null, null)); + + LOG.info("Auto compaction test with YEAR segment granularity"); + + List<String> expectedIntervalAfterCompaction = new ArrayList<>(); + for (String interval : intervalsBeforeCompaction) { + for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) { + expectedIntervalAfterCompaction.add(newinterval.toString()); + } + } + forceTriggerAutoCompaction(1); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(1, 1000); + checkCompactionIntervals(expectedIntervalAfterCompaction); + + newGranularity = Granularities.DAY; + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UniformGranularitySpec(newGranularity, null, null)); + + LOG.info("Auto compaction test with DAY segment granularity"); + + // The earlier segment with YEAR granularity is still 'used' as it’s not fully overshaowed. + // This is because we only have newer version on 2013-08-31 to 2013-09-01 and 2013-09-01 to 2013-09-02. + // The version for the YEAR segment is still the latest for 2013-01-01 to 2013-08-31 and 2013-09-02 to 2014-01-01. + // Hence, all three segments are available and the expected intervals are combined from the DAY and YEAR segment granularities + // (which are 2013-08-31 to 2013-09-01, 2013-09-01 to 2013-09-02 and 2013-01-01 to 2014-01-01) + for (String interval : intervalsBeforeCompaction) { + for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) { + expectedIntervalAfterCompaction.add(newinterval.toString()); + } + } + forceTriggerAutoCompaction(3); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(3, 1000); + checkCompactionIntervals(expectedIntervalAfterCompaction); + } + } + private void loadData(String indexTask) throws Exception { String taskSpec = getResourceAsString(indexTask); @@ -314,13 +371,19 @@ public class ITAutoCompactionTest extends AbstractIndexerTest private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest) throws Exception { - submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1); + submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, null); + } + + private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, GranularitySpec granularitySpec) throws Exception + { + submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec); } private void submitCompactionConfig( PartitionsSpec partitionsSpec, Period skipOffsetFromLatest, - int maxNumConcurrentSubTasks + int maxNumConcurrentSubTasks, + GranularitySpec granularitySpec ) throws Exception { DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig( @@ -348,6 +411,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest null, 1 ), + granularitySpec, null ); compactionResource.submitCompactionConfig(compactionConfig); @@ -415,11 +479,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest private void checkCompactionIntervals(List<String> expectedIntervals) { + Set<String> expectedIntervalsSet = new HashSet<>(expectedIntervals); ITRetryUtil.retryUntilTrue( () -> { - final List<String> actualIntervals = coordinator.getSegmentIntervals(fullDatasourceName); - actualIntervals.sort(null); - return actualIntervals.equals(expectedIntervals); + final Set<String> actualIntervals = new HashSet<>(coordinator.getSegmentIntervals(fullDatasourceName)); + System.out.println("ACTUAL: " + actualIntervals); + System.out.println("EXPECTED: " + expectedIntervalsSet); + return actualIntervals.equals(expectedIntervalsSet); }, "Compaction interval check" ); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java index ed7b44e..f4ee559 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java @@ -23,11 +23,14 @@ import com.google.inject.Inject; import org.apache.commons.io.IOUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.tests.TestNGGroup; +import org.joda.time.Interval; +import org.joda.time.chrono.ISOChronology; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -37,7 +40,10 @@ import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; @Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE}) @Guice(moduleFactory = DruidTestModuleFactory.class) @@ -49,6 +55,8 @@ public class ITCompactionTaskTest extends AbstractIndexerTest private static final String INDEX_DATASOURCE = "wikipedia_index_test"; private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json"; + private static final String COMPACTION_TASK_WITH_SEGMENT_GRANULARITY = "/indexer/wikipedia_compaction_task_with_segment_granularity.json"; + private static final String COMPACTION_TASK_WITH_GRANULARITY_SPEC = "/indexer/wikipedia_compaction_task_with_granularity_spec.json"; private static final String INDEX_TASK_WITH_TIMESTAMP = "/indexer/wikipedia_with_timestamp_index_task.json"; @@ -66,24 +74,41 @@ public class ITCompactionTaskTest extends AbstractIndexerTest @Test public void testCompaction() throws Exception { - loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE); + loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK, null); + } + + @Test + public void testCompactionWithSegmentGranularity() throws Exception + { + loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK_WITH_SEGMENT_GRANULARITY, GranularityType.MONTH); + } + + @Test + public void testCompactionWithGranularitySpec() throws Exception + { + loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK_WITH_GRANULARITY_SPEC, GranularityType.MONTH); } @Test public void testCompactionWithTimestampDimension() throws Exception { - loadDataAndCompact(INDEX_TASK_WITH_TIMESTAMP, INDEX_QUERIES_RESOURCE); + loadDataAndCompact(INDEX_TASK_WITH_TIMESTAMP, INDEX_QUERIES_RESOURCE, COMPACTION_TASK, null); } - private void loadDataAndCompact(String indexTask, String queriesResource) throws Exception + private void loadDataAndCompact( + String indexTask, + String queriesResource, + String compactionResource, + GranularityType newSegmentGranularity + ) throws Exception { loadData(indexTask); // 4 segments across 2 days checkNumberOfSegments(4); - final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); - intervalsBeforeCompaction.sort(null); + List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + expectedIntervalAfterCompaction.sort(null); try (final Closeable ignored = unloader(fullDatasourceName)) { String queryResponseTemplate; try { @@ -102,12 +127,23 @@ public class ITCompactionTaskTest extends AbstractIndexerTest queryHelper.testQueriesFromString(queryResponseTemplate); - compactData(); + compactData(compactionResource, newSegmentGranularity); // The original 4 segments should be compacted into 2 new segments checkNumberOfSegments(2); queryHelper.testQueriesFromString(queryResponseTemplate); - checkCompactionIntervals(intervalsBeforeCompaction); + + + if (newSegmentGranularity != null) { + List<String> newIntervals = new ArrayList<>(); + for (String interval : expectedIntervalAfterCompaction) { + for (Interval newinterval : newSegmentGranularity.getDefaultGranularity().getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) { + newIntervals.add(newinterval.toString()); + } + } + expectedIntervalAfterCompaction = newIntervals; + } + checkCompactionIntervals(expectedIntervalAfterCompaction); } } private void loadData(String indexTask) throws Exception @@ -124,12 +160,19 @@ public class ITCompactionTaskTest extends AbstractIndexerTest ); } - private void compactData() throws Exception + private void compactData(String compactionResource, GranularityType newSegmentGranularity) throws Exception { - final String template = getResourceAsString(COMPACTION_TASK); - String taskSpec = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName); + String template = getResourceAsString(compactionResource); + template = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName); + if (newSegmentGranularity != null) { + template = StringUtils.replace( + template, + "%%SEGMENTGRANULARITY%%", + newSegmentGranularity.name() + ); + } - final String taskID = indexer.submitTask(taskSpec); + final String taskID = indexer.submitTask(template); LOG.info("TaskID for compaction task %s", taskID); indexer.waitUntilTaskCompletes(taskID); @@ -153,13 +196,13 @@ public class ITCompactionTaskTest extends AbstractIndexerTest private void checkCompactionIntervals(List<String> expectedIntervals) { + Set<String> expectedIntervalsSet = new HashSet<>(expectedIntervals); ITRetryUtil.retryUntilTrue( () -> { - final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName); - intervalsAfterCompaction.sort(null); - System.out.println("AFTER: " + intervalsAfterCompaction); - System.out.println("EXPECTED: " + expectedIntervals); - return intervalsAfterCompaction.equals(expectedIntervals); + final Set<String> intervalsAfterCompaction = new HashSet<>(coordinator.getSegmentIntervals(fullDatasourceName)); + System.out.println("ACTUAL: " + intervalsAfterCompaction); + System.out.println("EXPECTED: " + expectedIntervalsSet); + return intervalsAfterCompaction.equals(expectedIntervalsSet); }, "Compaction interval check" ); diff --git a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_granularity_spec.json b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_granularity_spec.json new file mode 100644 index 0000000..828579b --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_granularity_spec.json @@ -0,0 +1,17 @@ +{ + "type" : "compact", + "dataSource" : "%%DATASOURCE%%", + "ioConfig" : { + "type": "compact", + "inputSpec": { + "type": "interval", + "interval": "2013-08-31/2013-09-02" + } + }, + "granularitySpec": { + "segmentGranularity": "%%SEGMENTGRANULARITY%%" + }, + "context" : { + "storeCompactionState" : true + } +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_segment_granularity.json b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_segment_granularity.json new file mode 100644 index 0000000..254926f --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_segment_granularity.json @@ -0,0 +1,15 @@ +{ + "type" : "compact", + "dataSource" : "%%DATASOURCE%%", + "ioConfig" : { + "type": "compact", + "inputSpec": { + "type": "interval", + "interval": "2013-08-31/2013-09-02" + } + }, + "segmentGranularity": "%%SEGMENTGRANULARITY%%", + "context" : { + "storeCompactionState" : true + } +} \ No newline at end of file diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java index d0dd175..ea41555 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java @@ -38,6 +38,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery private final String dataSource; private final ClientCompactionIOConfig ioConfig; private final ClientCompactionTaskQueryTuningConfig tuningConfig; + private final ClientCompactionTaskQueryGranularitySpec granularitySpec; private final Map<String, Object> context; @JsonCreator @@ -46,6 +47,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery @JsonProperty("dataSource") String dataSource, @JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig, @JsonProperty("tuningConfig") ClientCompactionTaskQueryTuningConfig tuningConfig, + @JsonProperty("granularitySpec") ClientCompactionTaskQueryGranularitySpec granularitySpec, @JsonProperty("context") Map<String, Object> context ) { @@ -53,6 +55,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery this.dataSource = dataSource; this.ioConfig = ioConfig; this.tuningConfig = tuningConfig; + this.granularitySpec = granularitySpec; this.context = context; } @@ -90,11 +93,18 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery } @JsonProperty + public ClientCompactionTaskQueryGranularitySpec getGranularitySpec() + { + return granularitySpec; + } + + @JsonProperty public Map<String, Object> getContext() { return context; } + @Override public boolean equals(Object o) { @@ -109,13 +119,14 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery Objects.equals(dataSource, that.dataSource) && Objects.equals(ioConfig, that.ioConfig) && Objects.equals(tuningConfig, that.tuningConfig) && + Objects.equals(granularitySpec, that.granularitySpec) && Objects.equals(context, that.context); } @Override public int hashCode() { - return Objects.hash(id, dataSource, ioConfig, tuningConfig, context); + return Objects.hash(id, dataSource, ioConfig, tuningConfig, granularitySpec, context); } @Override @@ -126,6 +137,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery ", dataSource='" + dataSource + '\'' + ", ioConfig=" + ioConfig + ", tuningConfig=" + tuningConfig + + ", granularitySpec=" + granularitySpec + ", context=" + context + '}'; } diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryGranularitySpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryGranularitySpec.java new file mode 100644 index 0000000..6f12ea7 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryGranularitySpec.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.indexing; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.segment.indexing.granularity.BaseGranularitySpec; + +import java.util.Objects; + +public class ClientCompactionTaskQueryGranularitySpec +{ + private final Granularity segmentGranularity; + private final Granularity queryGranularity; + private final boolean rollup; + + @JsonCreator + public ClientCompactionTaskQueryGranularitySpec( + @JsonProperty("segmentGranularity") Granularity segmentGranularity, + @JsonProperty("queryGranularity") Granularity queryGranularity, + @JsonProperty("rollup") Boolean rollup + ) + { + this.queryGranularity = queryGranularity == null ? BaseGranularitySpec.DEFAULT_QUERY_GRANULARITY : queryGranularity; + this.rollup = rollup == null ? BaseGranularitySpec.DEFAULT_ROLLUP : rollup; + this.segmentGranularity = segmentGranularity == null ? BaseGranularitySpec.DEFAULT_SEGMENT_GRANULARITY : segmentGranularity; + } + + @JsonProperty + public Granularity getSegmentGranularity() + { + return segmentGranularity; + } + + @JsonProperty + public Granularity getQueryGranularity() + { + return queryGranularity; + } + + @JsonProperty + public boolean isRollup() + { + return rollup; + } + + @Override + public String toString() + { + return "ClientCompactionTaskQueryGranularitySpec{" + + "segmentGranularity=" + segmentGranularity + + ", queryGranularity=" + queryGranularity + + ", rollup=" + rollup + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClientCompactionTaskQueryGranularitySpec that = (ClientCompactionTaskQueryGranularitySpec) o; + return Objects.equals(segmentGranularity, that.segmentGranularity) && + Objects.equals(queryGranularity, that.queryGranularity) && + Objects.equals(rollup, that.rollup); + } + + @Override + public int hashCode() + { + return Objects.hash(segmentGranularity, queryGranularity, rollup); + } +} diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index 2b6e47b..321831f 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -78,7 +78,8 @@ public class HttpIndexingServiceClient implements IndexingServiceClient String idPrefix, List<DataSegment> segments, int compactionTaskPriority, - ClientCompactionTaskQueryTuningConfig tuningConfig, + @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, + @Nullable ClientCompactionTaskQueryGranularitySpec granularitySpec, @Nullable Map<String, Object> context ) { @@ -99,6 +100,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient dataSource, new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)), tuningConfig, + granularitySpec, context ); return runTask(taskId, taskQuery); diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java index 14d4359..8eb8034 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java @@ -40,6 +40,7 @@ public interface IndexingServiceClient List<DataSegment> segments, int compactionTaskPriority, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, + @Nullable ClientCompactionTaskQueryGranularitySpec granularitySpec, @Nullable Map<String, Object> context ); diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java index 0f97749..8bf6396 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java @@ -143,5 +143,4 @@ public class ArbitraryGranularitySpec extends BaseGranularitySpec { return lookupTableBucketByDateTime; } - } diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java index 9ff114b..779952b 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java @@ -20,10 +20,14 @@ package org.apache.druid.segment.indexing.granularity; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Comparators; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -31,10 +35,15 @@ import org.joda.time.Interval; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.TreeSet; -abstract class BaseGranularitySpec implements GranularitySpec +public abstract class BaseGranularitySpec implements GranularitySpec { + public static final Boolean DEFAULT_ROLLUP = Boolean.TRUE; + public static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.DAY; + public static final Granularity DEFAULT_QUERY_GRANULARITY = Granularities.NONE; + protected List<Interval> inputIntervals; protected final Boolean rollup; @@ -45,7 +54,7 @@ abstract class BaseGranularitySpec implements GranularitySpec } else { this.inputIntervals = Collections.emptyList(); } - this.rollup = rollup == null ? Boolean.TRUE : rollup; + this.rollup = rollup == null ? DEFAULT_ROLLUP : rollup; } @Override @@ -76,6 +85,15 @@ abstract class BaseGranularitySpec implements GranularitySpec protected abstract LookupIntervalBuckets getLookupTableBuckets(); + @Override + public Map<String, Object> asMap(ObjectMapper objectMapper) + { + return objectMapper.convertValue( + this, + new TypeReference<Map<String, Object>>() {} + ); + } + /** * This is a helper class to facilitate sharing the code for sortedBucketIntervals among * the various GranularitySpec implementations. In particular, the UniformGranularitySpec diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java index c3bb579..148f039 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java @@ -21,12 +21,14 @@ package org.apache.druid.segment.indexing.granularity; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import org.apache.druid.java.util.common.granularity.Granularity; import org.joda.time.DateTime; import org.joda.time.Interval; import java.util.List; +import java.util.Map; import java.util.TreeSet; /** @@ -79,4 +81,6 @@ public interface GranularitySpec Granularity getQueryGranularity(); GranularitySpec withIntervals(List<Interval> inputIntervals); + + Map<String, Object> asMap(ObjectMapper objectMapper); } diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java index 5fe6a9e..cd7c7a4 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java @@ -21,7 +21,6 @@ package org.apache.druid.segment.indexing.granularity; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.IntervalsByGranularity; import org.joda.time.Interval; @@ -30,9 +29,6 @@ import java.util.List; public class UniformGranularitySpec extends BaseGranularitySpec { - private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.DAY; - private static final Granularity DEFAULT_QUERY_GRANULARITY = Granularities.NONE; - private final Granularity segmentGranularity; private final Granularity queryGranularity; private final IntervalsByGranularity intervalsByGranularity; @@ -144,5 +140,4 @@ public class UniformGranularitySpec extends BaseGranularitySpec { return lookupTableBucketByDateTime; } - } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index c5a1263..692eb81 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -22,6 +22,9 @@ package org.apache.druid.server.coordinator; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.segment.indexing.granularity.BaseGranularitySpec; +import org.apache.druid.segment.indexing.granularity.GranularitySpec; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.joda.time.Period; import javax.annotation.Nullable; @@ -46,6 +49,7 @@ public class DataSourceCompactionConfig private final Integer maxRowsPerSegment; private final Period skipOffsetFromLatest; private final UserCompactionTaskQueryTuningConfig tuningConfig; + private final GranularitySpec granularitySpec; private final Map<String, Object> taskContext; @JsonCreator @@ -56,6 +60,7 @@ public class DataSourceCompactionConfig @JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment, @JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest, @JsonProperty("tuningConfig") @Nullable UserCompactionTaskQueryTuningConfig tuningConfig, + @JsonProperty("granularitySpec") @Nullable GranularitySpec granularitySpec, @JsonProperty("taskContext") @Nullable Map<String, Object> taskContext ) { @@ -69,6 +74,24 @@ public class DataSourceCompactionConfig this.maxRowsPerSegment = maxRowsPerSegment; this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest; this.tuningConfig = tuningConfig; + if (granularitySpec != null) { + Preconditions.checkArgument( + granularitySpec instanceof UniformGranularitySpec, + "Auto compaction granularitySpec only supports uniform type" + ); + Preconditions.checkArgument( + granularitySpec.isRollup() == BaseGranularitySpec.DEFAULT_ROLLUP, + "Auto compaction granularitySpec only supports default rollup value" + ); + Preconditions.checkArgument( + granularitySpec.getQueryGranularity().equals(BaseGranularitySpec.DEFAULT_QUERY_GRANULARITY), + "Auto compaction granularitySpec only supports default query granularity value"); + Preconditions.checkArgument( + granularitySpec.inputIntervals().isEmpty(), + "Auto compaction granularitySpec does not supports interval value" + ); + } + this.granularitySpec = granularitySpec; this.taskContext = taskContext; } @@ -113,6 +136,13 @@ public class DataSourceCompactionConfig @JsonProperty @Nullable + public GranularitySpec getGranularitySpec() + { + return granularitySpec; + } + + @JsonProperty + @Nullable public Map<String, Object> getTaskContext() { return taskContext; @@ -131,8 +161,10 @@ public class DataSourceCompactionConfig return taskPriority == that.taskPriority && inputSegmentSizeBytes == that.inputSegmentSizeBytes && Objects.equals(dataSource, that.dataSource) && + Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) && Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) && Objects.equals(tuningConfig, that.tuningConfig) && + Objects.equals(granularitySpec, that.granularitySpec) && Objects.equals(taskContext, that.taskContext); } @@ -143,8 +175,10 @@ public class DataSourceCompactionConfig dataSource, taskPriority, inputSegmentSizeBytes, + maxRowsPerSegment, skipOffsetFromLatest, tuningConfig, + granularitySpec, taskContext ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 3b7ee31..dcd7fab 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -24,12 +24,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.google.inject.Inject; import org.apache.druid.client.indexing.ClientCompactionTaskQuery; +import org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.CompactionStatistics; @@ -123,8 +125,27 @@ public class CompactSegments implements CoordinatorDuty } if (COMPACTION_TASK_TYPE.equals(response.getPayload().getType())) { final ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery) response.getPayload(); + DataSourceCompactionConfig dataSourceCompactionConfig = compactionConfigs.get(status.getDataSource()); + if (dataSourceCompactionConfig != null && dataSourceCompactionConfig.getGranularitySpec() != null) { + Granularity configuredSegmentGranularity = dataSourceCompactionConfig.getGranularitySpec().getSegmentGranularity(); + if (configuredSegmentGranularity != null + && compactionTaskQuery.getGranularitySpec() != null + && !configuredSegmentGranularity.equals(compactionTaskQuery.getGranularitySpec().getSegmentGranularity())) { + // We will cancel active compaction task if segmentGranularity changes and we will need to + // re-compact the interval + LOG.info("Canceled task[%s] as task segmentGranularity is [%s] but compaction config " + + "segmentGranularity is [%s]", + status.getId(), + compactionTaskQuery.getGranularitySpec().getSegmentGranularity(), + configuredSegmentGranularity); + indexingServiceClient.cancelTask(status.getId()); + continue; + } + } + // Skip interval as the current active compaction task is good final Interval interval = compactionTaskQuery.getIoConfig().getInputSpec().getInterval(); compactionTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval); + // Since we keep the current active compaction task running, we count the active task slots numEstimatedNonCompleteCompactionTasks += findMaxNumTaskSlotsUsedByOneCompactionTask( compactionTaskQuery.getTuningConfig() ); @@ -289,12 +310,24 @@ public class CompactSegments implements CoordinatorDuty snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size()); final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); + ClientCompactionTaskQueryGranularitySpec queryGranularitySpec; + if (config.getGranularitySpec() != null) { + queryGranularitySpec = new ClientCompactionTaskQueryGranularitySpec( + config.getGranularitySpec().getSegmentGranularity(), + config.getGranularitySpec().getQueryGranularity(), + config.getGranularitySpec().isRollup() + ); + } else { + queryGranularitySpec = null; + } + // make tuningConfig final String taskId = indexingServiceClient.compactSegments( "coordinator-issued", segmentsToCompact, config.getTaskPriority(), ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()), + queryGranularitySpec, newAutoCompactionContext(config.getTaskContext()) ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java index c03c4b4..90bfb66 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java @@ -28,19 +28,27 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.server.coordinator.CompactionStatistics; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.partition.NumberedPartitionChunk; +import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartitionChunk; +import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.utils.Streams; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -51,12 +59,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; import java.util.PriorityQueue; +import java.util.Set; import java.util.stream.Collectors; /** @@ -75,6 +85,11 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator // searchIntervals keeps track of the current state of which interval should be considered to search segments to // compact. private final Map<String, CompactibleTimelineObjectHolderCursor> timelineIterators; + // This is needed for datasource that has segmentGranularity configured + // If configured segmentGranularity in config is finer than current segmentGranularity, the same set of segments + // can belong to multiple intervals in the timeline. We keep track of the compacted intervals between each + // run of the compaction job and skip any interval that was already previously compacted. + private final Map<String, Set<Interval>> intervalCompactedForDatasource = new HashMap<>(); private final PriorityQueue<QueueEntry> queue = new PriorityQueue<>( (o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o2.interval, o1.interval) @@ -93,12 +108,53 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator dataSources.forEach((String dataSource, VersionedIntervalTimeline<String, DataSegment> timeline) -> { final DataSourceCompactionConfig config = compactionConfigs.get(dataSource); - + Granularity configuredSegmentGranularity = null; if (config != null && !timeline.isEmpty()) { + Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs = new HashMap<>(); + if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) { + Map<Interval, Set<DataSegment>> intervalToPartitionMap = new HashMap<>(); + configuredSegmentGranularity = config.getGranularitySpec().getSegmentGranularity(); + // Create a new timeline to hold segments in the new configured segment granularity + VersionedIntervalTimeline<String, DataSegment> timelineWithConfiguredSegmentGranularity = new VersionedIntervalTimeline<>(Comparator.naturalOrder()); + Set<DataSegment> segments = timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE); + for (DataSegment segment : segments) { + // Convert original segmentGranularity to new granularities bucket by configuredSegmentGranularity + // For example, if the original is interval of 2020-01-28/2020-02-03 with WEEK granularity + // and the configuredSegmentGranularity is MONTH, the segment will be split to two segments + // of 2020-01/2020-02 and 2020-02/2020-03. + for (Interval interval : configuredSegmentGranularity.getIterable(segment.getInterval())) { + intervalToPartitionMap.computeIfAbsent(interval, k -> new HashSet<>()).add(segment); + } + } + for (Map.Entry<Interval, Set<DataSegment>> partitionsPerInterval : intervalToPartitionMap.entrySet()) { + Interval interval = partitionsPerInterval.getKey(); + int partitionNum = 0; + Set<DataSegment> segmentSet = partitionsPerInterval.getValue(); + int partitions = segmentSet.size(); + for (DataSegment segment : segmentSet) { + DataSegment segmentsForCompact = segment.withShardSpec(new NumberedShardSpec(partitionNum, partitions)); + // PartitionHolder can only holds chunks of one partition space + // However, partition in the new timeline (timelineWithConfiguredSegmentGranularity) can be hold multiple + // partitions of the original timeline (when the new segmentGranularity is larger than the original + // segmentGranularity). Hence, we group all the segments of the original timeline into intervals bucket + // by the new configuredSegmentGranularity. We then convert each segment into a new partition space so that + // there is no duplicate partitionNum across all segments of each new Interval. We will have to save the + // original ShardSpec to convert the segment back when returning from the iterator. + originalShardSpecs.put(new Pair<>(interval, segmentsForCompact.getId()), segment.getShardSpec()); + timelineWithConfiguredSegmentGranularity.add( + interval, + segmentsForCompact.getVersion(), + NumberedPartitionChunk.make(partitionNum, partitions, segmentsForCompact) + ); + partitionNum += 1; + } + } + timeline = timelineWithConfiguredSegmentGranularity; + } final List<Interval> searchIntervals = - findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest(), skipIntervals.get(dataSource)); + findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest(), configuredSegmentGranularity, skipIntervals.get(dataSource)); if (!searchIntervals.isEmpty()) { - timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchIntervals)); + timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchIntervals, originalShardSpecs)); } } }); @@ -187,10 +243,12 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator private static class CompactibleTimelineObjectHolderCursor implements Iterator<List<DataSegment>> { private final List<TimelineObjectHolder<String, DataSegment>> holders; + private final Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs; CompactibleTimelineObjectHolderCursor( VersionedIntervalTimeline<String, DataSegment> timeline, - List<Interval> totalIntervalsToSearch + List<Interval> totalIntervalsToSearch, + Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs ) { this.holders = totalIntervalsToSearch @@ -201,6 +259,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator .filter(holder -> isCompactibleHolder(interval, holder)) ) .collect(Collectors.toList()); + this.originalShardSpecs = originalShardSpecs; } private boolean isCompactibleHolder(Interval interval, TimelineObjectHolder<String, DataSegment> holder) @@ -220,6 +279,14 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator return partitionBytes > 0; } + private DataSegment transformShardSpecIfNeeded(DataSegment dataSegment, Interval interval) + { + if (originalShardSpecs != null && !originalShardSpecs.isEmpty()) { + return dataSegment.withShardSpec(originalShardSpecs.get(new Pair<>(interval, dataSegment.getId()))); + } + return dataSegment; + } + @Override public boolean hasNext() { @@ -232,8 +299,10 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator if (holders.isEmpty()) { throw new NoSuchElementException(); } - return Streams.sequentialStreamFrom(holders.remove(holders.size() - 1).getObject()) + TimelineObjectHolder<String, DataSegment> timelineObjectHolder = holders.remove(holders.size() - 1); + return Streams.sequentialStreamFrom(timelineObjectHolder.getObject()) .map(PartitionChunk::getObject) + .map(dataSegment -> transformShardSpecIfNeeded(dataSegment, timelineObjectHolder.getTrueInterval())) .collect(Collectors.toList()); } } @@ -257,10 +326,11 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator } } - private boolean needsCompaction(ClientCompactionTaskQueryTuningConfig tuningConfig, SegmentsToCompact candidates) + private boolean needsCompaction(DataSourceCompactionConfig config, SegmentsToCompact candidates) { Preconditions.checkState(!candidates.isEmpty(), "Empty candidates"); - + final ClientCompactionTaskQueryTuningConfig tuningConfig = + ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()); final PartitionsSpec partitionsSpecFromConfig = findPartitinosSpecFromConfig(tuningConfig); final CompactionState lastCompactionState = candidates.segments.get(0).getLastCompactionState(); if (lastCompactionState == null) { @@ -314,6 +384,22 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator needsCompaction = true; } + // Only checks for segmentGranularity as auto compaction currently only supports segmentGranularity + final Granularity segmentGranularity = lastCompactionState.getGranularitySpec() != null ? + objectMapper.convertValue(lastCompactionState.getGranularitySpec(), GranularitySpec.class).getSegmentGranularity() : + null; + + if (config.getGranularitySpec() != null && + config.getGranularitySpec().getSegmentGranularity() != null && + !config.getGranularitySpec().getSegmentGranularity().equals(segmentGranularity)) { + log.info( + "Configured granularitySpec[%s] is different from the one[%s] of segments. Needs compaction", + config.getGranularitySpec(), + segmentGranularity + ); + needsCompaction = true; + } + return needsCompaction; } @@ -334,16 +420,25 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator final long inputSegmentSize = config.getInputSegmentSizeBytes(); while (compactibleTimelineObjectHolderCursor.hasNext()) { - final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next()); - + List<DataSegment> segments = compactibleTimelineObjectHolderCursor.next(); + final SegmentsToCompact candidates = new SegmentsToCompact(segments); if (!candidates.isEmpty()) { final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize; final boolean needsCompaction = needsCompaction( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()), + config, candidates ); if (isCompactibleSize && needsCompaction) { + if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) { + Interval interval = candidates.getUmbrellaInterval(); + Set<Interval> intervalsCompacted = intervalCompactedForDatasource.computeIfAbsent(dataSourceName, k -> new HashSet<>()); + // Skip this candidates if we have compacted the interval already + if (intervalsCompacted.contains(interval)) { + continue; + } + intervalsCompacted.add(interval); + } return candidates; } else { if (!needsCompaction) { @@ -396,6 +491,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator private static List<Interval> findInitialSearchInterval( VersionedIntervalTimeline<String, DataSegment> timeline, Period skipOffset, + Granularity configuredSegmentGranularity, @Nullable List<Interval> skipIntervals ) { @@ -407,6 +503,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator final List<Interval> fullSkipIntervals = sortAndAddSkipIntervalFromLatest( last.getInterval().getEnd(), skipOffset, + configuredSegmentGranularity, skipIntervals ); @@ -447,19 +544,27 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator static List<Interval> sortAndAddSkipIntervalFromLatest( DateTime latest, Period skipOffset, + Granularity configuredSegmentGranularity, @Nullable List<Interval> skipIntervals ) { final List<Interval> nonNullSkipIntervals = skipIntervals == null ? new ArrayList<>(1) : new ArrayList<>(skipIntervals.size()); + final Interval skipFromLatest; + if (configuredSegmentGranularity != null) { + DateTime skipFromLastest = new DateTime(latest, latest.getZone()).minus(skipOffset); + DateTime skipOffsetBucketToSegmentGranularity = configuredSegmentGranularity.bucketStart(skipFromLastest); + skipFromLatest = new Interval(skipOffsetBucketToSegmentGranularity, latest); + } else { + skipFromLatest = new Interval(skipOffset, latest); + } if (skipIntervals != null) { final List<Interval> sortedSkipIntervals = new ArrayList<>(skipIntervals); sortedSkipIntervals.sort(Comparators.intervalsByStartThenEnd()); final List<Interval> overlapIntervals = new ArrayList<>(); - final Interval skipFromLatest = new Interval(skipOffset, latest); for (Interval interval : sortedSkipIntervals) { if (interval.overlaps(skipFromLatest)) { @@ -476,7 +581,6 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator nonNullSkipIntervals.add(skipFromLatest); } } else { - final Interval skipFromLatest = new Interval(skipOffset, latest); nonNullSkipIntervals.add(skipFromLatest); } @@ -579,6 +683,11 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator return segments.size(); } + private Interval getUmbrellaInterval() + { + return JodaUtils.umbrellaInterval(segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())); + } + private long getNumberOfIntervals() { return segments.stream().map(DataSegment::getInterval).distinct().count(); diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java index 34800cc..09f4c69 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java @@ -50,6 +50,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient List<DataSegment> segments, int compactionTaskPriority, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, + @Nullable ClientCompactionTaskQueryGranularitySpec granularitySpec, @Nullable Map<String, Object> context ) { diff --git a/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java b/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java index a015422..25bf848 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java @@ -32,6 +32,7 @@ import org.junit.Assert; import org.junit.Test; import java.util.List; +import java.util.Map; public class ArbitraryGranularityTest { @@ -218,6 +219,32 @@ public class ArbitraryGranularityTest } @Test + public void testAsMap() + { + final GranularitySpec spec = new ArbitraryGranularitySpec(Granularities.NONE, Lists.newArrayList( + Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), + Intervals.of("2012-02-01T00Z/2012-03-01T00Z"), + Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), + Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), + Intervals.of("2012-01-01T00Z/2012-01-03T00Z") + )); + + Map<String, Object> map = spec.asMap(JSON_MAPPER); + final GranularitySpec rtSpec = JSON_MAPPER.convertValue(map, GranularitySpec.class); + Assert.assertEquals( + "Round-trip", + ImmutableList.copyOf(spec.sortedBucketIntervals()), + ImmutableList.copyOf(rtSpec.sortedBucketIntervals()) + ); + Assert.assertEquals( + "Round-trip", + ImmutableList.copyOf(spec.inputIntervals()), + ImmutableList.copyOf(rtSpec.inputIntervals()) + ); + Assert.assertEquals(spec, rtSpec); + } + + @Test public void testNullInputIntervals() { final GranularitySpec spec = new ArbitraryGranularitySpec(Granularities.NONE, null); diff --git a/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java b/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java index 27760f6..0c19724 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java @@ -38,10 +38,11 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; public class UniformGranularityTest { - private static final ObjectMapper JOSN_MAPPER = new DefaultObjectMapper(); + private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); @Test public void testSimple() @@ -161,7 +162,7 @@ public class UniformGranularityTest ); try { - final GranularitySpec rtSpec = JOSN_MAPPER.readValue(JOSN_MAPPER.writeValueAsString(spec), GranularitySpec.class); + final GranularitySpec rtSpec = JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(spec), GranularitySpec.class); Assert.assertEquals( "Round-trip sortedBucketIntervals", ImmutableList.copyOf(spec.sortedBucketIntervals()), @@ -179,6 +180,34 @@ public class UniformGranularityTest } @Test + public void testAsMap() + { + final GranularitySpec spec = new UniformGranularitySpec( + Granularities.DAY, + null, + Lists.newArrayList( + Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), + Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), + Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), + Intervals.of("2012-01-01T00Z/2012-01-03T00Z") + ) + ); + Map<String, Object> map = spec.asMap(JSON_MAPPER); + final GranularitySpec rtSpec = JSON_MAPPER.convertValue(map, GranularitySpec.class); + Assert.assertEquals( + "Round-trip sortedBucketIntervals", + ImmutableList.copyOf(spec.sortedBucketIntervals()), + ImmutableList.copyOf(rtSpec.sortedBucketIntervals().iterator()) + ); + Assert.assertEquals( + "Round-trip granularity", + spec.getSegmentGranularity(), + rtSpec.getSegmentGranularity() + ); + Assert.assertEquals(spec, rtSpec); + } + + @Test public void testEquals() { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index 96bfcc1..01cfabb 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -20,15 +20,20 @@ package org.apache.druid.server.coordinator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory; import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy; import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.joda.time.Duration; import org.joda.time.Period; @@ -56,6 +61,7 @@ public class DataSourceCompactionConfigTest null, new Period(3600), null, + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -68,6 +74,7 @@ public class DataSourceCompactionConfigTest Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); + Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec()); } @Test @@ -80,6 +87,7 @@ public class DataSourceCompactionConfigTest 30, new Period(3600), null, + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -122,6 +130,7 @@ public class DataSourceCompactionConfigTest null, null ), + null, ImmutableMap.of("key", "val") ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -164,6 +173,7 @@ public class DataSourceCompactionConfigTest null, null ), + null, ImmutableMap.of("key", "val") ); @@ -217,4 +227,92 @@ public class DataSourceCompactionConfigTest OBJECT_MAPPER.readValue(json, UserCompactionTaskQueryTuningConfig.class); Assert.assertEquals(tuningConfig, fromJson); } + + @Test + public void testSerdeGranularitySpec() throws IOException + { + final DataSourceCompactionConfig config = new DataSourceCompactionConfig( + "dataSource", + null, + 500L, + null, + new Period(3600), + null, + new UniformGranularitySpec(Granularities.HOUR, null, null), + ImmutableMap.of("key", "val") + ); + final String json = OBJECT_MAPPER.writeValueAsString(config); + final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); + + Assert.assertEquals(config.getDataSource(), fromJson.getDataSource()); + Assert.assertEquals(25, fromJson.getTaskPriority()); + Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); + Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); + Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); + Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); + Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); + Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec()); + } + + @Test(expected = IllegalArgumentException.class) + public void testFailIfGranularitySpecContainsNonDefaultQueryGranularity() + { + new DataSourceCompactionConfig( + "dataSource", + null, + 500L, + null, + new Period(3600), + null, + new UniformGranularitySpec(Granularities.HOUR, Granularities.MONTH, null), + ImmutableMap.of("key", "val") + ); + } + + @Test(expected = IllegalArgumentException.class) + public void testFailIfGranularitySpecContainsNonDefaultRollup() + { + new DataSourceCompactionConfig( + "dataSource", + null, + 500L, + null, + new Period(3600), + null, + new UniformGranularitySpec(Granularities.HOUR, Granularities.MONTH, false, null), + ImmutableMap.of("key", "val") + ); + } + + @Test(expected = IllegalArgumentException.class) + public void testFailIfGranularitySpecContainsNonEmptyInterval() + { + new DataSourceCompactionConfig( + "dataSource", + null, + 500L, + null, + new Period(3600), + null, + new UniformGranularitySpec(Granularities.HOUR, Granularities.MONTH, ImmutableList.of(Intervals.of("2012-01-08T00Z/2012-01-11T00Z"))), + ImmutableMap.of("key", "val") + ); + } + + @Test(expected = IllegalArgumentException.class) + public void testFailIfGranularitySpecIsNotUniform() + { + new DataSourceCompactionConfig( + "dataSource", + null, + 500L, + null, + new Period(3600), + null, + new ArbitraryGranularitySpec(null, null, null), + ImmutableMap.of("key", "val") + ); + } + + } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index c8ef83d..c892810 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -28,16 +28,24 @@ import com.google.common.collect.Lists; import junitparams.converters.Nullable; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.druid.client.DataSourcesSnapshot; +import org.apache.druid.client.indexing.ClientCompactionIOConfig; +import org.apache.druid.client.indexing.ClientCompactionIntervalSpec; import org.apache.druid.client.indexing.ClientCompactionTaskQuery; +import org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.client.indexing.HttpIndexingServiceClient; import org.apache.druid.client.indexing.IndexingWorker; import org.apache.druid.client.indexing.IndexingWorkerInfo; +import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeRole; +import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; @@ -47,8 +55,10 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; @@ -81,6 +91,8 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import java.io.IOException; @@ -89,6 +101,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.BiFunction; @@ -101,6 +114,7 @@ public class CompactSegmentsTest { private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); private static final String DATA_SOURCE_PREFIX = "dataSource_"; + private static final int PARTITION_PER_TIME_INTERVAL = 4; // Each dataSource starts with 440 byte, 44 segments, and 11 intervals needing compaction private static final int TOTAL_BYTE_PER_DATASOURCE = 440; private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44; @@ -144,6 +158,7 @@ public class CompactSegmentsTest private final BiFunction<Integer, Integer, ShardSpec> shardSpecFactory; private Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources; + Map<String, List<DataSegment>> datasourceToSegments = new HashMap<>(); public CompactSegmentsTest(PartitionsSpec partitionsSpec, BiFunction<Integer, Integer, ShardSpec> shardSpecFactory) { @@ -154,18 +169,23 @@ public class CompactSegmentsTest @Before public void setup() { - List<DataSegment> segments = new ArrayList<>(); + List<DataSegment> allSegments = new ArrayList<>(); for (int i = 0; i < 3; i++) { final String dataSource = DATA_SOURCE_PREFIX + i; for (int j : new int[]{0, 1, 2, 3, 7, 8}) { - for (int k = 0; k < 4; k++) { - segments.add(createSegment(dataSource, j, true, k)); - segments.add(createSegment(dataSource, j, false, k)); + for (int k = 0; k < PARTITION_PER_TIME_INTERVAL; k++) { + List<DataSegment> segmentForDatasource = datasourceToSegments.computeIfAbsent(dataSource, key -> new ArrayList<>()); + DataSegment dataSegment = createSegment(dataSource, j, true, k); + allSegments.add(dataSegment); + segmentForDatasource.add(dataSegment); + dataSegment = createSegment(dataSource, j, false, k); + allSegments.add(dataSegment); + segmentForDatasource.add(dataSegment); } } } dataSources = DataSourcesSnapshot - .fromUsedSegments(segments, ImmutableMap.of()) + .fromUsedSegments(allSegments, ImmutableMap.of()) .getUsedSegmentsTimelinesPerDataSource(); } @@ -351,17 +371,17 @@ public class CompactSegmentsTest String dataSourceName = DATA_SOURCE_PREFIX + 1; List<DataSegment> segments = new ArrayList<>(); for (int j : new int[]{0, 1, 2, 3, 7, 8}) { - for (int k = 0; k < 4; k++) { + for (int k = 0; k < PARTITION_PER_TIME_INTERVAL; k++) { DataSegment beforeNoon = createSegment(dataSourceName, j, true, k); DataSegment afterNoon = createSegment(dataSourceName, j, false, k); if (j == 3) { // Make two intervals on this day compacted (two compacted intervals back-to-back) - beforeNoon = beforeNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of())); - afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of())); + beforeNoon = beforeNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of())); + afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of())); } if (j == 1) { // Make one interval on this day compacted - afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of())); + afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of())); } segments.add(beforeNoon); segments.add(afterNoon); @@ -539,6 +559,205 @@ public class CompactSegmentsTest } @Test + public void testCompactWithoutGranularitySpec() + { + final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); + final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient); + final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>(); + final String dataSource = DATA_SOURCE_PREFIX + 0; + compactionConfigs.add( + new DataSourceCompactionConfig( + dataSource, + 0, + 500L, + null, + new Period("PT0H"), // smaller than segment interval + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + null, + partitionsSpec, + null, + null, + null, + null, + null, + 3, + null, + null, + null, + null, + null, + null + ), + null, + null + ) + ); + doCompactSegments(compactSegments, compactionConfigs); + ArgumentCaptor<List<DataSegment>> segmentsCaptor = ArgumentCaptor.forClass(List.class); + ArgumentCaptor<ClientCompactionTaskQueryGranularitySpec> granularitySpecArgumentCaptor = ArgumentCaptor.forClass(ClientCompactionTaskQueryGranularitySpec.class); + Mockito.verify(mockIndexingServiceClient).compactSegments( + ArgumentMatchers.anyString(), + segmentsCaptor.capture(), + ArgumentMatchers.anyInt(), + ArgumentMatchers.any(), + granularitySpecArgumentCaptor.capture(), + ArgumentMatchers.any() + ); + // Only the same amount of segments as the original PARTITION_PER_TIME_INTERVAL since segment granulartity is the same + Assert.assertEquals(PARTITION_PER_TIME_INTERVAL, segmentsCaptor.getValue().size()); + Assert.assertNull(granularitySpecArgumentCaptor.getValue()); + } + + @Test + public void testCompactWithGranularitySpec() + { + final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); + final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient); + final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>(); + final String dataSource = DATA_SOURCE_PREFIX + 0; + compactionConfigs.add( + new DataSourceCompactionConfig( + dataSource, + 0, + 500L, + null, + new Period("PT0H"), // smaller than segment interval + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + null, + partitionsSpec, + null, + null, + null, + null, + null, + 3, + null, + null, + null, + null, + null, + null + ), + new UniformGranularitySpec(Granularities.YEAR, null, null), + null + ) + ); + doCompactSegments(compactSegments, compactionConfigs); + ArgumentCaptor<List<DataSegment>> segmentsCaptor = ArgumentCaptor.forClass(List.class); + ArgumentCaptor<ClientCompactionTaskQueryGranularitySpec> granularitySpecArgumentCaptor = ArgumentCaptor.forClass(ClientCompactionTaskQueryGranularitySpec.class); + Mockito.verify(mockIndexingServiceClient).compactSegments( + ArgumentMatchers.anyString(), + segmentsCaptor.capture(), + ArgumentMatchers.anyInt(), + ArgumentMatchers.any(), + granularitySpecArgumentCaptor.capture(), + ArgumentMatchers.any() + ); + // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment + // are within the same year + Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size()); + Assert.assertEquals(Granularities.YEAR, granularitySpecArgumentCaptor.getValue().getSegmentGranularity()); + } + + @Test + public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() + { + final String dataSource = DATA_SOURCE_PREFIX + 0; + final String conflictTaskId = "taskIdDummy"; + final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); + TaskStatusPlus runningConflictCompactionTask = new TaskStatusPlus( + conflictTaskId, + "groupId", + "compact", + DateTimes.EPOCH, + DateTimes.EPOCH, + TaskState.RUNNING, + RunnerTaskState.RUNNING, + -1L, + TaskLocation.unknown(), + dataSource, + null + ); + TaskPayloadResponse runningConflictCompactionTaskPayload = new TaskPayloadResponse( + conflictTaskId, + new ClientCompactionTaskQuery( + conflictTaskId, + dataSource, + new ClientCompactionIOConfig( + new ClientCompactionIntervalSpec( + Intervals.of("2000/2099"), + "testSha256OfSortedSegmentIds" + ) + ), + null, + new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, null, null), + null + ) + ); + Mockito.when(mockIndexingServiceClient.getActiveTasks()).thenReturn(ImmutableList.of(runningConflictCompactionTask)); + Mockito.when(mockIndexingServiceClient.getTaskPayload(ArgumentMatchers.eq(conflictTaskId))).thenReturn(runningConflictCompactionTaskPayload); + + final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient); + final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>(); + compactionConfigs.add( + new DataSourceCompactionConfig( + dataSource, + 0, + 500L, + null, + new Period("PT0H"), // smaller than segment interval + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + null, + partitionsSpec, + null, + null, + null, + null, + null, + 3, + null, + null, + null, + null, + null, + null + ), + new UniformGranularitySpec(Granularities.YEAR, null, null), + null + ) + ); + doCompactSegments(compactSegments, compactionConfigs); + // Verify that conflict task was canceled + Mockito.verify(mockIndexingServiceClient).cancelTask(conflictTaskId); + // The active conflict task has interval of 2000/2099 + // Make sure that we do not skip interval of conflict task. + // Since we cancel the task and will have to compact those intervals with the new segmentGranulartity + ArgumentCaptor<List<DataSegment>> segmentsCaptor = ArgumentCaptor.forClass(List.class); + ArgumentCaptor<ClientCompactionTaskQueryGranularitySpec> granularitySpecArgumentCaptor = ArgumentCaptor.forClass(ClientCompactionTaskQueryGranularitySpec.class); + Mockito.verify(mockIndexingServiceClient).compactSegments( + ArgumentMatchers.anyString(), + segmentsCaptor.capture(), + ArgumentMatchers.anyInt(), + ArgumentMatchers.any(), + granularitySpecArgumentCaptor.capture(), + ArgumentMatchers.any() + ); + // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment + // are within the same year + Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size()); + Assert.assertEquals(Granularities.YEAR, granularitySpecArgumentCaptor.getValue().getSegmentGranularity()); + } + + @Test public void testRunParallelCompactionMultipleCompactionTaskSlots() { final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); @@ -831,6 +1050,7 @@ public class CompactSegmentsTest null, null ), + null, null ) ); @@ -984,7 +1204,8 @@ public class CompactSegmentsTest "lz4", "longEncoding", "longs" - ) + ), + ImmutableMap.of() ), 1, segmentSize diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java index 76a84bc..b32a7b7 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java @@ -71,6 +71,7 @@ public class NewestSegmentFirstIteratorTest final List<Interval> fullSkipIntervals = NewestSegmentFirstIterator.sortAndAddSkipIntervalFromLatest( DateTimes.of("2019-01-01"), new Period(72, 0, 0, 0), + null, ImmutableList.of( Intervals.of("2018-12-30/2018-12-31"), Intervals.of("2018-12-24/2018-12-25") @@ -90,6 +91,7 @@ public class NewestSegmentFirstIteratorTest null, null, null, + null, null ); Assert.assertEquals( @@ -128,6 +130,7 @@ public class NewestSegmentFirstIteratorTest null, null ), + null, null ); Assert.assertEquals( @@ -166,6 +169,7 @@ public class NewestSegmentFirstIteratorTest null, null ), + null, null ); Assert.assertEquals( @@ -204,6 +208,7 @@ public class NewestSegmentFirstIteratorTest null, null ), + null, null ); Assert.assertEquals( @@ -242,6 +247,7 @@ public class NewestSegmentFirstIteratorTest null, null ), + null, null ); Assert.assertEquals( @@ -280,6 +286,7 @@ public class NewestSegmentFirstIteratorTest null, null ), + null, null ); Assert.assertEquals( @@ -318,6 +325,7 @@ public class NewestSegmentFirstIteratorTest null, null ), + null, null ); Assert.assertEquals( @@ -356,6 +364,7 @@ public class NewestSegmentFirstIteratorTest null, null ), + null, null ); Assert.assertEquals( @@ -394,6 +403,7 @@ public class NewestSegmentFirstIteratorTest null, null ), + null, null ); Assert.assertEquals( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java index 749f9d7..cc23f6a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java @@ -22,10 +22,15 @@ package org.apache.druid.server.coordinator.duty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.segment.indexing.granularity.GranularitySpec; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; @@ -58,7 +63,7 @@ public class NewestSegmentFirstPolicyTest { final Period segmentPeriod = new Period("PT1H"); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P2D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P2D"), null)), ImmutableMap.of( DATA_SOURCE, createTimeline( @@ -83,7 +88,7 @@ public class NewestSegmentFirstPolicyTest { final Period segmentPeriod = new Period("PT1H"); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1M"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1M"), null)), ImmutableMap.of( DATA_SOURCE, createTimeline( @@ -116,7 +121,7 @@ public class NewestSegmentFirstPolicyTest { final Period segmentPeriod = new Period("PT1H"); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H1M"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H1M"), null)), ImmutableMap.of( DATA_SOURCE, createTimeline( @@ -149,7 +154,7 @@ public class NewestSegmentFirstPolicyTest public void testHugeShard() { final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)), ImmutableMap.of( DATA_SOURCE, createTimeline( @@ -199,7 +204,7 @@ public class NewestSegmentFirstPolicyTest public void testManySegmentsPerShard() { final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"), null)), ImmutableMap.of( DATA_SOURCE, createTimeline( @@ -259,9 +264,9 @@ public class NewestSegmentFirstPolicyTest final CompactionSegmentIterator iterator = policy.reset( ImmutableMap.of( unknownDataSource, - createCompactionConfig(10000, new Period("P2D")), + createCompactionConfig(10000, new Period("P2D"), null), DATA_SOURCE, - createCompactionConfig(10000, new Period("P2D")) + createCompactionConfig(10000, new Period("P2D"), null) ), ImmutableMap.of( DATA_SOURCE, @@ -307,7 +312,7 @@ public class NewestSegmentFirstPolicyTest ) ); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"), null)), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -340,7 +345,7 @@ public class NewestSegmentFirstPolicyTest ); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -361,7 +366,7 @@ public class NewestSegmentFirstPolicyTest ); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -370,11 +375,87 @@ public class NewestSegmentFirstPolicyTest } @Test + public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularityEqual() + { + final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( + new SegmentGenerateSpec(Intervals.of("2017-11-30T23:00:00/2017-12-03T00:00:00"), new Period("P1D")), + new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("P1D")) + ); + + final CompactionSegmentIterator iterator = policy.reset( + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UniformGranularitySpec(Granularities.DAY, null, null))), + ImmutableMap.of(DATA_SOURCE, timeline), + Collections.emptyMap() + ); + + // We should only get segments in Oct + final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-14T00:00:00/2017-12-02T00:00:00"), Partitions.ONLY_COMPLETE) + ); + + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator)))); + } + + @Test + public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularityLarger() + { + final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( + // This contains segment that + // - Cross between month boundary of latest month (starts in Nov and ends in Dec). This should be skipped + // - Fully in latest month (starts in Dec and ends in Dec). This should be skipped + // - Does not overlap latest month (starts in Oct and ends in Oct). This should not be skipped + new SegmentGenerateSpec(Intervals.of("2017-11-30T23:00:00/2017-12-03T00:00:00"), new Period("PT5H")), + new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H")) + ); + + final CompactionSegmentIterator iterator = policy.reset( + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UniformGranularitySpec(Granularities.MONTH, null, null))), + ImmutableMap.of(DATA_SOURCE, timeline), + Collections.emptyMap() + ); + + // We should only get segments in Oct + final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), Partitions.ONLY_COMPLETE) + ); + + Assert.assertTrue(iterator.hasNext()); + List<DataSegment> actual = iterator.next(); + Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size()); + Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(actual)); + Assert.assertFalse(iterator.hasNext()); + } + + @Test + public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularitySmaller() + { + final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( + new SegmentGenerateSpec(Intervals.of("2017-12-01T23:00:00/2017-12-03T00:00:00"), new Period("PT5H")), + new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H")) + ); + + final CompactionSegmentIterator iterator = policy.reset( + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UniformGranularitySpec(Granularities.MINUTE, null, null))), + ImmutableMap.of(DATA_SOURCE, timeline), + Collections.emptyMap() + ); + + // We should only get segments in Oct + final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), Partitions.ONLY_COMPLETE) + ); + + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator)))); + } + + @Test public void testWithSkipIntervals() { final Period segmentPeriod = new Period("PT1H"); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)), ImmutableMap.of( DATA_SOURCE, createTimeline( @@ -414,7 +495,7 @@ public class NewestSegmentFirstPolicyTest { final Period segmentPeriod = new Period("PT1H"); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H"), null)), ImmutableMap.of( DATA_SOURCE, createTimeline( @@ -455,6 +536,113 @@ public class NewestSegmentFirstPolicyTest ); } + @Test + public void testIteratorReturnsSegmentsInConfiguredSegmentGranularity() + { + final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( + // Segments with day interval from Oct to Dec + new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-12-31T00:00:00"), new Period("P1D")) + ); + + final CompactionSegmentIterator iterator = policy.reset( + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UniformGranularitySpec(Granularities.MONTH, null, null))), + ImmutableMap.of(DATA_SOURCE, timeline), + Collections.emptyMap() + ); + + // We should get all segments in timeline back since skip offset is P0D. + // However, we only need to iterator 3 times (once for each month) since the new configured segmentGranularity is MONTH. + // and hence iterator would return all segments bucketed to the configured segmentGranularity + // Month of Dec + Assert.assertTrue(iterator.hasNext()); + List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-12-01T00:00:00/2017-12-31T00:00:00"), Partitions.ONLY_COMPLETE) + ); + Assert.assertEquals( + ImmutableSet.copyOf(expectedSegmentsToCompact), + ImmutableSet.copyOf(iterator.next()) + ); + // Month of Nov + Assert.assertTrue(iterator.hasNext()); + expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-11-01T00:00:00/2017-12-01T00:00:00"), Partitions.ONLY_COMPLETE) + ); + Assert.assertEquals( + ImmutableSet.copyOf(expectedSegmentsToCompact), + ImmutableSet.copyOf(iterator.next()) + ); + // Month of Oct + Assert.assertTrue(iterator.hasNext()); + expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-11-01T00:00:00"), Partitions.ONLY_COMPLETE) + ); + Assert.assertEquals( + ImmutableSet.copyOf(expectedSegmentsToCompact), + ImmutableSet.copyOf(iterator.next()) + ); + // No more + Assert.assertFalse(iterator.hasNext()); + } + + @Test + public void testIteratorReturnsSegmentsInMultipleIntervalIfConfiguredSegmentGranularityCrossBoundary() + { + final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( + new SegmentGenerateSpec(Intervals.of("2020-01-01/2020-01-08"), new Period("P7D")), + new SegmentGenerateSpec(Intervals.of("2020-01-28/2020-02-03"), new Period("P7D")), + new SegmentGenerateSpec(Intervals.of("2020-02-08/2020-02-15"), new Period("P7D")) + ); + + final CompactionSegmentIterator iterator = policy.reset( + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UniformGranularitySpec(Granularities.MONTH, null, null))), + ImmutableMap.of(DATA_SOURCE, timeline), + Collections.emptyMap() + ); + // We should get the segment of "2020-01-28/2020-02-03" back twice when the iterator returns for Jan and when the + // iterator returns for Feb. + + // Month of Jan + List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-28/2020-02-15"), Partitions.ONLY_COMPLETE) + ); + Assert.assertTrue(iterator.hasNext()); + List<DataSegment> actual = iterator.next(); + Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size()); + Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(actual)); + // Month of Feb + expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-01/2020-02-03"), Partitions.ONLY_COMPLETE) + ); + Assert.assertTrue(iterator.hasNext()); + actual = iterator.next(); + Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size()); + Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(actual)); + // No more + Assert.assertFalse(iterator.hasNext()); + } + + @Test + public void testIteratorDoesNotReturnCompactedInterval() + { + final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline( + new SegmentGenerateSpec(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), new Period("P1D")) + ); + + final CompactionSegmentIterator iterator = policy.reset( + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UniformGranularitySpec(Granularities.MINUTE, null, null))), + ImmutableMap.of(DATA_SOURCE, timeline), + Collections.emptyMap() + ); + + final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), Partitions.ONLY_COMPLETE) + ); + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(iterator.next())); + // Iterator should return only once since all the "minute" interval of the iterator contains the same interval + Assert.assertFalse(iterator.hasNext()); + } + private static void assertCompactSegmentIntervals( CompactionSegmentIterator iterator, Period segmentPeriod, @@ -546,7 +734,8 @@ public class NewestSegmentFirstPolicyTest private DataSourceCompactionConfig createCompactionConfig( long inputSegmentSizeBytes, - Period skipOffsetFromLatest + Period skipOffsetFromLatest, + GranularitySpec granularitySpec ) { return new DataSourceCompactionConfig( @@ -556,6 +745,7 @@ public class NewestSegmentFirstPolicyTest null, skipOffsetFromLatest, null, + granularitySpec, null ); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 61e0a32..ed6599a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -277,7 +277,8 @@ public class SystemSchemaTest extends CalciteTestBase private final CompactionState expectedCompactionState = new CompactionState( new DynamicPartitionsSpec(null, null), - Collections.singletonMap("test", "map") + Collections.singletonMap("test", "map"), + Collections.singletonMap("test2", "map2") ); private final DataSegment publishedCompactedSegment1 = new DataSegment( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org