This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 90f6b9e  Fix: NPE in pulsar source close (#3246)
90f6b9e is described below

commit 90f6b9ed550347b1e3bfe07c67833987b27087e7
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Sat Dec 22 19:24:17 2018 -0800

    Fix: NPE in pulsar source close (#3246)
---
 .../org/apache/pulsar/functions/source/PulsarSource.java   | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 02e56ab..ff41dc8 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -142,12 +142,14 @@ public class PulsarSource<T> extends PushSource<T> 
implements MessageListener<T>
 
     @Override
     public void close() throws Exception {
-        inputConsumers.forEach(consumer -> {
-            try {
-                consumer.close();
-            } catch (PulsarClientException e) {
-            }
-        });
+        if (inputConsumers != null ) {
+            inputConsumers.forEach(consumer -> {
+                try {
+                    consumer.close();
+                } catch (PulsarClientException e) {
+                }
+            });
+        }
     }
 
     @SuppressWarnings("unchecked")

Reply via email to