This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6541178  Support segmentGranularity for auto-compaction (#10843)
6541178 is described below

commit 6541178c21839530a42af4b4675a9bc680bffca6
Author: Maytas Monsereenusorn <mayt...@apache.org>
AuthorDate: Fri Feb 12 03:03:20 2021 -0800

    Support segmentGranularity for auto-compaction (#10843)
    
    * Support segmentGranularity for auto-compaction
    
    * Support segmentGranularity for auto-compaction
    
    * Support segmentGranularity for auto-compaction
    
    * Support segmentGranularity for auto-compaction
    
    * resolve conflict
    
    * Support segmentGranularity for auto-compaction
    
    * Support segmentGranularity for auto-compaction
    
    * fix tests
    
    * fix more tests
    
    * fix checkstyle
    
    * add unit tests
    
    * fix checkstyle
    
    * fix checkstyle
    
    * fix checkstyle
    
    * add unit tests
    
    * add integration tests
    
    * fix checkstyle
    
    * fix checkstyle
    
    * fix failing tests
    
    * address comments
    
    * address comments
    
    * fix tests
    
    * fix tests
    
    * fix test
    
    * fix test
    
    * fix test
    
    * fix test
    
    * fix test
    
    * fix test
    
    * fix test
    
    * fix test
---
 .../NewestSegmentFirstPolicyBenchmark.java         |   1 +
 .../org/apache/druid/timeline/CompactionState.java |  19 +-
 .../druid/timeline/partition/PartitionChunk.java   |   2 +-
 .../org/apache/druid/timeline/DataSegmentTest.java |   4 +-
 .../common/task/AbstractBatchIndexTask.java        |   6 +-
 .../druid/indexing/common/task/CompactionTask.java |  35 ++-
 .../druid/indexing/common/task/IndexTask.java      |   3 +-
 .../parallel/ParallelIndexSupervisorTask.java      |   3 +-
 .../task/ClientCompactionTaskQuerySerdeTest.java   |  18 ++
 .../common/task/CompactionTaskParallelRunTest.java |  84 +++++--
 .../common/task/CompactionTaskRunTest.java         |  81 +++++--
 .../indexing/common/task/CompactionTaskTest.java   |  41 ++++
 .../coordinator/duty/ITAutoCompactionTest.java     |  80 ++++++-
 .../druid/tests/indexer/ITCompactionTaskTest.java  |  75 +++++--
 ...edia_compaction_task_with_granularity_spec.json |  17 ++
 ...a_compaction_task_with_segment_granularity.json |  15 ++
 .../client/indexing/ClientCompactionTaskQuery.java |  14 +-
 .../ClientCompactionTaskQueryGranularitySpec.java  |  95 ++++++++
 .../client/indexing/HttpIndexingServiceClient.java |   4 +-
 .../client/indexing/IndexingServiceClient.java     |   1 +
 .../granularity/ArbitraryGranularitySpec.java      |   1 -
 .../indexing/granularity/BaseGranularitySpec.java  |  22 +-
 .../indexing/granularity/GranularitySpec.java      |   4 +
 .../granularity/UniformGranularitySpec.java        |   5 -
 .../coordinator/DataSourceCompactionConfig.java    |  34 +++
 .../server/coordinator/duty/CompactSegments.java   |  33 +++
 .../duty/NewestSegmentFirstIterator.java           | 133 +++++++++++-
 .../client/indexing/NoopIndexingServiceClient.java |   1 +
 .../granularity/ArbitraryGranularityTest.java      |  27 +++
 .../granularity/UniformGranularityTest.java        |  33 ++-
 .../DataSourceCompactionConfigTest.java            |  98 +++++++++
 .../coordinator/duty/CompactSegmentsTest.java      | 241 ++++++++++++++++++++-
 .../duty/NewestSegmentFirstIteratorTest.java       |  10 +
 .../duty/NewestSegmentFirstPolicyTest.java         | 216 ++++++++++++++++--
 .../druid/sql/calcite/schema/SystemSchemaTest.java |   3 +-
 35 files changed, 1336 insertions(+), 123 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index 91c1409..e744bf9 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -99,6 +99,7 @@ public class NewestSegmentFirstPolicyBenchmark
               null,
               null,
               null,
+              null,
               null
           )
       );
diff --git a/core/src/main/java/org/apache/druid/timeline/CompactionState.java 
b/core/src/main/java/org/apache/druid/timeline/CompactionState.java
index c30f427..8588717 100644
--- a/core/src/main/java/org/apache/druid/timeline/CompactionState.java
+++ b/core/src/main/java/org/apache/druid/timeline/CompactionState.java
@@ -43,15 +43,20 @@ public class CompactionState
   // org.apache.druid.segment.IndexSpec cannot be used here because it's in 
the 'processing' module which
   // has a dependency on the 'core' module where this class is.
   private final Map<String, Object> indexSpec;
+  // org.apache.druid.segment.indexing.granularity.GranularitySpec cannot be 
used here because it's in the
+  // 'server' module which has a dependency on the 'core' module where this 
class is.
+  private final Map<String, Object> granularitySpec;
 
   @JsonCreator
   public CompactionState(
       @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
-      @JsonProperty("indexSpec") Map<String, Object> indexSpec
+      @JsonProperty("indexSpec") Map<String, Object> indexSpec,
+      @JsonProperty("granularitySpec") Map<String, Object> granularitySpec
   )
   {
     this.partitionsSpec = partitionsSpec;
     this.indexSpec = indexSpec;
+    this.granularitySpec = granularitySpec;
   }
 
   @JsonProperty
@@ -66,6 +71,12 @@ public class CompactionState
     return indexSpec;
   }
 
+  @JsonProperty
+  public Map<String, Object> getGranularitySpec()
+  {
+    return granularitySpec;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -77,13 +88,14 @@ public class CompactionState
     }
     CompactionState that = (CompactionState) o;
     return Objects.equals(partitionsSpec, that.partitionsSpec) &&
-           Objects.equals(indexSpec, that.indexSpec);
+           Objects.equals(indexSpec, that.indexSpec) &&
+           Objects.equals(granularitySpec, that.granularitySpec);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(partitionsSpec, indexSpec);
+    return Objects.hash(partitionsSpec, indexSpec, granularitySpec);
   }
 
   @Override
@@ -92,6 +104,7 @@ public class CompactionState
     return "CompactionState{" +
            "partitionsSpec=" + partitionsSpec +
            ", indexSpec=" + indexSpec +
+           ", granularitySpec=" + granularitySpec +
            '}';
   }
 }
diff --git 
a/core/src/main/java/org/apache/druid/timeline/partition/PartitionChunk.java 
b/core/src/main/java/org/apache/druid/timeline/partition/PartitionChunk.java
index 10b43b8..54aaf03 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/PartitionChunk.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/PartitionChunk.java
@@ -58,7 +58,7 @@ public interface PartitionChunk<T> extends 
Comparable<PartitionChunk<T>>
    * Returns true if this chunk is the end of the partition.  Most commonly, 
that means it represents the range
    * [X, infinity] for some concrete X.
    *
-   * @return true if the chunk is the beginning of the partition
+   * @return true if the chunk is the end of the partition
    */
   boolean isEnd();
 
diff --git a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java 
b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
index 5483642..66c3400 100644
--- a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
@@ -120,6 +120,7 @@ public class DataSegmentTest
         new NumberedShardSpec(3, 0),
         new CompactionState(
             new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")),
+            ImmutableMap.of(),
             ImmutableMap.of()
         ),
         TEST_VERSION,
@@ -231,7 +232,8 @@ public class DataSegmentTest
   {
     final CompactionState compactionState = new CompactionState(
         new DynamicPartitionsSpec(null, null),
-        Collections.singletonMap("test", "map")
+        Collections.singletonMap("test", "map"),
+        Collections.singletonMap("test2", "map2")
     );
     final DataSegment segment1 = DataSegment.builder()
                                             .dataSource("foo")
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 4a7fa0d..6f7cdf6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -473,12 +473,14 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
   public static Function<Set<DataSegment>, Set<DataSegment>> 
compactionStateAnnotateFunction(
       boolean storeCompactionState,
       TaskToolbox toolbox,
-      IndexTuningConfig tuningConfig
+      IndexTuningConfig tuningConfig,
+      GranularitySpec granularitySpec
   )
   {
     if (storeCompactionState) {
       final Map<String, Object> indexSpecMap = 
tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper());
-      final CompactionState compactionState = new 
CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap);
+      final Map<String, Object> granularitySpecMap = 
granularitySpec.asMap(toolbox.getJsonMapper());
+      final CompactionState compactionState = new 
CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap, 
granularitySpecMap);
       return segments -> segments
           .stream()
           .map(s -> s.withLastCompactionState(compactionState))
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 530a400..5d971c3 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -141,6 +141,8 @@ public class CompactionTask extends AbstractBatchIndexTask
   @Nullable
   private final Granularity segmentGranularity;
   @Nullable
+  private final GranularitySpec granularitySpec;
+  @Nullable
   private final ParallelIndexTuningConfig tuningConfig;
   @JsonIgnore
   private final SegmentProvider segmentProvider;
@@ -172,7 +174,8 @@ public class CompactionTask extends AbstractBatchIndexTask
       @JsonProperty("dimensions") @Nullable final DimensionsSpec dimensions,
       @JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec 
dimensionsSpec,
       @JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] 
metricsSpec,
-      @JsonProperty("segmentGranularity") @Nullable final Granularity 
segmentGranularity,
+      @JsonProperty("segmentGranularity") @Deprecated @Nullable final 
Granularity segmentGranularity,
+      @JsonProperty("granularitySpec") @Nullable final GranularitySpec 
granularitySpec,
       @JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
       @JsonProperty("context") @Nullable final Map<String, Object> context,
       @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
@@ -202,6 +205,16 @@ public class CompactionTask extends AbstractBatchIndexTask
     this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
     this.metricsSpec = metricsSpec;
     this.segmentGranularity = segmentGranularity;
+    if (granularitySpec == null && segmentGranularity != null) {
+      this.granularitySpec = new UniformGranularitySpec(
+          segmentGranularity,
+          null,
+          null,
+          null
+      );
+    } else {
+      this.granularitySpec = granularitySpec;
+    }
     this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) : 
null;
     this.segmentProvider = new SegmentProvider(dataSource, 
this.ioConfig.getInputSpec());
     this.partitionConfigurationManager = new 
PartitionConfigurationManager(this.tuningConfig);
@@ -288,7 +301,14 @@ public class CompactionTask extends AbstractBatchIndexTask
   @Override
   public Granularity getSegmentGranularity()
   {
-    return segmentGranularity;
+    return granularitySpec == null ? null : 
granularitySpec.getSegmentGranularity();
+  }
+
+  @JsonProperty
+  @Nullable
+  public GranularitySpec getGranularitySpec()
+  {
+    return granularitySpec;
   }
 
   @Nullable
@@ -348,7 +368,7 @@ public class CompactionTask extends AbstractBatchIndexTask
         partitionConfigurationManager,
         dimensionsSpec,
         metricsSpec,
-        segmentGranularity,
+        getSegmentGranularity(),
         toolbox.getCoordinatorClient(),
         segmentLoaderFactory,
         retryPolicyFactory
@@ -892,6 +912,8 @@ public class CompactionTask extends AbstractBatchIndexTask
     @Nullable
     private Granularity segmentGranularity;
     @Nullable
+    private GranularitySpec granularitySpec;
+    @Nullable
     private TuningConfig tuningConfig;
     @Nullable
     private Map<String, Object> context;
@@ -941,6 +963,12 @@ public class CompactionTask extends AbstractBatchIndexTask
       return this;
     }
 
+    public Builder granularitySpec(GranularitySpec granularitySpec)
+    {
+      this.granularitySpec = granularitySpec;
+      return this;
+    }
+
     public Builder tuningConfig(TuningConfig tuningConfig)
     {
       this.tuningConfig = tuningConfig;
@@ -966,6 +994,7 @@ public class CompactionTask extends AbstractBatchIndexTask
           dimensionsSpec,
           metricsSpec,
           segmentGranularity,
+          granularitySpec,
           tuningConfig,
           context,
           segmentLoaderFactory,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 88779ab..517c08a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -904,7 +904,8 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
           compactionStateAnnotateFunction(
               storeCompactionState,
               toolbox,
-              ingestionSchema.getTuningConfig()
+              ingestionSchema.getTuningConfig(),
+              ingestionSchema.getDataSchema().getGranularitySpec()
           );
 
       // Probably we can publish atomicUpdateGroup along with segments.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index bd64fbd..af4352b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -929,7 +929,8 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
     final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction = 
compactionStateAnnotateFunction(
         storeCompactionState,
         toolbox,
-        ingestionSchema.getTuningConfig()
+        ingestionSchema.getTuningConfig(),
+        ingestionSchema.getDataSchema().getGranularitySpec()
     );
     final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, 
segmentsToPublish, commitMetadata) ->
         toolbox.getTaskActionClient().submit(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index ef3cf68..1e9628f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.client.indexing.ClientCompactionIOConfig;
 import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import 
org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.client.indexing.ClientTaskQuery;
 import org.apache.druid.client.indexing.IndexingServiceClient;
@@ -45,11 +46,13 @@ import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningC
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.HumanReadableBytes;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
 import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
 import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
@@ -113,6 +116,7 @@ public class ClientCompactionTaskQuerySerdeTest
             1000,
             100
         ),
