This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push: new 22086c67a6a [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853) 22086c67a6a is described below commit 22086c67a6a97148eb74ed32b281eec393721738 Author: harker2015 <hudmhar...@163.com> AuthorDate: Tue Sep 20 17:31:44 2022 +0200 [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853) * [FLINK-29324] Fix NPE for Kinesis connector when closing * [FLINK-29324] Add unit test case --- .../flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java | 7 ++++++- .../streaming/connectors/kinesis/FlinkKinesisConsumerTest.java | 8 ++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index b0a729fb11e..488a1f54e85 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -418,7 +418,12 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> public void close() throws Exception { cancel(); // safe-guard when the fetcher has been interrupted, make sure to not leak resources - fetcher.awaitTermination(); + // application might be stopped before connector subtask has been started + // so we must check if the fetcher is actually created + KinesisDataFetcher fetcher = this.fetcher; + if (fetcher != null) { + fetcher.awaitTermination(); + } this.fetcher = null; super.close(); } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index deef7b38057..51367a6a110 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -1184,6 +1184,14 @@ public class FlinkKinesisConsumerTest extends TestLogger { testHarness.close(); } + @Test + public void testCloseConnectorBeforeSubtaskStart() throws Exception { + Properties config = TestUtils.getStandardProperties(); + FlinkKinesisConsumer<String> consumer = + new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); + consumer.close(); + } + private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int count) throws Exception { Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));