github-advanced-security[bot] commented on code in PR #18466:
URL: https://github.com/apache/druid/pull/18466#discussion_r2343909978
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java:
##########
@@ -175,6 +181,76 @@
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
}
+ @Test
+ @Timeout(600)
+ public void
test_ingest20kRows_ofSelfClusterMetricsWithScaleOuts_andVerifyValues()
+ {
+ final int maxRowsPerSegment = 1000;
+ final int expectedSegmentsHandedOff = 20;
+
+ final int taskCount = 1;
+
+ // Submit and start a supervisor
+ final String supervisorId = dataSource + "_supe";
+ AutoScalerConfig autoScalerConfig = new LagBasedAutoScalerConfigBuilder()
+ .withLagCollectionIntervalMillis(100)
+ .withLagCollectionRangeMillis(100)
+ .withEnableTaskAutoScaler(true)
+ .withScaleActionPeriodMillis(5000)
+ .withScaleActionStartDelayMillis(10000)
+ .withScaleOutThreshold(0)
+ .withScaleInThreshold(10000)
+ .withTriggerScaleOutFractionThreshold(0.001)
+ .withTriggerScaleInFractionThreshold(0.1)
+ .withTaskCountMax(10)
+ .withTaskCountMin(taskCount)
+ .withScaleOutStep(1)
+ .withScaleInStep(0)
+ .withMinTriggerScaleActionFrequencyMillis(5000)
+ .build();
+
+ final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor(
+ supervisorId,
+ taskCount,
+ maxRowsPerSegment,
+ autoScalerConfig,
+ true
+ );
+
+ Assertions.assertEquals(
+ supervisorId,
+ cluster.callApi().postSupervisor(kafkaSupervisorSpec)
+ );
+
+ overlord.latchableEmitter().waitForEvent(event ->
event.hasMetricName("task/autoScaler/requiredCount"));
+
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("task/autoScaler/scaleActionTime")
+ .hasDimension(DruidMetrics.DATASOURCE,
List.of(dataSource))
+ );
+
+ indexer.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("ingest/handoff/count")
+ .hasDimension(DruidMetrics.DATASOURCE,
List.of(dataSource)),
+ agg -> agg.hasSumAtLeast(expectedSegmentsHandedOff)
+ );
+
+ final int numSegments = Integer.parseInt(
+ cluster.runSql("SELECT COUNT(*) FROM sys.segments WHERE datasource =
'%s'", dataSource)
+ );
Review Comment:
## Missing catch of NumberFormatException
Potential uncaught 'java.lang.NumberFormatException'.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10307)
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java:
##########
@@ -175,6 +181,76 @@
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
}
+ @Test
+ @Timeout(600)
+ public void
test_ingest20kRows_ofSelfClusterMetricsWithScaleOuts_andVerifyValues()
+ {
+ final int maxRowsPerSegment = 1000;
+ final int expectedSegmentsHandedOff = 20;
+
+ final int taskCount = 1;
+
+ // Submit and start a supervisor
+ final String supervisorId = dataSource + "_supe";
+ AutoScalerConfig autoScalerConfig = new LagBasedAutoScalerConfigBuilder()
+ .withLagCollectionIntervalMillis(100)
+ .withLagCollectionRangeMillis(100)
+ .withEnableTaskAutoScaler(true)
+ .withScaleActionPeriodMillis(5000)
+ .withScaleActionStartDelayMillis(10000)
+ .withScaleOutThreshold(0)
+ .withScaleInThreshold(10000)
+ .withTriggerScaleOutFractionThreshold(0.001)
+ .withTriggerScaleInFractionThreshold(0.1)
+ .withTaskCountMax(10)
+ .withTaskCountMin(taskCount)
+ .withScaleOutStep(1)
+ .withScaleInStep(0)
+ .withMinTriggerScaleActionFrequencyMillis(5000)
+ .build();
+
+ final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor(
+ supervisorId,
+ taskCount,
+ maxRowsPerSegment,
+ autoScalerConfig,
+ true
+ );
+
+ Assertions.assertEquals(
+ supervisorId,
+ cluster.callApi().postSupervisor(kafkaSupervisorSpec)
+ );
+
+ overlord.latchableEmitter().waitForEvent(event ->
event.hasMetricName("task/autoScaler/requiredCount"));
+
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("task/autoScaler/scaleActionTime")
+ .hasDimension(DruidMetrics.DATASOURCE,
List.of(dataSource))
+ );
+
+ indexer.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("ingest/handoff/count")
+ .hasDimension(DruidMetrics.DATASOURCE,
List.of(dataSource)),
+ agg -> agg.hasSumAtLeast(expectedSegmentsHandedOff)
+ );
+
+ final int numSegments = Integer.parseInt(
+ cluster.runSql("SELECT COUNT(*) FROM sys.segments WHERE datasource =
'%s'", dataSource)
+ );
+ Assertions.assertTrue(numSegments >= expectedSegmentsHandedOff);
+
+ final int numRows = Integer.parseInt(
+ cluster.runSql("SELECT COUNT(*) FROM %s", dataSource)
+ );
Review Comment:
## Missing catch of NumberFormatException
Potential uncaught 'java.lang.NumberFormatException'.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10308)
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3522,13 +3821,24 @@
try {
for (int i = 0; i < results.size(); i++) {
if (results.get(i).isValue() &&
Boolean.valueOf(true).equals(results.get(i).valueOrThrow())) {
- log.info("Successfully set endOffsets for task[%s] and
resumed it", setEndOffsetTaskIds.get(i));
+ log.info("Successfully set endOffsets for task[%s]",
setEndOffsetTaskIds.get(i));
} else {
String taskId = setEndOffsetTaskIds.get(i);
killTask(taskId, "Failed to set end offsets, killing
task");
taskGroup.tasks.remove(taskId);
}
}
+ if (isDynamicAllocationOngoing.get()) {
+ checkpointsToWaitFor -= setEndOffsetFutures.size();
+ if (checkpointsToWaitFor <= 0) {
+ log.info("All tasks in current task groups have been
checkpointed, resuming dynamic allocation");
+ boolean configUpdateResult =
pendingConfigUpdateHook.call();
Review Comment:
## Unread local variable
Variable 'boolean configUpdateResult' is never read.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10309)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]