This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 90f6b9e Fix: NPE in pulsar source close (#3246) 90f6b9e is described below commit 90f6b9ed550347b1e3bfe07c67833987b27087e7 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Sat Dec 22 19:24:17 2018 -0800 Fix: NPE in pulsar source close (#3246) --- .../org/apache/pulsar/functions/source/PulsarSource.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index 02e56ab..ff41dc8 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -142,12 +142,14 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T> @Override public void close() throws Exception { - inputConsumers.forEach(consumer -> { - try { - consumer.close(); - } catch (PulsarClientException e) { - } - }); + if (inputConsumers != null ) { + inputConsumers.forEach(consumer -> { + try { + consumer.close(); + } catch (PulsarClientException e) { + } + }); + } } @SuppressWarnings("unchecked")