Repository: flink
Updated Branches:
  refs/heads/master 6b8f7dc2d -> e22c777f4


[FLINK-7933][metrics] Improve PrometheusReporter tests

This closes #4908.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e22c777f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e22c777f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e22c777f

Branch: refs/heads/master
Commit: e22c777f44b3e24cf9f4509672f981c71d4eb715
Parents: 6b8f7dc
Author: zentol <[email protected]>
Authored: Thu Oct 26 19:04:30 2017 +0200
Committer: zentol <[email protected]>
Committed: Mon Oct 30 09:37:48 2017 +0100

----------------------------------------------------------------------
 .../metrics/prometheus/PrometheusReporter.java  |  9 +++
 .../PrometheusReporterTaskScopeTest.java        | 37 ++++++++----
 .../prometheus/PrometheusReporterTest.java      | 61 ++++++++++----------
 3 files changed, 65 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e22c777f/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
index 1e44ab9..fad3ced 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
@@ -32,6 +32,7 @@ import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.Preconditions;
 
 import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
@@ -73,9 +74,16 @@ public class PrometheusReporter implements MetricReporter {
        private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR;
 
        private HTTPServer httpServer;
+       private int port;
        private final Map<String, AbstractMap.SimpleImmutableEntry<Collector, 
Integer>> collectorsWithCountByMetricName = new HashMap<>();
 
        @VisibleForTesting
+       int getPort() {
+               Preconditions.checkState(httpServer != null, "Server has not 
been initialized.");
+               return port;
+       }
+
+       @VisibleForTesting
        static String replaceInvalidChars(final String input) {
                // https://prometheus.io/docs/instrumenting/writing_exporters/
                // Only [a-zA-Z0-9:_] are valid in metric names, any other 
characters should be sanitized to an underscore.
@@ -91,6 +99,7 @@ public class PrometheusReporter implements MetricReporter {
                        int port = ports.next();
                        try {
                                httpServer = new HTTPServer(port);
+                               this.port = port;
                                LOG.info("Started PrometheusReporter HTTP 
server on port {}.", port);
                                break;
                        } catch (IOException ioe) { //assume port conflict

http://git-wip-us.apache.org/repos/asf/flink/blob/e22c777f/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
index c7d4040..0ae8fc7 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.util.AbstractID;
 import com.mashape.unirest.http.exceptions.UnirestException;
 import io.prometheus.client.CollectorRegistry;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -62,7 +63,6 @@ public class PrometheusReporterTaskScopeTest {
        private static final int SUBTASK_INDEX_1 = 0;
        private static final int SUBTASK_INDEX_2 = 1;
 
-       private final MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9429")));
 
        private final JobID jobId = new JobID();
        private final JobVertexID taskId1 = new JobVertexID();
@@ -72,10 +72,29 @@ public class PrometheusReporterTaskScopeTest {
        private final AbstractID taskAttemptId2 = new AbstractID();
        private final String[] labelValues2 = {jobId.toString(), 
taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" 
+ ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2};
 
-       private final TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
-       private final TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
-       private final TaskMetricGroup taskMetricGroup1 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, 
SUBTASK_INDEX_1, ATTEMPT_NUMBER);
-       private final TaskMetricGroup taskMetricGroup2 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, 
SUBTASK_INDEX_2, ATTEMPT_NUMBER);
+       private TaskMetricGroup taskMetricGroup1;
+       private TaskMetricGroup taskMetricGroup2;
+
+       private MetricRegistry registry;
+       private PrometheusReporter reporter;
+
+       @Before
+       public void setupReporter() {
+               registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
+               reporter = (PrometheusReporter) registry.getReporters().get(0);
+
+               TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
+               TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
+               taskMetricGroup1 = new TaskMetricGroup(registry, 
tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, 
ATTEMPT_NUMBER);
+               taskMetricGroup2 = new TaskMetricGroup(registry, 
tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, 
ATTEMPT_NUMBER);
+       }
+
+       @After
+       public void shutdownRegistry() {
+               if (registry != null) {
+                       registry.shutdown();
+               }
+       }
 
        @Test
        public void countersCanBeAddedSeveralTimesIfTheyDifferInLabels() throws 
UnirestException {
@@ -137,7 +156,7 @@ public class PrometheusReporterTaskScopeTest {
                taskMetricGroup1.histogram("my_histogram", histogram);
                taskMetricGroup2.histogram("my_histogram", histogram);
 
-               final String exportedMetrics = pollMetrics().getBody();
+               final String exportedMetrics = 
pollMetrics(reporter.getPort()).getBody();
                assertThat(exportedMetrics, 
containsString("subtask_index=\"0\",quantile=\"0.5\",} 0.5")); // histogram
                assertThat(exportedMetrics, 
containsString("subtask_index=\"1\",quantile=\"0.5\",} 0.5")); // histogram
 
@@ -179,10 +198,4 @@ public class PrometheusReporterTaskScopeTest {
                labelNames[LABEL_NAMES.length] = element;
                return labelNames;
        }
-
-       @After
-       public void shutdownRegistry() {
-               registry.shutdown();
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e22c777f/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index 956339b..0d7be6d 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -40,6 +39,7 @@ import com.mashape.unirest.http.HttpResponse;
 import com.mashape.unirest.http.Unirest;
 import com.mashape.unirest.http.exceptions.UnirestException;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -56,7 +56,6 @@ import static org.junit.Assert.assertThat;
  * Basic test for {@link PrometheusReporter}.
  */
 public class PrometheusReporterTest extends TestLogger {
-       private static final int NON_DEFAULT_PORT = 9429;
 
        private static final String HOST_NAME = "hostname";
        private static final String TASK_MANAGER = "tm";
@@ -70,9 +69,23 @@ public class PrometheusReporterTest extends TestLogger {
        @Rule
        public ExpectedException thrown = ExpectedException.none();
 
-       private final MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "" + NON_DEFAULT_PORT)));
-       private final FrontMetricGroup<TaskManagerMetricGroup> metricGroup = 
new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, 
TASK_MANAGER));
-       private final MetricReporter reporter = registry.getReporters().get(0);
+       private MetricRegistry registry;
+       private FrontMetricGroup<TaskManagerMetricGroup> metricGroup;
+       private PrometheusReporter reporter;
+
+       @Before
+       public void setupReporter() {
+               registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
+               metricGroup = new FrontMetricGroup<>(0, new 
TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER));
+               reporter = (PrometheusReporter) registry.getReporters().get(0);
+       }
+
+       @After
+       public void shutdownRegistry() {
+               if (registry != null) {
+                       registry.shutdown();
+               }
+       }
 
        /**
         * {@link io.prometheus.client.Counter} may not decrease, so report 
{@link Counter} as {@link io.prometheus.client.Gauge}.
@@ -145,9 +158,11 @@ public class PrometheusReporterTest extends TestLogger {
 
        @Test
        public void endpointIsUnavailableAfterReporterIsClosed() throws 
UnirestException {
+               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
+               PrometheusReporter reporter = (PrometheusReporter) 
registry.getReporters().get(0);
                reporter.close();
                thrown.expect(UnirestException.class);
-               pollMetrics();
+               pollMetrics(reporter.getPort());
        }
 
        @Test
@@ -229,10 +244,12 @@ public class PrometheusReporterTest extends TestLogger {
 
        @Test
        public void cannotStartTwoReportersOnSamePort() {
-               final MetricRegistry fixedPort1 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "12345")));
-               final MetricRegistry fixedPort2 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2",
 "12345")));
-
+               final MetricRegistry fixedPort1 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
                assertThat(fixedPort1.getReporters(), hasSize(1));
+
+               PrometheusReporter firstReporter = (PrometheusReporter) 
fixedPort1.getReporters().get(0);
+
+               final MetricRegistry fixedPort2 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2",
 String.valueOf(firstReporter.getPort()))));
                assertThat(fixedPort2.getReporters(), hasSize(0));
 
                fixedPort1.shutdown();
@@ -241,8 +258,8 @@ public class PrometheusReporterTest extends TestLogger {
 
        @Test
        public void canStartTwoReportersWhenUsingPortRange() {
-               final MetricRegistry portRange1 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9249-9252")));
-               final MetricRegistry portRange2 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2",
 "9249-9252")));
+               final MetricRegistry portRange1 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9200-9300")));
+               final MetricRegistry portRange2 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2",
 "9200-9300")));
 
                assertThat(portRange1.getReporters(), hasSize(1));
                assertThat(portRange2.getReporters(), hasSize(1));
@@ -251,28 +268,13 @@ public class PrometheusReporterTest extends TestLogger {
                portRange2.shutdown();
        }
 
-       @Test
-       public void cannotStartThreeReportersWhenPortRangeIsTooSmall() {
-               final MetricRegistry smallPortRange1 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9253-9254")));
-               final MetricRegistry smallPortRange2 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2",
 "9253-9254")));
-               final MetricRegistry smallPortRange3 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test3",
 "9253-9254")));
-
-               assertThat(smallPortRange1.getReporters(), hasSize(1));
-               assertThat(smallPortRange2.getReporters(), hasSize(1));
-               assertThat(smallPortRange3.getReporters(), hasSize(0));
-
-               smallPortRange1.shutdown();
-               smallPortRange2.shutdown();
-               smallPortRange3.shutdown();
-       }
-
        private String addMetricAndPollResponse(Metric metric, String 
metricName) throws UnirestException {
                reporter.notifyOfAddedMetric(metric, metricName, metricGroup);
-               return pollMetrics().getBody();
+               return pollMetrics(reporter.getPort()).getBody();
        }
 
-       static HttpResponse<String> pollMetrics() throws UnirestException {
-               return Unirest.get("http://localhost:"; + NON_DEFAULT_PORT + 
"/metrics").asString();
+       static HttpResponse<String> pollMetrics(int port) throws 
UnirestException {
+               return Unirest.get("http://localhost:"; + port + 
"/metrics").asString();
        }
 
        static Configuration createConfigWithOneReporter(String reporterName, 
String portString) {
@@ -285,7 +287,6 @@ public class PrometheusReporterTest extends TestLogger {
 
        @After
        public void closeReporterAndShutdownRegistry() {
-               reporter.close();
                registry.shutdown();
        }
 }

Reply via email to