Repository: flink
Updated Branches:
  refs/heads/master db759c530 -> fff04bfbe


[hotfix][metrics] keep the non-null assumption and implement tests properly

Previous commit in e908b62ab3 caused a regression leading to unit test failures.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4795ce8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4795ce8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4795ce8f

Branch: refs/heads/master
Commit: 4795ce8f501f3c16e363664e8d9927e4a0aaa6ce
Parents: 931ce05
Author: Nico Kruber <n...@data-artisans.com>
Authored: Mon Feb 27 17:30:59 2017 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Thu Apr 27 15:14:09 2017 +0200

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutorTest.java     | 17 +++++++++++++++--
 .../runtime/taskmanager/TaskAsyncCallTest.java     |  6 +++++-
 .../flink/runtime/taskmanager/TaskStopTest.java    |  6 +++++-
 .../apache/flink/runtime/taskmanager/TaskTest.java |  7 +++++--
 4 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4795ce8f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index bc6fe68..1d1840e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -59,6 +59,7 @@ import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -547,10 +548,13 @@ public class TaskExecutorTest extends TestLogger {
 
                final TaskManagerMetricGroup taskManagerMetricGroup = 
mock(TaskManagerMetricGroup.class);
 
+               TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
+               
when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
+
                when(taskManagerMetricGroup.addTaskForJob(
                                any(JobID.class), anyString(), 
any(JobVertexID.class), any(ExecutionAttemptID.class),
                                anyString(), anyInt(), anyInt())
-                       ).thenReturn(mock(TaskMetricGroup.class));
+                       ).thenReturn(taskMetricGroup);
 
                final HighAvailabilityServices haServices = 
mock(HighAvailabilityServices.class);
                
when(haServices.getResourceManagerLeaderRetriever()).thenReturn(mock(LeaderRetrievalService.class));
@@ -1010,6 +1014,15 @@ public class TaskExecutorTest extends TestLogger {
                jobManagerTable.put(jobId, jobManagerConnection);
 
                try {
+                       final TaskManagerMetricGroup taskManagerMetricGroup = 
mock(TaskManagerMetricGroup.class);
+                       TaskMetricGroup taskMetricGroup = 
mock(TaskMetricGroup.class);
+                       
when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
+
+                       when(taskManagerMetricGroup.addTaskForJob(
+                               any(JobID.class), anyString(), 
any(JobVertexID.class), any(ExecutionAttemptID.class),
+                               anyString(), anyInt(), anyInt())
+                       ).thenReturn(taskMetricGroup);
+
                        final TaskExecutor taskManager = new TaskExecutor(
                                rpc,
                                taskManagerConfiguration,
@@ -1020,7 +1033,7 @@ public class TaskExecutorTest extends TestLogger {
                                haServices,
                                mock(HeartbeatServices.class, RETURNS_MOCKS),
                                mock(MetricRegistry.class),
-                               mock(TaskManagerMetricGroup.class),
+                               taskManagerMetricGroup,
                                mock(BroadcastVariableManager.class),
                                mock(FileCache.class),
                                taskSlotTable,

http://git-wip-us.apache.org/repos/asf/flink/blob/4795ce8f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 89ae5da..c6d2fec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateHandles;
@@ -157,6 +158,9 @@ public class TaskAsyncCallTest {
                
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
 
+               TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
+               
when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
+
                JobInformation jobInformation = new JobInformation(
                        new JobID(),
                        "Job Name",
@@ -194,7 +198,7 @@ public class TaskAsyncCallTest {
                        libCache,
                        mock(FileCache.class),
                        new TestingTaskManagerRuntimeInfo(),
-                       mock(TaskMetricGroup.class),
+                       taskMetricGroup,
                        consumableNotifier,
                        partitionProducerStateChecker,
                        executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/4795ce8f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index f3ac809..40678de 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -68,6 +69,9 @@ public class TaskStopTest {
                TaskManagerRuntimeInfo tmRuntimeInfo = 
mock(TaskManagerRuntimeInfo.class);
                when(tmRuntimeInfo.getConfiguration()).thenReturn(new 
Configuration());
 
+               TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
+               
when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
+
                task = new Task(
                        mock(JobInformation.class),
                        new TaskInformation(
@@ -95,7 +99,7 @@ public class TaskStopTest {
                        mock(LibraryCacheManager.class),
                        mock(FileCache.class),
                        tmRuntimeInfo,
-                       mock(TaskMetricGroup.class),
+                       taskMetricGroup,
                        mock(ResultPartitionConsumableNotifier.class),
                        mock(PartitionProducerStateChecker.class),
                        mock(Executor.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/4795ce8f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index eb59b1b..2522287 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -53,9 +53,9 @@ import 
org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -941,6 +941,9 @@ public class TaskTest extends TestLogger {
                        1,
                        invokable.getName(),
                        new Configuration());
+
+               TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
+               
when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
                
                return new Task(
                        jobInformation,
@@ -963,7 +966,7 @@ public class TaskTest extends TestLogger {
                        libCache,
                        mock(FileCache.class),
                        new TestingTaskManagerRuntimeInfo(taskManagerConfig),
-                       mock(TaskMetricGroup.class),
+                       taskMetricGroup,
                        consumableNotifier,
                        partitionProducerStateChecker,
                        executor);

Reply via email to