Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r168529359 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java --- @@ -118,12 +121,38 @@ public void reportTaskStateSnapshots( } } + @Nonnull @Override - public OperatorSubtaskState operatorStates(OperatorID operatorID) { - TaskStateSnapshot taskStateSnapshot = getLastJobManagerTaskStateSnapshot(); - return taskStateSnapshot != null ? taskStateSnapshot.getSubtaskStateByOperatorID(operatorID) : null; + public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID) { + TaskStateSnapshot jmTaskStateSnapshot = getLastJobManagerTaskStateSnapshot(); + TaskStateSnapshot tmTaskStateSnapshot = getLastTaskManagerTaskStateSnapshot(); + + OperatorSubtaskState jmOpState = null; + List<OperatorSubtaskState> tmStateCollection = null; --- End diff -- Changed in later commit.
---