abhishekrb19 commented on code in PR #15941:
URL: https://github.com/apache/druid/pull/15941#discussion_r1500196554


##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java:
##########
@@ -96,362 +104,805 @@ public class KillUnusedSegmentsTest
   private DataSegment nextDaySegment;
   private DataSegment nextMonthSegment;
 
-  private KillUnusedSegments target;
 
   @Before
   public void setup()
   {
-    
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
-    Mockito.doReturn(stats).when(params).getCoordinatorStats();
-    
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
-    
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
-    
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
-    
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
-    
Mockito.doReturn(Duration.parse("PT3154000000S")).when(config).getCoordinatorKillBufferPeriod();
+    segmentsMetadataManager = new TestSegmentsMetadataManager();
+    overlordClient = new TestOverlordClient();
 
-    Mockito.doReturn(Collections.singleton(DATASOURCE))
-           
.when(coordinatorDynamicConfig).getSpecificDataSourcesToKillUnusedSegmentsIn();
+    // These two can definitely be part of setup()
+    configBuilder = new TestDruidCoordinatorConfig.Builder()
+        .withCoordinatorIndexingPeriod(INDEXING_PERIOD)
+        .withCoordinatorKillPeriod(COORDINATOR_KILL_PERIOD)
+        .withCoordinatorKillDurationToRetain(DURATION_TO_RETAIN)
+        .withCoordinatorKillMaxSegments(MAX_SEGMENTS_TO_KILL)
+        .withCoordinatorKillBufferPeriod(BUFFER_PERIOD);
+    paramsBuilder = 
DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc());
+  }
 
-    final DateTime now = DateTimes.nowUtc();
+  /**
+   * The buffer periood and duration to retain influence kill behavior.
+   */
+  @Test
+  public void testDefaults()
+  {
+    final DateTime sixtyDaysAgo = NOW.minusDays(60);
 
-    yearOldSegment = createSegmentWithEnd(now.minusDays(365));
-    monthOldSegment = createSegmentWithEnd(now.minusDays(30));
-    dayOldSegment = createSegmentWithEnd(now.minusDays(1));
-    hourOldSegment = createSegmentWithEnd(now.minusHours(1));
-    nextDaySegment = createSegmentWithEnd(now.plusDays(1));
-    nextMonthSegment = createSegmentWithEnd(now.plusDays(30));
-
-    final List<DataSegment> unusedSegments = ImmutableList.of(
-        yearOldSegment,
-        monthOldSegment,
-        dayOldSegment,
-        hourOldSegment,
-        nextDaySegment,
-        nextMonthSegment
-    );
-
-    Mockito.when(
-        segmentsMetadataManager.getUnusedSegmentIntervals(
-            ArgumentMatchers.anyString(),
-            ArgumentMatchers.any(),
-            ArgumentMatchers.any(),
-            ArgumentMatchers.anyInt(),
-            ArgumentMatchers.any()
-        )
-    ).thenAnswer(invocation -> {
-      DateTime minStartTime = invocation.getArgument(1);
-      DateTime maxEndTime = invocation.getArgument(2);
-      long maxEndMillis = maxEndTime.getMillis();
-      Long minStartMillis = minStartTime != null ? minStartTime.getMillis() : 
null;
-      List<Interval> unusedIntervals =
-          unusedSegments.stream()
-                        .map(DataSegment::getInterval)
-                        .filter(i -> i.getEnd().getMillis() <= maxEndMillis
-                                     && (null == minStartMillis || 
i.getStart().getMillis() >= minStartMillis))
-                        .collect(Collectors.toList());
-
-      int limit = invocation.getArgument(3);
-      return unusedIntervals.size() <= limit ? unusedIntervals : 
unusedIntervals.subList(0, limit);
-    });
-
-    target = new KillUnusedSegments(
+    createAndAddUnusedSegment(DS1, YEAR_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, MONTH_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, DAY_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, HOUR_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, NEXT_DAY, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, Intervals.ETERNITY, sixtyDaysAgo);
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        new TestDruidCoordinatorConfig.Builder().build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 1L)),
+        firstRun.getCoordinatorStats()
     );
+
+    validateAndResetState(DS1, YEAR_OLD);
   }
 
   @Test
   public void testRunWithNoIntervalShouldNotKillAnySegments()
   {
-    
Mockito.doReturn(null).when(segmentsMetadataManager).getUnusedSegmentIntervals(
-        ArgumentMatchers.anyString(),
-        ArgumentMatchers.any(),
-        ArgumentMatchers.any(),
-        ArgumentMatchers.anyInt(),
-        ArgumentMatchers.any()
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 0, 10, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
     );
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    target.run(params);
-    Mockito.verify(overlordClient, Mockito.never())
-           .runKillTask(anyString(), anyString(), any(Interval.class), 
anyInt(), any(DateTime.class));
+    validateAndResetState(DS1, null);
+    validateAndResetState(DS2, null);
   }
 
   @Test
   public void 
