[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r231072668 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try { + Files.move(backupConfig, originalConfig, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOG.error("Failed to restore flink-conf.yaml", e); + } + +
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230429154 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try { + Files.move(backupConfig, originalConfig, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOG.error("Failed to restore flink-conf.yaml", e); + } + +
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230404317 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { Review comment: This is like a `E2ETestHarness` more than a `FlinkDistribution` right? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230408989 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try { + Files.move(backupConfig, originalConfig, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOG.error("Failed to restore flink-conf.yaml", e); + } + +
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230410636 ## File path: flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus.tests; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.prometheus.PrometheusReporter; +import org.apache.flink.tests.util.AutoClosableProcess; +import org.apache.flink.tests.util.CommandLineWrapper; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking; +import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking; + +/** + * End-to-end test for the PrometheusReporter. + */ +public class PrometheusReporterEndToEndITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporterEndToEndITCase.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String PROMETHEUS_FILE_NAME = "prometheus-2.4.3.linux-amd64"; + + private static final Pattern LOG_REPORTER_PORT_PATTERN = Pattern.compile(".*Started PrometheusReporter HTTP server on port ([0-9]+).*"); + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testReporter() throws Exception { + LOG.info("starting test"); + dist.copyOptJarsToLib("flink-metrics-prometheus"); + + final Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getCanonicalName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom.port", "9000-9100"); + + dist.appendConfiguration(config); + + final Path tmpPrometheusDir = tmp.newFolder().toPath().resolve("prometheus"); + final Path prometheusArchive = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME + ".tar.gz"); + final Path prometheusBinDir = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME); + final Path prometheusConfig = prometheusBinDir.resolve("prometheus.yml"); + final Path prometheusBinary = prometheusBinDir.resolve("prometheus"); + Files.createDirectory(tmpPrometheusDir); + + runBlocking( + "Download of Prometheus", + Duration.ofMinutes(5), + CommandLineWrapper + .wget("https://github.com/prometheus/prometheus/releases/download/v2.4.3/; + prometheusArchive.getFileName()) Review comment: Given that the version is already in the path, does it make sense to move the whole path or the part that includes the version to a class variable? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230409384 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try { + Files.move(backupConfig, originalConfig, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOG.error("Failed to restore flink-conf.yaml", e); + } + +
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230408639 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try { + Files.move(backupConfig, originalConfig, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOG.error("Failed to restore flink-conf.yaml", e); + } + +
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230401719 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Utility class to terminate a given {@link Process} when exiting a try-with-resources statement. + */ +public class AutoClosableProcess implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(AutoClosableProcess.class); + + private final Process process; + + public AutoClosableProcess(Process process) { + this.process = process; + } + + public static AutoClosableProcess runNonBlocking(String step, String... commands) throws IOException { + LOG.info("Step Started: " + step); + Process process = new ProcessBuilder() + .command(commands) + .inheritIO() + .start(); + return new AutoClosableProcess(process); + } + + public static void runBlocking(String step, String... commands) throws IOException { + runBlocking(step, Duration.ofSeconds(30), commands); + } + + public static void runBlocking(String step, Duration timeout, String... commands) throws IOException { + LOG.info("Step started: " + step); + Process process = new ProcessBuilder() + .command(commands) + .inheritIO() + .start(); + + try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) { + final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS); + if (!success) { + throw new TimeoutException(); + } + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException(step + " failed due to timeout."); + } + LOG.info("Step complete: " + step); + } + + @Override + public void close() throws IOException { + if (process.isAlive()) { + process.destroy(); + try { + process.waitFor(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + Review comment: Again, why not removing it given that it is not used, and re-add it if a test in the future uses it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230407896 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); Review comment: As before I would create the method `restoreDefaultConfig` and also call it here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230405669 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + Review comment: These can move under the `LOG`, with the rest of the `static final` fields. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230401483 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Utility class to terminate a given {@link Process} when exiting a try-with-resources statement. + */ +public class AutoClosableProcess implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(AutoClosableProcess.class); + + private final Process process; + + public AutoClosableProcess(Process process) { + this.process = process; Review comment: Add `null` checks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230408436 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { + stopFlinkCluster(); + } catch (IOException e) { + LOG.error("Failure while shutting down Flink cluster.", e); + } + + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + + try { + Files.move(backupConfig, originalConfig, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOG.error("Failed to restore flink-conf.yaml", e); + } + +
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230410247 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); + final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP); + Files.copy(originalConfig, backupConfig); + filesToDelete.add(new AutoClosablePath(backupConfig)); + } + + @Override + protected void after() { + try { Review comment: I find it a bit counter-intuitive that we stop the cluster in the `after`, although we have not started it in the `before`. I do not know the best solution, maybe setting a flag on `startCluster` and checking it in the `after` and if `true` then stopping the cluster. This is not the best solution but feel free to ignore this comment if you do not agree. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230402861 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility class for setting up command-line tool usages in a readable fashion. + */ +public enum CommandLineWrapper { + ; + + public static WGetBuilder wget(String url) { + return new WGetBuilder(url); + } + + /** +* Wrapper around wget used for downloading files. +*/ + public static final class WGetBuilder { + + private final String url; + private Path targetDir; + + WGetBuilder(String url) { + this.url = url; + } + + public WGetBuilder targetDir(Path dir) { Review comment: We could rename it to `toTargetDir`, but most importantly, why not imposing immutability and returning a new `WGetBuilder`, instead of mutating the object? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230403162 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility class for setting up command-line tool usages in a readable fashion. + */ +public enum CommandLineWrapper { + ; + + public static WGetBuilder wget(String url) { + return new WGetBuilder(url); + } + + /** +* Wrapper around wget used for downloading files. +*/ + public static final class WGetBuilder { + + private final String url; + private Path targetDir; + + WGetBuilder(String url) { + this.url = url; + } + + public WGetBuilder targetDir(Path dir) { + this.targetDir = dir; + return this; + } + + public String[] build() { + final List commandsList = new ArrayList<>(5); + commandsList.add("wget"); + commandsList.add("-q"); // silent + //commandsList.add("--show-progress"); // enable progress bar + if (targetDir != null) { + commandsList.add("-P"); + commandsList.add(targetDir.toAbsolutePath().toString()); + } + commandsList.add(url); + return commandsList.toArray(new String[commandsList.size()]); + } + } + + public static SedBuilder sed(final String command, final Path file) { + return new SedBuilder(command, file); + } + + /** +* Wrapper around sed used for processing text. +*/ + public static final class SedBuilder { + + private final String command; + private final Path file; + + private boolean inPlace = false; + + SedBuilder(final String command, final Path file) { + this.command = command; + this.file = file; + } + + public SedBuilder inPlace() { + inPlace = true; Review comment: Same here as in the `WGetBuilder`. Why not imposing immutability and go for the "full" builder pattern? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230407576 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A wrapper around a Flink distribution. + */ +public final class FlinkDistribution extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final List filesToDelete = new ArrayList<>(4); + + private static final Path FLINK_CONF_YAML = Paths.get("flink-conf.yaml"); + private static final Path FLINK_CONF_YAML_BACKUP = Paths.get("flink-conf.yaml.bak"); + + private final Path opt; + private final Path lib; + private final Path conf; + private final Path log; + private final Path bin; + + private Configuration defaultConfig; + + public FlinkDistribution() { + final String distDirProperty = System.getProperty("distDir"); + if (distDirProperty == null) { + Assert.fail("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + } + final Path flinkDir = Paths.get(distDirProperty); + bin = flinkDir.resolve("bin"); + opt = flinkDir.resolve("opt"); + lib = flinkDir.resolve("lib"); + conf = flinkDir.resolve("conf"); + log = flinkDir.resolve("log"); + } + + @Override + protected void before() throws IOException { + defaultConfig = new UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString())); + final Path originalConfig = conf.resolve(FLINK_CONF_YAML); Review comment: I would put the line 95-98 to a separate method (e.g. `backupOriginal/DefaultConfig`) and call it also in the `before()`, because this method can be also useful on its own. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230403257 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility class for setting up command-line tool usages in a readable fashion. + */ +public enum CommandLineWrapper { + ; + + public static WGetBuilder wget(String url) { + return new WGetBuilder(url); + } + + /** +* Wrapper around wget used for downloading files. +*/ + public static final class WGetBuilder { + + private final String url; + private Path targetDir; + + WGetBuilder(String url) { + this.url = url; + } + + public WGetBuilder targetDir(Path dir) { + this.targetDir = dir; + return this; + } + + public String[] build() { + final List commandsList = new ArrayList<>(5); + commandsList.add("wget"); + commandsList.add("-q"); // silent + //commandsList.add("--show-progress"); // enable progress bar + if (targetDir != null) { + commandsList.add("-P"); + commandsList.add(targetDir.toAbsolutePath().toString()); + } + commandsList.add(url); + return commandsList.toArray(new String[commandsList.size()]); + } + } + + public static SedBuilder sed(final String command, final Path file) { + return new SedBuilder(command, file); + } + + /** +* Wrapper around sed used for processing text. +*/ + public static final class SedBuilder { + + private final String command; + private final Path file; + + private boolean inPlace = false; + + SedBuilder(final String command, final Path file) { + this.command = command; + this.file = file; + } + + public SedBuilder inPlace() { + inPlace = true; + return this; + } + + public String[] build() { + final List commandsList = new ArrayList<>(5); + commandsList.add("sed"); + if (inPlace) { + commandsList.add("-i"); + } + commandsList.add("-e"); + commandsList.add(command); + commandsList.add(file.toAbsolutePath().toString()); + return commandsList.toArray(new String[commandsList.size()]); + } + } + + public static TarBuilder tar(final Path file) { + return new TarBuilder(file); + } + + /** +* Wrapper around tar used for extracting .tar archives. +*/ + public static final class TarBuilder { Review comment: Same as above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230401171 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosablePath.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.util.FileUtils; + +import java.io.IOException; +import java.nio.file.Path; + +/** + * Utility class to delete a given {@link Path} when exiting a try-with-resources statement. + */ +public final class AutoClosablePath implements AutoCloseable { + + private final Path file; + + public AutoClosablePath(final Path file) { + this.file = file; + } + + @Override + public void close() throws IOException { + FileUtils.deleteFileOrDirectory(file.toFile()); + } + + public Path getFile() { Review comment: This is not used anywhere, so why not removing it and putting it if a future test needs it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230400796 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosablePath.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.util.FileUtils; + +import java.io.IOException; +import java.nio.file.Path; + +/** + * Utility class to delete a given {@link Path} when exiting a try-with-resources statement. + */ +public final class AutoClosablePath implements AutoCloseable { + + private final Path file; + + public AutoClosablePath(final Path file) { Review comment: Add `null` checks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test URL: https://github.com/apache/flink/pull/7003#discussion_r230400941 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosablePath.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.util.FileUtils; + +import java.io.IOException; +import java.nio.file.Path; + +/** + * Utility class to delete a given {@link Path} when exiting a try-with-resources statement. + */ +public final class AutoClosablePath implements AutoCloseable { + + private final Path file; Review comment: Rename to `path` or something along these lines, as this is what it is. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services