gianm commented on code in PR #18302:
URL: https://github.com/apache/druid/pull/18302#discussion_r2238749731


##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.docker;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.derby.EmbeddedDerbyMetadataResource;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.testing.embedded.minio.MinIOStorageResource;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Runs some basic ingestion tests using {@link DruidContainers}.
+ */
+public class IngestionDockerTest extends EmbeddedClusterTestBase
+{
+  static {
+    System.setProperty("druid.testing.docker.image", 
"apache/druid:35.0.0-SNAPSHOT");
+  }
+
+  // Druid Docker containers
+  protected final DruidContainerResource overlordLeader = 
DruidContainers.newOverlord().withTestImage();
+  protected final DruidContainerResource coordinator = 
DruidContainers.newCoordinator().withTestImage();
+  protected final DruidContainerResource historical = 
DruidContainers.newHistorical().withTestImage();
+  protected final DruidContainerResource broker1 = 
DruidContainers.newBroker().withTestImage();
+  protected final DruidContainerResource middleManager = DruidContainers
+      .newMiddleManager()
+      .withTestImage()
+      .addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+
+  // Follower EmbeddedOverlord to watch segment publish events
+  private final EmbeddedOverlord overlordFollower = new EmbeddedOverlord()
+      .addProperty("druid.plaintextPort", "7090")
+      .addProperty("druid.manager.segments.useIncrementalCache", "always")
+      .addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+
+  // Additional EmbeddedBroker to wait for segments to become queryable
+  private final EmbeddedBroker broker2 = new EmbeddedBroker()
+      .addProperty("druid.plaintextPort", "7082");
+
+  private final KafkaResource kafkaServer = new KafkaResource();
+
+  @Override
+  public EmbeddedDruidCluster createCluster()
+  {
+    final EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withZookeeper();
+
+    return cluster
+        .useDruidContainers()
+        .useLatchableEmitter()
+        // Needed for overlordFollower to recognize the KafkaSupervisor type
+        .addExtension(KafkaIndexTaskModule.class)
+        .addResource(new EmbeddedDerbyMetadataResource())
+        .addResource(new MinIOStorageResource())
+        .addResource(kafkaServer)
+        .addCommonProperty(
+            "druid.extensions.loadList",
+            "[\"druid-s3-extensions\", \"druid-kafka-indexing-service\", 
\"druid-multi-stage-query\"]"
+        )
+        .addResource(coordinator)
+        .addResource(overlordLeader)
+        .addResource(middleManager)
+        .addResource(historical)
+        .addResource(broker1)
+        .addServer(overlordFollower)
+        .addServer(broker2)
+        .addServer(new EmbeddedRouter());
+  }
+
+  @Test
+  public void test_runIndexTask_andKillData()
+  {
+
+  }
+
+  @Test
+  public void test_runIndexParallelTask_andCompactData()
+  {
+
+  }
+
+  @Test
+  public void test_runMsqTask_andQueryData()
+  {
+
+  }
+
+  @Test
+  public void test_runIndexTask()
+  {
+    final String taskId = IdUtils.getRandomId();
+    final Object task = createIndexTaskForInlineData(
+        taskId,
+        StringUtils.replace(Resources.CSV_DATA_10_DAYS, "\n", "\\n")
+    );
+
+    cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
+    waitForCachedUsedSegmentCount(10);
+    verifyUsedSegmentCount(10);
+  }
+
+  private Object createIndexTaskForInlineData(String taskId, String 
inlineDataCsv)
+  {
+    return EmbeddedClusterApis.createTaskFromPayload(
+        taskId,
+        StringUtils.format(Resources.INDEX_TASK_PAYLOAD_WITH_INLINE_DATA, 
inlineDataCsv, dataSource)
+    );
+  }
+
+  @Test
+  public void test_runKafkaSupervisor()
+  {
+    final String topic = dataSource;
+    kafkaServer.createTopicWithPartitions(topic, 2);
+
+    kafkaServer.produceRecordsToTopic(
+        generateRecordsForTopic(topic, 10, DateTimes.of("2025-06-01"))
+    );
+
+    // Submit and start a supervisor
+    final String supervisorId = dataSource;
+    final KafkaSupervisorSpec kafkaSupervisorSpec = 
createKafkaSupervisor(supervisorId, topic);
+
+    final Map<String, String> startSupervisorResult = 
cluster.callApi().onLeaderOverlord(
+        o -> o.postSupervisor(kafkaSupervisorSpec)
+    );
+    Assertions.assertEquals(Map.of("id", supervisorId), startSupervisorResult);
+
+    // Wait for the broker to discover the realtime segments
+    broker2.latchableEmitter().waitForEvent(
+        event -> event.hasDimension(DruidMetrics.DATASOURCE, dataSource)
+    );
+
+    SupervisorStatus supervisorStatus = 
cluster.callApi().getSupervisorStatus(supervisorId);
+    Assertions.assertFalse(supervisorStatus.isSuspended());
+    Assertions.assertTrue(supervisorStatus.isHealthy());
+    Assertions.assertEquals("RUNNING", supervisorStatus.getState());
+    Assertions.assertEquals(topic, supervisorStatus.getSource());
+
+    // Get the task statuses
+    List<TaskStatusPlus> taskStatuses = ImmutableList.copyOf(
+        (CloseableIterator<TaskStatusPlus>)
+            cluster.callApi().onLeaderOverlord(o -> o.taskStatuses(null, 
dataSource, 1))
+    );
+    Assertions.assertEquals(1, taskStatuses.size());
+    Assertions.assertEquals(TaskState.RUNNING, 
taskStatuses.get(0).getStatusCode());
+
+    // Verify the count of rows ingested into the datasource so far
+    // Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s", 
dataSource));

