maytasm commented on a change in pull request #10843:
URL: https://github.com/apache/druid/pull/10843#discussion_r573559553



##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -93,12 +107,53 @@
 
     dataSources.forEach((String dataSource, VersionedIntervalTimeline<String, 
DataSegment> timeline) -> {
       final DataSourceCompactionConfig config = 
compactionConfigs.get(dataSource);
-
+      Granularity configuredSegmentGranularity = null;
       if (config != null && !timeline.isEmpty()) {
+        Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs = new 
HashMap<>();
+        if (config.getGranularitySpec() != null && 
config.getGranularitySpec().getSegmentGranularity() != null) {
+          Map<Interval, Set<DataSegment>> intervalToPartitionMap = new 
HashMap<>();
+          configuredSegmentGranularity = 
config.getGranularitySpec().getSegmentGranularity();
+          // Create a new timeline to hold segments in the new configured 
segment granularity
+          VersionedIntervalTimeline<String, DataSegment> 
timelineWithConfiguredSegmentGranularity = new 
VersionedIntervalTimeline<>(Comparator.naturalOrder());
+          Set<DataSegment> segments = 
timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, 
Partitions.ONLY_COMPLETE);
+          for (DataSegment segment : segments) {
+            // Convert original segmentGranularity to new granularities bucket 
by configuredSegmentGranularity
+            // For example, if the original is interval of 
2020-01-28/2020-02-03 with WEEK granularity
+            // and the configuredSegmentGranularity is MONTH, the segment will 
be split to two segments
+            // of 2020-01/2020-02 and 2020-02/2020-03.
+            for (Interval interval : 
configuredSegmentGranularity.getIterable(segment.getInterval())) {
+              intervalToPartitionMap.computeIfAbsent(interval, k -> new 
HashSet<>()).add(segment);
+            }
+          }
+          for (Map.Entry<Interval, Set<DataSegment>> partitionsPerInterval : 
intervalToPartitionMap.entrySet()) {
+            Interval interval = partitionsPerInterval.getKey();
+            int partitionNum = 0;
+            Set<DataSegment> segmentSet = partitionsPerInterval.getValue();
+            int partitions = segmentSet.size();
+            for (DataSegment segment : segmentSet) {
+              DataSegment segmentsForCompact = segment.withShardSpec(new 
NumberedShardSpec(partitionNum, partitions));
+              // PartitionHolder can only holds chucks of one partition space

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to