maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r488385897



##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -112,27 +114,38 @@
   }
 
   @Override
-  public Object2LongOpenHashMap<String> totalRemainingSegmentsSizeBytes()
+  public Map<String, CompactionStatistics> totalRemainingStatistics()
   {
-    final Object2LongOpenHashMap<String> resultMap = new 
Object2LongOpenHashMap<>();
-    resultMap.defaultReturnValue(UNKNOWN_TOTAL_REMAINING_SEGMENTS_SIZE);
-    for (QueueEntry entry : queue) {
-      final VersionedIntervalTimeline<String, DataSegment> timeline = 
dataSources.get(entry.getDataSource());
-      final Interval interval = new 
Interval(timeline.first().getInterval().getStart(), entry.interval.getEnd());
-
-      final List<TimelineObjectHolder<String, DataSegment>> holders = 
timeline.lookup(interval);
-
-      long size = 0;
-      for (DataSegment segment : FluentIterable
-          .from(holders)
-          .transformAndConcat(TimelineObjectHolder::getObject)
-          .transform(PartitionChunk::getObject)) {
-        size += segment.getSize();
-      }
+    return remainingSegments;
+  }
+
+  @Override
+  public Map<String, CompactionStatistics> totalProcessedStatistics()
+  {
+    return processedSegments;
+  }
 
-      resultMap.put(entry.getDataSource(), size);
+  @Override
+  public void flushAllSegments()
+  {
+    if (queue.isEmpty()) {
+      return;
+    }
+    QueueEntry entry;
+    while ((entry = queue.poll()) != null) {
+      final List<DataSegment> resultSegments = entry.segments;
+      final String dataSourceName = resultSegments.get(0).getDataSource();
+      // This entry was in the queue, meaning that it was not processed. 
Hence, also aggregates it's
+      // statistic to the remaining segments counts.
+      collectSegmentStatistics(remainingSegments, dataSourceName, new 
SegmentsToCompact(entry.segments));
+      final CompactibleTimelineObjectHolderCursor 
compactibleTimelineObjectHolderCursor = timelineIterators.get(
+          dataSourceName
+      );
+      // WARNING: This iterates the compactibleTimelineObjectHolderCursor.
+      // Since this method is intended to only be use after all necessary 
iteration is done on this iterator

Review comment:
       I mean that this method (`flushAllSegments`), will iterates the 
`compactibleTimelineObjectHolderCursor` in `iterateAllSegments` method call. 
This class `NewestSegmentFirstIterator` when use as a iterator in the next() 
method will also iterate the `compactibleTimelineObjectHolderCursor`.
   Hence, this iterator (`NewestSegmentFirstIterator) cannot be use to iterate 
after this method (`flushAllSegments`) is called.
   
   Basically, you cannot call `flushAllSegments` while iterating the 
NewestSegmentFirstIterator and you cannot call `flushAllSegments` then go back 
to iterating the NewestSegmentFirstIterator




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to