afedulov commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r896140006


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersConfig.java:
##########
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+
+import static 
org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL;
+import static 
org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT;
+import static 
org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT;
+import static 
org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+
+/** The central configuration holder for Flink container-based test 
environments. */
+public class FlinkContainersConfig {
+    private final String baseImage;
+    private final int numTaskManagers;
+    private final int numSlotsPerTaskManager;
+    private final Collection<String> jarPaths;
+    private final Configuration flinkConfiguration;
+    private final String taskManagerHostnamePrefix;
+    private final Boolean buildFromFlinkDist;
+    private final String flinkDistLocation;
+    private final String flinkHome;
+    private final String checkpointPath;
+    private final String haStoragePath;
+    private final Boolean zookeeperHA;
+    private final String zookeeperHostname;
+    private final Properties logProperties;
+
+    // Defaults
+    private static final long DEFAULT_METRIC_FETCHER_UPDATE_INTERVAL_MS = 
1000L;
+    private static final long DEFAULT_SLOT_REQUEST_TIMEOUT_MS = 10_000L;
+    private static final long DEFAULT_HEARTBEAT_TIMEOUT_MS = 5_000L;
+    private static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 1000L;
+    private static final MemorySize DEFAULT_TM_TOTAL_PROCESS_MEMORY = 
MemorySize.ofMebiBytes(1728);
+    private static final MemorySize DEFAULT_JM_TOTAL_PROCESS_MEMORY = 
MemorySize.ofMebiBytes(1600);
+
+    private static final int DEFAULT_NUM_TASK_MANAGERS = 1;
+    private static final int DEFAULT_NUM_SLOTS_PER_TASK_MANAGER = 1;
+    private static final String DEFAULT_TASK_MANAGERS_HOSTNAME_PREFIX = 
"taskmanager-";
+    private static final String DEFAULT_JOB_MANAGER_HOSTNAME = "jobmanager";
+    private static final String DEFAULT_BIND_ADDRESS = "0.0.0.0";
+    private static final String DEFAULT_FLINK_HOME = "/opt/flink";
+    private static final String DEFAULT_CHECKPOINT_PATH = DEFAULT_FLINK_HOME + 
"/checkpoint";
+    private static final String DEFAULT_HA_STORAGE_PATH = DEFAULT_FLINK_HOME + 
"/recovery";
+
+    private static final String DEFAULT_ZOOKEEPER_HOSTNAME = "zookeeper";
+    private static final String DEFAULT_ZOOKEEPER_QUORUM = 
DEFAULT_ZOOKEEPER_HOSTNAME + ":2181";
+    // --
+
+    private FlinkContainersConfig(Builder builder) {
+        baseImage = builder.baseImage;
+        numTaskManagers = builder.numTaskManagers;
+        numSlotsPerTaskManager = builder.numSlotsPerTaskManager;
+        jarPaths = builder.jarPaths;
+        flinkConfiguration = builder.flinkConfiguration;
+        taskManagerHostnamePrefix = builder.taskManagerHostnamePrefix;
+        buildFromFlinkDist = builder.buildFromFlinkDist;
+        flinkDistLocation = builder.flinkDistLocation;
+        flinkHome = builder.flinkHome;
+        checkpointPath = builder.checkpointPath;
+        haStoragePath = builder.haStoragePath;
+        zookeeperHA = builder.zookeeperHA;
+        zookeeperHostname = builder.zookeeperHostname;
+        logProperties = builder.logProperties;
+    }
+
+    /**
+     * Builder for {@code FlinkContainersConfig}.
+     *
+     * @return The builder.
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * {@code FlinkContainersConfig} based on defaults.
+     *
+     * @return The Flink containers config.
+     */
+    public static FlinkContainersConfig defaultConfig() {
+        return builder().build();
+    }
+
+    /**
+     * {@code FlinkContainersConfig} based on provided Flink configuration.
+     *
+     * @param config The config.
+     * @return The flink containers config.
+     */
+    public static FlinkContainersConfig basedOn(Configuration config) {
+        return builder().basedOn(config).build();
+    }
+
+    /** {@code FlinkContainersConfig} builder static inner class. */
+    public static final class Builder {
+        private String baseImage;
+        private int numTaskManagers = DEFAULT_NUM_TASK_MANAGERS;
+        private int numSlotsPerTaskManager = 
DEFAULT_NUM_SLOTS_PER_TASK_MANAGER;
+        private Collection<String> jarPaths = new ArrayList<>();
+        private Configuration flinkConfiguration = defaultFlinkConfig();
+        private String taskManagerHostnamePrefix = 
DEFAULT_TASK_MANAGERS_HOSTNAME_PREFIX;
+        private Boolean buildFromFlinkDist = true;
+        private String flinkDistLocation;
+        private String flinkHome = DEFAULT_FLINK_HOME;
+        private String checkpointPath = DEFAULT_CHECKPOINT_PATH;
+        private String haStoragePath = DEFAULT_HA_STORAGE_PATH;
+        private Boolean zookeeperHA = false;
+        private String zookeeperHostname = DEFAULT_ZOOKEEPER_HOSTNAME;
+        private Properties logProperties = defaultLoggingProperties();
+
+        private Builder() {}
+
+        /**
+         * Sets the {@code baseImage} and returns a reference to this Builder 
enabling method
+         * chaining.
+         *
+         * @param baseImage The {@code baseImage} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder baseImage(String baseImage) {
+            this.baseImage = baseImage;
+            this.buildFromFlinkDist = false;
+            return this;
+        }
+
+        /**
+         * Sets the {@code flinkDistLocation} and returns a reference to this 
Builder enabling
+         * method chaining.
+         *
+         * @param flinkDistLocation The {@code flinkDistLocation} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder flinkDistLocation(String flinkDistLocation) {

Review Comment:
   What would be the benefit of requiring users to pass Paths? My idea was to 
keep it simple since we know that the containers with Flink are all *nix based. 
In the end, what we care about is that the path is configured correctly for the 
container file system, however, the standard way of creating Paths ( 
`Paths.get()` ) uses `FileSystems.getDefault()` which makes it dependent on the 
target system of the user. I decided it should be less error-prone to ask for 
Strings with an exact and specific interpretation for this reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to