Hi,

I am trying to start up a simple consumer that streams from a Kafka topic,
using Spark 2.0.0:

   - spark-streaming_2.11
   - spark-streaming-kafka-0-8_2.11

I was getting an error as below until I created the topic in Kafka. From
integrating Spark 1.5, I never used to hit this check; we were able to
start all of our Spark Kafka consumers, then start the producers, and have
Kafka automatically create the topics once the first message for a given
topic was published.

Is there something I might be doing to cause this topic existence check in
KafkaCluster.scala to kick in? I'd much rather be able to not have to
pre-create the topics before I start the consumers.  Any thoughts/comments
would be appreciated.

Thanks.
- Dmitry

========================================================================

Exception in thread "main" org.apache.spark.SparkException:
java.nio.channels.ClosedChannelException

java.nio.channels.ClosedChannelException

org.apache.spark.SparkException: Error getting partition metadata for
'<topic name>'. Does the topic exist?

        at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373)


        at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373)


        at scala.util.Either.fold(Either.scala:98)

        at
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:372)


        at
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)


        at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)


        at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)


        at
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)


        at
com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.createContext(KafkaSparkStreamingDriver.java:253)


        at
com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.execute(KafkaSparkStreamingDriver.java:166)


        at
com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.main(KafkaSparkStreamingDriver.java:305)


        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)


        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)


        at java.lang.reflect.Method.invoke(Method.java:498)

        at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)


        at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)

        at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)

        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Reply via email to