This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fa0830  [FLINK-17080][java,table] Fix possible NPE in 
Utils#CollectHelper
0fa0830 is described below

commit 0fa08304a5bb59744aa378d297e66df180e12519
Author: TsReaper <tsreape...@gmail.com>
AuthorDate: Wed Apr 15 15:24:18 2020 +0800

    [FLINK-17080][java,table] Fix possible NPE in Utils#CollectHelper
    
    This closes #11688
---
 flink-java/src/main/java/org/apache/flink/api/java/Utils.java    | 9 +++++++--
 .../src/main/java/org/apache/flink/table/api/TableUtils.java     | 6 +++++-
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java 
b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index 2352833..bb1e5a5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -138,8 +138,13 @@ public final class Utils {
 
                @Override
                public void close() {
-                       // Important: should only be added in close method to 
minimize traffic of accumulators
-                       getRuntimeContext().addAccumulator(id, accumulator);
+                       // when the sink is up but not initialized and the job 
fails due to other operators,
+                       // it is possible that close() is called when open() is 
not called,
+                       // so we have to do this null check
+                       if (accumulator != null) {
+                               // Important: should only be added in close 
method to minimize traffic of accumulators
+                               getRuntimeContext().addAccumulator(id, 
accumulator);
+                       }
                }
        }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/TableUtils.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/TableUtils.java
index 8aaf5c5..caaf45d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/TableUtils.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/TableUtils.java
@@ -84,7 +84,11 @@ public class TableUtils {
                        tEnv.insertInto(sinkName, table);
                        JobExecutionResult executionResult = 
tEnv.execute(jobName);
                        ArrayList<byte[]> accResult = 
executionResult.getAccumulatorResult(id);
-                       deserializedList = 
SerializedListAccumulator.deserializeList(accResult, serializer);
+                       if (accResult != null) {
+                               deserializedList = 
SerializedListAccumulator.deserializeList(accResult, serializer);
+                       } else {
+                               throw new RuntimeException("Could not retrieve 
table result. It is very likely that the job fails.");
+                       }
                } finally {
                        tEnv.dropTemporaryTable(sinkName);
                }

Reply via email to