Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5256#discussion_r160200816 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java --- @@ -63,10 +65,16 @@ public static void bestEffortDiscardAllStateObjects( public static void discardStateFuture(RunnableFuture<? extends StateObject> stateFuture) throws Exception { if (null != stateFuture) { if (!stateFuture.cancel(true)) { - StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture); - if (null != stateObject) { - stateObject.discardState(); + try { + // We attempt to get a result, in case the future completed before cancellation. + StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture); + + if (null != stateObject) { + stateObject.discardState(); + } + } catch (CancellationException | ExecutionException ignore) { + // No result that requires discarding was produced. --- End diff -- Maybe we should log on DEBUG that the state future could not be completed.
---