maytasm commented on a change in pull request #10843: URL: https://github.com/apache/druid/pull/10843#discussion_r573559553
########## File path: server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java ########## @@ -93,12 +107,53 @@ dataSources.forEach((String dataSource, VersionedIntervalTimeline<String, DataSegment> timeline) -> { final DataSourceCompactionConfig config = compactionConfigs.get(dataSource); - + Granularity configuredSegmentGranularity = null; if (config != null && !timeline.isEmpty()) { + Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs = new HashMap<>(); + if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) { + Map<Interval, Set<DataSegment>> intervalToPartitionMap = new HashMap<>(); + configuredSegmentGranularity = config.getGranularitySpec().getSegmentGranularity(); + // Create a new timeline to hold segments in the new configured segment granularity + VersionedIntervalTimeline<String, DataSegment> timelineWithConfiguredSegmentGranularity = new VersionedIntervalTimeline<>(Comparator.naturalOrder()); + Set<DataSegment> segments = timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE); + for (DataSegment segment : segments) { + // Convert original segmentGranularity to new granularities bucket by configuredSegmentGranularity + // For example, if the original is interval of 2020-01-28/2020-02-03 with WEEK granularity + // and the configuredSegmentGranularity is MONTH, the segment will be split to two segments + // of 2020-01/2020-02 and 2020-02/2020-03. + for (Interval interval : configuredSegmentGranularity.getIterable(segment.getInterval())) { + intervalToPartitionMap.computeIfAbsent(interval, k -> new HashSet<>()).add(segment); + } + } + for (Map.Entry<Interval, Set<DataSegment>> partitionsPerInterval : intervalToPartitionMap.entrySet()) { + Interval interval = partitionsPerInterval.getKey(); + int partitionNum = 0; + Set<DataSegment> segmentSet = partitionsPerInterval.getValue(); + int partitions = segmentSet.size(); + for (DataSegment segment : segmentSet) { + DataSegment segmentsForCompact = segment.withShardSpec(new NumberedShardSpec(partitionNum, partitions)); + // PartitionHolder can only holds chucks of one partition space Review comment: Done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org