+        new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, 
Granularities.HOUR, true),
         ImmutableMap.of("key", "value")
     );
 
@@ -186,6 +190,18 @@ public class ClientCompactionTaskQuerySerdeTest
         query.getTuningConfig().getTotalNumMergeTasks().intValue(),
         task.getTuningConfig().getTotalNumMergeTasks()
     );
+    Assert.assertEquals(
+        query.getGranularitySpec().getQueryGranularity(),
+        task.getGranularitySpec().getQueryGranularity()
+    );
+    Assert.assertEquals(
+        query.getGranularitySpec().getSegmentGranularity(),
+        task.getGranularitySpec().getSegmentGranularity()
+    );
+    Assert.assertEquals(
+        query.getGranularitySpec().isRollup(),
+        task.getGranularitySpec().isRollup()
+    );
     Assert.assertEquals(query.getContext(), task.getContext());
   }
 
@@ -243,6 +259,7 @@ public class ClientCompactionTaskQuerySerdeTest
                 null
             )
         )
+        .granularitySpec(new UniformGranularitySpec(Granularities.DAY, 
Granularities.HOUR, null))
         .build();
 
     final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery(
@@ -284,6 +301,7 @@ public class ClientCompactionTaskQuerySerdeTest
             1000,
             100
         ),
+        new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, 
Granularities.HOUR, true),
         new HashMap<>()
     );
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 7b7005b..7fe8781 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -129,7 +129,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
   }
 
   @Test
-  public void testRunParallelWithDynamicPartitioningMatchCompactionState()
+  public void testRunParallelWithDynamicPartitioningMatchCompactionState() 
throws Exception
   {
     runIndexTask(null, true);
 
@@ -144,22 +144,33 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
         .build();
 
     final Set<DataSegment> compactedSegments = runTask(compactionTask);
-    final CompactionState expectedState = new CompactionState(
-        new DynamicPartitionsSpec(null, Long.MAX_VALUE),
-        
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
-    );
     for (DataSegment segment : compactedSegments) {
       Assert.assertSame(
           lockGranularity == LockGranularity.TIME_CHUNK ? 
NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
           segment.getShardSpec().getClass()
       );
       // Expect compaction state to exist as store compaction state by default
+      CompactionState expectedState = new CompactionState(
+          new DynamicPartitionsSpec(null, Long.MAX_VALUE),
+          
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
+          getObjectMapper().readValue(
+              getObjectMapper().writeValueAsString(
+                  new UniformGranularitySpec(
+                      Granularities.HOUR,
+                      Granularities.NONE,
+                      true,
+                      ImmutableList.of(segment.getInterval())
+                  )
+              ),
+              Map.class
+          )
+      );
       Assert.assertEquals(expectedState, segment.getLastCompactionState());
     }
   }
 
   @Test
-  public void testRunParallelWithHashPartitioningMatchCompactionState()
+  public void testRunParallelWithHashPartitioningMatchCompactionState() throws 
Exception
   {
     // Hash partitioning is not supported with segment lock yet
     Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
@@ -176,19 +187,30 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
         .build();
 
     final Set<DataSegment> compactedSegments = runTask(compactionTask);
-    final CompactionState expectedState = new CompactionState(
-        new HashedPartitionsSpec(null, 3, null),
-        
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
-    );
     for (DataSegment segment : compactedSegments) {
       // Expect compaction state to exist as store compaction state by default
       Assert.assertSame(HashBasedNumberedShardSpec.class, 
segment.getShardSpec().getClass());
+      CompactionState expectedState = new CompactionState(
+          new HashedPartitionsSpec(null, 3, null),
+          
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
+          getObjectMapper().readValue(
+              getObjectMapper().writeValueAsString(
+                  new UniformGranularitySpec(
+                      Granularities.HOUR,
+                      Granularities.NONE,
+                      true,
+                      ImmutableList.of(segment.getInterval())
+                  )
+              ),
+              Map.class
+          )
+      );
       Assert.assertEquals(expectedState, segment.getLastCompactionState());
     }
   }
 
   @Test
-  public void testRunParallelWithRangePartitioning()
+  public void testRunParallelWithRangePartitioning() throws Exception
   {
     // Range partitioning is not supported with segment lock yet
     Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
@@ -205,19 +227,30 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
         .build();
 
     final Set<DataSegment> compactedSegments = runTask(compactionTask);
-    final CompactionState expectedState = new CompactionState(
-        new SingleDimensionPartitionsSpec(7, null, "dim", false),
-        
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
-    );
     for (DataSegment segment : compactedSegments) {
       // Expect compaction state to exist as store compaction state by default
       Assert.assertSame(SingleDimensionShardSpec.class, 
segment.getShardSpec().getClass());
+      CompactionState expectedState = new CompactionState(
+          new SingleDimensionPartitionsSpec(7, null, "dim", false),
+          
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
+          getObjectMapper().readValue(
+              getObjectMapper().writeValueAsString(
+                  new UniformGranularitySpec(
+                      Granularities.HOUR,
+                      Granularities.NONE,
+                      true,
+                      ImmutableList.of(segment.getInterval())
+                  )
+              ),
+              Map.class
+          )
+      );
       Assert.assertEquals(expectedState, segment.getLastCompactionState());
     }
   }
 
   @Test
-  public void testRunParallelWithRangePartitioningWithSingleTask()
+  public void testRunParallelWithRangePartitioningWithSingleTask() throws 
Exception
   {
     // Range partitioning is not supported with segment lock yet
     Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
@@ -234,13 +267,24 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
         .build();
 
     final Set<DataSegment> compactedSegments = runTask(compactionTask);
-    final CompactionState expectedState = new CompactionState(
-        new SingleDimensionPartitionsSpec(7, null, "dim", false),
-        
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
-    );
     for (DataSegment segment : compactedSegments) {
       // Expect compaction state to exist as store compaction state by default
       Assert.assertSame(SingleDimensionShardSpec.class, 
segment.getShardSpec().getClass());
+      CompactionState expectedState = new CompactionState(
+          new SingleDimensionPartitionsSpec(7, null, "dim", false),
+          
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
+          getObjectMapper().readValue(
+              getObjectMapper().writeValueAsString(
+                  new UniformGranularitySpec(
+                      Granularities.HOUR,
+                      Granularities.NONE,
+                      true,
+                      ImmutableList.of(segment.getInterval())
+                  )
+              ),
+              Map.class
+          )
+      );
       Assert.assertEquals(expectedState, segment.getLastCompactionState());
     }
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index c3f30ec..bd1f819 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -54,6 +54,7 @@ import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
@@ -88,7 +89,6 @@ import org.joda.time.Interval;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -131,9 +131,6 @@ public class CompactionTaskRunTest extends IngestionTestBase
       0
   );
 
-  // Expecte compaction state to exist after compaction as we store compaction 
state by default
-  private static CompactionState DEFAULT_COMPACTION_STATE;
-
   private static final List<String> TEST_ROWS = ImmutableList.of(
       "2014-01-01T00:00:10Z,a,1\n",
       "2014-01-01T00:00:10Z,b,2\n",
@@ -186,14 +183,26 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
     this.lockGranularity = lockGranularity;
   }
 
-  @BeforeClass
-  public static void setupClass() throws JsonProcessingException
+  public static CompactionState getDefaultCompactionState(Granularity 
segmentGranularity,
+                                                          Granularity 
queryGranularity,
+                                                          List<Interval> 
intervals) throws JsonProcessingException
   {
     ObjectMapper mapper = new DefaultObjectMapper();
-
-    DEFAULT_COMPACTION_STATE = new CompactionState(
+    // Expected compaction state to exist after compaction as we store 
compaction state by default
+    return new CompactionState(
       new DynamicPartitionsSpec(5000000, Long.MAX_VALUE),
-      mapper.readValue(mapper.writeValueAsString(new IndexSpec()), Map.class)
+      mapper.readValue(mapper.writeValueAsString(new IndexSpec()), Map.class),
+      mapper.readValue(
+          mapper.writeValueAsString(
+              new UniformGranularitySpec(
+                  segmentGranularity,
+                  queryGranularity,
+                  true,
+                  intervals
+              )
+          ),
+          Map.class
+      )
     );
   }
 
@@ -238,7 +247,10 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
           Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
           segments.get(i).getInterval()
       );
-      Assert.assertEquals(DEFAULT_COMPACTION_STATE, 
segments.get(i).getLastCompactionState());
+      Assert.assertEquals(
+          getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, 
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i 
+ 1))),
+          segments.get(i).getLastCompactionState()
+      );
       if (lockGranularity == LockGranularity.SEGMENT) {
         Assert.assertEquals(
             new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1),
@@ -311,10 +323,6 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
 
     final List<DataSegment> segments = resultPair.rhs;
     Assert.assertEquals(6, segments.size());
-    final CompactionState expectedState = new CompactionState(
-        new HashedPartitionsSpec(null, 3, null),
-        
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
-    );
 
     for (int i = 0; i < 3; i++) {
       final Interval interval = 
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1);
@@ -324,6 +332,21 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
             interval,
             segments.get(segmentIdx).getInterval()
         );
+        CompactionState expectedState = new CompactionState(
+            new HashedPartitionsSpec(null, 3, null),
+            
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
+            getObjectMapper().readValue(
+                getObjectMapper().writeValueAsString(
+                    new UniformGranularitySpec(
+                        Granularities.HOUR,
+                        Granularities.NONE,
+                        true,
+                        
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i 
+ 1))
+                    )
+                ),
+                Map.class
+            )
+        );
         Assert.assertEquals(expectedState, 
segments.get(segmentIdx).getLastCompactionState());
         Assert.assertSame(HashBasedNumberedShardSpec.class, 
segments.get(segmentIdx).getShardSpec().getClass());
       }
@@ -361,7 +384,10 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
           Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
           segments.get(i).getInterval()
       );
-      Assert.assertEquals(DEFAULT_COMPACTION_STATE, 
segments.get(i).getLastCompactionState());
+      Assert.assertEquals(
+          getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, 
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i 
+ 1))),
+          segments.get(i).getLastCompactionState()
+      );
       if (lockGranularity == LockGranularity.SEGMENT) {
         Assert.assertEquals(
             new 
NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2, 
(short) 1, (short) 1),
@@ -388,7 +414,10 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
           Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
           segments.get(i).getInterval()
       );
-      Assert.assertEquals(DEFAULT_COMPACTION_STATE, 
segments.get(i).getLastCompactionState());
+      Assert.assertEquals(
+          getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, 
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i 
+ 1))),
+          segments.get(i).getLastCompactionState()
+      );
       if (lockGranularity == LockGranularity.SEGMENT) {
         Assert.assertEquals(
             new NumberedOverwriteShardSpec(
@@ -487,7 +516,10 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
           Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
           segments.get(i).getInterval()
       );
-      Assert.assertEquals(DEFAULT_COMPACTION_STATE, 
segments.get(i).getLastCompactionState());
+      Assert.assertEquals(
+          getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, 
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i 
+ 1))),
+          segments.get(i).getLastCompactionState()
+      );
       if (lockGranularity == LockGranularity.SEGMENT) {
         Assert.assertEquals(
             new 
NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2, 
(short) 1, (short) 1),
@@ -526,7 +558,10 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
 
     Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), 
segments.get(0).getInterval());
     Assert.assertEquals(new NumberedShardSpec(0, 1), 
segments.get(0).getShardSpec());
-    Assert.assertEquals(DEFAULT_COMPACTION_STATE, 
segments.get(0).getLastCompactionState());
+    Assert.assertEquals(
+        getDefaultCompactionState(Granularities.DAY, Granularities.NONE, 
ImmutableList.of(Intervals.of("2014-01-01T00:00:00/2014-01-01T03:00:00"))),
+        segments.get(0).getLastCompactionState()
+    );
 
     // hour segmentGranularity
     final CompactionTask compactionTask2 = builder
