Repository: spark Updated Branches: refs/heads/branch-1.5 3298fb69f -> 62ab2a4c6
[SPARK-9780] [STREAMING] [KAFKA] prevent NPE if KafkaRDD instantiation ⦠â¦fails Author: cody koeninger <c...@koeninger.org> Closes #8133 from koeninger/SPARK-9780 and squashes the following commits: 406259d [cody koeninger] [SPARK-9780][Streaming][Kafka] prevent NPE if KafkaRDD instantiation fails (cherry picked from commit 8ce60963cb0928058ef7b6e29ff94eb69d1143af) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62ab2a4c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62ab2a4c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62ab2a4c Branch: refs/heads/branch-1.5 Commit: 62ab2a4c6b4b0cf4875ac1291562660b4b77cac4 Parents: 3298fb6 Author: cody koeninger <c...@koeninger.org> Authored: Wed Aug 12 17:44:16 2015 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Wed Aug 12 17:44:27 2015 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/62ab2a4c/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index 1a9d78c..ea5f842 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -197,7 +197,11 @@ class KafkaRDD[ .dropWhile(_.offset < requestOffset) } - override def close(): Unit = consumer.close() + override def close(): Unit = { + if (consumer != null) { + consumer.close() + } + } override def getNext(): R = { if (iter == null || !iter.hasNext) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org