github-advanced-security[bot] commented on code in PR #18466:
URL: https://github.com/apache/druid/pull/18466#discussion_r2366700363


##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java:
##########
@@ -457,6 +473,209 @@
     autoscaler.stop();
   }
 
+  @Test
+  public void test_noInitialState_withPerpetuallyRunningTasks() throws 
Exception
+  {
+    final int taskCountMax = 2;
+    final int replicas = 1;
+
+    // Synchronization mechanism for checkpoint coordination
+    final CountDownLatch checkpointTriggeredLatch = new CountDownLatch(1);
+    final AtomicReference<String> checkpointTaskId = new AtomicReference<>();
+
+    KafkaIndexTaskClientFactory taskClientFactory = new 
KafkaIndexTaskClientFactory(
+        null,
+        null
+    )
+    {
+      @Override
+      public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
+          final String dataSource,
+          final TaskInfoProvider taskInfoProvider,
+          final SeekableStreamSupervisorTuningConfig tuningConfig,
+          final ScheduledExecutorService connectExec
+      )
+      {
+        Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), 
tuningConfig.getHttpTimeout());
+        Assert.assertEquals(TEST_CHAT_RETRIES, (long) 
tuningConfig.getChatRetries());
+        return taskClient;
+      }
+    };
+
+    HashMap<String, Object> autoScalerConfig = new HashMap<>();
+    autoScalerConfig.put("enableTaskAutoScaler", true);
+    autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+    autoScalerConfig.put("lagCollectionRangeMillis", 500);
+    autoScalerConfig.put("scaleOutThreshold", 0);
+    autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0);
+    autoScalerConfig.put("scaleInThreshold", 1000000);
+    autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
+    autoScalerConfig.put("scaleActionStartDelayMillis", 0);
+    autoScalerConfig.put("scaleActionPeriodMillis", 250);
+    autoScalerConfig.put("taskCountMax", taskCountMax);
+    autoScalerConfig.put("taskCountMin", 1);
+    autoScalerConfig.put("scaleInStep", 1);
+    autoScalerConfig.put("scaleOutStep", 2);
+    autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+
+    final Map<String, Object> consumerProperties = 
KafkaConsumerConfigs.getConsumerProperties();
+    consumerProperties.put("myCustomKey", "myCustomValue");
+    consumerProperties.put("bootstrap.servers", kafkaHost);
+
+    KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new 
KafkaIOConfigBuilder()
+        .withTopic(topic)
+        .withInputFormat(INPUT_FORMAT)
+        .withReplicas(replicas)
+        .withTaskCount(1)
+        .withConsumerProperties(consumerProperties)
+        .withAutoScalerConfig(OBJECT_MAPPER.convertValue(autoScalerConfig, 
LagBasedAutoScalerConfig.class))
+        .withUseEarliestSequenceNumber(true)
+        .build();
+
+    final KafkaSupervisorTuningConfig tuningConfigOri = new 
KafkaTuningConfigBuilder()
+        .withIntermediatePersistPeriod(Period.years(1))
+        .withResetOffsetAutomatically(false)
+        .withWorkerThreads(numThreads)
+        .withShutdownTimeout(TEST_SHUTDOWN_TIMEOUT)
+        .withMaxRowsInMemory(1000)
+        .withMaxRowsPerSegment(50000)
+        .withReportParseExceptions(false)
+        .withChatHandlerNumRetries(TEST_CHAT_RETRIES)
+        .withChatHandlerTimeout(TEST_HTTP_TIMEOUT.toStandardDuration())
+        .build();
+    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(kafkaSupervisorIOConfig).anyTimes();
+    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(tuningConfigOri).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    SeekableStreamSupervisorSpec testableSupervisorSpec = new 
KafkaSupervisorSpec(
+        null,
+        ingestionSchema,
+        dataSchema,
+        tuningConfigOri,
+        kafkaSupervisorIOConfig,
+        null,
+        false,
+        true,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        taskClientFactory,
+        OBJECT_MAPPER,
+        new NoopServiceEmitter(),
+        new DruidMonitorSchedulerConfig(),
+        rowIngestionMetersFactory,
+        new SupervisorStateManagerConfig()
+    );
+
+    supervisor = new TestableKafkaSupervisor(
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        taskClientFactory,
+        OBJECT_MAPPER,
+        (KafkaSupervisorSpec) testableSupervisorSpec,
+        rowIngestionMetersFactory
+    );
+
+    SupervisorTaskAutoScaler autoscaler = 
testableSupervisorSpec.createAutoscaler(supervisor);
+
+
+    final KafkaSupervisorTuningConfig tuningConfig = 
supervisor.getTuningConfig();

Review Comment:
   ## Unread local variable
   
   Variable 'KafkaSupervisorTuningConfig tuningConfig' is never read.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10398)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to