@@ -544,7 +579,10 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
     for (int i = 0; i < 3; i++) {
       
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", 
i, i + 1), segments.get(i).getInterval());
       Assert.assertEquals(new NumberedShardSpec(0, 1), 
segments.get(i).getShardSpec());
-      Assert.assertEquals(DEFAULT_COMPACTION_STATE, 
segments.get(i).getLastCompactionState());
+      Assert.assertEquals(
+          getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, 
ImmutableList.of(Intervals.of("2014-01-01/2014-01-02"))),
+          segments.get(i).getLastCompactionState()
+      );
     }
   }
 
@@ -781,7 +819,10 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
           Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
           segments.get(i).getInterval()
       );
-      Assert.assertEquals(DEFAULT_COMPACTION_STATE, 
segments.get(i).getLastCompactionState());
+      Assert.assertEquals(
+          getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, 
ImmutableList.of()),
+          segments.get(i).getLastCompactionState()
+      );
       if (lockGranularity == LockGranularity.SEGMENT) {
         Assert.assertEquals(
             new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 21b7f73..71b603e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -356,6 +356,47 @@ public class CompactionTaskTest
   }
 
   @Test
+  public void testCreateCompactionTaskWithGranularitySpec()
+  {
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentLoaderFactory,
+        RETRY_POLICY_FACTORY
+    );
+    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
+    builder.tuningConfig(createTuningConfig());
+    builder.segmentGranularity(Granularities.HOUR);
+    final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
+
+    final Builder builder2 = new Builder(
+        DATA_SOURCE,
+        segmentLoaderFactory,
+        RETRY_POLICY_FACTORY
+    );
+    builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
+    builder2.tuningConfig(createTuningConfig());
+    builder2.granularitySpec(new UniformGranularitySpec(Granularities.HOUR, 
Granularities.DAY, null));
+    final CompactionTask taskCreatedWithGranularitySpec = builder2.build();
+    
Assert.assertEquals(taskCreatedWithGranularitySpec.getSegmentGranularity(), 
taskCreatedWithSegmentGranularity.getSegmentGranularity());
+  }
+
+  @Test
+  public void 
testCreateCompactionTaskWithGranularitySpecOverrideSegmentGranularity()
+  {
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentLoaderFactory,
+        RETRY_POLICY_FACTORY
+    );
+    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
+    builder.tuningConfig(createTuningConfig());
+    builder.segmentGranularity(Granularities.HOUR);
+    builder.granularitySpec(new UniformGranularitySpec(Granularities.MINUTE, 
Granularities.DAY, null));
+    final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
+    Assert.assertEquals(Granularities.MINUTE, 
taskCreatedWithSegmentGranularity.getSegmentGranularity());
+  }
+
+  @Test
   public void testSerdeWithInterval() throws IOException
   {
     final Builder builder = new Builder(
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 5d1d55b..91bac02 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -28,7 +28,11 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -41,7 +45,9 @@ import org.apache.druid.tests.TestNGGroup;
 import org.apache.druid.tests.indexer.AbstractITBatchIndexTest;
 import org.apache.druid.tests.indexer.AbstractIndexerTest;
 import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
 import org.joda.time.Period;
+import org.joda.time.chrono.ISOChronology;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Guice;
@@ -52,8 +58,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 @Test(groups = {TestNGGroup.COMPACTION})
@@ -160,7 +168,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       LOG.info("Auto compaction test with hash partitioning");
 
       final HashedPartitionsSpec hashedPartitionsSpec = new 
HashedPartitionsSpec(null, 3, null);
-      submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1);
+      submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null);
       // 2 segments published per day after compaction.
       forceTriggerAutoCompaction(4);
       verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -175,7 +183,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
           "city",
           false
       );
-      submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1);
+      submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null);
       forceTriggerAutoCompaction(2);
       verifyQuery(INDEX_QUERIES_RESOURCE);
       verifySegmentsCompacted(rangePartitionsSpec, 2);
@@ -278,6 +286,55 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
     }
   }
 
+  @Test
+  public void testAutoCompactionDutyWithSegmentGranularity() throws Exception
+  {
+    loadData(INDEX_TASK);
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      final List<String> intervalsBeforeCompaction = 
coordinator.getSegmentIntervals(fullDatasourceName);
+      intervalsBeforeCompaction.sort(null);
+      // 4 segments across 2 days (4 total)...
+      verifySegmentsCount(4);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+
+      Granularity newGranularity = Granularities.YEAR;
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UniformGranularitySpec(newGranularity, null, null));
+
+      LOG.info("Auto compaction test with YEAR segment granularity");
+
+      List<String> expectedIntervalAfterCompaction = new ArrayList<>();
+      for (String interval : intervalsBeforeCompaction) {
+        for (Interval newinterval : newGranularity.getIterable(new 
Interval(interval, ISOChronology.getInstanceUTC()))) {
+          expectedIntervalAfterCompaction.add(newinterval.toString());
+        }
+      }
+      forceTriggerAutoCompaction(1);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      verifySegmentsCompacted(1, 1000);
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+      newGranularity = Granularities.DAY;
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UniformGranularitySpec(newGranularity, null, null));
+
+      LOG.info("Auto compaction test with DAY segment granularity");
+
+      // The earlier segment with YEAR granularity is still 'used' as it’s not 
fully overshaowed.
+      // This is because we only have newer version on 2013-08-31 to 
2013-09-01 and 2013-09-01 to 2013-09-02.
+      // The version for the YEAR segment is still the latest for 2013-01-01 
to 2013-08-31 and 2013-09-02 to 2014-01-01.
+      // Hence, all three segments are available and the expected intervals 
are combined from the DAY and YEAR segment granularities
+      // (which are 2013-08-31 to 2013-09-01, 2013-09-01 to 2013-09-02 and 
2013-01-01 to 2014-01-01)
+      for (String interval : intervalsBeforeCompaction) {
+        for (Interval newinterval : newGranularity.getIterable(new 
Interval(interval, ISOChronology.getInstanceUTC()))) {
+          expectedIntervalAfterCompaction.add(newinterval.toString());
+        }
+      }
+      forceTriggerAutoCompaction(3);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      verifySegmentsCompacted(3, 1000);
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+    }
+  }
+
   private void loadData(String indexTask) throws Exception
   {
     String taskSpec = getResourceAsString(indexTask);
@@ -314,13 +371,19 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
   private void submitCompactionConfig(Integer maxRowsPerSegment, Period 
skipOffsetFromLatest) throws Exception
   {
-    submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), 
skipOffsetFromLatest, 1);
+    submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, null);
+  }
+
+  private void submitCompactionConfig(Integer maxRowsPerSegment, Period 
skipOffsetFromLatest, GranularitySpec granularitySpec) throws Exception
+  {
+    submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), 
skipOffsetFromLatest, 1, granularitySpec);
   }
 
   private void submitCompactionConfig(
       PartitionsSpec partitionsSpec,
       Period skipOffsetFromLatest,
-      int maxNumConcurrentSubTasks
+      int maxNumConcurrentSubTasks,
+      GranularitySpec granularitySpec
   ) throws Exception
   {
     DataSourceCompactionConfig compactionConfig = new 
DataSourceCompactionConfig(
@@ -348,6 +411,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
             null,
             1
         ),
+        granularitySpec,
         null
     );
     compactionResource.submitCompactionConfig(compactionConfig);
@@ -415,11 +479,13 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
   private void checkCompactionIntervals(List<String> expectedIntervals)
   {
+    Set<String> expectedIntervalsSet = new HashSet<>(expectedIntervals);
     ITRetryUtil.retryUntilTrue(
         () -> {
-          final List<String> actualIntervals = 
coordinator.getSegmentIntervals(fullDatasourceName);
-          actualIntervals.sort(null);
-          return actualIntervals.equals(expectedIntervals);
+          final Set<String> actualIntervals = new 
HashSet<>(coordinator.getSegmentIntervals(fullDatasourceName));
+          System.out.println("ACTUAL: " + actualIntervals);
+          System.out.println("EXPECTED: " + expectedIntervalsSet);
+          return actualIntervals.equals(expectedIntervalsSet);
         },
         "Compaction interval check"
     );
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index ed7b44e..f4ee559 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -23,11 +23,14 @@ import com.google.inject.Inject;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.GranularityType;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.guice.DruidTestModuleFactory;
 import org.apache.druid.testing.utils.ITRetryUtil;
 import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.Interval;
+import org.joda.time.chrono.ISOChronology;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
@@ -37,7 +40,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 @Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE})
 @Guice(moduleFactory = DruidTestModuleFactory.class)
@@ -49,6 +55,8 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
   private static final String INDEX_DATASOURCE = "wikipedia_index_test";
 
   private static final String COMPACTION_TASK = 
"/indexer/wikipedia_compaction_task.json";
+  private static final String COMPACTION_TASK_WITH_SEGMENT_GRANULARITY = 
"/indexer/wikipedia_compaction_task_with_segment_granularity.json";
+  private static final String COMPACTION_TASK_WITH_GRANULARITY_SPEC = 
"/indexer/wikipedia_compaction_task_with_granularity_spec.json";
 
   private static final String INDEX_TASK_WITH_TIMESTAMP = 
"/indexer/wikipedia_with_timestamp_index_task.json";
 
@@ -66,24 +74,41 @@ public class ITCompactionTaskTest extends 
AbstractIndexerTest
   @Test
   public void testCompaction() throws Exception
   {
-    loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE);
+    loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK, 
null);
+  }
+
+  @Test
+  public void testCompactionWithSegmentGranularity() throws Exception
+  {
+    loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, 
COMPACTION_TASK_WITH_SEGMENT_GRANULARITY, GranularityType.MONTH);
+  }
+
+  @Test
+  public void testCompactionWithGranularitySpec() throws Exception
+  {
+    loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, 
COMPACTION_TASK_WITH_GRANULARITY_SPEC, GranularityType.MONTH);
   }
 
   @Test
   public void testCompactionWithTimestampDimension() throws Exception
   {
-    loadDataAndCompact(INDEX_TASK_WITH_TIMESTAMP, INDEX_QUERIES_RESOURCE);
+    loadDataAndCompact(INDEX_TASK_WITH_TIMESTAMP, INDEX_QUERIES_RESOURCE, 
COMPACTION_TASK, null);
   }
 
-  private void loadDataAndCompact(String indexTask, String queriesResource) 
throws Exception
+  private void loadDataAndCompact(
+      String indexTask,
+      String queriesResource,
+      String compactionResource,
+      GranularityType newSegmentGranularity
+  ) throws Exception
   {
     loadData(indexTask);
 
     // 4 segments across 2 days
     checkNumberOfSegments(4);
 
-    final List<String> intervalsBeforeCompaction = 
coordinator.getSegmentIntervals(fullDatasourceName);
-    intervalsBeforeCompaction.sort(null);
+    List<String> expectedIntervalAfterCompaction = 
coordinator.getSegmentIntervals(fullDatasourceName);
+    expectedIntervalAfterCompaction.sort(null);
     try (final Closeable ignored = unloader(fullDatasourceName)) {
       String queryResponseTemplate;
       try {
@@ -102,12 +127,23 @@ public class ITCompactionTaskTest extends 
AbstractIndexerTest
 
 
       queryHelper.testQueriesFromString(queryResponseTemplate);
-      compactData();
+      compactData(compactionResource, newSegmentGranularity);
 
       // The original 4 segments should be compacted into 2 new segments
       checkNumberOfSegments(2);
       queryHelper.testQueriesFromString(queryResponseTemplate);
-      checkCompactionIntervals(intervalsBeforeCompaction);
+
+
+      if (newSegmentGranularity != null) {
+        List<String> newIntervals = new ArrayList<>();
+        for (String interval : expectedIntervalAfterCompaction) {
+          for (Interval newinterval : 
newSegmentGranularity.getDefaultGranularity().getIterable(new 
Interval(interval, ISOChronology.getInstanceUTC()))) {
+            newIntervals.add(newinterval.toString());
+          }
+        }
+        expectedIntervalAfterCompaction = newIntervals;
+      }
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
     }
   }
   private void loadData(String indexTask) throws Exception
@@ -124,12 +160,19 @@ public class ITCompactionTaskTest extends 
AbstractIndexerTest
     );
   }
 
