Repository: flink Updated Branches: refs/heads/master db759c530 -> fff04bfbe
[hotfix][metrics] keep the non-null assumption and implement tests properly Previous commit in e908b62ab3 caused a regression leading to unit test failures. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4795ce8f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4795ce8f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4795ce8f Branch: refs/heads/master Commit: 4795ce8f501f3c16e363664e8d9927e4a0aaa6ce Parents: 931ce05 Author: Nico Kruber <n...@data-artisans.com> Authored: Mon Feb 27 17:30:59 2017 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Thu Apr 27 15:14:09 2017 +0200 ---------------------------------------------------------------------- .../runtime/taskexecutor/TaskExecutorTest.java | 17 +++++++++++++++-- .../runtime/taskmanager/TaskAsyncCallTest.java | 6 +++++- .../flink/runtime/taskmanager/TaskStopTest.java | 6 +++++- .../apache/flink/runtime/taskmanager/TaskTest.java | 7 +++++-- 4 files changed, 30 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4795ce8f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index bc6fe68..1d1840e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -59,6 +59,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; @@ -547,10 +548,13 @@ public class TaskExecutorTest extends TestLogger { final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class); + TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class); + when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class)); + when(taskManagerMetricGroup.addTaskForJob( any(JobID.class), anyString(), any(JobVertexID.class), any(ExecutionAttemptID.class), anyString(), anyInt(), anyInt()) - ).thenReturn(mock(TaskMetricGroup.class)); + ).thenReturn(taskMetricGroup); final HighAvailabilityServices haServices = mock(HighAvailabilityServices.class); when(haServices.getResourceManagerLeaderRetriever()).thenReturn(mock(LeaderRetrievalService.class)); @@ -1010,6 +1014,15 @@ public class TaskExecutorTest extends TestLogger { jobManagerTable.put(jobId, jobManagerConnection); try { + final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class); + TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class); + when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class)); + + when(taskManagerMetricGroup.addTaskForJob( + any(JobID.class), anyString(), any(JobVertexID.class), any(ExecutionAttemptID.class), + anyString(), anyInt(), anyInt()) + ).thenReturn(taskMetricGroup); + final TaskExecutor taskManager = new TaskExecutor( rpc, taskManagerConfiguration, @@ -1020,7 +1033,7 @@ public class TaskExecutorTest extends TestLogger { haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), mock(MetricRegistry.class), - mock(TaskManagerMetricGroup.class), + taskManagerMetricGroup, mock(BroadcastVariableManager.class), mock(FileCache.class), taskSlotTable, http://git-wip-us.apache.org/repos/asf/flink/blob/4795ce8f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 89ae5da..c6d2fec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.TaskStateHandles; @@ -157,6 +158,9 @@ public class TaskAsyncCallTest { when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) .thenReturn(mock(TaskKvStateRegistry.class)); + TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class); + when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class)); + JobInformation jobInformation = new JobInformation( new JobID(), "Job Name", @@ -194,7 +198,7 @@ public class TaskAsyncCallTest { libCache, mock(FileCache.class), new TestingTaskManagerRuntimeInfo(), - mock(TaskMetricGroup.class), + taskMetricGroup, consumableNotifier, partitionProducerStateChecker, executor); http://git-wip-us.apache.org/repos/asf/flink/blob/4795ce8f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java index f3ac809..40678de 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -68,6 +69,9 @@ public class TaskStopTest { TaskManagerRuntimeInfo tmRuntimeInfo = mock(TaskManagerRuntimeInfo.class); when(tmRuntimeInfo.getConfiguration()).thenReturn(new Configuration()); + TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class); + when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class)); + task = new Task( mock(JobInformation.class), new TaskInformation( @@ -95,7 +99,7 @@ public class TaskStopTest { mock(LibraryCacheManager.class), mock(FileCache.class), tmRuntimeInfo, - mock(TaskMetricGroup.class), + taskMetricGroup, mock(ResultPartitionConsumableNotifier.class), mock(PartitionProducerStateChecker.class), mock(Executor.class)); http://git-wip-us.apache.org/repos/asf/flink/blob/4795ce8f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index eb59b1b..2522287 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -53,9 +53,9 @@ import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.messages.TaskMessages; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; @@ -941,6 +941,9 @@ public class TaskTest extends TestLogger { 1, invokable.getName(), new Configuration()); + + TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class); + when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class)); return new Task( jobInformation, @@ -963,7 +966,7 @@ public class TaskTest extends TestLogger { libCache, mock(FileCache.class), new TestingTaskManagerRuntimeInfo(taskManagerConfig), - mock(TaskMetricGroup.class), + taskMetricGroup, consumableNotifier, partitionProducerStateChecker, executor);