github-advanced-security[bot] commented on code in PR #18466:
URL: https://github.com/apache/druid/pull/18466#discussion_r2366700363
##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java:
##########
@@ -457,6 +473,209 @@
autoscaler.stop();
}
+ @Test
+ public void test_noInitialState_withPerpetuallyRunningTasks() throws
Exception
+ {
+ final int taskCountMax = 2;
+ final int replicas = 1;
+
+ // Synchronization mechanism for checkpoint coordination
+ final CountDownLatch checkpointTriggeredLatch = new CountDownLatch(1);
+ final AtomicReference<String> checkpointTaskId = new AtomicReference<>();
+
+ KafkaIndexTaskClientFactory taskClientFactory = new
KafkaIndexTaskClientFactory(
+ null,
+ null
+ )
+ {
+ @Override
+ public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
+ final String dataSource,
+ final TaskInfoProvider taskInfoProvider,
+ final SeekableStreamSupervisorTuningConfig tuningConfig,
+ final ScheduledExecutorService connectExec
+ )
+ {
+ Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(),
tuningConfig.getHttpTimeout());
+ Assert.assertEquals(TEST_CHAT_RETRIES, (long)
tuningConfig.getChatRetries());
+ return taskClient;
+ }
+ };
+
+ HashMap<String, Object> autoScalerConfig = new HashMap<>();
+ autoScalerConfig.put("enableTaskAutoScaler", true);
+ autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+ autoScalerConfig.put("lagCollectionRangeMillis", 500);
+ autoScalerConfig.put("scaleOutThreshold", 0);
+ autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0);
+ autoScalerConfig.put("scaleInThreshold", 1000000);
+ autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
+ autoScalerConfig.put("scaleActionStartDelayMillis", 0);
+ autoScalerConfig.put("scaleActionPeriodMillis", 250);
+ autoScalerConfig.put("taskCountMax", taskCountMax);
+ autoScalerConfig.put("taskCountMin", 1);
+ autoScalerConfig.put("scaleInStep", 1);
+ autoScalerConfig.put("scaleOutStep", 2);
+ autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+
+ final Map<String, Object> consumerProperties =
KafkaConsumerConfigs.getConsumerProperties();
+ consumerProperties.put("myCustomKey", "myCustomValue");
+ consumerProperties.put("bootstrap.servers", kafkaHost);
+
+ KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new
KafkaIOConfigBuilder()
+ .withTopic(topic)
+ .withInputFormat(INPUT_FORMAT)
+ .withReplicas(replicas)
+ .withTaskCount(1)
+ .withConsumerProperties(consumerProperties)
+ .withAutoScalerConfig(OBJECT_MAPPER.convertValue(autoScalerConfig,
LagBasedAutoScalerConfig.class))
+ .withUseEarliestSequenceNumber(true)
+ .build();
+
+ final KafkaSupervisorTuningConfig tuningConfigOri = new
KafkaTuningConfigBuilder()
+ .withIntermediatePersistPeriod(Period.years(1))
+ .withResetOffsetAutomatically(false)
+ .withWorkerThreads(numThreads)
+ .withShutdownTimeout(TEST_SHUTDOWN_TIMEOUT)
+ .withMaxRowsInMemory(1000)
+ .withMaxRowsPerSegment(50000)
+ .withReportParseExceptions(false)
+ .withChatHandlerNumRetries(TEST_CHAT_RETRIES)
+ .withChatHandlerTimeout(TEST_HTTP_TIMEOUT.toStandardDuration())
+ .build();
+
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(kafkaSupervisorIOConfig).anyTimes();
+
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(tuningConfigOri).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ SeekableStreamSupervisorSpec testableSupervisorSpec = new
KafkaSupervisorSpec(
+ null,
+ ingestionSchema,
+ dataSchema,
+ tuningConfigOri,
+ kafkaSupervisorIOConfig,
+ null,
+ false,
+ true,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ OBJECT_MAPPER,
+ new NoopServiceEmitter(),
+ new DruidMonitorSchedulerConfig(),
+ rowIngestionMetersFactory,
+ new SupervisorStateManagerConfig()
+ );
+
+ supervisor = new TestableKafkaSupervisor(
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ OBJECT_MAPPER,
+ (KafkaSupervisorSpec) testableSupervisorSpec,
+ rowIngestionMetersFactory
+ );
+
+ SupervisorTaskAutoScaler autoscaler =
testableSupervisorSpec.createAutoscaler(supervisor);
+
+
+ final KafkaSupervisorTuningConfig tuningConfig =
supervisor.getTuningConfig();
Review Comment:
## Unread local variable
Variable 'KafkaSupervisorTuningConfig tuningConfig' is never read.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10398)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]