-  private void compactData() throws Exception
+  private void compactData(String compactionResource, GranularityType 
newSegmentGranularity) throws Exception
   {
-    final String template = getResourceAsString(COMPACTION_TASK);
-    String taskSpec = StringUtils.replace(template, "%%DATASOURCE%%", 
fullDatasourceName);
+    String template = getResourceAsString(compactionResource);
+    template = StringUtils.replace(template, "%%DATASOURCE%%", 
fullDatasourceName);
+    if (newSegmentGranularity != null) {
+      template = StringUtils.replace(
+          template,
+          "%%SEGMENTGRANULARITY%%",
+          newSegmentGranularity.name()
+      );
+    }
 
-    final String taskID = indexer.submitTask(taskSpec);
+    final String taskID = indexer.submitTask(template);
     LOG.info("TaskID for compaction task %s", taskID);
     indexer.waitUntilTaskCompletes(taskID);
 
@@ -153,13 +196,13 @@ public class ITCompactionTaskTest extends 
AbstractIndexerTest
 
   private void checkCompactionIntervals(List<String> expectedIntervals)
   {
+    Set<String> expectedIntervalsSet = new HashSet<>(expectedIntervals);
     ITRetryUtil.retryUntilTrue(
         () -> {
-          final List<String> intervalsAfterCompaction = 
coordinator.getSegmentIntervals(fullDatasourceName);
-          intervalsAfterCompaction.sort(null);
-          System.out.println("AFTER: " + intervalsAfterCompaction);
-          System.out.println("EXPECTED: " + expectedIntervals);
-          return intervalsAfterCompaction.equals(expectedIntervals);
+          final Set<String> intervalsAfterCompaction = new 
HashSet<>(coordinator.getSegmentIntervals(fullDatasourceName));
+          System.out.println("ACTUAL: " + intervalsAfterCompaction);
+          System.out.println("EXPECTED: " + expectedIntervalsSet);
+          return intervalsAfterCompaction.equals(expectedIntervalsSet);
         },
         "Compaction interval check"
     );
diff --git 
a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_granularity_spec.json
 
b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_granularity_spec.json
new file mode 100644
index 0000000..828579b
--- /dev/null
+++ 
b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_granularity_spec.json
@@ -0,0 +1,17 @@
+{
+  "type" : "compact",
+  "dataSource" : "%%DATASOURCE%%",
+  "ioConfig" : {
+    "type": "compact",
+    "inputSpec": {
+      "type": "interval",
+      "interval": "2013-08-31/2013-09-02"
+    }
+  },
+  "granularitySpec": {
+    "segmentGranularity": "%%SEGMENTGRANULARITY%%"
+  },
+  "context" : {
+    "storeCompactionState" : true
+  }
+}
\ No newline at end of file
diff --git 
a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_segment_granularity.json
 
b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_segment_granularity.json
new file mode 100644
index 0000000..254926f
--- /dev/null
+++ 
b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_segment_granularity.json
@@ -0,0 +1,15 @@
+{
+  "type" : "compact",
+  "dataSource" : "%%DATASOURCE%%",
+  "ioConfig" : {
+    "type": "compact",
+    "inputSpec": {
+      "type": "interval",
+      "interval": "2013-08-31/2013-09-02"
+    }
+  },
+  "segmentGranularity": "%%SEGMENTGRANULARITY%%",
+  "context" : {
+    "storeCompactionState" : true
+  }
+}
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
index d0dd175..ea41555 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
@@ -38,6 +38,7 @@ public class ClientCompactionTaskQuery implements 
ClientTaskQuery
   private final String dataSource;
   private final ClientCompactionIOConfig ioConfig;
   private final ClientCompactionTaskQueryTuningConfig tuningConfig;
+  private final ClientCompactionTaskQueryGranularitySpec granularitySpec;
   private final Map<String, Object> context;
 
   @JsonCreator
@@ -46,6 +47,7 @@ public class ClientCompactionTaskQuery implements 
ClientTaskQuery
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig,
       @JsonProperty("tuningConfig") ClientCompactionTaskQueryTuningConfig 
tuningConfig,
+      @JsonProperty("granularitySpec") 
ClientCompactionTaskQueryGranularitySpec granularitySpec,
       @JsonProperty("context") Map<String, Object> context
   )
   {
@@ -53,6 +55,7 @@ public class ClientCompactionTaskQuery implements 
ClientTaskQuery
     this.dataSource = dataSource;
     this.ioConfig = ioConfig;
     this.tuningConfig = tuningConfig;
+    this.granularitySpec = granularitySpec;
     this.context = context;
   }
 
@@ -90,11 +93,18 @@ public class ClientCompactionTaskQuery implements 
ClientTaskQuery
   }
 
   @JsonProperty
+  public ClientCompactionTaskQueryGranularitySpec getGranularitySpec()
+  {
+    return granularitySpec;
+  }
+
+  @JsonProperty
   public Map<String, Object> getContext()
   {
     return context;
   }
 
+
   @Override
   public boolean equals(Object o)
   {
@@ -109,13 +119,14 @@ public class ClientCompactionTaskQuery implements 
ClientTaskQuery
            Objects.equals(dataSource, that.dataSource) &&
            Objects.equals(ioConfig, that.ioConfig) &&
            Objects.equals(tuningConfig, that.tuningConfig) &&
+           Objects.equals(granularitySpec, that.granularitySpec) &&
            Objects.equals(context, that.context);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(id, dataSource, ioConfig, tuningConfig, context);
+    return Objects.hash(id, dataSource, ioConfig, tuningConfig, 
granularitySpec, context);
   }
 
   @Override
@@ -126,6 +137,7 @@ public class ClientCompactionTaskQuery implements 
ClientTaskQuery
            ", dataSource='" + dataSource + '\'' +
            ", ioConfig=" + ioConfig +
            ", tuningConfig=" + tuningConfig +
+           ", granularitySpec=" + granularitySpec +
            ", context=" + context +
            '}';
   }
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryGranularitySpec.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryGranularitySpec.java
new file mode 100644
index 0000000..6f12ea7
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryGranularitySpec.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.segment.indexing.granularity.BaseGranularitySpec;
+
+import java.util.Objects;
+
+public class ClientCompactionTaskQueryGranularitySpec
+{
+  private final Granularity segmentGranularity;
+  private final Granularity queryGranularity;
+  private final boolean rollup;
+
+  @JsonCreator
+  public ClientCompactionTaskQueryGranularitySpec(
+      @JsonProperty("segmentGranularity") Granularity segmentGranularity,
+      @JsonProperty("queryGranularity") Granularity queryGranularity,
+      @JsonProperty("rollup") Boolean rollup
+  )
+  {
+    this.queryGranularity = queryGranularity == null ? 
BaseGranularitySpec.DEFAULT_QUERY_GRANULARITY : queryGranularity;
+    this.rollup = rollup == null ? BaseGranularitySpec.DEFAULT_ROLLUP : rollup;
+    this.segmentGranularity = segmentGranularity == null ? 
BaseGranularitySpec.DEFAULT_SEGMENT_GRANULARITY : segmentGranularity;
+  }
+
+  @JsonProperty
+  public Granularity getSegmentGranularity()
+  {
+    return segmentGranularity;
+  }
+
+  @JsonProperty
+  public Granularity getQueryGranularity()
+  {
+    return queryGranularity;
+  }
+
+  @JsonProperty
+  public boolean isRollup()
+  {
+    return rollup;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "ClientCompactionTaskQueryGranularitySpec{" +
+           "segmentGranularity=" + segmentGranularity +
+           ", queryGranularity=" + queryGranularity +
+           ", rollup=" + rollup +
+           '}';
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ClientCompactionTaskQueryGranularitySpec that = 
(ClientCompactionTaskQueryGranularitySpec) o;
+    return Objects.equals(segmentGranularity, that.segmentGranularity) &&
+           Objects.equals(queryGranularity, that.queryGranularity) &&
+           Objects.equals(rollup, that.rollup);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(segmentGranularity, queryGranularity, rollup);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
 
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 2b6e47b..321831f 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -78,7 +78,8 @@ public class HttpIndexingServiceClient implements 
IndexingServiceClient
       String idPrefix,
       List<DataSegment> segments,
       int compactionTaskPriority,
-      ClientCompactionTaskQueryTuningConfig tuningConfig,
+      @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
+      @Nullable ClientCompactionTaskQueryGranularitySpec granularitySpec,
       @Nullable Map<String, Object> context
   )
   {
@@ -99,6 +100,7 @@ public class HttpIndexingServiceClient implements 
IndexingServiceClient
         dataSource,
         new 
ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)),
         tuningConfig,
+        granularitySpec,
         context
     );
     return runTask(taskId, taskQuery);
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
 
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index 14d4359..8eb8034 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -40,6 +40,7 @@ public interface IndexingServiceClient
       List<DataSegment> segments,
       int compactionTaskPriority,
       @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
+      @Nullable ClientCompactionTaskQueryGranularitySpec granularitySpec,
       @Nullable Map<String, Object> context
   );
 
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java
 
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java
index 0f97749..8bf6396 100644
--- 
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java
+++ 
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java
@@ -143,5 +143,4 @@ public class ArbitraryGranularitySpec extends 
BaseGranularitySpec
   {
     return lookupTableBucketByDateTime;
   }
-
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
 
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
index 9ff114b..779952b 100644
--- 
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
+++ 
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
@@ -20,10 +20,14 @@
 package org.apache.druid.segment.indexing.granularity;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
@@ -31,10 +35,15 @@ import org.joda.time.Interval;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.TreeSet;
 
-abstract class BaseGranularitySpec implements GranularitySpec
+public abstract class BaseGranularitySpec implements GranularitySpec
 {
+  public static final Boolean DEFAULT_ROLLUP = Boolean.TRUE;
+  public static final Granularity DEFAULT_SEGMENT_GRANULARITY = 
Granularities.DAY;
+  public static final Granularity DEFAULT_QUERY_GRANULARITY = 
Granularities.NONE;
+
   protected List<Interval> inputIntervals;
   protected final Boolean rollup;
 
@@ -45,7 +54,7 @@ abstract class BaseGranularitySpec implements GranularitySpec
     } else {
       this.inputIntervals = Collections.emptyList();
     }
-    this.rollup = rollup == null ? Boolean.TRUE : rollup;
+    this.rollup = rollup == null ? DEFAULT_ROLLUP : rollup;
   }
 
   @Override
@@ -76,6 +85,15 @@ abstract class BaseGranularitySpec implements GranularitySpec
 
   protected abstract LookupIntervalBuckets getLookupTableBuckets();
 
