This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8f933c253c9ff37e4ce588a2495258de4502bcde Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Tue Dec 1 13:19:46 2020 +0100 [hotfix] Speed up flink container creation in testcontainers --- .../util/kafka/SQLClientSchemaRegistryITCase.java | 4 -- .../flink/tests/util/flink/FlinkContainer.java | 80 +++++++++++++++++----- 2 files changed, 64 insertions(+), 20 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java index 34500d3..f479648 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java @@ -42,7 +42,6 @@ import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.utility.DockerImageName; -import java.io.IOException; import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; @@ -90,9 +89,6 @@ public class SQLClientSchemaRegistryITCase { private final KafkaContainerClient kafkaClient = new KafkaContainerClient(kafka); private CachedSchemaRegistryClient registryClient; - public SQLClientSchemaRegistryITCase() throws IOException { - } - @Before public void setUp() { registryClient = new CachedSchemaRegistryClient( diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java index e970025..72a4658 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java @@ -27,6 +27,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessin import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.exception.NotFoundException; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.junit.rules.TemporaryFolder; @@ -54,6 +55,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -243,10 +245,9 @@ public class FlinkContainer extends GenericContainer<FlinkContainer> implements return this; } - public FlinkContainer build() throws IOException { + public FlinkContainer build() { try { Path flinkDist = FileUtils.findFlinkDist(); - String flinkDistName = flinkDist.getFileName().toString(); temporaryFolder.create(); Path tmp = temporaryFolder.newFolder().toPath(); Path workersFile = tmp.resolve("workers"); @@ -256,18 +257,16 @@ public class FlinkContainer extends GenericContainer<FlinkContainer> implements .mapToObj(i -> "localhost") .collect(Collectors.toList())); - ImageFromDockerfile image = new ImageFromDockerfile("flink-dist", true) - .withDockerfileFromBuilder( - builder -> { - builder.from("openjdk:" + getJavaVersionSuffix()) - .copy(flinkDistName, "flink") - .copy(flinkDistName + "/conf/workers", "workers") - .cmd(FLINK_BIN + "/start-cluster.sh && tail -f /dev/null") - .build(); - } - ) - .withFileFromPath("workers", workersFile) - .withFileFromPath(flinkDistName, flinkDist); + // Building the docker image is split into two stages: + // 1. build a base image with an immutable flink-dist + // 2. based on the base image add any mutable files such as e.g. workers files + // + // This lets us save some time for archiving and copying big, immutable files + // between tests runs. + String baseImage = buildBaseImage(flinkDist); + ImageFromDockerfile configuredImage = buildConfiguredImage( + workersFile, + baseImage); Optional<Path> logBackupDirectory = DISTRIBUTION_LOG_BACKUP_DIRECTORY.get(); if (!logBackupDirectory.isPresent()) { @@ -275,9 +274,58 @@ public class FlinkContainer extends GenericContainer<FlinkContainer> implements "Property {} not set, logs will not be backed up in case of test failures.", DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName()); } - return new FlinkContainer(image, numTaskManagers, logBackupDirectory.orElse(null)); - } finally { + return new FlinkContainer( + configuredImage, + numTaskManagers, + logBackupDirectory.orElse(null)); + } catch (Exception e) { temporaryFolder.delete(); + throw new RuntimeException("Could not build the flink-dist image", e); + } + } + + private ImageFromDockerfile buildConfiguredImage( + Path workersFile, + String baseImage) { + return new ImageFromDockerfile( + "flink-dist-configured") + .withDockerfileFromBuilder( + builder -> builder.from(baseImage) + .copy("workers", "flink/conf/workers") + .cmd(FLINK_BIN + "/start-cluster.sh && tail -f /dev/null") + .build() + ) + .withFileFromPath("workers", workersFile); + } + + @Nonnull + private String buildBaseImage(Path flinkDist) + throws java.util.concurrent.TimeoutException { + String baseImage = "flink-dist-base"; + if (!imageExists(baseImage)) { + new ImageFromDockerfile(baseImage) + .withDockerfileFromBuilder( + builder -> builder + .from("openjdk:" + getJavaVersionSuffix()) + .copy("flink", "flink") + .build() + ) + .withFileFromPath("flink", flinkDist) + .get(1, TimeUnit.MINUTES); + } + return baseImage; + } + + private boolean imageExists(String baseImage) { + try { + DockerClientFactory + .instance() + .client() + .inspectImageCmd(baseImage) + .exec(); + return true; + } catch (NotFoundException e) { + return false; } }