github-advanced-security[bot] commented on code in PR #18843:
URL: https://github.com/apache/druid/pull/18843#discussion_r2616436194


##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/BaseConsulDiscoveryDockerTest.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.docker;
+
+import org.apache.druid.consul.discovery.ConsulDiscoveryModule;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import 
org.apache.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedResource;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.consul.ConsulClusterResource;
+import org.apache.druid.testing.embedded.consul.ConsulSecurityMode;
+import org.apache.druid.testing.embedded.emitter.LatchableEmitterModule;
+import org.apache.druid.testing.embedded.indexing.IngestionSmokeTest;
+import org.apache.druid.testing.embedded.minio.MinIOStorageResource;
+import org.apache.druid.testing.embedded.psql.PostgreSQLMetadataResource;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.security.SecureRandom;
+import java.security.cert.X509Certificate;
+import java.time.Duration;
+
+/**
+ * Base class for Consul discovery integration tests with different security 
modes.
+ * Provides shared test logic that runs for PLAIN, TLS, and mTLS 
configurations.
+ * <p>
+ * Uses embedded Druid servers (running on host) with containerized 
infrastructure:
+ * <ul>
+ *   <li>Consul - for service discovery and leader election</li>
+ *   <li>PostgreSQL - for metadata storage</li>
+ *   <li>MinIO - for deep storage</li>
+ * </ul>
+ */
+@Tag("docker-test")
+@EnabledIfSystemProperty(named = "druid.testing.consul.enabled", matches = 
"true")
+abstract class BaseConsulDiscoveryDockerTest extends IngestionSmokeTest
+{
+  protected ConsulClusterResource consulResource;
+
+  /**
+   * Returns the Consul security mode for this test.
+   * Subclasses must implement this to specify PLAIN, TLS, or MTLS.
+   */
+  protected abstract ConsulSecurityMode getConsulSecurityMode();
+
+  @Override
+  public EmbeddedDruidCluster createCluster()
+  {
+    final PostgreSQLMetadataResource postgreSQLMetadataResource = new 
PostgreSQLMetadataResource();
+    final MinIOStorageResource minIOStorageResource = new 
MinIOStorageResource();
+
+    // Create Consul with the specified security mode
+    consulResource = new ConsulClusterResource(getConsulSecurityMode());
+
+    EmbeddedDruidCluster cluster = EmbeddedDruidCluster
+        .empty()  // NO ZooKeeper - using Consul for everything
+        .useDefaultTimeoutForLatchableEmitter(120)
+        .addExtensions(
+            KafkaIndexTaskModule.class,
+            LatchableEmitterModule.class,
+            PostgreSQLMetadataStorageModule.class,
+            ConsulDiscoveryModule.class
+        );
+
+    // Add containerized infrastructure resources
+    cluster.addResource(postgreSQLMetadataResource)
+           .addResource(minIOStorageResource)
+           .addResource(consulResource)
+           .addResource(kafkaServer);
+
+    // Consul-based discovery and leadership
+    cluster.addCommonProperty("druid.discovery.type", "consul")
+           .addCommonProperty("druid.coordinator.selector.type", "consul")
+           .addCommonProperty("druid.indexer.selector.type", "consul")
+           // Use short watch timeout for faster test shutdown
+           .addCommonProperty("druid.discovery.consul.watch.watchSeconds", 
"PT5S");
+
+    // Disable ZooKeeper entirely
+    cluster.addCommonProperty("druid.zk.service.enabled", "false")
+           .addCommonProperty("druid.zk.service.host", "");
+
+    // HTTP-based serverview and task management (no ZooKeeper needed)
+    cluster.addCommonProperty("druid.serverview.type", "http")
+           .addCommonProperty("druid.coordinator.loadqueuepeon.type", "http")
+           .addCommonProperty("druid.indexer.runner.type", "httpRemote");
+
+    // Configure emitter
+    cluster.addCommonProperty("druid.emitter", "http")
+           .addCommonProperty("druid.emitter.http.recipientBaseUrl", 
eventCollector.getMetricsUrl())
+           .addCommonProperty("druid.emitter.http.flushMillis", "500");
+
+    // Resource to configure embedded servers with container connection details
+    cluster.addResource(new EmbeddedResource()
+    {
+      @Override
+      public void start()
+      {
+      }
+
+      @Override
+      public void stop()
+      {
+      }
+
+      @Override
+      public void beforeStart(EmbeddedDruidCluster cluster)
+      {
+        // Configure all embedded servers to connect to containerized 
infrastructure
+        // via localhost and mapped ports
+        String psqlConnectURI = StringUtils.format(
+            "jdbc:postgresql://localhost:%d/%s",
+            postgreSQLMetadataResource.getContainer().getMappedPort(5432),
+            postgreSQLMetadataResource.getDatabaseName()
+        );
+
+        String consulHost = "localhost";
+        String consulPort = Integer.toString(consulResource.getMappedPort());
+        String minioUrl = minIOStorageResource.getEndpointUrl();
+
+        
cluster.addCommonProperty("druid.metadata.storage.connector.connectURI", 
psqlConnectURI);
+        cluster.addCommonProperty("druid.discovery.consul.connection.host", 
consulHost);
+        cluster.addCommonProperty("druid.discovery.consul.connection.port", 
consulPort);
+        cluster.addCommonProperty("druid.s3.endpoint.url", minioUrl);
+
+        // Configure TLS/mTLS if needed
+        ConsulSecurityMode mode = getConsulSecurityMode();
+        if (mode == ConsulSecurityMode.TLS || mode == ConsulSecurityMode.MTLS) 
{
+          
cluster.addCommonProperty("druid.discovery.consul.connection.sslClientConfig.protocol",
 "TLSv1.2");
+          
cluster.addCommonProperty("druid.discovery.consul.connection.sslClientConfig.trustStoreType",
 "PKCS12");
+          
cluster.addCommonProperty("druid.discovery.consul.connection.sslClientConfig.trustStorePath",
 consulResource.getTrustStorePath());
+          
cluster.addCommonProperty("druid.discovery.consul.connection.sslClientConfig.trustStorePassword",
 consulResource.getStorePassword());
+          
cluster.addCommonProperty("druid.discovery.consul.connection.sslClientConfig.validateHostnames",
 "false");
+
+          if (mode == ConsulSecurityMode.MTLS) {
+            // Client certificate for mTLS
+            
cluster.addCommonProperty("druid.discovery.consul.connection.sslClientConfig.keyStoreType",
 "PKCS12");
+            
cluster.addCommonProperty("druid.discovery.consul.connection.sslClientConfig.keyStorePath",
 consulResource.getKeyStorePath());
+            
cluster.addCommonProperty("druid.discovery.consul.connection.sslClientConfig.certAlias",
 "client");
+            
cluster.addCommonProperty("druid.discovery.consul.connection.sslClientConfig.keyStorePassword",
 consulResource.getStorePassword());
+            
cluster.addCommonProperty("druid.discovery.consul.connection.sslClientConfig.keyManagerPassword",
 consulResource.getStorePassword());
+          }
+        }
+      }
+
+      @Override
+      public void onStarted(EmbeddedDruidCluster cluster)
+      {
+        // No container configuration needed - all Druid services are embedded
+      }
+    });
+
+    // Add all embedded servers
+    cluster = addServers(cluster);
+
+    return cluster;
+  }
+
+  @Override
+  protected EmbeddedDruidCluster addServers(EmbeddedDruidCluster cluster)
+  {
+    return cluster
+        .addServer(eventCollector)
+        .addServer(new EmbeddedCoordinator())
+        .addServer(overlord)
+        .addServer(indexer)
+        .addServer(broker)
+        .addServer(new EmbeddedHistorical())
+        .addServer(new EmbeddedRouter());
+  }
+
+  @BeforeEach
+  public void verifyOverlordLeader()
+  {
+    // Verify that the embedded Overlord is leader (only overlord in the 
cluster)
+    Assertions.assertTrue(
+        overlord.bindings().overlordLeaderSelector().isLeader(),
+        "Embedded Overlord should be the leader"
+    );
+  }
+
+  /**
+   * Tests that Consul exposes leadership key/values for Coordinator and 
Overlord.
+   * This verifies that leader election is working correctly.
+   */
+  @Test
+  public void testConsulExposesLeadershipKVs() throws Exception
+  {
+    final String coordinatorLeader = 
fetchConsulRawValue("druid/leader/coordinator");
+    final String overlordLeader = fetchConsulRawValue("druid/leader/overlord");
+
+    // Leader values are URLs with the service's default port
+    // Coordinator uses port 8081, Overlord uses port 8090
+    Assertions.assertTrue(coordinatorLeader.contains(":8081"), 
coordinatorLeader);
+    Assertions.assertTrue(overlordLeader.contains(":8090"), overlordLeader);
+  }
+
+  /**
+   * Fetches a raw value from Consul's KV store.
+   * Uses an HTTP client that trusts self-signed certificates for TLS/mTLS 
modes.
+   */
+  protected String fetchConsulRawValue(String key) throws Exception
+  {
+    return RetryUtils.retry(
+        () -> {
+          HttpClient client = createHttpClient();
+          final HttpRequest request = HttpRequest.newBuilder(
+                  
consulResource.getHttpUri(StringUtils.format("/v1/kv/%s?raw", key))
+              )
+              .timeout(Duration.ofSeconds(5))
+              .GET()
+              .build();
+          final HttpResponse<String> response = client.send(request, 
HttpResponse.BodyHandlers.ofString());
+          if (response.statusCode() != 200) {
+            throw new IllegalStateException(
+                StringUtils.format("Consul returned status[%d] for key[%s]", 
response.statusCode(), key)
+            );
+          }
+          return response.body();
+        },
+        ex -> true,
+        10
+    );
+  }
+
+  /**
+   * Creates an HTTP client that trusts all certificates (for testing with 
self-signed certs).
+   */
+  private HttpClient createHttpClient() throws Exception
+  {
+    if (getConsulSecurityMode() == ConsulSecurityMode.PLAIN) {
+      return HttpClient.newBuilder()
+                       .connectTimeout(Duration.ofSeconds(5))
+                       .build();
+    }
+
+    // For TLS/mTLS, create a client that trusts all certificates
+    TrustManager[] trustAllCerts = new TrustManager[]{
+        new X509TrustManager()
+        {
+          @Override
+          public X509Certificate[] getAcceptedIssuers()
+          {
+            return new X509Certificate[0];
+          }
+
+          @Override
+          public void checkClientTrusted(X509Certificate[] certs, String 
authType)
+          {
+          }
+
+          @Override
+          public void checkServerTrusted(X509Certificate[] certs, String 
authType)
+          {
+          }
+        }
+    };
+
+    SSLContext sslContext = SSLContext.getInstance("TLS");
+    sslContext.init(null, trustAllCerts, new SecureRandom());

Review Comment:
   ## `TrustManager` that accepts all certificates
   
   This uses [TrustManager](1), which is defined in 
[BaseConsulDiscoveryDockerTest$](2) and trusts any certificate.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10589)