+  @Override
+  public Map<String, Object> asMap(ObjectMapper objectMapper)
+  {
+    return objectMapper.convertValue(
+        this,
+        new TypeReference<Map<String, Object>>() {}
+    );
+  }
+
   /**
    * This is a helper class to facilitate sharing the code for 
sortedBucketIntervals among
    * the various GranularitySpec implementations. In particular, the 
UniformGranularitySpec
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
 
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
index c3bb579..148f039 100644
--- 
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
+++ 
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
@@ -21,12 +21,14 @@ package org.apache.druid.segment.indexing.granularity;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import java.util.List;
+import java.util.Map;
 import java.util.TreeSet;
 
 /**
@@ -79,4 +81,6 @@ public interface GranularitySpec
   Granularity getQueryGranularity();
 
   GranularitySpec withIntervals(List<Interval> inputIntervals);
+
+  Map<String, Object> asMap(ObjectMapper objectMapper);
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
 
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
index 5fe6a9e..cd7c7a4 100644
--- 
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
+++ 
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
@@ -21,7 +21,6 @@ package org.apache.druid.segment.indexing.granularity;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
 import org.joda.time.Interval;
@@ -30,9 +29,6 @@ import java.util.List;
 
 public class UniformGranularitySpec extends BaseGranularitySpec
 {
-  private static final Granularity DEFAULT_SEGMENT_GRANULARITY = 
Granularities.DAY;
-  private static final Granularity DEFAULT_QUERY_GRANULARITY = 
Granularities.NONE;
-
   private final Granularity segmentGranularity;
   private final Granularity queryGranularity;
   private final IntervalsByGranularity intervalsByGranularity;
@@ -144,5 +140,4 @@ public class UniformGranularitySpec extends 
BaseGranularitySpec
   {
     return lookupTableBucketByDateTime;
   }
-
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index c5a1263..692eb81 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -22,6 +22,9 @@ package org.apache.druid.server.coordinator;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import org.apache.druid.segment.indexing.granularity.BaseGranularitySpec;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
@@ -46,6 +49,7 @@ public class DataSourceCompactionConfig
   private final Integer maxRowsPerSegment;
   private final Period skipOffsetFromLatest;
   private final UserCompactionTaskQueryTuningConfig tuningConfig;
+  private final GranularitySpec granularitySpec;
   private final Map<String, Object> taskContext;
 
   @JsonCreator
@@ -56,6 +60,7 @@ public class DataSourceCompactionConfig
       @JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer 
maxRowsPerSegment,
       @JsonProperty("skipOffsetFromLatest") @Nullable Period 
skipOffsetFromLatest,
       @JsonProperty("tuningConfig") @Nullable 
UserCompactionTaskQueryTuningConfig tuningConfig,
+      @JsonProperty("granularitySpec") @Nullable GranularitySpec 
granularitySpec,
       @JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
   )
   {
@@ -69,6 +74,24 @@ public class DataSourceCompactionConfig
     this.maxRowsPerSegment = maxRowsPerSegment;
     this.skipOffsetFromLatest = skipOffsetFromLatest == null ? 
DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest;
     this.tuningConfig = tuningConfig;
+    if (granularitySpec != null) {
+      Preconditions.checkArgument(
+          granularitySpec instanceof UniformGranularitySpec,
+          "Auto compaction granularitySpec only supports uniform type"
+      );
+      Preconditions.checkArgument(
+          granularitySpec.isRollup() == BaseGranularitySpec.DEFAULT_ROLLUP,
+          "Auto compaction granularitySpec only supports default rollup value"
+      );
+      Preconditions.checkArgument(
+          
granularitySpec.getQueryGranularity().equals(BaseGranularitySpec.DEFAULT_QUERY_GRANULARITY),
+          "Auto compaction granularitySpec only supports default query 
granularity value");
+      Preconditions.checkArgument(
+          granularitySpec.inputIntervals().isEmpty(),
+          "Auto compaction granularitySpec does not supports interval value"
+      );
+    }
+    this.granularitySpec = granularitySpec;
     this.taskContext = taskContext;
   }
 
@@ -113,6 +136,13 @@ public class DataSourceCompactionConfig
 
   @JsonProperty
   @Nullable
+  public GranularitySpec getGranularitySpec()
+  {
+    return granularitySpec;
+  }
+
+  @JsonProperty
+  @Nullable
   public Map<String, Object> getTaskContext()
   {
     return taskContext;
@@ -131,8 +161,10 @@ public class DataSourceCompactionConfig
     return taskPriority == that.taskPriority &&
            inputSegmentSizeBytes == that.inputSegmentSizeBytes &&
            Objects.equals(dataSource, that.dataSource) &&
+           Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
            Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) &&
            Objects.equals(tuningConfig, that.tuningConfig) &&
+           Objects.equals(granularitySpec, that.granularitySpec) &&
            Objects.equals(taskContext, that.taskContext);
   }
 
@@ -143,8 +175,10 @@ public class DataSourceCompactionConfig
         dataSource,
         taskPriority,
         inputSegmentSizeBytes,
+        maxRowsPerSegment,
         skipOffsetFromLatest,
         tuningConfig,
+        granularitySpec,
         taskContext
     );
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 3b7ee31..dcd7fab 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -24,12 +24,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.google.inject.Inject;
 import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import 
org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.TaskPayloadResponse;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
 import org.apache.druid.server.coordinator.CompactionStatistics;
@@ -123,8 +125,27 @@ public class CompactSegments implements CoordinatorDuty
           }
           if (COMPACTION_TASK_TYPE.equals(response.getPayload().getType())) {
             final ClientCompactionTaskQuery compactionTaskQuery = 
(ClientCompactionTaskQuery) response.getPayload();
+            DataSourceCompactionConfig dataSourceCompactionConfig = 
compactionConfigs.get(status.getDataSource());
+            if (dataSourceCompactionConfig != null && 
dataSourceCompactionConfig.getGranularitySpec() != null) {
+              Granularity configuredSegmentGranularity = 
dataSourceCompactionConfig.getGranularitySpec().getSegmentGranularity();
+              if (configuredSegmentGranularity != null
+                  && compactionTaskQuery.getGranularitySpec() != null
+                  && 
!configuredSegmentGranularity.equals(compactionTaskQuery.getGranularitySpec().getSegmentGranularity()))
 {
+                // We will cancel active compaction task if segmentGranularity 
changes and we will need to
+                // re-compact the interval
+                LOG.info("Canceled task[%s] as task segmentGranularity is [%s] 
but compaction config "
+                         + "segmentGranularity is [%s]",
+                         status.getId(),
+                         
compactionTaskQuery.getGranularitySpec().getSegmentGranularity(),
+                         configuredSegmentGranularity);
+                indexingServiceClient.cancelTask(status.getId());
+                continue;
+              }
+            }
+            // Skip interval as the current active compaction task is good
             final Interval interval = 
compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
             compactionTaskIntervals.computeIfAbsent(status.getDataSource(), k 
-> new ArrayList<>()).add(interval);
+            // Since we keep the current active compaction task running, we 
count the active task slots
             numEstimatedNonCompleteCompactionTasks += 
findMaxNumTaskSlotsUsedByOneCompactionTask(
                 compactionTaskQuery.getTuningConfig()
             );
@@ -289,12 +310,24 @@ public class CompactSegments implements CoordinatorDuty
         
snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
 
         final DataSourceCompactionConfig config = 
compactionConfigs.get(dataSourceName);
+        ClientCompactionTaskQueryGranularitySpec queryGranularitySpec;
+        if (config.getGranularitySpec() != null) {
+          queryGranularitySpec = new ClientCompactionTaskQueryGranularitySpec(
+              config.getGranularitySpec().getSegmentGranularity(),
+              config.getGranularitySpec().getQueryGranularity(),
+              config.getGranularitySpec().isRollup()
+          );
+        } else {
+          queryGranularitySpec = null;
+        }
+
         // make tuningConfig
         final String taskId = indexingServiceClient.compactSegments(
             "coordinator-issued",
             segmentsToCompact,
             config.getTaskPriority(),
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment()),
+            queryGranularitySpec,
             newAutoCompactionContext(config.getTaskContext())
         );
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
index c03c4b4..90bfb66 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
@@ -28,19 +28,27 @@ import 
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.Partitions;
+import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.timeline.partition.NumberedPartitionChunk;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.PartitionChunk;
+import org.apache.druid.timeline.partition.ShardSpec;
 import org.apache.druid.utils.Streams;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
@@ -51,12 +59,14 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.PriorityQueue;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -75,6 +85,11 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
   // searchIntervals keeps track of the current state of which interval should 
be considered to search segments to
   // compact.
   private final Map<String, CompactibleTimelineObjectHolderCursor> 
timelineIterators;
+  // This is needed for datasource that has segmentGranularity configured
+  // If configured segmentGranularity in config is finer than current 
segmentGranularity, the same set of segments
+  // can belong to multiple intervals in the timeline. We keep track of the 
compacted intervals between each
+  // run of the compaction job and skip any interval that was already 
previously compacted.
+  private final Map<String, Set<Interval>> intervalCompactedForDatasource = 
new HashMap<>();
 
   private final PriorityQueue<QueueEntry> queue = new PriorityQueue<>(
       (o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o2.interval, 
o1.interval)
@@ -93,12 +108,53 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
 
     dataSources.forEach((String dataSource, VersionedIntervalTimeline<String, 
DataSegment> timeline) -> {
       final DataSourceCompactionConfig config = 
compactionConfigs.get(dataSource);
-
+      Granularity configuredSegmentGranularity = null;
       if (config != null && !timeline.isEmpty()) {
+        Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs = new 
HashMap<>();
+        if (config.getGranularitySpec() != null && 
config.getGranularitySpec().getSegmentGranularity() != null) {
+          Map<Interval, Set<DataSegment>> intervalToPartitionMap = new 
HashMap<>();
+          configuredSegmentGranularity = 
config.getGranularitySpec().getSegmentGranularity();
+          // Create a new timeline to hold segments in the new configured 
segment granularity
+          VersionedIntervalTimeline<String, DataSegment> 
timelineWithConfiguredSegmentGranularity = new 
VersionedIntervalTimeline<>(Comparator.naturalOrder());
+          Set<DataSegment> segments = 
timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, 
Partitions.ONLY_COMPLETE);
+          for (DataSegment segment : segments) {
+            // Convert original segmentGranularity to new granularities bucket 
by configuredSegmentGranularity
+            // For example, if the original is interval of 
2020-01-28/2020-02-03 with WEEK granularity
+            // and the configuredSegmentGranularity is MONTH, the segment will 
be split to two segments
+            // of 2020-01/2020-02 and 2020-02/2020-03.
+            for (Interval interval : 
configuredSegmentGranularity.getIterable(segment.getInterval())) {
+              intervalToPartitionMap.computeIfAbsent(interval, k -> new 
HashSet<>()).add(segment);
+            }
+          }
+          for (Map.Entry<Interval, Set<DataSegment>> partitionsPerInterval : 
intervalToPartitionMap.entrySet()) {
+            Interval interval = partitionsPerInterval.getKey();
+            int partitionNum = 0;
+            Set<DataSegment> segmentSet = partitionsPerInterval.getValue();
+            int partitions = segmentSet.size();
+            for (DataSegment segment : segmentSet) {
+              DataSegment segmentsForCompact = segment.withShardSpec(new 
NumberedShardSpec(partitionNum, partitions));
+              // PartitionHolder can only holds chunks of one partition space
+              // However, partition in the new timeline 
(timelineWithConfiguredSegmentGranularity) can be hold multiple
+              // partitions of the original timeline (when the new 
segmentGranularity is larger than the original
+              // segmentGranularity). Hence, we group all the segments of the 
original timeline into intervals bucket
+              // by the new configuredSegmentGranularity. We then convert each 
segment into a new partition space so that
+              // there is no duplicate partitionNum across all segments of 
each new Interval. We will have to save the
+              // original ShardSpec to convert the segment back when returning 
from the iterator.
+              originalShardSpecs.put(new Pair<>(interval, 
segmentsForCompact.getId()), segment.getShardSpec());
+              timelineWithConfiguredSegmentGranularity.add(
+                  interval,
+                  segmentsForCompact.getVersion(),
+                  NumberedPartitionChunk.make(partitionNum, partitions, 
segmentsForCompact)
+              );
+              partitionNum += 1;
+            }
+          }
+          timeline = timelineWithConfiguredSegmentGranularity;
+        }
         final List<Interval> searchIntervals =
-            findInitialSearchInterval(timeline, 
config.getSkipOffsetFromLatest(), skipIntervals.get(dataSource));
+            findInitialSearchInterval(timeline, 
config.getSkipOffsetFromLatest(), configuredSegmentGranularity, 
skipIntervals.get(dataSource));
         if (!searchIntervals.isEmpty()) {
-          timelineIterators.put(dataSource, new 
CompactibleTimelineObjectHolderCursor(timeline, searchIntervals));
+          timelineIterators.put(dataSource, new 
CompactibleTimelineObjectHolderCursor(timeline, searchIntervals, 
originalShardSpecs));
         }
       }
     });
@@ -187,10 +243,12 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
   private static class CompactibleTimelineObjectHolderCursor implements 
Iterator<List<DataSegment>>
   {
     private final List<TimelineObjectHolder<String, DataSegment>> holders;
+    private final Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs;
 
     CompactibleTimelineObjectHolderCursor(
         VersionedIntervalTimeline<String, DataSegment> timeline,
-        List<Interval> totalIntervalsToSearch
+        List<Interval> totalIntervalsToSearch,
+        Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs
     )
     {
       this.holders = totalIntervalsToSearch
@@ -201,6 +259,7 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
               .filter(holder -> isCompactibleHolder(interval, holder))
           )
           .collect(Collectors.toList());
+      this.originalShardSpecs = originalShardSpecs;
     }
 
     private boolean isCompactibleHolder(Interval interval, 
TimelineObjectHolder<String, DataSegment> holder)
@@ -220,6 +279,14 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
       return partitionBytes > 0;
     }
 
+    private DataSegment transformShardSpecIfNeeded(DataSegment dataSegment, 
Interval interval)
+    {
+      if (originalShardSpecs != null && !originalShardSpecs.isEmpty()) {
+        return dataSegment.withShardSpec(originalShardSpecs.get(new 
Pair<>(interval, dataSegment.getId())));
+      }
+      return dataSegment;
+    }
+
     @Override
     public boolean hasNext()
     {
@@ -232,8 +299,10 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
       if (holders.isEmpty()) {
         throw new NoSuchElementException();
       }
-      return Streams.sequentialStreamFrom(holders.remove(holders.size() - 
1).getObject())
+      TimelineObjectHolder<String, DataSegment> timelineObjectHolder = 
holders.remove(holders.size() - 1);
+      return Streams.sequentialStreamFrom(timelineObjectHolder.getObject())
                     .map(PartitionChunk::getObject)
+                    .map(dataSegment -> 
transformShardSpecIfNeeded(dataSegment, timelineObjectHolder.getTrueInterval()))
                     .collect(Collectors.toList());
     }
   }
@@ -257,10 +326,11 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
     }
   }
 
-  private boolean needsCompaction(ClientCompactionTaskQueryTuningConfig 
tuningConfig, SegmentsToCompact candidates)
+  private boolean needsCompaction(DataSourceCompactionConfig config, 
SegmentsToCompact candidates)
   {
     Preconditions.checkState(!candidates.isEmpty(), "Empty candidates");
-
+    final ClientCompactionTaskQueryTuningConfig tuningConfig =
+        ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment());
     final PartitionsSpec partitionsSpecFromConfig = 
findPartitinosSpecFromConfig(tuningConfig);
     final CompactionState lastCompactionState = 
candidates.segments.get(0).getLastCompactionState();
     if (lastCompactionState == null) {
@@ -314,6 +384,22 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
       needsCompaction = true;
     }
 
+    // Only checks for segmentGranularity as auto compaction currently only 
supports segmentGranularity
+    final Granularity segmentGranularity = 
lastCompactionState.getGranularitySpec() != null ?
+                                           
objectMapper.convertValue(lastCompactionState.getGranularitySpec(), 
GranularitySpec.class).getSegmentGranularity() :
+                                           null;
+
+    if (config.getGranularitySpec() != null &&
+        config.getGranularitySpec().getSegmentGranularity() != null &&
+        
!config.getGranularitySpec().getSegmentGranularity().equals(segmentGranularity))
 {
+      log.info(
+          "Configured granularitySpec[%s] is different from the one[%s] of 
segments. Needs compaction",
+          config.getGranularitySpec(),
+          segmentGranularity
+      );
+      needsCompaction = true;
+    }
+
     return needsCompaction;
   }
 
@@ -334,16 +420,25 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
     final long inputSegmentSize = config.getInputSegmentSizeBytes();
 
     while (compactibleTimelineObjectHolderCursor.hasNext()) {
-      final SegmentsToCompact candidates = new 
SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
-
+      List<DataSegment> segments = 
compactibleTimelineObjectHolderCursor.next();
+      final SegmentsToCompact candidates = new SegmentsToCompact(segments);
       if (!candidates.isEmpty()) {
         final boolean isCompactibleSize = candidates.getTotalSize() <= 
inputSegmentSize;
         final boolean needsCompaction = needsCompaction(
-            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment()),
+            config,
             candidates
         );
 
         if (isCompactibleSize && needsCompaction) {
+          if (config.getGranularitySpec() != null && 
config.getGranularitySpec().getSegmentGranularity() != null) {
+            Interval interval = candidates.getUmbrellaInterval();
+            Set<Interval> intervalsCompacted = 
intervalCompactedForDatasource.computeIfAbsent(dataSourceName, k -> new 
HashSet<>());
+            // Skip this candidates if we have compacted the interval already
+            if (intervalsCompacted.contains(interval)) {
+              continue;
+            }
+            intervalsCompacted.add(interval);
+          }
           return candidates;
         } else {
           if (!needsCompaction) {
@@ -396,6 +491,7 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
   private static List<Interval> findInitialSearchInterval(
       VersionedIntervalTimeline<String, DataSegment> timeline,
       Period skipOffset,
+      Granularity configuredSegmentGranularity,
       @Nullable List<Interval> skipIntervals
   )
   {
@@ -407,6 +503,7 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
     final List<Interval> fullSkipIntervals = sortAndAddSkipIntervalFromLatest(
         last.getInterval().getEnd(),
         skipOffset,
+        configuredSegmentGranularity,
         skipIntervals
     );
 
@@ -447,19 +544,27 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
   static List<Interval> sortAndAddSkipIntervalFromLatest(
       DateTime latest,
       Period skipOffset,
+      Granularity configuredSegmentGranularity,
       @Nullable List<Interval> skipIntervals
   )
   {
     final List<Interval> nonNullSkipIntervals = skipIntervals == null
                                                 ? new ArrayList<>(1)
                                                 : new 
ArrayList<>(skipIntervals.size());
+    final Interval skipFromLatest;
+    if (configuredSegmentGranularity != null) {
+      DateTime skipFromLastest = new DateTime(latest, 
latest.getZone()).minus(skipOffset);
+      DateTime skipOffsetBucketToSegmentGranularity = 
configuredSegmentGranularity.bucketStart(skipFromLastest);
+      skipFromLatest = new Interval(skipOffsetBucketToSegmentGranularity, 
latest);
+    } else {
+      skipFromLatest = new Interval(skipOffset, latest);
+    }
 
     if (skipIntervals != null) {
       final List<Interval> sortedSkipIntervals = new 
ArrayList<>(skipIntervals);
       sortedSkipIntervals.sort(Comparators.intervalsByStartThenEnd());
 
       final List<Interval> overlapIntervals = new ArrayList<>();
-      final Interval skipFromLatest = new Interval(skipOffset, latest);
 
       for (Interval interval : sortedSkipIntervals) {
         if (interval.overlaps(skipFromLatest)) {
@@ -476,7 +581,6 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
         nonNullSkipIntervals.add(skipFromLatest);
       }
     } else {
-      final Interval skipFromLatest = new Interval(skipOffset, latest);
       nonNullSkipIntervals.add(skipFromLatest);
     }
 
@@ -579,6 +683,11 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
       return segments.size();
     }
 
+    private Interval getUmbrellaInterval()
+    {
+      return 
JodaUtils.umbrellaInterval(segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
+    }
+
     private long getNumberOfIntervals()
     {
       return 
segments.stream().map(DataSegment::getInterval).distinct().count();
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
 
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index 34800cc..09f4c69 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -50,6 +50,7 @@ public class NoopIndexingServiceClient implements 
IndexingServiceClient
       List<DataSegment> segments,
       int compactionTaskPriority,
       @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
+      @Nullable ClientCompactionTaskQueryGranularitySpec granularitySpec,
       @Nullable Map<String, Object> context
   )
   {
diff --git 
a/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java
 
b/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java
index a015422..25bf848 100644
--- 
a/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java
@@ -32,6 +32,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.List;
+import java.util.Map;
 
 public class ArbitraryGranularityTest
 {
@@ -218,6 +219,32 @@ public class ArbitraryGranularityTest
   }
 
   @Test
+  public void testAsMap()
+  {
+    final GranularitySpec spec = new 
ArbitraryGranularitySpec(Granularities.NONE, Lists.newArrayList(
+        Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
+        Intervals.of("2012-02-01T00Z/2012-03-01T00Z"),
+        Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
+        Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
+        Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
+    ));
+
+    Map<String, Object> map = spec.asMap(JSON_MAPPER);
+    final GranularitySpec rtSpec = JSON_MAPPER.convertValue(map, 
GranularitySpec.class);
+    Assert.assertEquals(
+        "Round-trip",
+        ImmutableList.copyOf(spec.sortedBucketIntervals()),
+        ImmutableList.copyOf(rtSpec.sortedBucketIntervals())
+    );
+    Assert.assertEquals(
+        "Round-trip",
+        ImmutableList.copyOf(spec.inputIntervals()),
+        ImmutableList.copyOf(rtSpec.inputIntervals())
+    );
+    Assert.assertEquals(spec, rtSpec);
+  }
+
+  @Test
   public void testNullInputIntervals()
   {
     final GranularitySpec spec = new 
ArbitraryGranularitySpec(Granularities.NONE, null);
diff --git 
a/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java
 
b/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java
index 27760f6..0c19724 100644
--- 
a/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java
@@ -38,10 +38,11 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 public class UniformGranularityTest
 {
-  private static final ObjectMapper JOSN_MAPPER = new DefaultObjectMapper();
+  private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
 
   @Test
   public void testSimple()
@@ -161,7 +162,7 @@ public class UniformGranularityTest
     );
 
     try {
-      final GranularitySpec rtSpec = 
JOSN_MAPPER.readValue(JOSN_MAPPER.writeValueAsString(spec), 
GranularitySpec.class);
+      final GranularitySpec rtSpec = 
JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(spec), 
GranularitySpec.class);
       Assert.assertEquals(
           "Round-trip sortedBucketIntervals",
           ImmutableList.copyOf(spec.sortedBucketIntervals()),
@@ -179,6 +180,34 @@ public class UniformGranularityTest
   }
 
   @Test
+  public void testAsMap()
+  {
+    final GranularitySpec spec = new UniformGranularitySpec(
+        Granularities.DAY,
+        null,
+        Lists.newArrayList(
+            Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
+            Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
+            Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
+            Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
+        )
+    );
+    Map<String, Object> map = spec.asMap(JSON_MAPPER);
+    final GranularitySpec rtSpec = JSON_MAPPER.convertValue(map, 
GranularitySpec.class);
+    Assert.assertEquals(
+        "Round-trip sortedBucketIntervals",
+        ImmutableList.copyOf(spec.sortedBucketIntervals()),
+        ImmutableList.copyOf(rtSpec.sortedBucketIntervals().iterator())
+    );
+    Assert.assertEquals(
+        "Round-trip granularity",
+        spec.getSegmentGranularity(),
+        rtSpec.getSegmentGranularity()
+    );
+    Assert.assertEquals(spec, rtSpec);
+  }
+
+  @Test
   public void testEquals()
   {
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index 96bfcc1..01cfabb 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -20,15 +20,20 @@
 package org.apache.druid.server.coordinator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.SegmentsSplitHintSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
 import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
 import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import org.joda.time.Duration;
 import org.joda.time.Period;
@@ -56,6 +61,7 @@ public class DataSourceCompactionConfigTest
         null,
         new Period(3600),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -68,6 +74,7 @@ public class DataSourceCompactionConfigTest
     Assert.assertEquals(config.getSkipOffsetFromLatest(), 
fromJson.getSkipOffsetFromLatest());
     Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
     Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+    Assert.assertEquals(config.getGranularitySpec(), 
fromJson.getGranularitySpec());
   }
 
   @Test
@@ -80,6 +87,7 @@ public class DataSourceCompactionConfigTest
         30,
         new Period(3600),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -122,6 +130,7 @@ public class DataSourceCompactionConfigTest
             null,
             null
         ),
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -164,6 +173,7 @@ public class DataSourceCompactionConfigTest
             null,
             null
         ),
+        null,
         ImmutableMap.of("key", "val")
     );
 
@@ -217,4 +227,92 @@ public class DataSourceCompactionConfigTest
         OBJECT_MAPPER.readValue(json, 
UserCompactionTaskQueryTuningConfig.class);
     Assert.assertEquals(tuningConfig, fromJson);
   }
+
+  @Test
+  public void testSerdeGranularitySpec() throws IOException
+  {
+    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        500L,
+        null,
+        new Period(3600),
+        null,
+        new UniformGranularitySpec(Granularities.HOUR, null, null),
+        ImmutableMap.of("key", "val")
+    );
+    final String json = OBJECT_MAPPER.writeValueAsString(config);
+    final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
+
+    Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+    Assert.assertEquals(25, fromJson.getTaskPriority());
+    Assert.assertEquals(config.getInputSegmentSizeBytes(), 
fromJson.getInputSegmentSizeBytes());
+    Assert.assertEquals(config.getMaxRowsPerSegment(), 
fromJson.getMaxRowsPerSegment());
+    Assert.assertEquals(config.getSkipOffsetFromLatest(), 
fromJson.getSkipOffsetFromLatest());
+    Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+    Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+    Assert.assertEquals(config.getGranularitySpec(), 
fromJson.getGranularitySpec());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testFailIfGranularitySpecContainsNonDefaultQueryGranularity()
+  {
+    new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        500L,
+        null,
+        new Period(3600),
+        null,
+        new UniformGranularitySpec(Granularities.HOUR, Granularities.MONTH, 
null),
+        ImmutableMap.of("key", "val")
+    );
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testFailIfGranularitySpecContainsNonDefaultRollup()
+  {
+    new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        500L,
+        null,
+        new Period(3600),
+        null,
+        new UniformGranularitySpec(Granularities.HOUR, Granularities.MONTH, 
false, null),
+        ImmutableMap.of("key", "val")
+    );
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testFailIfGranularitySpecContainsNonEmptyInterval()
+  {
+    new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        500L,
+        null,
+        new Period(3600),
+        null,
+        new UniformGranularitySpec(Granularities.HOUR, Granularities.MONTH, 
ImmutableList.of(Intervals.of("2012-01-08T00Z/2012-01-11T00Z"))),
+        ImmutableMap.of("key", "val")
+    );
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testFailIfGranularitySpecIsNotUniform()
+  {
+    new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        500L,
+        null,
+        new Period(3600),
+        null,
+        new ArbitraryGranularitySpec(null, null, null),
+        ImmutableMap.of("key", "val")
+    );
+  }
+
+
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index c8ef83d..c892810 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -28,16 +28,24 @@ import com.google.common.collect.Lists;
 import junitparams.converters.Nullable;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.druid.client.DataSourcesSnapshot;
+import org.apache.druid.client.indexing.ClientCompactionIOConfig;
+import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import 
org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.client.indexing.ClientTaskQuery;
 import org.apache.druid.client.indexing.HttpIndexingServiceClient;
 import org.apache.druid.client.indexing.IndexingWorker;
 import org.apache.druid.client.indexing.IndexingWorkerInfo;
+import org.apache.druid.client.indexing.TaskPayloadResponse;
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.discovery.DruidNodeDiscovery;
 import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
 import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -47,8 +55,10 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.http.client.Request;
 import 
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
@@ -81,6 +91,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
 
 import java.io.IOException;
@@ -89,6 +101,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiFunction;
@@ -101,6 +114,7 @@ public class CompactSegmentsTest
 {
   private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
   private static final String DATA_SOURCE_PREFIX = "dataSource_";
+  private static final int PARTITION_PER_TIME_INTERVAL = 4;
   // Each dataSource starts with 440 byte, 44 segments, and 11 intervals 
needing compaction
   private static final int TOTAL_BYTE_PER_DATASOURCE = 440;
   private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44;
@@ -144,6 +158,7 @@ public class CompactSegmentsTest
   private final BiFunction<Integer, Integer, ShardSpec> shardSpecFactory;
 
   private Map<String, VersionedIntervalTimeline<String, DataSegment>> 
dataSources;
+  Map<String, List<DataSegment>> datasourceToSegments = new HashMap<>();
 
   public CompactSegmentsTest(PartitionsSpec partitionsSpec, 
BiFunction<Integer, Integer, ShardSpec> shardSpecFactory)
   {
@@ -154,18 +169,23 @@ public class CompactSegmentsTest
   @Before
   public void setup()
   {
-    List<DataSegment> segments = new ArrayList<>();
+    List<DataSegment> allSegments = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
       final String dataSource = DATA_SOURCE_PREFIX + i;
       for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
-        for (int k = 0; k < 4; k++) {
-          segments.add(createSegment(dataSource, j, true, k));
-          segments.add(createSegment(dataSource, j, false, k));
+        for (int k = 0; k < PARTITION_PER_TIME_INTERVAL; k++) {
+          List<DataSegment> segmentForDatasource = 
datasourceToSegments.computeIfAbsent(dataSource, key -> new ArrayList<>());
+          DataSegment dataSegment = createSegment(dataSource, j, true, k);
+          allSegments.add(dataSegment);
+          segmentForDatasource.add(dataSegment);
+          dataSegment = createSegment(dataSource, j, false, k);
+          allSegments.add(dataSegment);
+          segmentForDatasource.add(dataSegment);
         }
       }
     }
     dataSources = DataSourcesSnapshot
-        .fromUsedSegments(segments, ImmutableMap.of())
+        .fromUsedSegments(allSegments, ImmutableMap.of())
         .getUsedSegmentsTimelinesPerDataSource();
   }
 
@@ -351,17 +371,17 @@ public class CompactSegmentsTest
     String dataSourceName = DATA_SOURCE_PREFIX + 1;
     List<DataSegment> segments = new ArrayList<>();
     for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
-      for (int k = 0; k < 4; k++) {
+      for (int k = 0; k < PARTITION_PER_TIME_INTERVAL; k++) {
         DataSegment beforeNoon = createSegment(dataSourceName, j, true, k);
         DataSegment afterNoon = createSegment(dataSourceName, j, false, k);
         if (j == 3) {
           // Make two intervals on this day compacted (two compacted intervals 
back-to-back)
-          beforeNoon = beforeNoon.withLastCompactionState(new 
CompactionState(partitionsSpec, ImmutableMap.of()));
-          afterNoon = afterNoon.withLastCompactionState(new 
CompactionState(partitionsSpec, ImmutableMap.of()));
+          beforeNoon = beforeNoon.withLastCompactionState(new 
CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of()));
+          afterNoon = afterNoon.withLastCompactionState(new 
CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of()));
         }
         if (j == 1) {
           // Make one interval on this day compacted
-          afterNoon = afterNoon.withLastCompactionState(new 
CompactionState(partitionsSpec, ImmutableMap.of()));
+          afterNoon = afterNoon.withLastCompactionState(new 
CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of()));
         }
         segments.add(beforeNoon);
         segments.add(afterNoon);
@@ -539,6 +559,205 @@ public class CompactSegmentsTest
   }
 
   @Test
+  public void testCompactWithoutGranularitySpec()
+  {
+    final HttpIndexingServiceClient mockIndexingServiceClient = 
Mockito.mock(HttpIndexingServiceClient.class);
+    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, 
mockIndexingServiceClient);
+    final List<DataSourceCompactionConfig> compactionConfigs = new 
ArrayList<>();
+    final String dataSource = DATA_SOURCE_PREFIX + 0;
+    compactionConfigs.add(
+        new DataSourceCompactionConfig(
+            dataSource,
+            0,
+            500L,
+            null,
+            new Period("PT0H"), // smaller than segment interval
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                null,
+                null,
+                null,
+                partitionsSpec,
+                null,
+                null,
+                null,
+                null,
+                null,
+                3,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            null,
+            null
+        )
+    );
+    doCompactSegments(compactSegments, compactionConfigs);
+    ArgumentCaptor<List<DataSegment>> segmentsCaptor = 
ArgumentCaptor.forClass(List.class);
+    ArgumentCaptor<ClientCompactionTaskQueryGranularitySpec> 
granularitySpecArgumentCaptor = 
ArgumentCaptor.forClass(ClientCompactionTaskQueryGranularitySpec.class);
+    Mockito.verify(mockIndexingServiceClient).compactSegments(
+        ArgumentMatchers.anyString(),
+        segmentsCaptor.capture(),
+        ArgumentMatchers.anyInt(),
+        ArgumentMatchers.any(),
+        granularitySpecArgumentCaptor.capture(),
+        ArgumentMatchers.any()
+    );
+    // Only the same amount of segments as the original 
PARTITION_PER_TIME_INTERVAL since segment granulartity is the same
+    Assert.assertEquals(PARTITION_PER_TIME_INTERVAL, 
segmentsCaptor.getValue().size());
+    Assert.assertNull(granularitySpecArgumentCaptor.getValue());
+  }
+
+  @Test
+  public void testCompactWithGranularitySpec()
+  {
+    final HttpIndexingServiceClient mockIndexingServiceClient = 
Mockito.mock(HttpIndexingServiceClient.class);
+    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, 
mockIndexingServiceClient);
+    final List<DataSourceCompactionConfig> compactionConfigs = new 
ArrayList<>();
+    final String dataSource = DATA_SOURCE_PREFIX + 0;
+    compactionConfigs.add(
+        new DataSourceCompactionConfig(
+            dataSource,
+            0,
+            500L,
+            null,
+            new Period("PT0H"), // smaller than segment interval
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                null,
+                null,
+                null,
+                partitionsSpec,
+                null,
+                null,
+                null,
+                null,
+                null,
+                3,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            new UniformGranularitySpec(Granularities.YEAR, null, null),
+            null
+        )
+    );
+    doCompactSegments(compactSegments, compactionConfigs);
+    ArgumentCaptor<List<DataSegment>> segmentsCaptor = 
ArgumentCaptor.forClass(List.class);
+    ArgumentCaptor<ClientCompactionTaskQueryGranularitySpec> 
granularitySpecArgumentCaptor = 
ArgumentCaptor.forClass(ClientCompactionTaskQueryGranularitySpec.class);
+    Mockito.verify(mockIndexingServiceClient).compactSegments(
+        ArgumentMatchers.anyString(),
+        segmentsCaptor.capture(),
+        ArgumentMatchers.anyInt(),
+        ArgumentMatchers.any(),
+        granularitySpecArgumentCaptor.capture(),
+        ArgumentMatchers.any()
+    );
+    // All segments is compact at the same time since we changed the segment 
granularity to YEAR and all segment
+    // are within the same year
+    Assert.assertEquals(datasourceToSegments.get(dataSource).size(), 
segmentsCaptor.getValue().size());
+    Assert.assertEquals(Granularities.YEAR, 
granularitySpecArgumentCaptor.getValue().getSegmentGranularity());
+  }
+
+  @Test
+  public void testCompactWithGranularitySpecConflictWithActiveCompactionTask()
+  {
+    final String dataSource = DATA_SOURCE_PREFIX + 0;
+    final String conflictTaskId = "taskIdDummy";
+    final HttpIndexingServiceClient mockIndexingServiceClient = 
Mockito.mock(HttpIndexingServiceClient.class);
+    TaskStatusPlus runningConflictCompactionTask = new TaskStatusPlus(
+        conflictTaskId,
+        "groupId",
+        "compact",
+        DateTimes.EPOCH,
+        DateTimes.EPOCH,
+        TaskState.RUNNING,
+        RunnerTaskState.RUNNING,
+        -1L,
+        TaskLocation.unknown(),
+        dataSource,
+        null
+    );
+    TaskPayloadResponse runningConflictCompactionTaskPayload = new 
TaskPayloadResponse(
+        conflictTaskId,
+        new ClientCompactionTaskQuery(
+            conflictTaskId,
+            dataSource,
+            new ClientCompactionIOConfig(
+                new ClientCompactionIntervalSpec(
+                    Intervals.of("2000/2099"),
+                    "testSha256OfSortedSegmentIds"
+                )
+            ),
+            null,
+            new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, 
null, null),
+            null
+        )
+    );
+    
Mockito.when(mockIndexingServiceClient.getActiveTasks()).thenReturn(ImmutableList.of(runningConflictCompactionTask));
+    
Mockito.when(mockIndexingServiceClient.getTaskPayload(ArgumentMatchers.eq(conflictTaskId))).thenReturn(runningConflictCompactionTaskPayload);
+
+    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, 
mockIndexingServiceClient);
+    final List<DataSourceCompactionConfig> compactionConfigs = new 
ArrayList<>();
+    compactionConfigs.add(
+        new DataSourceCompactionConfig(
+            dataSource,
+            0,
+            500L,
+            null,
+            new Period("PT0H"), // smaller than segment interval
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                null,
+                null,
+                null,
+                partitionsSpec,
+                null,
+                null,
+                null,
+                null,
+                null,
+                3,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            new UniformGranularitySpec(Granularities.YEAR, null, null),
+            null
+        )
+    );
+    doCompactSegments(compactSegments, compactionConfigs);
+    // Verify that conflict task was canceled
+    Mockito.verify(mockIndexingServiceClient).cancelTask(conflictTaskId);
+    // The active conflict task has interval of 2000/2099
+    // Make sure that we do not skip interval of conflict task.
+    // Since we cancel the task and will have to compact those intervals with 
the new segmentGranulartity
+    ArgumentCaptor<List<DataSegment>> segmentsCaptor = 
ArgumentCaptor.forClass(List.class);
+    ArgumentCaptor<ClientCompactionTaskQueryGranularitySpec> 
granularitySpecArgumentCaptor = 
ArgumentCaptor.forClass(ClientCompactionTaskQueryGranularitySpec.class);
+    Mockito.verify(mockIndexingServiceClient).compactSegments(
+        ArgumentMatchers.anyString(),
+        segmentsCaptor.capture(),
+        ArgumentMatchers.anyInt(),
+        ArgumentMatchers.any(),
+        granularitySpecArgumentCaptor.capture(),
+        ArgumentMatchers.any()
+    );
+    // All segments is compact at the same time since we changed the segment 
granularity to YEAR and all segment
+    // are within the same year
+    Assert.assertEquals(datasourceToSegments.get(dataSource).size(), 
segmentsCaptor.getValue().size());
+    Assert.assertEquals(Granularities.YEAR, 
granularitySpecArgumentCaptor.getValue().getSegmentGranularity());
+  }
+
+  @Test
   public void testRunParallelCompactionMultipleCompactionTaskSlots()
   {
     final TestDruidLeaderClient leaderClient = new 
TestDruidLeaderClient(JSON_MAPPER);
@@ -831,6 +1050,7 @@ public class CompactSegmentsTest
                   null,
                   null
               ),
+              null,
               null
           )
       );
@@ -984,7 +1204,8 @@ public class CompactSegmentsTest
                     "lz4",
                     "longEncoding",
                     "longs"
-                )
+                ),
+                ImmutableMap.of()
             ),
             1,
             segmentSize
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
index 76a84bc..b32a7b7 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
@@ -71,6 +71,7 @@ public class NewestSegmentFirstIteratorTest
     final List<Interval> fullSkipIntervals = 
NewestSegmentFirstIterator.sortAndAddSkipIntervalFromLatest(
         DateTimes.of("2019-01-01"),
         new Period(72, 0, 0, 0),
+        null,
         ImmutableList.of(
             Intervals.of("2018-12-30/2018-12-31"),
             Intervals.of("2018-12-24/2018-12-25")
@@ -90,6 +91,7 @@ public class NewestSegmentFirstIteratorTest
         null,
         null,
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -128,6 +130,7 @@ public class NewestSegmentFirstIteratorTest
             null,
             null
         ),
+        null,
         null
     );
     Assert.assertEquals(
@@ -166,6 +169,7 @@ public class NewestSegmentFirstIteratorTest
             null,
             null
         ),
+        null,
         null
     );
     Assert.assertEquals(
@@ -204,6 +208,7 @@ public class NewestSegmentFirstIteratorTest
             null,
             null
         ),
+        null,
         null
     );
     Assert.assertEquals(
@@ -242,6 +247,7 @@ public class NewestSegmentFirstIteratorTest
             null,
             null
         ),
+        null,
         null
     );
     Assert.assertEquals(
@@ -280,6 +286,7 @@ public class NewestSegmentFirstIteratorTest
             null,
             null
         ),
+        null,
         null
     );
     Assert.assertEquals(
@@ -318,6 +325,7 @@ public class NewestSegmentFirstIteratorTest
             null,
             null
         ),
+        null,
         null
     );
     Assert.assertEquals(
@@ -356,6 +364,7 @@ public class NewestSegmentFirstIteratorTest
             null,
             null
         ),
+        null,
         null
     );
     Assert.assertEquals(
@@ -394,6 +403,7 @@ public class NewestSegmentFirstIteratorTest
             null,
             null
         ),
+        null,
         null
     );
     Assert.assertEquals(
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index 749f9d7..cc23f6a 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -22,10 +22,15 @@ package org.apache.druid.server.coordinator.duty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.Partitions;
@@ -58,7 +63,7 @@ public class NewestSegmentFirstPolicyTest
   {
     final Period segmentPeriod = new Period("PT1H");
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new 
Period("P2D"))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new 
Period("P2D"), null)),
         ImmutableMap.of(
             DATA_SOURCE,
             createTimeline(
@@ -83,7 +88,7 @@ public class NewestSegmentFirstPolicyTest
   {
     final Period segmentPeriod = new Period("PT1H");
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new 
Period("PT1M"))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new 
Period("PT1M"), null)),
         ImmutableMap.of(
             DATA_SOURCE,
             createTimeline(
@@ -116,7 +121,7 @@ public class NewestSegmentFirstPolicyTest
   {
     final Period segmentPeriod = new Period("PT1H");
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new 
Period("PT1H1M"))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new 
Period("PT1H1M"), null)),
         ImmutableMap.of(
             DATA_SOURCE,
             createTimeline(
@@ -149,7 +154,7 @@ public class NewestSegmentFirstPolicyTest
   public void testHugeShard()
   {
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new 
Period("P1D"))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new 
Period("P1D"), null)),
         ImmutableMap.of(
             DATA_SOURCE,
             createTimeline(
@@ -199,7 +204,7 @@ public class NewestSegmentFirstPolicyTest
   public void testManySegmentsPerShard()
   {
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new 
Period("P1D"))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new 
Period("P1D"), null)),
         ImmutableMap.of(
             DATA_SOURCE,
             createTimeline(
@@ -259,9 +264,9 @@ public class NewestSegmentFirstPolicyTest
     final CompactionSegmentIterator iterator = policy.reset(
         ImmutableMap.of(
             unknownDataSource,
-            createCompactionConfig(10000, new Period("P2D")),
+            createCompactionConfig(10000, new Period("P2D"), null),
             DATA_SOURCE,
-            createCompactionConfig(10000, new Period("P2D"))
+            createCompactionConfig(10000, new Period("P2D"), null)
         ),
         ImmutableMap.of(
             DATA_SOURCE,
@@ -307,7 +312,7 @@ public class NewestSegmentFirstPolicyTest
         )
     );
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, 
createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"))),
+        ImmutableMap.of(DATA_SOURCE, 
createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"), null)),
         ImmutableMap.of(DATA_SOURCE, timeline),
         Collections.emptyMap()
     );
@@ -340,7 +345,7 @@ public class NewestSegmentFirstPolicyTest
     );
 
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P1D"))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P1D"), null)),
         ImmutableMap.of(DATA_SOURCE, timeline),
         Collections.emptyMap()
     );
@@ -361,7 +366,7 @@ public class NewestSegmentFirstPolicyTest
     );
 
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P1D"))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P1D"), null)),
         ImmutableMap.of(DATA_SOURCE, timeline),
         Collections.emptyMap()
     );
@@ -370,11 +375,87 @@ public class NewestSegmentFirstPolicyTest
   }
 
   @Test
+  public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularityEqual()
+  {
+    final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
+        new 
SegmentGenerateSpec(Intervals.of("2017-11-30T23:00:00/2017-12-03T00:00:00"), 
new Period("P1D")),
+        new 
SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), 
new Period("P1D"))
+    );
+
+    final CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P1D"), new UniformGranularitySpec(Granularities.DAY, null, null))),
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
+    );
+
+    // We should only get segments in Oct
+    final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+        
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-14T00:00:00/2017-12-02T00:00:00"),
 Partitions.ONLY_COMPLETE)
+    );
+
+    Assert.assertTrue(iterator.hasNext());
+    Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), 
ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator))));
+  }
+
+  @Test
+  public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularityLarger()
+  {
+    final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
+        // This contains segment that
+        // - Cross between month boundary of latest month (starts in Nov and 
ends in Dec). This should be skipped
+        // - Fully in latest month (starts in Dec and ends in Dec). This 
should be skipped
+        // - Does not overlap latest month (starts in Oct and ends in Oct). 
This should not be skipped
+        new 
SegmentGenerateSpec(Intervals.of("2017-11-30T23:00:00/2017-12-03T00:00:00"), 
new Period("PT5H")),
+        new 
SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), 
new Period("PT5H"))
+    );
+
+    final CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P1D"), new UniformGranularitySpec(Granularities.MONTH, null, null))),
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
+    );
+
+    // We should only get segments in Oct
+    final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+        
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"),
 Partitions.ONLY_COMPLETE)
+    );
+
+    Assert.assertTrue(iterator.hasNext());
+    List<DataSegment> actual = iterator.next();
+    Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
+    Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), 
ImmutableSet.copyOf(actual));
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularitySmaller()
+  {
+    final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
+        new 
SegmentGenerateSpec(Intervals.of("2017-12-01T23:00:00/2017-12-03T00:00:00"), 
new Period("PT5H")),
+        new 
SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), 
new Period("PT5H"))
+    );
+
+    final CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P1D"), new UniformGranularitySpec(Granularities.MINUTE, null, null))),
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
+    );
+
+    // We should only get segments in Oct
+    final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+        
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"),
 Partitions.ONLY_COMPLETE)
+    );
+
+    Assert.assertTrue(iterator.hasNext());
+    Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), 
ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator))));
+  }
+
+  @Test
   public void testWithSkipIntervals()
   {
     final Period segmentPeriod = new Period("PT1H");
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new 
Period("P1D"))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new 
Period("P1D"), null)),
         ImmutableMap.of(
             DATA_SOURCE,
             createTimeline(
@@ -414,7 +495,7 @@ public class NewestSegmentFirstPolicyTest
   {
     final Period segmentPeriod = new Period("PT1H");
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new 
Period("PT1H"))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new 
Period("PT1H"), null)),
         ImmutableMap.of(
             DATA_SOURCE,
             createTimeline(
@@ -455,6 +536,113 @@ public class NewestSegmentFirstPolicyTest
     );
   }
 
+  @Test
+  public void testIteratorReturnsSegmentsInConfiguredSegmentGranularity()
+  {
+    final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
+        // Segments with day interval from Oct to Dec
+        new 
SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-12-31T00:00:00"), 
new Period("P1D"))
+    );
+
+    final CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UniformGranularitySpec(Granularities.MONTH, null, null))),
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
+    );
+
+    // We should get all segments in timeline back since skip offset is P0D.
+    // However, we only need to iterator 3 times (once for each month) since 
the new configured segmentGranularity is MONTH.
+    // and hence iterator would return all segments bucketed to the configured 
segmentGranularity
+    // Month of Dec
+    Assert.assertTrue(iterator.hasNext());
+    List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+        
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-12-01T00:00:00/2017-12-31T00:00:00"),
 Partitions.ONLY_COMPLETE)
+    );
+    Assert.assertEquals(
+        ImmutableSet.copyOf(expectedSegmentsToCompact),
+        ImmutableSet.copyOf(iterator.next())
+    );
+    // Month of Nov
+    Assert.assertTrue(iterator.hasNext());
+    expectedSegmentsToCompact = new ArrayList<>(
+        
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-11-01T00:00:00/2017-12-01T00:00:00"),
 Partitions.ONLY_COMPLETE)
+    );
+    Assert.assertEquals(
+        ImmutableSet.copyOf(expectedSegmentsToCompact),
+        ImmutableSet.copyOf(iterator.next())
+    );
+    // Month of Oct
+    Assert.assertTrue(iterator.hasNext());
+    expectedSegmentsToCompact = new ArrayList<>(
+        
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-11-01T00:00:00"),
 Partitions.ONLY_COMPLETE)
+    );
+    Assert.assertEquals(
+        ImmutableSet.copyOf(expectedSegmentsToCompact),
+        ImmutableSet.copyOf(iterator.next())
+    );
+    // No more
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void 
testIteratorReturnsSegmentsInMultipleIntervalIfConfiguredSegmentGranularityCrossBoundary()
+  {
+    final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
+        new SegmentGenerateSpec(Intervals.of("2020-01-01/2020-01-08"), new 
Period("P7D")),
+        new SegmentGenerateSpec(Intervals.of("2020-01-28/2020-02-03"), new 
Period("P7D")),
+        new SegmentGenerateSpec(Intervals.of("2020-02-08/2020-02-15"), new 
Period("P7D"))
+    );
+
+    final CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UniformGranularitySpec(Granularities.MONTH, null, null))),
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
+    );
+    // We should get the segment of "2020-01-28/2020-02-03" back twice when 
the iterator returns for Jan and when the
+    // iterator returns for Feb.
+
+    // Month of Jan
+    List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+        
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-28/2020-02-15"),
 Partitions.ONLY_COMPLETE)
+    );
+    Assert.assertTrue(iterator.hasNext());
+    List<DataSegment> actual = iterator.next();
+    Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
+    Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), 
ImmutableSet.copyOf(actual));
+    // Month of Feb
+    expectedSegmentsToCompact = new ArrayList<>(
+        
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-01/2020-02-03"),
 Partitions.ONLY_COMPLETE)
+    );
+    Assert.assertTrue(iterator.hasNext());
+    actual = iterator.next();
+    Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
+    Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), 
ImmutableSet.copyOf(actual));
+    // No more
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void testIteratorDoesNotReturnCompactedInterval()
+  {
+    final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
+        new 
SegmentGenerateSpec(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), 
new Period("P1D"))
+    );
+
+    final CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P0D"), new UniformGranularitySpec(Granularities.MINUTE, null, null))),
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
+    );
+
+    final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+        
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"),
 Partitions.ONLY_COMPLETE)
+    );
+    Assert.assertTrue(iterator.hasNext());
+    Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), 
ImmutableSet.copyOf(iterator.next()));
+    // Iterator should return only once since all the "minute" interval of the 
iterator contains the same interval
+    Assert.assertFalse(iterator.hasNext());
+  }
+
   private static void assertCompactSegmentIntervals(
       CompactionSegmentIterator iterator,
       Period segmentPeriod,
@@ -546,7 +734,8 @@ public class NewestSegmentFirstPolicyTest
 
   private DataSourceCompactionConfig createCompactionConfig(
       long inputSegmentSizeBytes,
-      Period skipOffsetFromLatest
+      Period skipOffsetFromLatest,
+      GranularitySpec granularitySpec
   )
   {
     return new DataSourceCompactionConfig(
@@ -556,6 +745,7 @@ public class NewestSegmentFirstPolicyTest
         null,
         skipOffsetFromLatest,
         null,
+        granularitySpec,
         null
     );
   }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 61e0a32..ed6599a 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -277,7 +277,8 @@ public class SystemSchemaTest extends CalciteTestBase
 
   private final CompactionState expectedCompactionState = new CompactionState(
       new DynamicPartitionsSpec(null, null),
-      Collections.singletonMap("test", "map")
+      Collections.singletonMap("test", "map"),
+      Collections.singletonMap("test2", "map2")
   );
 
   private final DataSegment publishedCompactedSegment1 = new DataSegment(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to