[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-06 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r231072668
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {
+   Files.move(backupConfig, originalConfig, 
StandardCopyOption.REPLACE_EXISTING);
+   } catch (IOException e) {
+   LOG.error("Failed to restore flink-conf.yaml", e);
+   }
+
+

[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230429154
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {
+   Files.move(backupConfig, originalConfig, 
StandardCopyOption.REPLACE_EXISTING);
+   } catch (IOException e) {
+   LOG.error("Failed to restore flink-conf.yaml", e);
+   }
+
+

[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230404317
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
 
 Review comment:
   This is like a `E2ETestHarness` more than a `FlinkDistribution` right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230408989
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {
+   Files.move(backupConfig, originalConfig, 
StandardCopyOption.REPLACE_EXISTING);
+   } catch (IOException e) {
+   LOG.error("Failed to restore flink-conf.yaml", e);
+   }
+
+

[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230410636
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus.tests;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.prometheus.PrometheusReporter;
+import org.apache.flink.tests.util.AutoClosableProcess;
+import org.apache.flink.tests.util.CommandLineWrapper;
+import org.apache.flink.tests.util.FlinkDistribution;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking;
+import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking;
+
+/**
+ * End-to-end test for the PrometheusReporter.
+ */
+public class PrometheusReporterEndToEndITCase extends TestLogger {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusReporterEndToEndITCase.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private static final String PROMETHEUS_FILE_NAME = 
"prometheus-2.4.3.linux-amd64";
+
+   private static final Pattern LOG_REPORTER_PORT_PATTERN = 
Pattern.compile(".*Started PrometheusReporter HTTP server on port ([0-9]+).*");
+
+   @Rule
+   public final FlinkDistribution dist = new FlinkDistribution();
+
+   @Rule
+   public final TemporaryFolder tmp = new TemporaryFolder();
+
+   @Test
+   public void testReporter() throws Exception {
+   LOG.info("starting test");
+   dist.copyOptJarsToLib("flink-metrics-prometheus");
+
+   final Configuration config = new Configuration();
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
PrometheusReporter.class.getCanonicalName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"prom.port", "9000-9100");
+
+   dist.appendConfiguration(config);
+
+   final Path tmpPrometheusDir = 
tmp.newFolder().toPath().resolve("prometheus");
+   final Path prometheusArchive = 
tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME + ".tar.gz");
+   final Path prometheusBinDir = 
tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME);
+   final Path prometheusConfig = 
prometheusBinDir.resolve("prometheus.yml");
+   final Path prometheusBinary = 
prometheusBinDir.resolve("prometheus");
+   Files.createDirectory(tmpPrometheusDir);
+
+   runBlocking(
+   "Download of Prometheus",
+   Duration.ofMinutes(5),
+   CommandLineWrapper
+   
.wget("https://github.com/prometheus/prometheus/releases/download/v2.4.3/; + 
prometheusArchive.getFileName())
 
 Review comment:
   Given that the version is already in the path, does it make sense to move 
the whole path or the part that includes the version to a class variable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With 

[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230409384
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {
+   Files.move(backupConfig, originalConfig, 
StandardCopyOption.REPLACE_EXISTING);
+   } catch (IOException e) {
+   LOG.error("Failed to restore flink-conf.yaml", e);
+   }
+
+

[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230408639
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {
+   Files.move(backupConfig, originalConfig, 
StandardCopyOption.REPLACE_EXISTING);
+   } catch (IOException e) {
+   LOG.error("Failed to restore flink-conf.yaml", e);
+   }
+
+

[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230401719
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Utility class to terminate a given {@link Process} when exiting a 
try-with-resources statement.
+ */
+public class AutoClosableProcess implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AutoClosableProcess.class);
+
+   private final Process process;
+
+   public AutoClosableProcess(Process process) {
+   this.process = process;
+   }
+
+   public static AutoClosableProcess runNonBlocking(String step, String... 
commands) throws IOException {
+   LOG.info("Step Started: " + step);
+   Process process = new ProcessBuilder()
+   .command(commands)
+   .inheritIO()
+   .start();
+   return new AutoClosableProcess(process);
+   }
+
+   public static void runBlocking(String step, String... commands) throws 
IOException {
+   runBlocking(step, Duration.ofSeconds(30), commands);
+   }
+
+   public static void runBlocking(String step, Duration timeout, String... 
commands) throws IOException {
+   LOG.info("Step started: " + step);
+   Process process = new ProcessBuilder()
+   .command(commands)
+   .inheritIO()
+   .start();
+
+   try (AutoClosableProcess autoProcess = new 
AutoClosableProcess(process)) {
+   final boolean success = 
process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
+   if (!success) {
+   throw new TimeoutException();
+   }
+   } catch (TimeoutException | InterruptedException e) {
+   throw new RuntimeException(step + " failed due to 
timeout.");
+   }
+   LOG.info("Step complete: " + step);
+   }
+
+   @Override
+   public void close() throws IOException {
+   if (process.isAlive()) {
+   process.destroy();
+   try {
+   process.waitFor(10, TimeUnit.SECONDS);
+   } catch (InterruptedException e) {
+   Thread.currentThread().interrupt();
+   }
+   }
+   }
+
 
 Review comment:
   Again, why not removing it given that it is not used, and re-add it if a 
test in the future uses it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230407896
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
 
 Review comment:
   As before I would create the method `restoreDefaultConfig` and also call it 
here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about 

[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230405669
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
 
 Review comment:
   These can move under the `LOG`, with the rest of the `static final` fields.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230401483
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Utility class to terminate a given {@link Process} when exiting a 
try-with-resources statement.
+ */
+public class AutoClosableProcess implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AutoClosableProcess.class);
+
+   private final Process process;
+
+   public AutoClosableProcess(Process process) {
+   this.process = process;
 
 Review comment:
   Add `null` checks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230408436
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
+   stopFlinkCluster();
+   } catch (IOException e) {
+   LOG.error("Failure while shutting down Flink cluster.", 
e);
+   }
+
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+
+   try {
+   Files.move(backupConfig, originalConfig, 
StandardCopyOption.REPLACE_EXISTING);
+   } catch (IOException e) {
+   LOG.error("Failed to restore flink-conf.yaml", e);
+   }
+
+

[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230410247
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
+   final Path backupConfig = conf.resolve(FLINK_CONF_YAML_BACKUP);
+   Files.copy(originalConfig, backupConfig);
+   filesToDelete.add(new AutoClosablePath(backupConfig));
+   }
+
+   @Override
+   protected void after() {
+   try {
 
 Review comment:
   I find it a bit counter-intuitive that we stop the cluster in the `after`, 
although we have not started it in the `before`. I do not know the best 
solution, maybe setting a flag on `startCluster` and checking it in the `after` 
and if `true` then stopping the cluster. This is not the best solution but feel 
free to ignore this comment if you do not agree.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For 

[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230402861
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility class for setting up command-line tool usages in a readable fashion.
+ */
+public enum CommandLineWrapper {
+   ;
+
+   public static WGetBuilder wget(String url) {
+   return new WGetBuilder(url);
+   }
+
+   /**
+* Wrapper around wget used for downloading files.
+*/
+   public static final class WGetBuilder {
+
+   private final String url;
+   private Path targetDir;
+
+   WGetBuilder(String url) {
+   this.url = url;
+   }
+
+   public WGetBuilder targetDir(Path dir) {
 
 Review comment:
   We could rename it to `toTargetDir`, but most importantly, why not imposing 
immutability and returning a new `WGetBuilder`, instead of mutating the object?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230403162
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility class for setting up command-line tool usages in a readable fashion.
+ */
+public enum CommandLineWrapper {
+   ;
+
+   public static WGetBuilder wget(String url) {
+   return new WGetBuilder(url);
+   }
+
+   /**
+* Wrapper around wget used for downloading files.
+*/
+   public static final class WGetBuilder {
+
+   private final String url;
+   private Path targetDir;
+
+   WGetBuilder(String url) {
+   this.url = url;
+   }
+
+   public WGetBuilder targetDir(Path dir) {
+   this.targetDir = dir;
+   return this;
+   }
+
+   public String[] build() {
+   final List commandsList = new ArrayList<>(5);
+   commandsList.add("wget");
+   commandsList.add("-q"); // silent
+   //commandsList.add("--show-progress"); // enable 
progress bar
+   if (targetDir != null) {
+   commandsList.add("-P");
+   
commandsList.add(targetDir.toAbsolutePath().toString());
+   }
+   commandsList.add(url);
+   return commandsList.toArray(new 
String[commandsList.size()]);
+   }
+   }
+
+   public static SedBuilder sed(final String command, final Path file) {
+   return new SedBuilder(command, file);
+   }
+
+   /**
+* Wrapper around sed used for processing text.
+*/
+   public static final class SedBuilder {
+
+   private final String command;
+   private final Path file;
+
+   private boolean inPlace = false;
+
+   SedBuilder(final String command, final Path file) {
+   this.command = command;
+   this.file = file;
+   }
+
+   public SedBuilder inPlace() {
+   inPlace = true;
 
 Review comment:
   Same here as in the `WGetBuilder`. Why not imposing immutability and go for 
the "full" builder pattern?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230407576
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper around a Flink distribution.
+ */
+public final class FlinkDistribution extends ExternalResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistribution.class);
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   private final List filesToDelete = new ArrayList<>(4);
+
+   private static final Path FLINK_CONF_YAML = 
Paths.get("flink-conf.yaml");
+   private static final Path FLINK_CONF_YAML_BACKUP = 
Paths.get("flink-conf.yaml.bak");
+
+   private final Path opt;
+   private final Path lib;
+   private final Path conf;
+   private final Path log;
+   private final Path bin;
+
+   private Configuration defaultConfig;
+
+   public FlinkDistribution() {
+   final String distDirProperty = System.getProperty("distDir");
+   if (distDirProperty == null) {
+   Assert.fail("The distDir property was not set. You can 
set it when running maven via -DdistDir= .");
+   }
+   final Path flinkDir = Paths.get(distDirProperty);
+   bin = flinkDir.resolve("bin");
+   opt = flinkDir.resolve("opt");
+   lib = flinkDir.resolve("lib");
+   conf = flinkDir.resolve("conf");
+   log = flinkDir.resolve("log");
+   }
+
+   @Override
+   protected void before() throws IOException {
+   defaultConfig = new 
UnmodifiableConfiguration(GlobalConfiguration.loadConfiguration(conf.toAbsolutePath().toString()));
+   final Path originalConfig = conf.resolve(FLINK_CONF_YAML);
 
 Review comment:
   I would put the line 95-98 to a separate method (e.g. 
`backupOriginal/DefaultConfig`) and call it also in the `before()`, because 
this method can be also useful on its own. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230403257
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/CommandLineWrapper.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility class for setting up command-line tool usages in a readable fashion.
+ */
+public enum CommandLineWrapper {
+   ;
+
+   public static WGetBuilder wget(String url) {
+   return new WGetBuilder(url);
+   }
+
+   /**
+* Wrapper around wget used for downloading files.
+*/
+   public static final class WGetBuilder {
+
+   private final String url;
+   private Path targetDir;
+
+   WGetBuilder(String url) {
+   this.url = url;
+   }
+
+   public WGetBuilder targetDir(Path dir) {
+   this.targetDir = dir;
+   return this;
+   }
+
+   public String[] build() {
+   final List commandsList = new ArrayList<>(5);
+   commandsList.add("wget");
+   commandsList.add("-q"); // silent
+   //commandsList.add("--show-progress"); // enable 
progress bar
+   if (targetDir != null) {
+   commandsList.add("-P");
+   
commandsList.add(targetDir.toAbsolutePath().toString());
+   }
+   commandsList.add(url);
+   return commandsList.toArray(new 
String[commandsList.size()]);
+   }
+   }
+
+   public static SedBuilder sed(final String command, final Path file) {
+   return new SedBuilder(command, file);
+   }
+
+   /**
+* Wrapper around sed used for processing text.
+*/
+   public static final class SedBuilder {
+
+   private final String command;
+   private final Path file;
+
+   private boolean inPlace = false;
+
+   SedBuilder(final String command, final Path file) {
+   this.command = command;
+   this.file = file;
+   }
+
+   public SedBuilder inPlace() {
+   inPlace = true;
+   return this;
+   }
+
+   public String[] build() {
+   final List commandsList = new ArrayList<>(5);
+   commandsList.add("sed");
+   if (inPlace) {
+   commandsList.add("-i");
+   }
+   commandsList.add("-e");
+   commandsList.add(command);
+   commandsList.add(file.toAbsolutePath().toString());
+   return commandsList.toArray(new 
String[commandsList.size()]);
+   }
+   }
+
+   public static TarBuilder tar(final Path file) {
+   return new TarBuilder(file);
+   }
+
+   /**
+* Wrapper around tar used for extracting .tar archives.
+*/
+   public static final class TarBuilder {
 
 Review comment:
   Same as above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230401171
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosablePath.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.util.FileUtils;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+/**
+ * Utility class to delete a given {@link Path} when exiting a 
try-with-resources statement.
+ */
+public final class AutoClosablePath implements AutoCloseable {
+
+   private final Path file;
+
+   public AutoClosablePath(final Path file) {
+   this.file = file;
+   }
+
+   @Override
+   public void close() throws IOException {
+   FileUtils.deleteFileOrDirectory(file.toFile());
+   }
+
+   public Path getFile() {
 
 Review comment:
   This is not used anywhere, so why not removing it and putting it if a future 
test needs it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230400796
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosablePath.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.util.FileUtils;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+/**
+ * Utility class to delete a given {@link Path} when exiting a 
try-with-resources statement.
+ */
+public final class AutoClosablePath implements AutoCloseable {
+
+   private final Path file;
+
+   public AutoClosablePath(final Path file) {
 
 Review comment:
   Add `null` checks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add E2E test

2018-11-02 Thread GitBox
kl0u commented on a change in pull request #7003: [FLINK-10633][prometheus] Add 
E2E test
URL: https://github.com/apache/flink/pull/7003#discussion_r230400941
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosablePath.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import org.apache.flink.util.FileUtils;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+/**
+ * Utility class to delete a given {@link Path} when exiting a 
try-with-resources statement.
+ */
+public final class AutoClosablePath implements AutoCloseable {
+
+   private final Path file;
 
 Review comment:
   Rename to `path` or something along these lines, as this is what it is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services