Review Comment:
   Please don't commit commented-out code.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.docker;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.derby.EmbeddedDerbyMetadataResource;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.testing.embedded.minio.MinIOStorageResource;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Runs some basic ingestion tests using {@link DruidContainers}.
+ */
+public class IngestionDockerTest extends EmbeddedClusterTestBase
+{
+  static {
+    System.setProperty("druid.testing.docker.image", 
"apache/druid:35.0.0-SNAPSHOT");

Review Comment:
   Setting this in a static initializer doesn't seem right, because it will be 
set when the class is loaded, not necessarily when the test runs. Maybe with 
`@BeforeAll`.
   
   Although, it'd be even better to not have to set the property at all. Like, 
perhaps the property just sets a default, but tests are able to specify their 
own when constructing the `DruidContainer`. That's how it works for other kinds 
of testcontainers.
   
   Also- about the value- do we need to change the value when Druid's version 
is upgraded? Is it possible to get it programmatically, such as from 
`DruidContainerResource.class.getPackage().getImplementationVersion()`? Should 
work when running from a jar, possibly not from an IDE.
   
   Finally, if we are going to keep needing to set this property, better to 
refer to the name by `DruidContainerResource.PROPERTY_TEST_IMAGE` rather than 
hardcoding it.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.docker;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.derby.EmbeddedDerbyMetadataResource;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.testing.embedded.minio.MinIOStorageResource;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Runs some basic ingestion tests using {@link DruidContainers}.
+ */
+public class IngestionDockerTest extends EmbeddedClusterTestBase
+{
+  static {
+    System.setProperty("druid.testing.docker.image", 
"apache/druid:35.0.0-SNAPSHOT");
+  }
+
+  // Druid Docker containers
+  protected final DruidContainerResource overlordLeader = 
DruidContainers.newOverlord().withTestImage();
+  protected final DruidContainerResource coordinator = 
DruidContainers.newCoordinator().withTestImage();
+  protected final DruidContainerResource historical = 
DruidContainers.newHistorical().withTestImage();
+  protected final DruidContainerResource broker1 = 
DruidContainers.newBroker().withTestImage();
+  protected final DruidContainerResource middleManager = DruidContainers
+      .newMiddleManager()
+      .withTestImage()
+      .addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+
+  // Follower EmbeddedOverlord to watch segment publish events
+  private final EmbeddedOverlord overlordFollower = new EmbeddedOverlord()
+      .addProperty("druid.plaintextPort", "7090")
+      .addProperty("druid.manager.segments.useIncrementalCache", "always")
+      .addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+
+  // Additional EmbeddedBroker to wait for segments to become queryable
+  private final EmbeddedBroker broker2 = new EmbeddedBroker()
+      .addProperty("druid.plaintextPort", "7082");
+
+  private final KafkaResource kafkaServer = new KafkaResource();
+
+  @Override
+  public EmbeddedDruidCluster createCluster()
+  {
+    final EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withZookeeper();
+
+    return cluster
+        .useDruidContainers()
+        .useLatchableEmitter()
+        // Needed for overlordFollower to recognize the KafkaSupervisor type
+        .addExtension(KafkaIndexTaskModule.class)
+        .addResource(new EmbeddedDerbyMetadataResource())
+        .addResource(new MinIOStorageResource())
+        .addResource(kafkaServer)
+        .addCommonProperty(
+            "druid.extensions.loadList",
+            "[\"druid-s3-extensions\", \"druid-kafka-indexing-service\", 
\"druid-multi-stage-query\"]"
+        )
+        .addResource(coordinator)
+        .addResource(overlordLeader)
+        .addResource(middleManager)
+        .addResource(historical)
+        .addResource(broker1)
+        .addServer(overlordFollower)
+        .addServer(broker2)
+        .addServer(new EmbeddedRouter());
+  }
+
+  @Test
+  public void test_runIndexTask_andKillData()
+  {
+

Review Comment:
   a comment about why this is empty would be helpful (same goes for the others)



##########
extensions-core/druid-testcontainers/pom.xml:
##########
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>org.apache.druid.extensions</groupId>
+  <artifactId>druid-testcontainers</artifactId>
+  <name>druid-testcontainers</name>
+  <description>Testcontainers for running Apache Druid services in 
tests</description>
+
+  <parent>
+    <groupId>org.apache.druid</groupId>
+    <artifactId>druid</artifactId>
+    <version>35.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <!-- This module should have minimal dependencies so that it can be used 
independently
+   by libraries and codebases outside Druid to run Testcontainers -->
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-processing</artifactId>

Review Comment:
   `druid-processing` is a pretty big dependency with lots of transitive 
dependencies. It looks like it's just for `StringUtils` and `Logger`. It would 
be better to avoid those and remove this dependency. For `Logger`, you could 
either refrain from logging stuff, or you could use `org.slf4j.Logger` directly 
and depend on `slf4j-api`.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/DruidContainerResource.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.docker;
+
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.DruidCommand;
+import org.apache.druid.testing.DruidContainer;
+import org.apache.druid.testing.MountedDir;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.TestcontainerResource;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * {@link TestcontainerResource} to run Druid services.
+ * Currently, only core extensions can be used out-of-the-box with these 
containers
+ * such as {@code druid-s3-extensions} or {@code postgresql-metadata-storage},
+ * simply by adding them to {@code druid.extensions.loadList}.
+ * <p>
+ * {@link DruidContainers} should be used only for testing backward 
compatiblity
+ * or a Docker-specific feature. For all other testing needs, use plain old
+ * {@code EmbeddedDruidServer} as they are much faster, allow easy debugging 
and
+ * do not require downloading any images.
+ */
+public class DruidContainerResource extends 
TestcontainerResource<DruidContainer>
+{
+  /**
+   * Java system property to specify the name of the Docker test image.
+   */
+  public static final String PROPERTY_TEST_IMAGE = 
"druid.testing.docker.image";
+
+  private static final Logger log = new Logger(DruidContainerResource.class);
+
+  /**
+   * Forbidden server properties that may be used by EmbeddedDruidServers but
+   * interfere with the functioning of DruidContainer-based services.
+   */
+  private static final Set<String> FORBIDDEN_PROPERTIES = Set.of(
+      "druid.extensions.modulesForEmbeddedTests",
+      "druid.emitter"
+  );
+
+  /**
+   * A static incremental ID is used instead of a random number to ensure that
+   * tests are more deterministic and easier to debug.
+   */
+  private static final AtomicInteger SERVER_ID = new AtomicInteger(0);
+
+  private final String name;
+  private final DruidCommand command;
+  private final Map<String, String> properties = new HashMap<>();
+
+  private DockerImageName imageName;
+  private EmbeddedDruidCluster cluster;
+
+  private File containerDirectory;
+
+  private MountedDir indexerLogsDeepStorageDirectory;
+  private MountedDir serviceLogsDirectory;
+  private MountedDir segmentDeepStorageDirectory;
+
+  DruidContainerResource(DruidCommand command)
+  {
+    this.name = StringUtils.format(
+        "container_%s_%d",
+        command.getName(),
+        SERVER_ID.incrementAndGet()
+    );
+    this.command = command;
+    addProperty("druid.host", EmbeddedDruidCluster.getDefaultHost());
+  }
+
+  public DruidContainerResource withImage(DockerImageName imageName)
+  {
+    this.imageName = imageName;
+    return this;
+  }
+
+  /**
+   * Uses the Docker test image specified by the system property
+   * {@link #PROPERTY_TEST_IMAGE} for this container.
+   */
+  public DruidContainerResource withTestImage()

Review Comment:
   Similar comment as above, IMO `usingTestImage()` is clearer.



##########
services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java:
##########
@@ -231,13 +242,50 @@ public TestFolder getTestFolder()
   }
 
   /**
-   * The embedded Zookeeper server used by this cluster, if any.
-   *
-   * @throws NullPointerException if this cluster has no embedded zookeeper.
+   * Hostname to be used for embedded services (both Druid or external).
+   * Using this hostname ensures that the underlying service is reachable by 
both
+   * EmbeddedDruidServers and DruidContainers.
+   */
+  public String getEmbeddedServiceHostname()
+  {
+    return hasDruidContainers ? getDefaultHost() : "localhost";
+  }
+
+  /**
+   * Hostname for the host machine running the containers. Using this hostname
+   * instead of "localhost" allows all the Druid containers to talk to each
+   * other and also other EmbeddedDruidServers.
    */
-  public EmbeddedZookeeper getZookeeper()
+  public static String getDefaultHost()
   {
-    return Objects.requireNonNull(zookeeper, "No embedded zookeeper configured 
for this cluster");
+    try {
+      return InetAddress.getLocalHost().getHostAddress();
+    }
+    catch (UnknownHostException e) {
+      throw new ISE(e, "Unable to determine host name");
+    }
+  }
+
+  /**
+   * Replaces {@code localhost} or {@code 127.0.0.1} in the given connectUri
+   * with {@link #getEmbeddedServiceHostname()}. Using the embedded URI ensures
+   * that the underlying service is reachable by both EmbeddedDruidServers and
+   * DruidContainers.
+   */
+  public String getEmbeddedConnectUri(String connectUri)
+  {
+    if (!hasDruidContainers) {
+      return connectUri;
+    } else if (connectUri.contains("localhost")) {
+      return StringUtils.replace(connectUri, "localhost", 
getEmbeddedServiceHostname());

Review Comment:
   Seeing URI manipulations like this stresses me out ;P
   
   It's just test code, but it's still better form to manipulate the URL 
properly. That means parsing it with `new URI(connectUri)` and then creating a 
replacement `URI` using the 7-arg constructor.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.docker;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.derby.EmbeddedDerbyMetadataResource;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.testing.embedded.minio.MinIOStorageResource;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Runs some basic ingestion tests using {@link DruidContainers}.
+ */
+public class IngestionDockerTest extends EmbeddedClusterTestBase
+{
+  static {
+    System.setProperty("druid.testing.docker.image", 
"apache/druid:35.0.0-SNAPSHOT");
+  }
+
+  // Druid Docker containers
+  protected final DruidContainerResource overlordLeader = 
DruidContainers.newOverlord().withTestImage();

Review Comment:
   I don't think we can be sure that this one is always the leader. It starts 
up first, but maybe something happens that causes a leader change. Or maybe for 
some reason the container version fails to gain leadership due to an error, and 
then the `overlordFollower` becomes leader.
   
   It would help to add some check in the tests that confirms this one is 
actually leading. Ideally we check at the both beginning and end of each test.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/DruidContainerResource.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.docker;
+
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.DruidCommand;
+import org.apache.druid.testing.DruidContainer;
+import org.apache.druid.testing.MountedDir;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.TestcontainerResource;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * {@link TestcontainerResource} to run Druid services.
+ * Currently, only core extensions can be used out-of-the-box with these 
containers
+ * such as {@code druid-s3-extensions} or {@code postgresql-metadata-storage},
+ * simply by adding them to {@code druid.extensions.loadList}.
+ * <p>
+ * {@link DruidContainers} should be used only for testing backward 
compatiblity
+ * or a Docker-specific feature. For all other testing needs, use plain old
+ * {@code EmbeddedDruidServer} as they are much faster, allow easy debugging 
and
+ * do not require downloading any images.
+ */
+public class DruidContainerResource extends 
TestcontainerResource<DruidContainer>
+{
+  /**
+   * Java system property to specify the name of the Docker test image.
+   */
+  public static final String PROPERTY_TEST_IMAGE = 
"druid.testing.docker.image";
+
+  private static final Logger log = new Logger(DruidContainerResource.class);
+
+  /**
+   * Forbidden server properties that may be used by EmbeddedDruidServers but
+   * interfere with the functioning of DruidContainer-based services.
+   */
+  private static final Set<String> FORBIDDEN_PROPERTIES = Set.of(
+      "druid.extensions.modulesForEmbeddedTests",
+      "druid.emitter"
+  );
+
+  /**
+   * A static incremental ID is used instead of a random number to ensure that
+   * tests are more deterministic and easier to debug.
+   */
+  private static final AtomicInteger SERVER_ID = new AtomicInteger(0);
+
+  private final String name;
+  private final DruidCommand command;
+  private final Map<String, String> properties = new HashMap<>();
+
+  private DockerImageName imageName;
+  private EmbeddedDruidCluster cluster;
+
+  private File containerDirectory;
+
+  private MountedDir indexerLogsDeepStorageDirectory;
+  private MountedDir serviceLogsDirectory;
+  private MountedDir segmentDeepStorageDirectory;
+
+  DruidContainerResource(DruidCommand command)
+  {
+    this.name = StringUtils.format(
+        "container_%s_%d",
+        command.getName(),
+        SERVER_ID.incrementAndGet()
+    );
+    this.command = command;
+    addProperty("druid.host", EmbeddedDruidCluster.getDefaultHost());
+  }
+
+  public DruidContainerResource withImage(DockerImageName imageName)

Review Comment:
   To me a method named `withX` suggests a copy is being made, and the original 
object is not modified. But here the original object is in fact modified. IMO 
`setImage` or `useImage` would be clearer.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.docker;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.derby.EmbeddedDerbyMetadataResource;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.testing.embedded.minio.MinIOStorageResource;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Runs some basic ingestion tests using {@link DruidContainers}.
+ */
+public class IngestionDockerTest extends EmbeddedClusterTestBase
+{
+  static {
+    System.setProperty("druid.testing.docker.image", 
"apache/druid:35.0.0-SNAPSHOT");
+  }
+
+  // Druid Docker containers
+  protected final DruidContainerResource overlordLeader = 
DruidContainers.newOverlord().withTestImage();
+  protected final DruidContainerResource coordinator = 
DruidContainers.newCoordinator().withTestImage();
+  protected final DruidContainerResource historical = 
DruidContainers.newHistorical().withTestImage();
+  protected final DruidContainerResource broker1 = 
DruidContainers.newBroker().withTestImage();
+  protected final DruidContainerResource middleManager = DruidContainers
+      .newMiddleManager()
+      .withTestImage()
+      .addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+
+  // Follower EmbeddedOverlord to watch segment publish events
+  private final EmbeddedOverlord overlordFollower = new EmbeddedOverlord()
+      .addProperty("druid.plaintextPort", "7090")
+      .addProperty("druid.manager.segments.useIncrementalCache", "always")
+      .addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+
+  // Additional EmbeddedBroker to wait for segments to become queryable
+  private final EmbeddedBroker broker2 = new EmbeddedBroker()
+      .addProperty("druid.plaintextPort", "7082");
+
+  private final KafkaResource kafkaServer = new KafkaResource();
+
+  @Override
+  public EmbeddedDruidCluster createCluster()
+  {
+    final EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withZookeeper();
+
+    return cluster
+        .useDruidContainers()
+        .useLatchableEmitter()
+        // Needed for overlordFollower to recognize the KafkaSupervisor type
+        .addExtension(KafkaIndexTaskModule.class)
+        .addResource(new EmbeddedDerbyMetadataResource())
+        .addResource(new MinIOStorageResource())
+        .addResource(kafkaServer)
+        .addCommonProperty(
+            "druid.extensions.loadList",
+            "[\"druid-s3-extensions\", \"druid-kafka-indexing-service\", 
\"druid-multi-stage-query\"]"
+        )
+        .addResource(coordinator)
+        .addResource(overlordLeader)
+        .addResource(middleManager)
+        .addResource(historical)
+        .addResource(broker1)
+        .addServer(overlordFollower)

Review Comment:
   a comment about why we mix containers and noncontainers in this test would 
be helpful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to