Dear all,

I managed to resolve the issue. Since I kept getting the exception 
"org.apache.spark.SparkException: java.nio.channels.ClosedChannelException”,

a reasonable direction was checking the advertised.host.name key which as I’ve 
read from the docs basically sets for the broker the host.name it should 
advertise to the consumers and producers.

By setting this property, I instantly started receiving Kafka log messages.

Nevertheless, thank you all for your help, I appreciate it! 

> On 07 Jun 2016, at 17:44, Dominik Safaric <dominiksafa...@gmail.com> wrote:
> 
> Dear Todd,
> 
> By running bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic 
> <topic_name> --broker-list localhost:9092 --time -1
> 
> I get the following current offset for <topic_name> <topic_name>:0:1760
> 
> But I guess this does not provide as much information. 
> 
> To answer your other question, onto how exactly do I track the offset - 
> implicitly via Spark Streaming, i.e. using the default checkpoints. 
> 
>> On 07 Jun 2016, at 15:46, Todd Nist <tsind...@gmail.com 
>> <mailto:tsind...@gmail.com>> wrote:
>> 
>> Hi Dominik,
>> 
>> Right, and spark 1.6.x uses Kafka v0.8.2.x as I recall.  However, it appears 
>> as though the v.0.8 consumer is compatible with the Kafka v0.9.x broker, but 
>> not the other way around; sorry for the confusion there.
>> 
>> With the direct stream, simple consumer, offsets are tracked by Spark 
>> Streaming within its checkpoints by default.  You can also manage them 
>> yourself if desired.  How are you dealing with offsets ?
>> 
>> Can you verify the offsets on the broker:
>> 
>> kafka-run-class.sh kafka.tools.GetOffsetShell --topic <TOPIC> --broker-list 
>> <BROKER-IP:PORT> --time -1
>> 
>> -Todd
>> 
>> On Tue, Jun 7, 2016 at 8:17 AM, Dominik Safaric <dominiksafa...@gmail.com 
>> <mailto:dominiksafa...@gmail.com>> wrote:
>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0"
>> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
>> libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % 
>> "1.6.1"
>> Please take a look at the SBT copy. 
>> 
>> I would rather think that the problem is related to the Zookeeper/Kafka 
>> consumers. 
>> 
>> [2016-06-07 11:24:52,484] WARN Either no config or no quorum defined in 
>> config, running  in standalone mode 
>> (org.apache.zookeeper.server.quorum.QuorumPeerMain)
>> 
>> Any indication onto why the channel connection might be closed? Would it be 
>> Kafka or Zookeeper related? 
>> 
>>> On 07 Jun 2016, at 14:07, Todd Nist <tsind...@gmail.com 
>>> <mailto:tsind...@gmail.com>> wrote:
>>> 
>>> What version of Spark are you using?  I do not believe that 1.6.x is 
>>> compatible with 0.9.0.1 due to changes in the kafka clients between 0.8.2.2 
>>> and 0.9.0.x.  See this for more information:
>>> 
>>> https://issues.apache.org/jira/browse/SPARK-12177 
>>> <https://issues.apache.org/jira/browse/SPARK-12177>
>>> 
>>> -Todd
>>> 
>>> On Tue, Jun 7, 2016 at 7:35 AM, Dominik Safaric <dominiksafa...@gmail.com 
>>> <mailto:dominiksafa...@gmail.com>> wrote:
>>> Hi,
>>> 
>>> Correct, I am using the 0.9.0.1 version. 
>>> 
>>> As already described, the topic contains messages. Those messages are 
>>> produced using the Confluence REST API.
>>> 
>>> However, what I’ve observed is that the problem is not in the Spark 
>>> configuration, but rather Zookeeper or Kafka related. 
>>> 
>>> Take a look at the exception’s stack top item:
>>> 
>>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>>> org.apache.spark.SparkException: Couldn't find leader offsets for 
>>> Set([<topicname>,0])
>>>     at 
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>     at 
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>     at scala.util.Either.fold(Either.scala:97)
>>>     at 
>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>>     at 
>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>>     at 
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>>     at org.mediasoft.spark.Driver$.main(Driver.scala:22)
>>>     at .<init>(<console>:11)
>>>     at .<clinit>(<console>)
>>>     at .<init>(<console>:7)
>>> 
>>> By listing all active connections using netstat, I’ve also observed that 
>>> both Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka 
>>> 9092. 
>>> 
>>> Furthermore, I am also able to retrieve all log messages using the console 
>>> consumer.
>>> 
>>> Any clue what might be going wrong?
>>> 
>>>> On 07 Jun 2016, at 13:13, Jacek Laskowski <ja...@japila.pl 
>>>> <mailto:ja...@japila.pl>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? What's 
>>>> the topic name?
>>>> 
>>>> Jacek
>>>> 
>>>> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" <dominiksafa...@gmail.com 
>>>> <mailto:dominiksafa...@gmail.com>> wrote:
>>>> As I am trying to integrate Kafka into Spark, the following exception 
>>>> occurs:
>>>> 
>>>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>> Set([*<topicName>*,0])
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>>         at scala.util.Either.fold(Either.scala:97)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>>>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>>>>         at .<init>(<console>:11)
>>>>         at .<clinit>(<console>)
>>>>         at .<init>(<console>:7)
>>>>         at .<clinit>(<console>)
>>>>         at $print(<console>)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>>         at 
>>>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>>>>         at 
>>>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>>>>         at 
>>>> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>>>>         at 
>>>> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>>>>         at 
>>>> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>>>>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>>>>         at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>>>>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>>>>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>>         at
>>>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>>>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>>>>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>>>>         at
>>>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>>         at 
>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>>> 
>>>> As for the Spark configuration:
>>>> 
>>>>    val conf: SparkConf = new
>>>> SparkConf().setAppName("AppName").setMaster("local[2]")
>>>> 
>>>>     val confParams: Map[String, String] = Map(
>>>>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>>>>       "auto.offset.reset" -> "largest"
>>>>     )
>>>> 
>>>>     val topics: Set[String] = Set("<topic_name>")
>>>> 
>>>>     val context: StreamingContext = new StreamingContext(conf, Seconds(1))
>>>>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
>>>> topics)
>>>> 
>>>>     kafkaStream.foreachRDD(rdd => {
>>>>       rdd.collect().foreach(println)
>>>>     })
>>>> 
>>>>     context.awaitTermination()
>>>>     context.start()
>>>> 
>>>> The Kafka topic does exist, Kafka server is up and running and I am able to
>>>> produce messages to that particular topic using the Confluent REST API.
>>>> 
>>>> What might the problem actually be?
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> View this message in context: 
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
>>>>  
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html>
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com 
>>>> <http://nabble.com/>.
>>>> 
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>> For additional commands, e-mail: user-h...@spark.apache.org 
>>>> <mailto:user-h...@spark.apache.org>
>>>> 
>>> 
>>> 
>> 
>> 
> 

Reply via email to