gianm commented on code in PR #18302: URL: https://github.com/apache/druid/pull/18302#discussion_r2238749731
########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java: ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.docker; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; +import org.apache.druid.indexing.kafka.simulate.KafkaResource; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig; +import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedClusterApis; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.EmbeddedRouter; +import org.apache.druid.testing.embedded.derby.EmbeddedDerbyMetadataResource; +import org.apache.druid.testing.embedded.indexing.Resources; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.minio.MinIOStorageResource; +import org.apache.druid.timeline.DataSegment; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.joda.time.DateTime; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Runs some basic ingestion tests using {@link DruidContainers}. + */ +public class IngestionDockerTest extends EmbeddedClusterTestBase +{ + static { + System.setProperty("druid.testing.docker.image", "apache/druid:35.0.0-SNAPSHOT"); + } + + // Druid Docker containers + protected final DruidContainerResource overlordLeader = DruidContainers.newOverlord().withTestImage(); + protected final DruidContainerResource coordinator = DruidContainers.newCoordinator().withTestImage(); + protected final DruidContainerResource historical = DruidContainers.newHistorical().withTestImage(); + protected final DruidContainerResource broker1 = DruidContainers.newBroker().withTestImage(); + protected final DruidContainerResource middleManager = DruidContainers + .newMiddleManager() + .withTestImage() + .addProperty("druid.segment.handoff.pollDuration", "PT0.1s"); + + // Follower EmbeddedOverlord to watch segment publish events + private final EmbeddedOverlord overlordFollower = new EmbeddedOverlord() + .addProperty("druid.plaintextPort", "7090") + .addProperty("druid.manager.segments.useIncrementalCache", "always") + .addProperty("druid.manager.segments.pollDuration", "PT0.1s"); + + // Additional EmbeddedBroker to wait for segments to become queryable + private final EmbeddedBroker broker2 = new EmbeddedBroker() + .addProperty("druid.plaintextPort", "7082"); + + private final KafkaResource kafkaServer = new KafkaResource(); + + @Override + public EmbeddedDruidCluster createCluster() + { + final EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withZookeeper(); + + return cluster + .useDruidContainers() + .useLatchableEmitter() + // Needed for overlordFollower to recognize the KafkaSupervisor type + .addExtension(KafkaIndexTaskModule.class) + .addResource(new EmbeddedDerbyMetadataResource()) + .addResource(new MinIOStorageResource()) + .addResource(kafkaServer) + .addCommonProperty( + "druid.extensions.loadList", + "[\"druid-s3-extensions\", \"druid-kafka-indexing-service\", \"druid-multi-stage-query\"]" + ) + .addResource(coordinator) + .addResource(overlordLeader) + .addResource(middleManager) + .addResource(historical) + .addResource(broker1) + .addServer(overlordFollower) + .addServer(broker2) + .addServer(new EmbeddedRouter()); + } + + @Test + public void test_runIndexTask_andKillData() + { + + } + + @Test + public void test_runIndexParallelTask_andCompactData() + { + + } + + @Test + public void test_runMsqTask_andQueryData() + { + + } + + @Test + public void test_runIndexTask() + { + final String taskId = IdUtils.getRandomId(); + final Object task = createIndexTaskForInlineData( + taskId, + StringUtils.replace(Resources.CSV_DATA_10_DAYS, "\n", "\\n") + ); + + cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task)); + waitForCachedUsedSegmentCount(10); + verifyUsedSegmentCount(10); + } + + private Object createIndexTaskForInlineData(String taskId, String inlineDataCsv) + { + return EmbeddedClusterApis.createTaskFromPayload( + taskId, + StringUtils.format(Resources.INDEX_TASK_PAYLOAD_WITH_INLINE_DATA, inlineDataCsv, dataSource) + ); + } + + @Test + public void test_runKafkaSupervisor() + { + final String topic = dataSource; + kafkaServer.createTopicWithPartitions(topic, 2); + + kafkaServer.produceRecordsToTopic( + generateRecordsForTopic(topic, 10, DateTimes.of("2025-06-01")) + ); + + // Submit and start a supervisor + final String supervisorId = dataSource; + final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor(supervisorId, topic); + + final Map<String, String> startSupervisorResult = cluster.callApi().onLeaderOverlord( + o -> o.postSupervisor(kafkaSupervisorSpec) + ); + Assertions.assertEquals(Map.of("id", supervisorId), startSupervisorResult); + + // Wait for the broker to discover the realtime segments + broker2.latchableEmitter().waitForEvent( + event -> event.hasDimension(DruidMetrics.DATASOURCE, dataSource) + ); + + SupervisorStatus supervisorStatus = cluster.callApi().getSupervisorStatus(supervisorId); + Assertions.assertFalse(supervisorStatus.isSuspended()); + Assertions.assertTrue(supervisorStatus.isHealthy()); + Assertions.assertEquals("RUNNING", supervisorStatus.getState()); + Assertions.assertEquals(topic, supervisorStatus.getSource()); + + // Get the task statuses + List<TaskStatusPlus> taskStatuses = ImmutableList.copyOf( + (CloseableIterator<TaskStatusPlus>) + cluster.callApi().onLeaderOverlord(o -> o.taskStatuses(null, dataSource, 1)) + ); + Assertions.assertEquals(1, taskStatuses.size()); + Assertions.assertEquals(TaskState.RUNNING, taskStatuses.get(0).getStatusCode()); + + // Verify the count of rows ingested into the datasource so far + // Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s", dataSource)); Review Comment: Please don't commit commented-out code. ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java: ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.docker; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; +import org.apache.druid.indexing.kafka.simulate.KafkaResource; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig; +import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedClusterApis; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.EmbeddedRouter; +import org.apache.druid.testing.embedded.derby.EmbeddedDerbyMetadataResource; +import org.apache.druid.testing.embedded.indexing.Resources; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.minio.MinIOStorageResource; +import org.apache.druid.timeline.DataSegment; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.joda.time.DateTime; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Runs some basic ingestion tests using {@link DruidContainers}. + */ +public class IngestionDockerTest extends EmbeddedClusterTestBase +{ + static { + System.setProperty("druid.testing.docker.image", "apache/druid:35.0.0-SNAPSHOT"); Review Comment: Setting this in a static initializer doesn't seem right, because it will be set when the class is loaded, not necessarily when the test runs. Maybe with `@BeforeAll`. Although, it'd be even better to not have to set the property at all. Like, perhaps the property just sets a default, but tests are able to specify their own when constructing the `DruidContainer`. That's how it works for other kinds of testcontainers. Also- about the value- do we need to change the value when Druid's version is upgraded? Is it possible to get it programmatically, such as from `DruidContainerResource.class.getPackage().getImplementationVersion()`? Should work when running from a jar, possibly not from an IDE. Finally, if we are going to keep needing to set this property, better to refer to the name by `DruidContainerResource.PROPERTY_TEST_IMAGE` rather than hardcoding it. ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java: ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.docker; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; +import org.apache.druid.indexing.kafka.simulate.KafkaResource; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig; +import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedClusterApis; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.EmbeddedRouter; +import org.apache.druid.testing.embedded.derby.EmbeddedDerbyMetadataResource; +import org.apache.druid.testing.embedded.indexing.Resources; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.minio.MinIOStorageResource; +import org.apache.druid.timeline.DataSegment; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.joda.time.DateTime; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Runs some basic ingestion tests using {@link DruidContainers}. + */ +public class IngestionDockerTest extends EmbeddedClusterTestBase +{ + static { + System.setProperty("druid.testing.docker.image", "apache/druid:35.0.0-SNAPSHOT"); + } + + // Druid Docker containers + protected final DruidContainerResource overlordLeader = DruidContainers.newOverlord().withTestImage(); + protected final DruidContainerResource coordinator = DruidContainers.newCoordinator().withTestImage(); + protected final DruidContainerResource historical = DruidContainers.newHistorical().withTestImage(); + protected final DruidContainerResource broker1 = DruidContainers.newBroker().withTestImage(); + protected final DruidContainerResource middleManager = DruidContainers + .newMiddleManager() + .withTestImage() + .addProperty("druid.segment.handoff.pollDuration", "PT0.1s"); + + // Follower EmbeddedOverlord to watch segment publish events + private final EmbeddedOverlord overlordFollower = new EmbeddedOverlord() + .addProperty("druid.plaintextPort", "7090") + .addProperty("druid.manager.segments.useIncrementalCache", "always") + .addProperty("druid.manager.segments.pollDuration", "PT0.1s"); + + // Additional EmbeddedBroker to wait for segments to become queryable + private final EmbeddedBroker broker2 = new EmbeddedBroker() + .addProperty("druid.plaintextPort", "7082"); + + private final KafkaResource kafkaServer = new KafkaResource(); + + @Override + public EmbeddedDruidCluster createCluster() + { + final EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withZookeeper(); + + return cluster + .useDruidContainers() + .useLatchableEmitter() + // Needed for overlordFollower to recognize the KafkaSupervisor type + .addExtension(KafkaIndexTaskModule.class) + .addResource(new EmbeddedDerbyMetadataResource()) + .addResource(new MinIOStorageResource()) + .addResource(kafkaServer) + .addCommonProperty( + "druid.extensions.loadList", + "[\"druid-s3-extensions\", \"druid-kafka-indexing-service\", \"druid-multi-stage-query\"]" + ) + .addResource(coordinator) + .addResource(overlordLeader) + .addResource(middleManager) + .addResource(historical) + .addResource(broker1) + .addServer(overlordFollower) + .addServer(broker2) + .addServer(new EmbeddedRouter()); + } + + @Test + public void test_runIndexTask_andKillData() + { + Review Comment: a comment about why this is empty would be helpful (same goes for the others) ########## extensions-core/druid-testcontainers/pom.xml: ########## @@ -0,0 +1,70 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.druid.extensions</groupId> + <artifactId>druid-testcontainers</artifactId> + <name>druid-testcontainers</name> + <description>Testcontainers for running Apache Druid services in tests</description> + + <parent> + <groupId>org.apache.druid</groupId> + <artifactId>druid</artifactId> + <version>35.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <!-- This module should have minimal dependencies so that it can be used independently + by libraries and codebases outside Druid to run Testcontainers --> + <dependencies> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-processing</artifactId> Review Comment: `druid-processing` is a pretty big dependency with lots of transitive dependencies. It looks like it's just for `StringUtils` and `Logger`. It would be better to avoid those and remove this dependency. For `Logger`, you could either refrain from logging stuff, or you could use `org.slf4j.Logger` directly and depend on `slf4j-api`. ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/DruidContainerResource.java: ########## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.docker; + +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.testing.DruidCommand; +import org.apache.druid.testing.DruidContainer; +import org.apache.druid.testing.MountedDir; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.TestcontainerResource; +import org.testcontainers.utility.DockerImageName; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * {@link TestcontainerResource} to run Druid services. + * Currently, only core extensions can be used out-of-the-box with these containers + * such as {@code druid-s3-extensions} or {@code postgresql-metadata-storage}, + * simply by adding them to {@code druid.extensions.loadList}. + * <p> + * {@link DruidContainers} should be used only for testing backward compatiblity + * or a Docker-specific feature. For all other testing needs, use plain old + * {@code EmbeddedDruidServer} as they are much faster, allow easy debugging and + * do not require downloading any images. + */ +public class DruidContainerResource extends TestcontainerResource<DruidContainer> +{ + /** + * Java system property to specify the name of the Docker test image. + */ + public static final String PROPERTY_TEST_IMAGE = "druid.testing.docker.image"; + + private static final Logger log = new Logger(DruidContainerResource.class); + + /** + * Forbidden server properties that may be used by EmbeddedDruidServers but + * interfere with the functioning of DruidContainer-based services. + */ + private static final Set<String> FORBIDDEN_PROPERTIES = Set.of( + "druid.extensions.modulesForEmbeddedTests", + "druid.emitter" + ); + + /** + * A static incremental ID is used instead of a random number to ensure that + * tests are more deterministic and easier to debug. + */ + private static final AtomicInteger SERVER_ID = new AtomicInteger(0); + + private final String name; + private final DruidCommand command; + private final Map<String, String> properties = new HashMap<>(); + + private DockerImageName imageName; + private EmbeddedDruidCluster cluster; + + private File containerDirectory; + + private MountedDir indexerLogsDeepStorageDirectory; + private MountedDir serviceLogsDirectory; + private MountedDir segmentDeepStorageDirectory; + + DruidContainerResource(DruidCommand command) + { + this.name = StringUtils.format( + "container_%s_%d", + command.getName(), + SERVER_ID.incrementAndGet() + ); + this.command = command; + addProperty("druid.host", EmbeddedDruidCluster.getDefaultHost()); + } + + public DruidContainerResource withImage(DockerImageName imageName) + { + this.imageName = imageName; + return this; + } + + /** + * Uses the Docker test image specified by the system property + * {@link #PROPERTY_TEST_IMAGE} for this container. + */ + public DruidContainerResource withTestImage() Review Comment: Similar comment as above, IMO `usingTestImage()` is clearer. ########## services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java: ########## @@ -231,13 +242,50 @@ public TestFolder getTestFolder() } /** - * The embedded Zookeeper server used by this cluster, if any. - * - * @throws NullPointerException if this cluster has no embedded zookeeper. + * Hostname to be used for embedded services (both Druid or external). + * Using this hostname ensures that the underlying service is reachable by both + * EmbeddedDruidServers and DruidContainers. + */ + public String getEmbeddedServiceHostname() + { + return hasDruidContainers ? getDefaultHost() : "localhost"; + } + + /** + * Hostname for the host machine running the containers. Using this hostname + * instead of "localhost" allows all the Druid containers to talk to each + * other and also other EmbeddedDruidServers. */ - public EmbeddedZookeeper getZookeeper() + public static String getDefaultHost() { - return Objects.requireNonNull(zookeeper, "No embedded zookeeper configured for this cluster"); + try { + return InetAddress.getLocalHost().getHostAddress(); + } + catch (UnknownHostException e) { + throw new ISE(e, "Unable to determine host name"); + } + } + + /** + * Replaces {@code localhost} or {@code 127.0.0.1} in the given connectUri + * with {@link #getEmbeddedServiceHostname()}. Using the embedded URI ensures + * that the underlying service is reachable by both EmbeddedDruidServers and + * DruidContainers. + */ + public String getEmbeddedConnectUri(String connectUri) + { + if (!hasDruidContainers) { + return connectUri; + } else if (connectUri.contains("localhost")) { + return StringUtils.replace(connectUri, "localhost", getEmbeddedServiceHostname()); Review Comment: Seeing URI manipulations like this stresses me out ;P It's just test code, but it's still better form to manipulate the URL properly. That means parsing it with `new URI(connectUri)` and then creating a replacement `URI` using the 7-arg constructor. ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java: ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.docker; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; +import org.apache.druid.indexing.kafka.simulate.KafkaResource; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig; +import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedClusterApis; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.EmbeddedRouter; +import org.apache.druid.testing.embedded.derby.EmbeddedDerbyMetadataResource; +import org.apache.druid.testing.embedded.indexing.Resources; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.minio.MinIOStorageResource; +import org.apache.druid.timeline.DataSegment; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.joda.time.DateTime; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Runs some basic ingestion tests using {@link DruidContainers}. + */ +public class IngestionDockerTest extends EmbeddedClusterTestBase +{ + static { + System.setProperty("druid.testing.docker.image", "apache/druid:35.0.0-SNAPSHOT"); + } + + // Druid Docker containers + protected final DruidContainerResource overlordLeader = DruidContainers.newOverlord().withTestImage(); Review Comment: I don't think we can be sure that this one is always the leader. It starts up first, but maybe something happens that causes a leader change. Or maybe for some reason the container version fails to gain leadership due to an error, and then the `overlordFollower` becomes leader. It would help to add some check in the tests that confirms this one is actually leading. Ideally we check at the both beginning and end of each test. ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/DruidContainerResource.java: ########## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.docker; + +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.testing.DruidCommand; +import org.apache.druid.testing.DruidContainer; +import org.apache.druid.testing.MountedDir; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.TestcontainerResource; +import org.testcontainers.utility.DockerImageName; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * {@link TestcontainerResource} to run Druid services. + * Currently, only core extensions can be used out-of-the-box with these containers + * such as {@code druid-s3-extensions} or {@code postgresql-metadata-storage}, + * simply by adding them to {@code druid.extensions.loadList}. + * <p> + * {@link DruidContainers} should be used only for testing backward compatiblity + * or a Docker-specific feature. For all other testing needs, use plain old + * {@code EmbeddedDruidServer} as they are much faster, allow easy debugging and + * do not require downloading any images. + */ +public class DruidContainerResource extends TestcontainerResource<DruidContainer> +{ + /** + * Java system property to specify the name of the Docker test image. + */ + public static final String PROPERTY_TEST_IMAGE = "druid.testing.docker.image"; + + private static final Logger log = new Logger(DruidContainerResource.class); + + /** + * Forbidden server properties that may be used by EmbeddedDruidServers but + * interfere with the functioning of DruidContainer-based services. + */ + private static final Set<String> FORBIDDEN_PROPERTIES = Set.of( + "druid.extensions.modulesForEmbeddedTests", + "druid.emitter" + ); + + /** + * A static incremental ID is used instead of a random number to ensure that + * tests are more deterministic and easier to debug. + */ + private static final AtomicInteger SERVER_ID = new AtomicInteger(0); + + private final String name; + private final DruidCommand command; + private final Map<String, String> properties = new HashMap<>(); + + private DockerImageName imageName; + private EmbeddedDruidCluster cluster; + + private File containerDirectory; + + private MountedDir indexerLogsDeepStorageDirectory; + private MountedDir serviceLogsDirectory; + private MountedDir segmentDeepStorageDirectory; + + DruidContainerResource(DruidCommand command) + { + this.name = StringUtils.format( + "container_%s_%d", + command.getName(), + SERVER_ID.incrementAndGet() + ); + this.command = command; + addProperty("druid.host", EmbeddedDruidCluster.getDefaultHost()); + } + + public DruidContainerResource withImage(DockerImageName imageName) Review Comment: To me a method named `withX` suggests a copy is being made, and the original object is not modified. But here the original object is in fact modified. IMO `setImage` or `useImage` would be clearer. ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java: ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.docker; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; +import org.apache.druid.indexing.kafka.simulate.KafkaResource; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig; +import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedClusterApis; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.EmbeddedRouter; +import org.apache.druid.testing.embedded.derby.EmbeddedDerbyMetadataResource; +import org.apache.druid.testing.embedded.indexing.Resources; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.minio.MinIOStorageResource; +import org.apache.druid.timeline.DataSegment; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.joda.time.DateTime; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Runs some basic ingestion tests using {@link DruidContainers}. + */ +public class IngestionDockerTest extends EmbeddedClusterTestBase +{ + static { + System.setProperty("druid.testing.docker.image", "apache/druid:35.0.0-SNAPSHOT"); + } + + // Druid Docker containers + protected final DruidContainerResource overlordLeader = DruidContainers.newOverlord().withTestImage(); + protected final DruidContainerResource coordinator = DruidContainers.newCoordinator().withTestImage(); + protected final DruidContainerResource historical = DruidContainers.newHistorical().withTestImage(); + protected final DruidContainerResource broker1 = DruidContainers.newBroker().withTestImage(); + protected final DruidContainerResource middleManager = DruidContainers + .newMiddleManager() + .withTestImage() + .addProperty("druid.segment.handoff.pollDuration", "PT0.1s"); + + // Follower EmbeddedOverlord to watch segment publish events + private final EmbeddedOverlord overlordFollower = new EmbeddedOverlord() + .addProperty("druid.plaintextPort", "7090") + .addProperty("druid.manager.segments.useIncrementalCache", "always") + .addProperty("druid.manager.segments.pollDuration", "PT0.1s"); + + // Additional EmbeddedBroker to wait for segments to become queryable + private final EmbeddedBroker broker2 = new EmbeddedBroker() + .addProperty("druid.plaintextPort", "7082"); + + private final KafkaResource kafkaServer = new KafkaResource(); + + @Override + public EmbeddedDruidCluster createCluster() + { + final EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withZookeeper(); + + return cluster + .useDruidContainers() + .useLatchableEmitter() + // Needed for overlordFollower to recognize the KafkaSupervisor type + .addExtension(KafkaIndexTaskModule.class) + .addResource(new EmbeddedDerbyMetadataResource()) + .addResource(new MinIOStorageResource()) + .addResource(kafkaServer) + .addCommonProperty( + "druid.extensions.loadList", + "[\"druid-s3-extensions\", \"druid-kafka-indexing-service\", \"druid-multi-stage-query\"]" + ) + .addResource(coordinator) + .addResource(overlordLeader) + .addResource(middleManager) + .addResource(historical) + .addResource(broker1) + .addServer(overlordFollower) Review Comment: a comment about why we mix containers and noncontainers in this test would be helpful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