testRunWithSpecificDatasourceAndNoIntervalShouldNotKillAnySegments()
   {
-    Mockito.doReturn(Duration.standardDays(400))
-           .when(config).getCoordinatorKillDurationToRetain();
-    target = new KillUnusedSegments(
+    
configBuilder.withCoordinatorKillDurationToRetain(Duration.standardDays(400));
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
     );
 
-    // No unused segment is older than the retention period
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    target.run(params);
-    Mockito.verify(overlordClient, Mockito.never())
-           .runKillTask(anyString(), anyString(), any(Interval.class), 
anyInt(), any(DateTime.class));
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 0, 10, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, null);
+    validateAndResetState(DS2, null);
+  }
+
+  /**
+   * The kill period is honored after the first indexing run.
+   */
+  @Test
+  public void testRunsWithKillPeriod()
+  {
+    configBuilder.withCoordinatorKillPeriod(Duration.standardHours(1));
+
+    setupUnusedSegments();
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        firstRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            yearOldSegment.getInterval().getStart(),
+            monthOldSegment.getInterval().getEnd()
+        )
+    );
+
+    final DruidCoordinatorRuntimeParams secondRun = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 1, 10,  ImmutableMap.of(DS1, 2L)),
+        secondRun.getCoordinatorStats()
+    );
+    validateAndResetState(DS1, null);
+  }
+
+  /**
+   * Similar to {@link #testMultipleRuns()}
+   */
+  @Test
+  public void testMultipleDatasources()
+  {
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillMaxSegments(2);
+
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, DAY_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, NOW.minusDays(1));
+
+    createAndAddUnusedSegment(DS2, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, DAY_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, NEXT_DAY, NOW.minusDays(1));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 2, 10, ImmutableMap.of(DS1, 2L, DS2, 2L)),
+        firstKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(YEAR_OLD.getStart(), 
MONTH_OLD.getEnd()));
+    validateAndResetState(DS2, new Interval(YEAR_OLD.getStart(), 
DAY_OLD.getEnd()));
+
+    final DruidCoordinatorRuntimeParams secondKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(20, 4, 20,  ImmutableMap.of(DS1, 4L, DS2, 3L)),
+        secondKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(DAY_OLD.getStart(), 
NEXT_DAY.getEnd()));
+    validateAndResetState(DS2, NEXT_DAY);
+
+    final DruidCoordinatorRuntimeParams thirdKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(30, 5, 30, ImmutableMap.of(DS1, 5L, DS2, 3L)),
+        thirdKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, NEXT_MONTH);
+    validateAndResetState(DS2, null);
+  }
+
+  /**
+   * Even though "more recent" segments are also considered as candidates in 
the wide kill interval here,
+   * the kill task will narrow down to clean up only the segments that 
strictly honor the max kill time.
+   * See {@code 
KillUnusedSegmentsTaskTest#testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime}
+   * for example.
+   */
+  @Test
+  public void testSpreadOutLastMaxSegments()
+  {
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillBufferPeriod(Duration.standardDays(3));
+
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(10));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, NOW.minusDays(10));
+    createAndAddUnusedSegment(DS1, DAY_OLD, NOW.minusDays(2));
+    createAndAddUnusedSegment(DS1, HOUR_OLD, NOW.minusDays(2));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, NOW.minusDays(10));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, NOW.minusDays(10));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 4L)),
+        firstKill.getCoordinatorStats()
+    );
+    validateAndResetState(DS1, new Interval(YEAR_OLD.getStart(), 
NEXT_MONTH.getEnd()));
+  }
+
+  @Test
+  public void testAddOldVersionsLater()
+  {
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillMaxSegments(2);
+
+    createAndAddUnusedSegment(DS1, DAY_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, NOW.minusDays(1));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        firstKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(DAY_OLD.getStart(), 
NEXT_DAY.getEnd()));
+
+    log.info("Second run...");
+    // Add two old unused segments now. These only get killed much later on 
when the kill
+    // duty eventually round robins its way through until the latest time.
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, NOW.minusDays(1));
+
+    final DruidCoordinatorRuntimeParams secondKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(20, 2, 20, ImmutableMap.of(DS1, 3L)),
+        secondKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, NEXT_MONTH);
+
+    log.info("Third run...");
+    final DruidCoordinatorRuntimeParams thirdKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(30, 2, 30, ImmutableMap.of(DS1, 3L)),
+        thirdKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, null);
+
+    log.info("Fourth run...");

Review Comment:
   Ack, removed the test logger altogether.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to