This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8f933c253c9ff37e4ce588a2495258de4502bcde
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Tue Dec 1 13:19:46 2020 +0100

    [hotfix] Speed up flink container creation in testcontainers
---
 .../util/kafka/SQLClientSchemaRegistryITCase.java  |  4 --
 .../flink/tests/util/flink/FlinkContainer.java     | 80 +++++++++++++++++-----
 2 files changed, 64 insertions(+), 20 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
index 34500d3..f479648 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
@@ -42,7 +42,6 @@ import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.utility.DockerImageName;
 
-import java.io.IOException;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Arrays;
@@ -90,9 +89,6 @@ public class SQLClientSchemaRegistryITCase {
        private final KafkaContainerClient kafkaClient = new 
KafkaContainerClient(kafka);
        private CachedSchemaRegistryClient registryClient;
 
-       public SQLClientSchemaRegistryITCase() throws IOException {
-       }
-
        @Before
        public void setUp() {
                registryClient = new CachedSchemaRegistryClient(
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java
index e970025..72a4658 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainer.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessin
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.exception.NotFoundException;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.junit.rules.TemporaryFolder;
@@ -54,6 +55,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -243,10 +245,9 @@ public class FlinkContainer extends 
GenericContainer<FlinkContainer> implements
                        return this;
                }
 
-               public FlinkContainer build() throws IOException {
+               public FlinkContainer build() {
                        try {
                                Path flinkDist = FileUtils.findFlinkDist();
-                               String flinkDistName = 
flinkDist.getFileName().toString();
                                temporaryFolder.create();
                                Path tmp = temporaryFolder.newFolder().toPath();
                                Path workersFile = tmp.resolve("workers");
@@ -256,18 +257,16 @@ public class FlinkContainer extends 
GenericContainer<FlinkContainer> implements
                                                .mapToObj(i -> "localhost")
                                                .collect(Collectors.toList()));
 
-                               ImageFromDockerfile image = new 
ImageFromDockerfile("flink-dist", true)
-                                       .withDockerfileFromBuilder(
-                                               builder -> {
-                                                       builder.from("openjdk:" 
+ getJavaVersionSuffix())
-                                                               
.copy(flinkDistName, "flink")
-                                                               
.copy(flinkDistName + "/conf/workers", "workers")
-                                                               .cmd(FLINK_BIN 
+ "/start-cluster.sh && tail -f /dev/null")
-                                                               .build();
-                                               }
-                                       )
-                                       .withFileFromPath("workers", 
workersFile)
-                                       .withFileFromPath(flinkDistName, 
flinkDist);
+                               // Building the docker image is split into two 
stages:
+                               // 1. build a base image with an immutable 
flink-dist
+                               // 2. based on the base image add any mutable 
files such as e.g. workers files
+                               //
+                               // This lets us save some time for archiving 
and copying big, immutable files
+                               // between tests runs.
+                               String baseImage = buildBaseImage(flinkDist);
+                               ImageFromDockerfile configuredImage = 
buildConfiguredImage(
+                                       workersFile,
+                                       baseImage);
 
                                Optional<Path> logBackupDirectory = 
DISTRIBUTION_LOG_BACKUP_DIRECTORY.get();
                                if (!logBackupDirectory.isPresent()) {
@@ -275,9 +274,58 @@ public class FlinkContainer extends 
GenericContainer<FlinkContainer> implements
                                                "Property {} not set, logs will 
not be backed up in case of test failures.",
                                                
DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName());
                                }
-                               return new FlinkContainer(image, 
numTaskManagers, logBackupDirectory.orElse(null));
-                       } finally {
+                               return new FlinkContainer(
+                                       configuredImage,
+                                       numTaskManagers,
+                                       logBackupDirectory.orElse(null));
+                       } catch (Exception e) {
                                temporaryFolder.delete();
+                               throw new RuntimeException("Could not build the 
flink-dist image", e);
+                       }
+               }
+
+               private ImageFromDockerfile buildConfiguredImage(
+                               Path workersFile,
+                               String baseImage) {
+                       return new ImageFromDockerfile(
+                               "flink-dist-configured")
+                               .withDockerfileFromBuilder(
+                                       builder -> builder.from(baseImage)
+                                               .copy("workers", 
"flink/conf/workers")
+                                               .cmd(FLINK_BIN + 
"/start-cluster.sh && tail -f /dev/null")
+                                               .build()
+                               )
+                               .withFileFromPath("workers", workersFile);
+               }
+
+               @Nonnull
+               private String buildBaseImage(Path flinkDist)
+                               throws java.util.concurrent.TimeoutException {
+                       String baseImage = "flink-dist-base";
+                       if (!imageExists(baseImage)) {
+                               new ImageFromDockerfile(baseImage)
+                                       .withDockerfileFromBuilder(
+                                               builder -> builder
+                                                       .from("openjdk:" + 
getJavaVersionSuffix())
+                                                       .copy("flink", "flink")
+                                                       .build()
+                                       )
+                                       .withFileFromPath("flink", flinkDist)
+                                       .get(1, TimeUnit.MINUTES);
+                       }
+                       return baseImage;
+               }
+
+               private boolean imageExists(String baseImage) {
+                       try {
+                               DockerClientFactory
+                                       .instance()
+                                       .client()
+                                       .inspectImageCmd(baseImage)
+                                       .exec();
+                               return true;
+                       } catch (NotFoundException e) {
+                               return false;
                        }
                }
 

Reply via email to