Hi, I am trying to start up a simple consumer that streams from a Kafka topic, using Spark 2.0.0:
- spark-streaming_2.11 - spark-streaming-kafka-0-8_2.11 I was getting an error as below until I created the topic in Kafka. From integrating Spark 1.5, I never used to hit this check; we were able to start all of our Spark Kafka consumers, then start the producers, and have Kafka automatically create the topics once the first message for a given topic was published. Is there something I might be doing to cause this topic existence check in KafkaCluster.scala to kick in? I'd much rather be able to not have to pre-create the topics before I start the consumers. Any thoughts/comments would be appreciated. Thanks. - Dmitry ======================================================================== Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException java.nio.channels.ClosedChannelException org.apache.spark.SparkException: Error getting partition metadata for '<topic name>'. Does the topic exist? at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373) at scala.util.Either.fold(Either.scala:98) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:372) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) at com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.createContext(KafkaSparkStreamingDriver.java:253) at com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.execute(KafkaSparkStreamingDriver.java:166) at com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.main(KafkaSparkStreamingDriver.java:305) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)