This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0fa0830 [FLINK-17080][java,table] Fix possible NPE in Utils#CollectHelper 0fa0830 is described below commit 0fa08304a5bb59744aa378d297e66df180e12519 Author: TsReaper <tsreape...@gmail.com> AuthorDate: Wed Apr 15 15:24:18 2020 +0800 [FLINK-17080][java,table] Fix possible NPE in Utils#CollectHelper This closes #11688 --- flink-java/src/main/java/org/apache/flink/api/java/Utils.java | 9 +++++++-- .../src/main/java/org/apache/flink/table/api/TableUtils.java | 6 +++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java index 2352833..bb1e5a5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java @@ -138,8 +138,13 @@ public final class Utils { @Override public void close() { - // Important: should only be added in close method to minimize traffic of accumulators - getRuntimeContext().addAccumulator(id, accumulator); + // when the sink is up but not initialized and the job fails due to other operators, + // it is possible that close() is called when open() is not called, + // so we have to do this null check + if (accumulator != null) { + // Important: should only be added in close method to minimize traffic of accumulators + getRuntimeContext().addAccumulator(id, accumulator); + } } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/TableUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/TableUtils.java index 8aaf5c5..caaf45d 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/TableUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/TableUtils.java @@ -84,7 +84,11 @@ public class TableUtils { tEnv.insertInto(sinkName, table); JobExecutionResult executionResult = tEnv.execute(jobName); ArrayList<byte[]> accResult = executionResult.getAccumulatorResult(id); - deserializedList = SerializedListAccumulator.deserializeList(accResult, serializer); + if (accResult != null) { + deserializedList = SerializedListAccumulator.deserializeList(accResult, serializer); + } else { + throw new RuntimeException("Could not retrieve table result. It is very likely that the job fails."); + } } finally { tEnv.dropTemporaryTable(sinkName); }