This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new eb65655f8ce [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis 
connector when closing (#20853)
eb65655f8ce is described below

commit eb65655f8ce39627a6bd28c8bdddd0c92db44d2e
Author: harker2015 <hudmhar...@163.com>
AuthorDate: Tue Sep 20 17:31:44 2022 +0200

    [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when 
closing (#20853)
    
    * [FLINK-29324] Fix NPE for Kinesis connector when closing
    
    * [FLINK-29324] Add unit test case
---
 .../flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java  | 7 ++++++-
 .../streaming/connectors/kinesis/FlinkKinesisConsumerTest.java    | 8 ++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index b0a729fb11e..488a1f54e85 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -418,7 +418,12 @@ public class FlinkKinesisConsumer<T> extends 
RichParallelSourceFunction<T>
     public void close() throws Exception {
         cancel();
         // safe-guard when the fetcher has been interrupted, make sure to not 
leak resources
-        fetcher.awaitTermination();
+        // application might be stopped before connector subtask has been 
started
+        // so we must check if the fetcher is actually created
+        KinesisDataFetcher fetcher = this.fetcher;
+        if (fetcher != null) {
+            fetcher.awaitTermination();
+        }
         this.fetcher = null;
         super.close();
     }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index d48e04ed4e6..6ad94f823e8 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -1194,6 +1194,14 @@ public class FlinkKinesisConsumerTest extends TestLogger 
{
         testHarness.close();
     }
 
+    @Test
+    public void testCloseConnectorBeforeSubtaskStart() throws Exception {
+        Properties config = TestUtils.getStandardProperties();
+        FlinkKinesisConsumer<String> consumer =
+                new FlinkKinesisConsumer<>("fakeStream", new 
SimpleStringSchema(), config);
+        consumer.close();
+    }
+
     private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> 
queue, int count)
             throws Exception {
         Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));

Reply via email to