github-advanced-security[bot] commented on code in PR #18466:
URL: https://github.com/apache/druid/pull/18466#discussion_r2343909978


##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java:
##########
@@ -175,6 +181,76 @@
     
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
   }
 
+  @Test
+  @Timeout(600)
+  public void 
test_ingest20kRows_ofSelfClusterMetricsWithScaleOuts_andVerifyValues()
+  {
+    final int maxRowsPerSegment = 1000;
+    final int expectedSegmentsHandedOff = 20;
+
+    final int taskCount = 1;
+
+    // Submit and start a supervisor
+    final String supervisorId = dataSource + "_supe";
+    AutoScalerConfig autoScalerConfig = new LagBasedAutoScalerConfigBuilder()
+        .withLagCollectionIntervalMillis(100)
+        .withLagCollectionRangeMillis(100)
+        .withEnableTaskAutoScaler(true)
+        .withScaleActionPeriodMillis(5000)
+        .withScaleActionStartDelayMillis(10000)
+        .withScaleOutThreshold(0)
+        .withScaleInThreshold(10000)
+        .withTriggerScaleOutFractionThreshold(0.001)
+        .withTriggerScaleInFractionThreshold(0.1)
+        .withTaskCountMax(10)
+        .withTaskCountMin(taskCount)
+        .withScaleOutStep(1)
+        .withScaleInStep(0)
+        .withMinTriggerScaleActionFrequencyMillis(5000)
+        .build();
+
+    final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor(
+        supervisorId,
+        taskCount,
+        maxRowsPerSegment,
+        autoScalerConfig,
+        true
+    );
+
+    Assertions.assertEquals(
+        supervisorId,
+        cluster.callApi().postSupervisor(kafkaSupervisorSpec)
+    );
+
+    overlord.latchableEmitter().waitForEvent(event -> 
event.hasMetricName("task/autoScaler/requiredCount"));
+
+    overlord.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName("task/autoScaler/scaleActionTime")
+                      .hasDimension(DruidMetrics.DATASOURCE, 
List.of(dataSource))
+    );
+
+    indexer.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("ingest/handoff/count")
+                      .hasDimension(DruidMetrics.DATASOURCE, 
List.of(dataSource)),
+        agg -> agg.hasSumAtLeast(expectedSegmentsHandedOff)
+    );
+
+    final int numSegments = Integer.parseInt(
+        cluster.runSql("SELECT COUNT(*) FROM sys.segments WHERE datasource = 
'%s'", dataSource)
+    );

Review Comment:
   ## Missing catch of NumberFormatException
   
   Potential uncaught 'java.lang.NumberFormatException'.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10307)



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java:
##########
@@ -175,6 +181,76 @@
     
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
   }
 
+  @Test
+  @Timeout(600)
+  public void 
test_ingest20kRows_ofSelfClusterMetricsWithScaleOuts_andVerifyValues()
+  {
+    final int maxRowsPerSegment = 1000;
+    final int expectedSegmentsHandedOff = 20;
+
+    final int taskCount = 1;
+
+    // Submit and start a supervisor
+    final String supervisorId = dataSource + "_supe";
+    AutoScalerConfig autoScalerConfig = new LagBasedAutoScalerConfigBuilder()
+        .withLagCollectionIntervalMillis(100)
+        .withLagCollectionRangeMillis(100)
+        .withEnableTaskAutoScaler(true)
+        .withScaleActionPeriodMillis(5000)
+        .withScaleActionStartDelayMillis(10000)
+        .withScaleOutThreshold(0)
+        .withScaleInThreshold(10000)
+        .withTriggerScaleOutFractionThreshold(0.001)
+        .withTriggerScaleInFractionThreshold(0.1)
+        .withTaskCountMax(10)
+        .withTaskCountMin(taskCount)
+        .withScaleOutStep(1)
+        .withScaleInStep(0)
+        .withMinTriggerScaleActionFrequencyMillis(5000)
+        .build();
+
+    final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor(
+        supervisorId,
+        taskCount,
+        maxRowsPerSegment,
+        autoScalerConfig,
+        true
+    );
+
+    Assertions.assertEquals(
+        supervisorId,
+        cluster.callApi().postSupervisor(kafkaSupervisorSpec)
+    );
+
+    overlord.latchableEmitter().waitForEvent(event -> 
event.hasMetricName("task/autoScaler/requiredCount"));
+
+    overlord.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName("task/autoScaler/scaleActionTime")
+                      .hasDimension(DruidMetrics.DATASOURCE, 
List.of(dataSource))
+    );
+
+    indexer.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("ingest/handoff/count")
+                      .hasDimension(DruidMetrics.DATASOURCE, 
List.of(dataSource)),
+        agg -> agg.hasSumAtLeast(expectedSegmentsHandedOff)
+    );
+
+    final int numSegments = Integer.parseInt(
+        cluster.runSql("SELECT COUNT(*) FROM sys.segments WHERE datasource = 
'%s'", dataSource)
+    );
+    Assertions.assertTrue(numSegments >= expectedSegmentsHandedOff);
+
+    final int numRows = Integer.parseInt(
+        cluster.runSql("SELECT COUNT(*) FROM %s", dataSource)
+    );

Review Comment:
   ## Missing catch of NumberFormatException
   
   Potential uncaught 'java.lang.NumberFormatException'.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10308)



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3522,13 +3821,24 @@
                   try {
                     for (int i = 0; i < results.size(); i++) {
                       if (results.get(i).isValue() && 
Boolean.valueOf(true).equals(results.get(i).valueOrThrow())) {
-                        log.info("Successfully set endOffsets for task[%s] and 
resumed it", setEndOffsetTaskIds.get(i));
+                        log.info("Successfully set endOffsets for task[%s]", 
setEndOffsetTaskIds.get(i));
                       } else {
                         String taskId = setEndOffsetTaskIds.get(i);
                         killTask(taskId, "Failed to set end offsets, killing 
task");
                         taskGroup.tasks.remove(taskId);
                       }
                     }
+                    if (isDynamicAllocationOngoing.get()) {
+                      checkpointsToWaitFor -= setEndOffsetFutures.size();
+                      if (checkpointsToWaitFor <= 0) {
+                        log.info("All tasks in current task groups have been 
checkpointed, resuming dynamic allocation");
+                        boolean configUpdateResult = 
pendingConfigUpdateHook.call();

Review Comment:
   ## Unread local variable
   
   Variable 'boolean configUpdateResult' is never read.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10309)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to