[hotfix] [dist. coordination] Move metrics out of the Execution Graph

ExecutionGraph-based metrics should be in their own package 
'org.apache.flink.runtime.executiongraph.metrics'.
They are instantiated by whoever builds the execution graph, but not by the 
execution graph itself.
This separates concerns more elegantly.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c277ee17
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c277ee17
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c277ee17

Branch: refs/heads/table-retraction
Commit: c277ee17388c85c40d9c3956fe9ac524c3157130
Parents: 85f75a5
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 29 16:54:17 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 29 17:11:49 2017 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  45 +----
 .../executiongraph/ExecutionGraphBuilder.java   |   8 +-
 .../metrics/RestartTimeGauge.java               |  80 +++++++++
 .../executiongraph/metrics/package-info.java    |  23 +++
 ...ExecutionGraphCheckpointCoordinatorTest.java |   4 +-
 .../ExecutionGraphMetricsTest.java              | 173 ++++++-------------
 .../partitioner/RescalePartitionerTest.java     |   5 +-
 7 files changed, 169 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c277ee17/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0564fd0..06b2f9a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -29,9 +29,6 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
@@ -126,8 +123,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
        /** The log object used for debugging. */
        static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
 
-       static final String RESTARTING_TIME_METRIC_NAME = "restartingTime";
-
        // 
--------------------------------------------------------------------------------------------
 
        /** The lock used to secure all access to mutable fields, especially 
the tracking of progress
@@ -258,9 +253,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        Collections.<BlobKey>emptyList(),
                        Collections.<URL>emptyList(),
                        slotProvider,
-                       ExecutionGraph.class.getClassLoader(),
-                       new UnregisteredMetricsGroup()
-               );
+                       ExecutionGraph.class.getClassLoader());
        }
 
        public ExecutionGraph(
@@ -275,8 +268,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        List<BlobKey> requiredJarFiles,
                        List<URL> requiredClasspaths,
                        SlotProvider slotProvider,
-                       ClassLoader userClassLoader,
-                       MetricGroup metricGroup) throws IOException {
+                       ClassLoader userClassLoader) throws IOException {
 
                checkNotNull(futureExecutor);
                checkNotNull(jobId);
@@ -315,9 +307,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                this.scheduleAllocationTimeout = checkNotNull(timeout);
 
                this.restartStrategy = restartStrategy;
-
-               metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new 
RestartTimeGauge());
-
                this.kvStateLocationRegistry = new 
KvStateLocationRegistry(jobId, getAllVertices());
        }
 
@@ -1449,36 +1438,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                }
        }
 
-       /**
-        * Gauge which returns the last restarting time. Restarting time is the 
time between
-        * JobStatus.RESTARTING and JobStatus.RUNNING or a terminal state if 
JobStatus.RUNNING was not
-        * reached. If the job has not yet reached either of these states, then 
the time is measured
-        * since reaching JobStatus.RESTARTING. If it is still the initial job 
execution, then the
-        * gauge will return 0.
-        */
-       private class RestartTimeGauge implements Gauge<Long> {
-
-               @Override
-               public Long getValue() {
-                       long restartingTimestamp = 
stateTimestamps[JobStatus.RESTARTING.ordinal()];
-
-                       if (restartingTimestamp <= 0) {
-                               // we haven't yet restarted our job
-                               return 0L;
-                       } else if (stateTimestamps[JobStatus.RUNNING.ordinal()] 
>= restartingTimestamp) {
-                               // we have transitioned to RUNNING since the 
last restart
-                               return 
stateTimestamps[JobStatus.RUNNING.ordinal()] - restartingTimestamp;
-                       } else if (state.isTerminalState()) {
-                               // since the last restart we've switched to a 
terminal state without touching
-                               // the RUNNING state (e.g. failing from 
RESTARTING)
-                               return stateTimestamps[state.ordinal()] - 
restartingTimestamp;
-                       } else {
-                               // we're still somwhere between RESTARTING and 
RUNNING
-                               return System.currentTimeMillis() - 
restartingTimestamp;
-                       }
-               }
-       }
-
        @Override
        public ArchivedExecutionGraph archive() {
                Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = 
new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/c277ee17/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 8471178..494b7a2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -102,8 +103,7 @@ public class ExecutionGraphBuilder {
                                                jobGraph.getUserJarBlobKeys(),
                                                jobGraph.getClasspaths(),
                                                slotProvider,
-                                               classLoader,
-                                               metrics);
+                                               classLoader);
                } catch (IOException e) {
                        throw new JobException("Could not create the execution 
graph.", e);
                }
@@ -250,6 +250,10 @@ public class ExecutionGraphBuilder {
                                        checkpointStatsTracker);
                }
 
