This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 83e3613fdf85cb7b774724b68df9acd1ff392110
Author: Jiangjie (Becket) Qin <jiangjie...@alibaba-inc.com>
AuthorDate: Sun Oct 25 18:12:34 2020 +0800

    [hotfix] Only close the SourceReader and EventTimeLogic in the 
SourceOperator if they are not null.
---
 .../org/apache/flink/streaming/api/operators/SourceOperator.java  | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index eaf5481..46d4239 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -189,8 +189,12 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
 
        @Override
        public void close() throws Exception {
-               sourceReader.close();
-               eventTimeLogic.stopPeriodicWatermarkEmits();
+               if (sourceReader != null) {
+                       sourceReader.close();
+               }
+               if (eventTimeLogic != null) {
+                       eventTimeLogic.stopPeriodicWatermarkEmits();
+               }
                super.close();
        }
 

Reply via email to