##########
extensions-contrib/consul-extensions/src/main/java/org/apache/druid/consul/discovery/DefaultConsulApiClient.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.consul.discovery;
+
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.agent.model.NewService;
+import com.ecwid.consul.v1.health.model.HealthService;
+import com.ecwid.consul.v1.kv.model.GetValue;
+import com.ecwid.consul.v1.kv.model.PutParams;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Default implementation of {@link ConsulApiClient} using the Ecwid Consul 
client library.
+ */
+public class DefaultConsulApiClient implements ConsulApiClient
+{
+  private static final Logger LOGGER = new 
Logger(DefaultConsulApiClient.class);
+
+  // Consul service metadata has a limit of 512 characters per value
+  // Use a safe limit to avoid edge cases (450 chars leaves room for key name 
overhead)
+  private static final int MAX_METADATA_VALUE_SIZE = 450;
+  private static final long MIN_SESSION_TTL_SECONDS = 30;
+  private static final long MIN_HEALTH_CHECK_INTERVAL_SECONDS = 1;
+
+  private final ConsulClient consulClient;
+  private final ConsulDiscoveryConfig config;
+  private final ObjectMapper jsonMapper;
+
+  public DefaultConsulApiClient(
+      ConsulClient consulClient,
+      ConsulDiscoveryConfig config,
+      ObjectMapper jsonMapper
+  )
+  {
+    this.consulClient = Preconditions.checkNotNull(consulClient, 
"consulClient");
+    this.config = Preconditions.checkNotNull(config, "config");
+    this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
+
+    LOGGER.info(
+        "Created DefaultConsulApiClient for [%s:%d] with service prefix [%s]",
+        config.getConnection().getHost(),
+        config.getConnection().getPort(),
+        config.getService().getServicePrefix()
+    );
+  }
+
+  @Override
+  public void registerService(DiscoveryDruidNode node) throws Exception
+  {
+    String serviceId = ConsulServiceIds.serviceId(config, node);
+    String serviceName = ConsulServiceIds.serviceName(config, 
node.getNodeRole());
+
+    NewService service = new NewService();
+    service.setId(serviceId);
+    service.setName(serviceName);
+    service.setAddress(node.getDruidNode().getHost());
+    service.setPort(node.getDruidNode().getPortToUse());
+
+    List<String> tags = new ArrayList<>();
+    tags.add("druid");
+    tags.add("role:" + node.getNodeRole().getJsonName());
+    if (config.getService().getServiceTags() != null) {
+      for (Map.Entry<String, String> e : 
config.getService().getServiceTags().entrySet()) {
+        if (e.getKey() != null && e.getValue() != null) {
+          tags.add(e.getKey() + ":" + e.getValue());
+        }
+      }
+    }
+    service.setTags(tags);
+
+    // Serialize the full DiscoveryDruidNode as metadata
+    String nodeJson = jsonMapper.writeValueAsString(node);
+
+    // Consul service metadata has a 512 character limit per value
+    // If the JSON is too large, store it in Consul KV and reference it from 
metadata
+    Map<String, String> meta = new HashMap<>();
+    if (nodeJson.length() <= MAX_METADATA_VALUE_SIZE) {
+      // Small enough - store directly in metadata
+      meta.put("druid_node", nodeJson);
+    } else {
+      // Too large - store in KV and reference it
+      String kvKey = ConsulServiceIds.nodeKvKey(config, serviceId);
+      PutParams putParams = new PutParams();  // No session locking for simple 
storage
+      consulClient.setKVValue(kvKey, nodeJson, config.getAuth().getAclToken(), 
putParams, buildQueryParams());
+      meta.put("druid_node_kv", kvKey);
+      LOGGER.debug(
+          "Node metadata for [%s] is too large (%d chars), stored in KV at 
[%s]",
+          serviceId,
+          nodeJson.length(),
+          kvKey
+      );
+    }
+    service.setMeta(meta);
+
+    NewService.Check check = new NewService.Check();
+    long intervalSeconds = Math.max(MIN_HEALTH_CHECK_INTERVAL_SECONDS, 
config.getService().getHealthCheckInterval().getStandardSeconds());
+    long ttlSeconds = Math.max(MIN_SESSION_TTL_SECONDS, intervalSeconds * 3);
+    check.setTtl(StringUtils.format("%ds", ttlSeconds));
+    check.setDeregisterCriticalServiceAfter(
+        StringUtils.format("%ds", 
config.getService().getDeregisterAfter().getStandardSeconds())
+    );
+    service.setCheck(check);
+
+    consulClient.agentServiceRegister(service, config.getAuth().getAclToken());
+    LOGGER.info("Registered service [%s] with Consul", serviceId);
+
+    try {
+      consulClient.agentCheckPass("service:" + serviceId, "Druid node is 
healthy", config.getAuth().getAclToken());
+    }
+    catch (Exception e) {
+      // Log but don't fail - the periodic health check will eventually mark 
it as passing
+      LOGGER.warn(e, "Failed to immediately mark service [%s] as healthy, will 
retry via periodic health check", serviceId);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("RedundantThrows")
+  public void deregisterService(String serviceId) throws Exception
+  {
+    try {
+      String kvKey = ConsulServiceIds.nodeKvKey(config, serviceId);
+      consulClient.deleteKVValue(kvKey, config.getAuth().getAclToken());
+    }
+    catch (Exception e) {
+      LOGGER.debug(e, "Failed to delete KV entry for service [%s] during 
deregistration", serviceId);
+    }
+
+    consulClient.agentServiceDeregister(serviceId, 
config.getAuth().getAclToken());
+    LOGGER.info("Deregistered service [%s] from Consul", serviceId);
+  }
+
+  @Override
+  @SuppressWarnings("RedundantThrows")
+  public void passTtlCheck(String serviceId, String note) throws Exception
+  {
+    consulClient.agentCheckPass("service:" + serviceId, note, 
config.getAuth().getAclToken());
+  }
+
+  @Override
+  @SuppressWarnings({"deprecation", "RedundantThrows"})
+  public List<DiscoveryDruidNode> getHealthyServices(NodeRole nodeRole) throws 
Exception
+  {
+    String serviceName = makeServiceName(nodeRole);
+    Response<List<HealthService>> response = consulClient.getHealthServices(
+        serviceName,
+        true, // only healthy
+        buildQueryParams(),
+        config.getAuth().getAclToken()
+    );
+
+    return parseHealthServices(response.getValue());
+  }
+
+  @Override
+  @SuppressWarnings({"deprecation", "RedundantThrows"})
+  public ConsulWatchResult watchServices(NodeRole nodeRole, long lastIndex, 
long waitSeconds) throws Exception
+  {
+    String serviceName = makeServiceName(nodeRole);
+
+    Response<List<HealthService>> response = consulClient.getHealthServices(
+        serviceName,
+        true, // only healthy
+        buildQueryParams(waitSeconds, lastIndex),
+        config.getAuth().getAclToken()
+    );

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [ConsulClient.getHealthServices](1) should be avoided because it 
has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10592)



##########
extensions-contrib/consul-extensions/src/main/java/org/apache/druid/consul/discovery/DefaultConsulApiClient.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.consul.discovery;
+
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.agent.model.NewService;
+import com.ecwid.consul.v1.health.model.HealthService;
+import com.ecwid.consul.v1.kv.model.GetValue;
+import com.ecwid.consul.v1.kv.model.PutParams;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Default implementation of {@link ConsulApiClient} using the Ecwid Consul 
client library.
+ */
+public class DefaultConsulApiClient implements ConsulApiClient
+{
+  private static final Logger LOGGER = new 
Logger(DefaultConsulApiClient.class);
+
+  // Consul service metadata has a limit of 512 characters per value
+  // Use a safe limit to avoid edge cases (450 chars leaves room for key name 
overhead)
+  private static final int MAX_METADATA_VALUE_SIZE = 450;
+  private static final long MIN_SESSION_TTL_SECONDS = 30;
+  private static final long MIN_HEALTH_CHECK_INTERVAL_SECONDS = 1;
+
+  private final ConsulClient consulClient;
+  private final ConsulDiscoveryConfig config;
+  private final ObjectMapper jsonMapper;
+
+  public DefaultConsulApiClient(
+      ConsulClient consulClient,
+      ConsulDiscoveryConfig config,
+      ObjectMapper jsonMapper
+  )
+  {
+    this.consulClient = Preconditions.checkNotNull(consulClient, 
"consulClient");
+    this.config = Preconditions.checkNotNull(config, "config");
+    this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
+
+    LOGGER.info(
+        "Created DefaultConsulApiClient for [%s:%d] with service prefix [%s]",
+        config.getConnection().getHost(),
+        config.getConnection().getPort(),
+        config.getService().getServicePrefix()
+    );
+  }
+
+  @Override
+  public void registerService(DiscoveryDruidNode node) throws Exception
+  {
+    String serviceId = ConsulServiceIds.serviceId(config, node);
+    String serviceName = ConsulServiceIds.serviceName(config, 
node.getNodeRole());
+
+    NewService service = new NewService();
+    service.setId(serviceId);
+    service.setName(serviceName);
+    service.setAddress(node.getDruidNode().getHost());
+    service.setPort(node.getDruidNode().getPortToUse());
+
+    List<String> tags = new ArrayList<>();
+    tags.add("druid");
+    tags.add("role:" + node.getNodeRole().getJsonName());
+    if (config.getService().getServiceTags() != null) {
+      for (Map.Entry<String, String> e : 
config.getService().getServiceTags().entrySet()) {
+        if (e.getKey() != null && e.getValue() != null) {
+          tags.add(e.getKey() + ":" + e.getValue());
+        }
+      }
+    }
+    service.setTags(tags);
+
+    // Serialize the full DiscoveryDruidNode as metadata
+    String nodeJson = jsonMapper.writeValueAsString(node);
+
+    // Consul service metadata has a 512 character limit per value
+    // If the JSON is too large, store it in Consul KV and reference it from 
metadata
+    Map<String, String> meta = new HashMap<>();
+    if (nodeJson.length() <= MAX_METADATA_VALUE_SIZE) {
+      // Small enough - store directly in metadata
+      meta.put("druid_node", nodeJson);
+    } else {
+      // Too large - store in KV and reference it
+      String kvKey = ConsulServiceIds.nodeKvKey(config, serviceId);
+      PutParams putParams = new PutParams();  // No session locking for simple 
storage
+      consulClient.setKVValue(kvKey, nodeJson, config.getAuth().getAclToken(), 
putParams, buildQueryParams());
+      meta.put("druid_node_kv", kvKey);
+      LOGGER.debug(
+          "Node metadata for [%s] is too large (%d chars), stored in KV at 
[%s]",
+          serviceId,
+          nodeJson.length(),
+          kvKey
+      );
+    }
+    service.setMeta(meta);
+
+    NewService.Check check = new NewService.Check();
+    long intervalSeconds = Math.max(MIN_HEALTH_CHECK_INTERVAL_SECONDS, 
config.getService().getHealthCheckInterval().getStandardSeconds());
+    long ttlSeconds = Math.max(MIN_SESSION_TTL_SECONDS, intervalSeconds * 3);
+    check.setTtl(StringUtils.format("%ds", ttlSeconds));
+    check.setDeregisterCriticalServiceAfter(
+        StringUtils.format("%ds", 
config.getService().getDeregisterAfter().getStandardSeconds())
+    );
+    service.setCheck(check);
+
+    consulClient.agentServiceRegister(service, config.getAuth().getAclToken());
+    LOGGER.info("Registered service [%s] with Consul", serviceId);
+
+    try {
+      consulClient.agentCheckPass("service:" + serviceId, "Druid node is 
healthy", config.getAuth().getAclToken());
+    }
+    catch (Exception e) {
+      // Log but don't fail - the periodic health check will eventually mark 
it as passing
+      LOGGER.warn(e, "Failed to immediately mark service [%s] as healthy, will 
retry via periodic health check", serviceId);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("RedundantThrows")
+  public void deregisterService(String serviceId) throws Exception
+  {
+    try {
+      String kvKey = ConsulServiceIds.nodeKvKey(config, serviceId);
+      consulClient.deleteKVValue(kvKey, config.getAuth().getAclToken());
+    }
+    catch (Exception e) {
+      LOGGER.debug(e, "Failed to delete KV entry for service [%s] during 
deregistration", serviceId);
+    }
+
+    consulClient.agentServiceDeregister(serviceId, 
config.getAuth().getAclToken());
+    LOGGER.info("Deregistered service [%s] from Consul", serviceId);
+  }
+
+  @Override
+  @SuppressWarnings("RedundantThrows")
+  public void passTtlCheck(String serviceId, String note) throws Exception
+  {
+    consulClient.agentCheckPass("service:" + serviceId, note, 
config.getAuth().getAclToken());
+  }
+
+  @Override
+  @SuppressWarnings({"deprecation", "RedundantThrows"})
+  public List<DiscoveryDruidNode> getHealthyServices(NodeRole nodeRole) throws 
Exception
+  {
+    String serviceName = makeServiceName(nodeRole);
+    Response<List<HealthService>> response = consulClient.getHealthServices(
+        serviceName,
+        true, // only healthy
+        buildQueryParams(),
+        config.getAuth().getAclToken()
+    );

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [ConsulClient.getHealthServices](1) should be avoided because it 
has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10591)



##########
extensions-contrib/consul-extensions/src/main/java/org/apache/druid/consul/discovery/ConsulDiscoveryConfig.java:
##########
@@ -0,0 +1,770 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.consul.discovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Configuration for Consul-based service discovery.
+ */
+public class ConsulDiscoveryConfig
+{
+  private static final Logger LOGGER = new Logger(ConsulDiscoveryConfig.class);
+  private static final long MIN_LEADER_SESSION_TTL_SECONDS = 10;
+
+  @JsonProperty("connection")
+  private final ConnectionConfig connection;
+
+  @JsonProperty("auth")
+  private final AuthConfig auth;
+
+  @JsonProperty("service")
+  private final ServiceConfig service;
+
+  @JsonProperty("leader")
+  private final LeaderElectionConfig leader;
+
+  @JsonProperty("watch")
+  private final WatchConfig watch;
+
+  @JsonCreator
+  public static ConsulDiscoveryConfig create(
+      @JsonProperty("connection") @Nullable ConnectionConfig connection,
+      @JsonProperty("auth") @Nullable AuthConfig auth,
+      @JsonProperty("service") ServiceConfig service,
+      @JsonProperty("leader") @Nullable LeaderElectionConfig leader,
+      @JsonProperty("watch") @Nullable WatchConfig watch
+  )
+  {
+    if (service == null) {
+      throw new IAE("service cannot be null");
+    }
+
+    LeaderElectionConfig finalLeader = computeLeaderElectionConfig(leader, 
service.getHealthCheckInterval());
+    return new ConsulDiscoveryConfig(connection, auth, service, finalLeader, 
watch);
+  }
+
+  private static LeaderElectionConfig computeLeaderElectionConfig(
+      @Nullable LeaderElectionConfig leader,
+      Duration healthCheckInterval
+  )
+  {
+    if (leader != null) {
+      // Compute default TTL based on health check interval when not 
explicitly set
+      if (leader.getLeaderSessionTtl() == null) {
+        return new LeaderElectionConfig(
+            leader.getCoordinatorLeaderLockPath(),
+            leader.getOverlordLeaderLockPath(),
+            null,
+            leader.getLeaderMaxErrorRetries(),
+            leader.getLeaderRetryBackoffMax(),
+            healthCheckInterval
+        );
+      } else {
+        return leader;
+      }
+    } else {
+      return new LeaderElectionConfig(null, null, null, null, null, 
healthCheckInterval);
+    }
+  }
+
+  private ConsulDiscoveryConfig(
+      ConnectionConfig connection,
+      AuthConfig auth,
+      ServiceConfig service,
+      LeaderElectionConfig leader,
+      WatchConfig watch
+  )
+  {
+    this.connection = connection == null ? new ConnectionConfig(null, null, 
null, null, null, null, null) : connection;
+    this.auth = auth == null ? new AuthConfig(null, null, null, null) : auth;
+    this.service = service;
+    this.leader = leader;
+    this.watch = watch == null ? new WatchConfig(null, null, null, null) : 
watch;
+
+    validateCrossFieldConstraints();
+  }
+
+  private void validateCrossFieldConstraints()
+  {
+    // Socket timeout must exceed watch timeout to avoid premature disconnects
+    if (connection.getSocketTimeout().compareTo(watch.getWatchSeconds()) <= 0) 
{
+      throw new IAE(
+          StringUtils.format(
+              "socketTimeout [%s] must be greater than watchSeconds [%s]",
+              connection.getSocketTimeout(),
+              watch.getWatchSeconds()
+          )
+      );
+    }
+
+    long serviceTtlSeconds = Math.max(30, 
service.getHealthCheckInterval().getStandardSeconds() * 3);
+    if (service.getDeregisterAfter().getStandardSeconds() < serviceTtlSeconds) 
{
+      throw new IAE(
+          StringUtils.format(
+              "deregisterAfter (%ds) must be >= service TTL (%ds = 3 × 
healthCheckInterval)",
+              service.getDeregisterAfter().getStandardSeconds(),
+              serviceTtlSeconds
+          )
+      );
+    }
+
+    // Large watchSeconds relative to session TTL can delay failure detection
+    if (watch.getWatchSeconds().getStandardSeconds() > 
leader.getLeaderSessionTtl().getStandardSeconds() * 2) {
+      LOGGER.warn(
+          "watchSeconds (%ds) is much larger than leaderSessionTtl (%ds): 
delayed failure detection possible",
+          watch.getWatchSeconds().getStandardSeconds(),
+          leader.getLeaderSessionTtl().getStandardSeconds()
+      );
+    }
+  }
+
+  public ConnectionConfig getConnection()
+  {
+    return connection;
+  }
+
+  public AuthConfig getAuth()
+  {
+    return auth;
+  }
+
+  public ServiceConfig getService()
+  {
+    return service;
+  }
+
+  public LeaderElectionConfig getLeader()
+  {
+    return leader;
+  }
+
+  public WatchConfig getWatch()
+  {
+    return watch;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ConsulDiscoveryConfig that = (ConsulDiscoveryConfig) o;
+    return Objects.equals(connection, that.connection) &&
+           Objects.equals(auth, that.auth) &&
+           Objects.equals(service, that.service) &&
+           Objects.equals(leader, that.leader) &&
+           Objects.equals(watch, that.watch);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(connection, auth, service, leader, watch);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "ConsulDiscoveryConfig{" +
+           "connection=" + connection +
+           ", auth=" + auth +
+           ", service=" + service +
+           ", leader=" + leader +
+           ", watch=" + watch +
+           '}';
+  }
+
+  public static class ConnectionConfig
+  {
+    private static final long DEFAULT_CONNECT_TIMEOUT_MS = 10_000;
+    private static final long DEFAULT_SOCKET_TIMEOUT_MS = 75_000;
+    private static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 50;
+    private static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 20;
+
+    private final String host;
+    private final int port;
+    private final Duration connectTimeout;
+    private final Duration socketTimeout;
+    @Nullable
+    private final ConsulSSLConfig sslClientConfig;
+    private final int maxTotalConnections;
+    private final int maxConnectionsPerRoute;
+
+    @JsonCreator
+    public ConnectionConfig(
+        @JsonProperty("host") @Nullable String host,
+        @JsonProperty("port") @Nullable Integer port,
+        @JsonProperty("connectTimeout") @Nullable Duration connectTimeout,
+        @JsonProperty("socketTimeout") @Nullable Duration socketTimeout,
+        @JsonProperty("sslClientConfig") @Nullable ConsulSSLConfig 
sslClientConfig,
+        @JsonProperty("maxTotalConnections") @Nullable Integer 
maxTotalConnections,
+        @JsonProperty("maxConnectionsPerRoute") @Nullable Integer 
maxConnectionsPerRoute
+    )
+    {
+      this.host = host == null ? "localhost" : host;
+      this.port = validatePort(port);
+      this.connectTimeout = validatePositive(connectTimeout, 
DEFAULT_CONNECT_TIMEOUT_MS, "connectTimeout");
+      this.socketTimeout = validatePositive(socketTimeout, 
DEFAULT_SOCKET_TIMEOUT_MS, "socketTimeout");
+      this.sslClientConfig = sslClientConfig;
+      this.maxTotalConnections = validateConnectionPoolSize(
+          maxTotalConnections,
+          DEFAULT_MAX_TOTAL_CONNECTIONS,
+          "maxTotalConnections"
+      );
+      this.maxConnectionsPerRoute = validateConnectionPoolSize(
+          maxConnectionsPerRoute,
+          DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
+          "maxConnectionsPerRoute"
+      );
+    }
+
+    private static int validatePort(Integer port)
+    {
+      int portValue = port == null ? 8500 : port;
+      if (portValue < 1 || portValue > 65535) {
+        throw new IllegalArgumentException("port must be between 1 and 65535");
+      }
+      return portValue;
+    }
+
+    private static int validateConnectionPoolSize(Integer value, int 
defaultValue, String name)
+    {
+      int result = value == null ? defaultValue : value;
+      if (result <= 0) {
+        throw new IAE(name + " must be positive");
+      }
+      return result;
+    }
+
+    @JsonProperty
+    public String getHost()
+    {
+      return host;
+    }
+
+    @JsonProperty
+    public int getPort()
+    {
+      return port;
+    }
+
+    @JsonProperty
+    public Duration getConnectTimeout()
+    {
+      return connectTimeout;
+    }
+
+    @JsonProperty
+    public Duration getSocketTimeout()
+    {
+      return socketTimeout;
+    }
+
+    @JsonProperty
+    @Nullable
+    public ConsulSSLConfig getSslClientConfig()
+    {
+      return sslClientConfig;
+    }
+
+    @JsonProperty
+    public int getMaxTotalConnections()
+    {
+      return maxTotalConnections;
+    }
+
+    @JsonProperty
+    public int getMaxConnectionsPerRoute()
+    {
+      return maxConnectionsPerRoute;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ConnectionConfig that = (ConnectionConfig) o;
+      return port == that.getPort() &&
+             maxTotalConnections == that.getMaxTotalConnections() &&
+             maxConnectionsPerRoute == that.getMaxConnectionsPerRoute() &&
+             Objects.equals(host, that.getHost()) &&
+             Objects.equals(connectTimeout, that.getConnectTimeout()) &&
+             Objects.equals(socketTimeout, that.getSocketTimeout()) &&
+             Objects.equals(sslClientConfig, that.getSslClientConfig());
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(host, port, connectTimeout, socketTimeout, 
sslClientConfig, maxTotalConnections, maxConnectionsPerRoute);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "ConnectionConfig{host='" + host + "', port=" + port +
+             ", connectTimeout=" + connectTimeout + ", socketTimeout=" + 
socketTimeout +
+             ", maxTotalConnections=" + maxTotalConnections + ", 
maxConnectionsPerRoute=" + maxConnectionsPerRoute + '}';
+    }
+  }
+
+  public static class AuthConfig
+  {
+    @Nullable
+    private final String aclToken;
+    @Nullable
+    private final String basicAuthUser;
+    @Nullable
+    private final String basicAuthPassword;
+    private final boolean allowBasicAuthOverHttp;
+
+    @JsonCreator
+    public AuthConfig(
+        @JsonProperty("aclToken") @Nullable String aclToken,
+        @JsonProperty("basicAuthUser") @Nullable String basicAuthUser,
+        @JsonProperty("basicAuthPassword") @Nullable String basicAuthPassword,
+        @JsonProperty("allowBasicAuthOverHttp") @Nullable Boolean 
allowBasicAuthOverHttp
+    )
+    {
+      this.aclToken = aclToken;
+      this.basicAuthUser = basicAuthUser;
+      this.basicAuthPassword = basicAuthPassword;
+      this.allowBasicAuthOverHttp = allowBasicAuthOverHttp != null ? 
allowBasicAuthOverHttp : false;
+    }
+
+    @JsonProperty
+    @Nullable
+    public String getAclToken()
+    {
+      return aclToken;
+    }
+
+    @JsonProperty
+    @Nullable
+    public String getBasicAuthUser()
+    {
+      return basicAuthUser;
+    }
+
+    @JsonProperty
+    @Nullable
+    public String getBasicAuthPassword()
+    {
+      return basicAuthPassword;
+    }
+
+    @JsonProperty
+    public boolean getAllowBasicAuthOverHttp()
+    {
+      return allowBasicAuthOverHttp;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      AuthConfig that = (AuthConfig) o;
+      return allowBasicAuthOverHttp == that.getAllowBasicAuthOverHttp() &&
+             Objects.equals(aclToken, that.getAclToken()) &&
+             Objects.equals(basicAuthUser, that.getBasicAuthUser()) &&
+             Objects.equals(basicAuthPassword, that.getBasicAuthPassword());
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(aclToken, basicAuthUser, basicAuthPassword, 
allowBasicAuthOverHttp);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "AuthConfig{aclToken=" + mask(aclToken) +
+             ", basicAuthUser=" + mask(basicAuthUser) +
+             ", basicAuthPassword=" + mask(basicAuthPassword) +
+             ", allowBasicAuthOverHttp=" + allowBasicAuthOverHttp + '}';
+    }
+
+    private static String mask(String value)
+    {
+      if (value == null) {
+        return String.valueOf(value);
+      }
+      return "*****";
+    }
+  }
+
+  public static class ServiceConfig
+  {
+    private static final long DEFAULT_HEALTH_CHECK_INTERVAL_MS = 10_000;
+    private static final long DEFAULT_DEREGISTER_AFTER_MS = 90_000;
+
+    private final String servicePrefix;
+    @Nullable
+    private final String datacenter;
+    @Nullable
+    private final Map<String, String> serviceTags;
+    private final Duration healthCheckInterval;
+    private final Duration deregisterAfter;
+
+    @JsonCreator
+    public ServiceConfig(
+        @JsonProperty("servicePrefix") String servicePrefix,
+        @JsonProperty("datacenter") @Nullable String datacenter,
+        @JsonProperty("serviceTags") @Nullable Map<String, String> serviceTags,
+        @JsonProperty("healthCheckInterval") @Nullable Duration 
healthCheckInterval,
+        @JsonProperty("deregisterAfter") @Nullable Duration deregisterAfter
+    )
+    {
+      if (servicePrefix == null || servicePrefix.isEmpty()) {
+        throw new IAE("servicePrefix cannot be null or empty");
+      }
+      this.servicePrefix = servicePrefix;
+      this.datacenter = datacenter;
+      this.serviceTags = serviceTags == null
+                         ? null
+                         : Collections.unmodifiableMap(new 
LinkedHashMap<>(serviceTags));
+      this.healthCheckInterval = validatePositive(healthCheckInterval, 
DEFAULT_HEALTH_CHECK_INTERVAL_MS, "healthCheckInterval");
+      this.deregisterAfter = validateNonNegative(deregisterAfter, 
DEFAULT_DEREGISTER_AFTER_MS, "deregisterAfter");
+    }
+
+    @JsonProperty
+    public String getServicePrefix()
+    {
+      return servicePrefix;
+    }
+
+    @JsonProperty
+    @Nullable
+    public String getDatacenter()
+    {
+      return datacenter;
+    }
+
+    @JsonProperty
+    @Nullable
+    public Map<String, String> getServiceTags()

Review Comment:
   ## Exposing internal representation
   
   getServiceTags exposes the internal representation stored in field 
serviceTags. The value may be modified [after this call to getServiceTags](1).
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10590)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to