+               // create all the metrics for the Execution Graph
+
+               metrics.gauge(RestartTimeGauge.METRIC_NAME, new 
RestartTimeGauge(executionGraph));
+
                return executionGraph;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c277ee17/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java
new file mode 100644
index 0000000..e0a22e3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gauge which returns the last restarting time.
+ * 
+ * <p>Restarting time is the time between {@link JobStatus#RESTARTING} and 
{@link JobStatus#RUNNING},
+ * or a terminal state if {@link JobStatus#RUNNING} was not reached.
+ * 
+ * <p>If the job has not yet reached either of these states, then the time is 
measured since reaching
+ * {@link JobStatus#RESTARTING}. If it is still the initial job execution, 
then the gauge will return 0.
+ */
+public class RestartTimeGauge implements Gauge<Long> {
+
+       public static final String METRIC_NAME = "restartingTime";
+
+       // 
------------------------------------------------------------------------
+
+       private final ExecutionGraph eg;
+
+       public RestartTimeGauge(ExecutionGraph executionGraph) {
+               this.eg = checkNotNull(executionGraph);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public Long getValue() {
+               final JobStatus status = eg.getState();
+
+               final long restartingTimestamp = 
eg.getStatusTimestamp(JobStatus.RESTARTING);
+
+               final long switchToRunningTimestamp;
+               final long lastRestartTime;
+
+               if (restartingTimestamp <= 0) {
+                       // we haven't yet restarted our job
+                       return 0L;
+               }
+               else if ((switchToRunningTimestamp = 
eg.getStatusTimestamp(JobStatus.RUNNING)) >= restartingTimestamp) {
+                       // we have transitioned to RUNNING since the last 
restart
+                       lastRestartTime = switchToRunningTimestamp - 
restartingTimestamp;
+               }
+               else if (status.isTerminalState()) {
+                       // since the last restart we've switched to a terminal 
state without touching
+                       // the RUNNING state (e.g. failing from RESTARTING)
+                       lastRestartTime = eg.getStatusTimestamp(status) - 
restartingTimestamp;
+               }
+               else {
+                       // we're still somewhere between RESTARTING and RUNNING
+                       lastRestartTime  = System.currentTimeMillis() - 
restartingTimestamp;
+               }
+
+               // we guard this with 'Math.max' to avoid negative timestamps 
when clocks re-sync 
+               return Math.max(lastRestartTime, 0);
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c277ee17/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/package-info.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/package-info.java
new file mode 100644
index 0000000..8b9d205
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes implementing various metrics for the job 
execution,
+ * based on the {@link org.apache.flink.runtime.executiongraph.ExecutionGraph 
Execution Graph}.
+ */
+package org.apache.flink.runtime.executiongraph.metrics;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c277ee17/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 81162b6..0ab031e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -95,8 +94,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                        Collections.<BlobKey>emptyList(),
                        Collections.<URL>emptyList(),
                        new Scheduler(TestingUtils.defaultExecutionContext()),
-                       ClassLoader.getSystemClassLoader(),
-                       new UnregisteredMetricsGroup());
+                       ClassLoader.getSystemClassLoader());
 
                executionGraph.enableCheckpointing(
                                100,

http://git-wip-us.apache.org/repos/asf/flink/blob/c277ee17/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 5496e35..97127c7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -20,13 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -35,13 +29,9 @@ import 
org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -49,9 +39,11 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -63,17 +55,16 @@ import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ExecutionGraphMetricsTest extends TestLogger {
 
@@ -86,32 +77,14 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                try {
                        // setup execution graph with mocked scheduling logic
                        int parallelism = 1;
-       
+
                        JobVertex jobVertex = new JobVertex("TestVertex");
                        jobVertex.setParallelism(parallelism);
                        jobVertex.setInvokableClass(NoOpInvokable.class);
                        JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
-       
-                       Configuration config = new Configuration();
-                       
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
-                       
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
-       
+
                        Configuration jobConfig = new Configuration();
-       
                        Time timeout = Time.seconds(10L);
-       
-                       MetricRegistry metricRegistry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-       
-                       assertTrue(metricRegistry.getReporters().size() == 1);
-       
-                       MetricReporter reporter = 
metricRegistry.getReporters().get(0);
-       
-                       assertTrue(reporter instanceof TestingReporter);
-       
-                       TestingReporter testingReporter = (TestingReporter) 
reporter;
-       
-                       MetricGroup metricGroup = new 
JobManagerMetricGroup(metricRegistry, "localhost");
-       
                        Scheduler scheduler = mock(Scheduler.class);
 
                        ResourceID taskManagerId = ResourceID.generate();
@@ -163,163 +136,127 @@ public class ExecutionGraphMetricsTest extends 
TestLogger {
                                Collections.<BlobKey>emptyList(),
                                Collections.<URL>emptyList(),
                                scheduler,
-                               getClass().getClassLoader(),
-                               metricGroup);
-       
-                       // get restarting time metric
-                       Metric metric = 
testingReporter.getMetric(ExecutionGraph.RESTARTING_TIME_METRIC_NAME);
-       
-                       assertNotNull(metric);
-                       assertTrue(metric instanceof Gauge);
-       
-                       @SuppressWarnings("unchecked")
-                       Gauge<Long> restartingTime = (Gauge<Long>) metric;
-       
+                               getClass().getClassLoader());
+
+                       RestartTimeGauge restartingTime = new 
RestartTimeGauge(executionGraph);
+
                        // check that the restarting time is 0 since it's the 
initial start
-                       assertTrue(0L == restartingTime.getValue());
-       
+                       assertEquals(0L, restartingTime.getValue().longValue());
+
                        
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-       
+
                        // start execution
-               executionGraph.scheduleForExecution();
+                       executionGraph.scheduleForExecution();
+                       assertEquals(0L, restartingTime.getValue().longValue());
 
-                       assertTrue(0L == restartingTime.getValue());
-       
                        List<ExecutionAttemptID> executionIDs = new 
ArrayList<>();
-       
+
                        for (ExecutionVertex executionVertex: 
executionGraph.getAllExecutionVertices()) {
                                
executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
                        }
-       
+
                        // tell execution graph that the tasks are in state 
running --> job status switches to state running
                        for (ExecutionAttemptID executionID : executionIDs) {
                                executionGraph.updateState(new 
TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
                        }
-       
+
                        assertEquals(JobStatus.RUNNING, 
executionGraph.getState());
-       
-                       assertTrue(0L == restartingTime.getValue());
-       
+                       assertEquals(0L, restartingTime.getValue().longValue());
+
                        // fail the job so that it goes into state restarting
                        for (ExecutionAttemptID executionID : executionIDs) {
                                executionGraph.updateState(new 
TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new 
Exception()));
                        }
-       
+
                        assertEquals(JobStatus.RESTARTING, 
executionGraph.getState());
-       
+
                        long firstRestartingTimestamp = 
executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
-       
+
                        // wait some time so that the restarting time gauge 
shows a value different from 0
                        Thread.sleep(50);
-       
+
                        long previousRestartingTime = restartingTime.getValue();
-       
+
                        // check that the restarting time is monotonically 
increasing
                        for (int i = 0; i < 10; i++) {
                                long currentRestartingTime = 
restartingTime.getValue();
-       
+
                                assertTrue(currentRestartingTime >= 
previousRestartingTime);
                                previousRestartingTime = currentRestartingTime;
                        }
-       
+
                        // check that we have measured some restarting time
                        assertTrue(previousRestartingTime > 0);
-       
+
                        // restart job
                        testingRestartStrategy.restartExecutionGraph();
-       
+
                        executionIDs.clear();
-       
+
                        for (ExecutionVertex executionVertex: 
executionGraph.getAllExecutionVertices()) {
                                
executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
                        }
-       
+
                        for (ExecutionAttemptID executionID : executionIDs) {
                                executionGraph.updateState(new 
TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
                        }
-       
+
                        assertEquals(JobStatus.RUNNING, 
executionGraph.getState());
-       
+
                        assertTrue(firstRestartingTimestamp != 0);
-       
+
                        previousRestartingTime = restartingTime.getValue();
-       
+
                        // check that the restarting time does not increase 
after we've reached the running state
                        for (int i = 0; i < 10; i++) {
                                long currentRestartingTime = 
restartingTime.getValue();
-       
+
                                assertTrue(currentRestartingTime == 
previousRestartingTime);
                                previousRestartingTime = currentRestartingTime;
                        }
-       
+
                        // fail job again
                        for (ExecutionAttemptID executionID : executionIDs) {
                                executionGraph.updateState(new 
TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new 
Exception()));
                        }
-       
+
                        assertEquals(JobStatus.RESTARTING, 
executionGraph.getState());
-       
+
                        long secondRestartingTimestamp = 
executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
-       
+
                        assertTrue(firstRestartingTimestamp != 
secondRestartingTimestamp);
-       
+
                        Thread.sleep(50);
-       
+
                        previousRestartingTime = restartingTime.getValue();
-       
+
                        // check that the restarting time is increasing again
                        for (int i = 0; i < 10; i++) {
                                long currentRestartingTime = 
restartingTime.getValue();
-       
+
                                assertTrue(currentRestartingTime >= 
previousRestartingTime);
                                previousRestartingTime = currentRestartingTime;
                        }
-       
+
                        assertTrue(previousRestartingTime > 0);
-       
+
                        // now lets fail the job while it is in restarting and 
see whether the restarting time then stops to increase
                        // for this to work, we have to use a 
SuppressRestartException
                        executionGraph.fail(new SuppressRestartsException(new 
Exception()));
-       
+
                        assertEquals(JobStatus.FAILED, 
executionGraph.getState());
-       
+
                        previousRestartingTime = restartingTime.getValue();
-       
+
                        for (int i = 0; i < 10; i++) {
                                long currentRestartingTime = 
restartingTime.getValue();
-       
+
                                assertTrue(currentRestartingTime == 
previousRestartingTime);
                                previousRestartingTime = currentRestartingTime;
                        }
                } finally {
                        executor.shutdownNow();
                }
-
-       }
-
-       public static class TestingReporter implements MetricReporter {
-
-               private final Map<String, Metric> metrics = new HashMap<>();
-
-               @Override
-               public void open(MetricConfig config) {}
-
-               @Override
-               public void close() {}
-
-               @Override
-               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
-                       metrics.put(metricName, metric);
-               }
-
-               @Override
-               public void notifyOfRemovedMetric(Metric metric, String 
metricName, MetricGroup group) {
-                       metrics.remove(metricName);
-               }
-
-               Metric getMetric(String metricName) {
-                       return metrics.get(metricName);
-               }
        }
 
        static class TestingRestartStrategy implements RestartStrategy {

http://git-wip-us.apache.org/repos/asf/flink/blob/c277ee17/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 43fe169..d72c37b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -44,6 +43,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Before;
 import org.junit.Test;
 
@@ -150,8 +150,7 @@ public class RescalePartitionerTest extends TestLogger {
                        new ArrayList<BlobKey>(),
                        new ArrayList<URL>(),
                        new Scheduler(TestingUtils.defaultExecutionContext()),
-                       ExecutionGraph.class.getClassLoader(),
-                       new UnregisteredMetricsGroup());
+                       ExecutionGraph.class.getClassLoader());
                try {
                        eg.attachJobGraph(jobVertices);
                }

Reply via email to