Re: RESOLVED - Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Mich Talebzadeh
OK so this was Kafka issue?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 7 June 2016 at 16:55, Dominik Safaric  wrote:

> Dear all,
>
> I managed to resolve the issue. Since I kept getting the exception
> "org.apache.spark.SparkException: java.nio.channels.ClosedChannelException”,
>
> a reasonable direction was checking the advertised.host.name key which as
> I’ve read from the docs basically sets for the broker the host.name it
> should advertise to the consumers and producers.
>
> By setting this property, I instantly started receiving Kafka log messages.
>
> Nevertheless, thank you all for your help, I appreciate it!
>
> On 07 Jun 2016, at 17:44, Dominik Safaric 
> wrote:
>
> Dear Todd,
>
> By running bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic
>  --broker-list localhost:9092 --time -1
>
> I get the following current offset for  :0:1760
>
> But I guess this does not provide as much information.
>
> To answer your other question, onto how exactly do I track the offset -
> implicitly via Spark Streaming, i.e. using the default checkpoints.
>
> On 07 Jun 2016, at 15:46, Todd Nist  wrote:
>
> Hi Dominik,
>
> Right, and spark 1.6.x uses Kafka v0.8.2.x as I recall.  However, it
> appears as though the v.0.8 consumer is compatible with the Kafka v0.9.x
> broker, but not the other way around; sorry for the confusion there.
>
> With the direct stream, simple consumer, offsets are tracked by Spark
> Streaming within its checkpoints by default.  You can also manage them
> yourself if desired.  How are you dealing with offsets ?
>
> Can you verify the offsets on the broker:
>
> kafka-run-class.sh kafka.tools.GetOffsetShell --topic 
> --broker-list  --time -1
>
> -Todd
>
> On Tue, Jun 7, 2016 at 8:17 AM, Dominik Safaric 
> wrote:
>
>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0"
>> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
>> libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % 
>> "1.6.1"
>>
>> Please take a look at the SBT copy.
>>
>> I would rather think that the problem is related to the Zookeeper/Kafka
>> consumers.
>>
>> [2016-06-07 11:24:52,484] WARN Either no config or no quorum defined in
>> config, running  in standalone mode
>> (org.apache.zookeeper.server.quorum.QuorumPeerMain)
>>
>> Any indication onto why the channel connection might be closed? Would it
>> be Kafka or Zookeeper related?
>>
>> On 07 Jun 2016, at 14:07, Todd Nist  wrote:
>>
>> What version of Spark are you using?  I do not believe that 1.6.x is
>> compatible with 0.9.0.1 due to changes in the kafka clients between 0.8.2.2
>> and 0.9.0.x.  See this for more information:
>>
>> https://issues.apache.org/jira/browse/SPARK-12177
>>
>> -Todd
>>
>> On Tue, Jun 7, 2016 at 7:35 AM, Dominik Safaric > > wrote:
>>
>>> Hi,
>>>
>>> Correct, I am using the 0.9.0.1 version.
>>>
>>> As already described, the topic contains messages. Those messages are
>>> produced using the Confluence REST API.
>>>
>>> However, what I’ve observed is that the problem is not in the Spark
>>> configuration, but rather Zookeeper or Kafka related.
>>>
>>> Take a look at the exception’s stack top item:
>>>
>>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([,0])
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>> at scala.util.Either.fold(Either.scala:97)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>> at org.mediasoft.spark.Driver$.main(Driver.scala:22)
>>> at .(:11)
>>> at .()
>>> at .(:7)
>>>
>>> By listing all active connections using netstat, I’ve also observed that
>>> both Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka
>>> 9092.
>>>
>>> Furthermore, I am also able to retrieve all log messages using the
>>> console consumer.
>>>
>>> Any clue what might be going wrong?
>>>
>>> On 07 Jun 2016, at 13:13, Jacek Laskowski  wrote:
>>>
>>> Hi,
>>>
>>> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you?
>>> What's the topic name?
>>>
>>> Jacek
>>> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" 
>>> wrote:
>>>
 As I am trying to integrate Kafka into Spark, the following exception
 occurs:

 org.apache.spark.SparkException:
 java.nio.channels.ClosedChannelException
 org.apache.spark.SparkException: Couldn't find leader offse

RESOLVED - Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Dominik Safaric
Dear all,

I managed to resolve the issue. Since I kept getting the exception 
"org.apache.spark.SparkException: java.nio.channels.ClosedChannelException”,

a reasonable direction was checking the advertised.host.name key which as I’ve 
read from the docs basically sets for the broker the host.name it should 
advertise to the consumers and producers.

By setting this property, I instantly started receiving Kafka log messages.

Nevertheless, thank you all for your help, I appreciate it! 

> On 07 Jun 2016, at 17:44, Dominik Safaric  wrote:
> 
> Dear Todd,
> 
> By running bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic 
>  --broker-list localhost:9092 --time -1
> 
> I get the following current offset for  :0:1760
> 
> But I guess this does not provide as much information. 
> 
> To answer your other question, onto how exactly do I track the offset - 
> implicitly via Spark Streaming, i.e. using the default checkpoints. 
> 
>> On 07 Jun 2016, at 15:46, Todd Nist > > wrote:
>> 
>> Hi Dominik,
>> 
>> Right, and spark 1.6.x uses Kafka v0.8.2.x as I recall.  However, it appears 
>> as though the v.0.8 consumer is compatible with the Kafka v0.9.x broker, but 
>> not the other way around; sorry for the confusion there.
>> 
>> With the direct stream, simple consumer, offsets are tracked by Spark 
>> Streaming within its checkpoints by default.  You can also manage them 
>> yourself if desired.  How are you dealing with offsets ?
>> 
>> Can you verify the offsets on the broker:
>> 
>> kafka-run-class.sh kafka.tools.GetOffsetShell --topic  --broker-list 
>>  --time -1
>> 
>> -Todd
>> 
>> On Tue, Jun 7, 2016 at 8:17 AM, Dominik Safaric > > wrote:
>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0"
>> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
>> libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % 
>> "1.6.1"
>> Please take a look at the SBT copy. 
>> 
>> I would rather think that the problem is related to the Zookeeper/Kafka 
>> consumers. 
>> 
>> [2016-06-07 11:24:52,484] WARN Either no config or no quorum defined in 
>> config, running  in standalone mode 
>> (org.apache.zookeeper.server.quorum.QuorumPeerMain)
>> 
>> Any indication onto why the channel connection might be closed? Would it be 
>> Kafka or Zookeeper related? 
>> 
>>> On 07 Jun 2016, at 14:07, Todd Nist >> > wrote:
>>> 
>>> What version of Spark are you using?  I do not believe that 1.6.x is 
>>> compatible with 0.9.0.1 due to changes in the kafka clients between 0.8.2.2 
>>> and 0.9.0.x.  See this for more information:
>>> 
>>> https://issues.apache.org/jira/browse/SPARK-12177 
>>> 
>>> 
>>> -Todd
>>> 
>>> On Tue, Jun 7, 2016 at 7:35 AM, Dominik Safaric >> > wrote:
>>> Hi,
>>> 
>>> Correct, I am using the 0.9.0.1 version. 
>>> 
>>> As already described, the topic contains messages. Those messages are 
>>> produced using the Confluence REST API.
>>> 
>>> However, what I’ve observed is that the problem is not in the Spark 
>>> configuration, but rather Zookeeper or Kafka related. 
>>> 
>>> Take a look at the exception’s stack top item:
>>> 
>>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>>> org.apache.spark.SparkException: Couldn't find leader offsets for 
>>> Set([,0])
>>> at 
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>> at 
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>> at scala.util.Either.fold(Either.scala:97)
>>> at 
>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>> at 
>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>> at 
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>> at org.mediasoft.spark.Driver$.main(Driver.scala:22)
>>> at .(:11)
>>> at .()
>>> at .(:7)
>>> 
>>> By listing all active connections using netstat, I’ve also observed that 
>>> both Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka 
>>> 9092. 
>>> 
>>> Furthermore, I am also able to retrieve all log messages using the console 
>>> consumer.
>>> 
>>> Any clue what might be going wrong?
>>> 
 On 07 Jun 2016, at 13:13, Jacek Laskowski >>> > wrote:
 
 Hi,
 
 What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? What's 
 the topic name?
 
 Jacek
 
 On 7 Jun 2016 11:06 a.m., "Dominik Safaric" >>> > wrote:
 As I am trying to integrate Kafka into Spark, the following exception 
 occurs:
 
 org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
 org.apache.s

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Dominik Safaric
Dear Todd,

By running bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic 
 --broker-list localhost:9092 --time -1

I get the following current offset for  :0:1760

But I guess this does not provide as much information. 

To answer your other question, onto how exactly do I track the offset - 
implicitly via Spark Streaming, i.e. using the default checkpoints. 

> On 07 Jun 2016, at 15:46, Todd Nist  wrote:
> 
> Hi Dominik,
> 
> Right, and spark 1.6.x uses Kafka v0.8.2.x as I recall.  However, it appears 
> as though the v.0.8 consumer is compatible with the Kafka v0.9.x broker, but 
> not the other way around; sorry for the confusion there.
> 
> With the direct stream, simple consumer, offsets are tracked by Spark 
> Streaming within its checkpoints by default.  You can also manage them 
> yourself if desired.  How are you dealing with offsets ?
> 
> Can you verify the offsets on the broker:
> 
> kafka-run-class.sh kafka.tools.GetOffsetShell --topic  --broker-list 
>  --time -1
> 
> -Todd
> 
> On Tue, Jun 7, 2016 at 8:17 AM, Dominik Safaric  > wrote:
> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0"
> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
> libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % 
> "1.6.1"
> Please take a look at the SBT copy. 
> 
> I would rather think that the problem is related to the Zookeeper/Kafka 
> consumers. 
> 
> [2016-06-07 11:24:52,484] WARN Either no config or no quorum defined in 
> config, running  in standalone mode 
> (org.apache.zookeeper.server.quorum.QuorumPeerMain)
> 
> Any indication onto why the channel connection might be closed? Would it be 
> Kafka or Zookeeper related? 
> 
>> On 07 Jun 2016, at 14:07, Todd Nist > > wrote:
>> 
>> What version of Spark are you using?  I do not believe that 1.6.x is 
>> compatible with 0.9.0.1 due to changes in the kafka clients between 0.8.2.2 
>> and 0.9.0.x.  See this for more information:
>> 
>> https://issues.apache.org/jira/browse/SPARK-12177 
>> 
>> 
>> -Todd
>> 
>> On Tue, Jun 7, 2016 at 7:35 AM, Dominik Safaric > > wrote:
>> Hi,
>> 
>> Correct, I am using the 0.9.0.1 version. 
>> 
>> As already described, the topic contains messages. Those messages are 
>> produced using the Confluence REST API.
>> 
>> However, what I’ve observed is that the problem is not in the Spark 
>> configuration, but rather Zookeeper or Kafka related. 
>> 
>> Take a look at the exception’s stack top item:
>> 
>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>> org.apache.spark.SparkException: Couldn't find leader offsets for 
>> Set([,0])
>>  at 
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>  at 
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>  at scala.util.Either.fold(Either.scala:97)
>>  at 
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>  at 
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>  at 
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>  at org.mediasoft.spark.Driver$.main(Driver.scala:22)
>>  at .(:11)
>>  at .()
>>  at .(:7)
>> 
>> By listing all active connections using netstat, I’ve also observed that 
>> both Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka 
>> 9092. 
>> 
>> Furthermore, I am also able to retrieve all log messages using the console 
>> consumer.
>> 
>> Any clue what might be going wrong?
>> 
>>> On 07 Jun 2016, at 13:13, Jacek Laskowski >> > wrote:
>>> 
>>> Hi,
>>> 
>>> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? What's 
>>> the topic name?
>>> 
>>> Jacek
>>> 
>>> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" >> > wrote:
>>> As I am trying to integrate Kafka into Spark, the following exception 
>>> occurs:
>>> 
>>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([**,0])
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>> at scala.util.Either.fold(Either.scala:97)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>> at org.mediasoft.sp

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Todd Nist
Hi Dominik,

Right, and spark 1.6.x uses Kafka v0.8.2.x as I recall.  However, it
appears as though the v.0.8 consumer is compatible with the Kafka v0.9.x
broker, but not the other way around; sorry for the confusion there.

With the direct stream, simple consumer, offsets are tracked by Spark
Streaming within its checkpoints by default.  You can also manage them
yourself if desired.  How are you dealing with offsets ?

Can you verify the offsets on the broker:

kafka-run-class.sh kafka.tools.GetOffsetShell --topic  --broker-list
 --time -1

-Todd

On Tue, Jun 7, 2016 at 8:17 AM, Dominik Safaric 
wrote:

> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0"
> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
> libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % 
> "1.6.1"
>
> Please take a look at the SBT copy.
>
> I would rather think that the problem is related to the Zookeeper/Kafka
> consumers.
>
> [2016-06-07 11:24:52,484] WARN Either no config or no quorum defined in
> config, running  in standalone mode
> (org.apache.zookeeper.server.quorum.QuorumPeerMain)
>
> Any indication onto why the channel connection might be closed? Would it
> be Kafka or Zookeeper related?
>
> On 07 Jun 2016, at 14:07, Todd Nist  wrote:
>
> What version of Spark are you using?  I do not believe that 1.6.x is
> compatible with 0.9.0.1 due to changes in the kafka clients between 0.8.2.2
> and 0.9.0.x.  See this for more information:
>
> https://issues.apache.org/jira/browse/SPARK-12177
>
> -Todd
>
> On Tue, Jun 7, 2016 at 7:35 AM, Dominik Safaric 
> wrote:
>
>> Hi,
>>
>> Correct, I am using the 0.9.0.1 version.
>>
>> As already described, the topic contains messages. Those messages are
>> produced using the Confluence REST API.
>>
>> However, what I’ve observed is that the problem is not in the Spark
>> configuration, but rather Zookeeper or Kafka related.
>>
>> Take a look at the exception’s stack top item:
>>
>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([,0])
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> at scala.util.Either.fold(Either.scala:97)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>> at org.mediasoft.spark.Driver$.main(Driver.scala:22)
>> at .(:11)
>> at .()
>> at .(:7)
>>
>> By listing all active connections using netstat, I’ve also observed that
>> both Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka
>> 9092.
>>
>> Furthermore, I am also able to retrieve all log messages using the
>> console consumer.
>>
>> Any clue what might be going wrong?
>>
>> On 07 Jun 2016, at 13:13, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you?
>> What's the topic name?
>>
>> Jacek
>> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" 
>> wrote:
>>
>>> As I am trying to integrate Kafka into Spark, the following exception
>>> occurs:
>>>
>>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([**,0])
>>> at
>>>
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>> at
>>>
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>> at scala.util.Either.fold(Either.scala:97)
>>> at
>>>
>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>> at
>>>
>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>> at
>>>
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>> at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>>> at .(:11)
>>> at .()
>>> at .(:7)
>>> at .()
>>> at $print()
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>>
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>>
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:483)
>>> at
>>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>>> at
>>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>>> at
>>> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>>> at scala

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Dominik Safaric
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % 
"1.6.1"
Please take a look at the SBT copy. 

I would rather think that the problem is related to the Zookeeper/Kafka 
consumers. 

[2016-06-07 11:24:52,484] WARN Either no config or no quorum defined in config, 
running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)

Any indication onto why the channel connection might be closed? Would it be 
Kafka or Zookeeper related? 

> On 07 Jun 2016, at 14:07, Todd Nist  wrote:
> 
> What version of Spark are you using?  I do not believe that 1.6.x is 
> compatible with 0.9.0.1 due to changes in the kafka clients between 0.8.2.2 
> and 0.9.0.x.  See this for more information:
> 
> https://issues.apache.org/jira/browse/SPARK-12177 
> 
> 
> -Todd
> 
> On Tue, Jun 7, 2016 at 7:35 AM, Dominik Safaric  > wrote:
> Hi,
> 
> Correct, I am using the 0.9.0.1 version. 
> 
> As already described, the topic contains messages. Those messages are 
> produced using the Confluence REST API.
> 
> However, what I’ve observed is that the problem is not in the Spark 
> configuration, but rather Zookeeper or Kafka related. 
> 
> Take a look at the exception’s stack top item:
> 
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for 
> Set([,0])
>   at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>   at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>   at scala.util.Either.fold(Either.scala:97)
>   at 
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>   at 
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>   at 
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>   at org.mediasoft.spark.Driver$.main(Driver.scala:22)
>   at .(:11)
>   at .()
>   at .(:7)
> 
> By listing all active connections using netstat, I’ve also observed that both 
> Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka 9092. 
> 
> Furthermore, I am also able to retrieve all log messages using the console 
> consumer.
> 
> Any clue what might be going wrong?
> 
>> On 07 Jun 2016, at 13:13, Jacek Laskowski > > wrote:
>> 
>> Hi,
>> 
>> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? What's 
>> the topic name?
>> 
>> Jacek
>> 
>> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" > > wrote:
>> As I am trying to integrate Kafka into Spark, the following exception occurs:
>> 
>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([**,0])
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> at scala.util.Either.fold(Either.scala:97)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>> at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>> at .(:11)
>> at .()
>> at .(:7)
>> at .()
>> at $print()
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:483)
>> at 
>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>> at 
>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>> at 
>> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>> at 
>> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>> at scala.tools.nsc.interpreter.ILo

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Todd Nist
What version of Spark are you using?  I do not believe that 1.6.x is
compatible with 0.9.0.1 due to changes in the kafka clients between 0.8.2.2
and 0.9.0.x.  See this for more information:

https://issues.apache.org/jira/browse/SPARK-12177

-Todd

On Tue, Jun 7, 2016 at 7:35 AM, Dominik Safaric 
wrote:

> Hi,
>
> Correct, I am using the 0.9.0.1 version.
>
> As already described, the topic contains messages. Those messages are
> produced using the Confluence REST API.
>
> However, what I’ve observed is that the problem is not in the Spark
> configuration, but rather Zookeeper or Kafka related.
>
> Take a look at the exception’s stack top item:
>
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([,0])
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at scala.util.Either.fold(Either.scala:97)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
> at org.mediasoft.spark.Driver$.main(Driver.scala:22)
> at .(:11)
> at .()
> at .(:7)
>
> By listing all active connections using netstat, I’ve also observed that
> both Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka
> 9092.
>
> Furthermore, I am also able to retrieve all log messages using the console
> consumer.
>
> Any clue what might be going wrong?
>
> On 07 Jun 2016, at 13:13, Jacek Laskowski  wrote:
>
> Hi,
>
> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? What's
> the topic name?
>
> Jacek
> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" 
> wrote:
>
>> As I am trying to integrate Kafka into Spark, the following exception
>> occurs:
>>
>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([**,0])
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> at scala.util.Either.fold(Either.scala:97)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>> at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>> at .(:11)
>> at .()
>> at .(:7)
>> at .()
>> at $print()
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:483)
>> at
>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>> at
>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>> at
>> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>> at
>> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>> at
>> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>> at
>> scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>> at
>>
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>> at
>>
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>> at
>>
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>> at
>>
>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>> at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>> at
>>
>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Dominik Safaric
Hi,

Correct, I am using the 0.9.0.1 version. 

As already described, the topic contains messages. Those messages are produced 
using the Confluence REST API.

However, what I’ve observed is that the problem is not in the Spark 
configuration, but rather Zookeeper or Kafka related. 

Take a look at the exception’s stack top item:

org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for 
Set([,0])
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.mediasoft.spark.Driver$.main(Driver.scala:22)
at .(:11)
at .()
at .(:7)

By listing all active connections using netstat, I’ve also observed that both 
Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka 9092. 

Furthermore, I am also able to retrieve all log messages using the console 
consumer.

Any clue what might be going wrong?

> On 07 Jun 2016, at 13:13, Jacek Laskowski  wrote:
> 
> Hi,
> 
> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? What's 
> the topic name?
> 
> Jacek
> 
> On 7 Jun 2016 11:06 a.m., "Dominik Safaric"  > wrote:
> As I am trying to integrate Kafka into Spark, the following exception occurs:
> 
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([**,0])
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at scala.util.Either.fold(Either.scala:97)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
> at org.mediasoft.spark.Driver$.main(Driver.scala:42)
> at .(:11)
> at .()
> at .(:7)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at 
> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
> at 
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
> at 
> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
> at 
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
> at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
> at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
> at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
> at
> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at 
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> 
> As for the Spark configuration:
> 
>val conf: SparkConf = new
> SparkConf().setAppNa

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Jacek Laskowski
Hi,

What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? What's
the topic name?

Jacek
On 7 Jun 2016 11:06 a.m., "Dominik Safaric" 
wrote:

> As I am trying to integrate Kafka into Spark, the following exception
> occurs:
>
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([**,0])
> at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at scala.util.Either.fold(Either.scala:97)
> at
>
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> at
>
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
> at
>
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
> at org.mediasoft.spark.Driver$.main(Driver.scala:42)
> at .(:11)
> at .()
> at .(:7)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at
> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
> at
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
> at
> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
> at
> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
> at
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
> at
>
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
> at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> at
>
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
> at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
> at
>
> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>
> As for the Spark configuration:
>
>val conf: SparkConf = new
> SparkConf().setAppName("AppName").setMaster("local[2]")
>
> val confParams: Map[String, String] = Map(
>   "metadata.broker.list" -> ":9092",
>   "auto.offset.reset" -> "largest"
> )
>
> val topics: Set[String] = Set("")
>
> val context: StreamingContext = new StreamingContext(conf, Seconds(1))
> val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
> topics)
>
> kafkaStream.foreachRDD(rdd => {
>   rdd.collect().foreach(println)
> })
>
> context.awaitTermination()
> context.start()
>
> The Kafka topic does exist, Kafka server is up and running and I am able to
> produce messages to that particular topic using the Confluent REST API.
>
> What might the problem actually be?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Mich Talebzadeh
For now you can move away from Spark and look at the cause of your kafka
publishing

Also check that zookeeper is running
jps
*17102* QuorumPeerMain

runs on default port 2181

netstat -plten|grep 2181
tcp0  0 :::2181 :::*
LISTEN  1005   8765628*17102*/java

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 7 June 2016 at 11:59, Dominik Safaric  wrote:

> Sounds like the issue is with Kafka channel, it is closing.
>
>
> Made the same conclusion as well. I’ve even tried further refining the
> configuration files:
>
> Zookeeper properties:
>
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> # the directory where the snapshot is stored.
> dataDir=/tmp/zookeeper
> # the port at which the clients will connect
> clientPort=2181
> # disable the per-ip limit on the number of connections since this is a
> non-production config
> maxClientCnxns=20
>
> Kafka server properties:
>
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> # see kafka.server.KafkaConfig for additional details and defaults
>
> # Server Basics #
>
> # The id of the broker. This must be set to a unique integer for each
> broker.
> broker.id=1
>
> # Socket Server Settings
> #
>
> listeners=PLAINTEXT://:9092
>
> # The port the socket server listens on
> #port=9092
>
> # Hostname the broker will bind to. If not set, the server will bind to
> all interfaces
> host.name=0.0.0.0
>
> # Hostname the broker will advertise to producers and consumers. If not
> set, it uses the
> # value for "host.name" if configured.  Otherwise, it will use the value
> returned from
> # java.net.InetAddress.getCanonicalHostName().
> #advertised.host.name=
>
> # The port to publish to ZooKeeper for clients to use. If this is not set,
> # it will publish the same port that the broker binds to.
> #advertised.port=
>
> # The number of threads handling network requests
> num.network.threads=3
>
> # The number of threads doing disk I/O
> num.io.threads=8
>
> # The send buffer (SO_SNDBUF) used by the socket server
> socket.send.buffer.bytes=102400
>
> # The receive buffer (SO_RCVBUF) used by the socket server
> socket.receive.buffer.bytes=102400
>
> # The maximum size of a request that the socket server will accept
> (protection against OOM)
> socket.request.max.bytes=104857600
>
>
> # Log Basics #
>
> # A comma seperated list of directories under which to store log files
> log.dirs=/tmp/kafka-logs
>
> # The default number of log partitions per topic. More partitions allow
> greater
> # parallelism for consumption, but this will also result in more files
> across
> # the brokers.
> num.partitions=1
>
> # The number of threads per data directory to be used for log recovery at
> startup and flushing at shutdown.
> # This value is recommended to be increased for installations with data
> dirs located in RAID array.
> num.recovery.threads.per.data.dir=1
>
> # Log Flush Policy
> #
>
> # Messages are immediately written to the filesystem but by default we
> only fsync() to sync
> # the OS cache lazily. The following configurations control t

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Dominik Safaric
> Sounds like the issue is with Kafka channel, it is closing.


Made the same conclusion as well. I’ve even tried further refining the 
configuration files:

Zookeeper properties:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a 
non-production config
maxClientCnxns=20

Kafka server properties:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

# Server Basics #

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

# Socket Server Settings 
#

listeners=PLAINTEXT://:9092

# The port the socket server listens on
#port=9092

# Hostname the broker will bind to. If not set, the server will bind to all 
interfaces
host.name=0.0.0.0

# Hostname the broker will advertise to producers and consumers. If not set, it 
uses the
# value for "host.name" if configured.  Otherwise, it will use the value 
returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection 
against OOM)
socket.request.max.bytes=104857600


# Log Basics #

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at 
startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs 
located in RAID array.
num.recovery.threads.per.data.dir=1

# Log Flush Policy #

# Messages are immediately written to the filesystem but by default we only 
fsync() to sync
# the OS cache lazily. The following configurations control the flush of data 
to disk.
# There are a few important trade-offs here:
#1. Durability: Unflushed data may be lost if you are not using replication.
#2. Latency: Very large flush intervals may lead to latency spikes when the 
flush does occur as there will be a lot of data to flush.
#3. Throughput: The flush is generally the most expensive operation, and a 
small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data 
after a period of time or
# every N messages (or both). This can be done globally and overridden on a 
per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=1

# The maximum amount of time a message can sit in a log before we

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Mich Talebzadeh
Sounds like the issue is with Kafka channel, it is closing.

 Reconnect due to socket error: java.nio.channels.ClosedChannelException

Can you relax that

val ssc = new StreamingContext(sparkConf, Seconds(20)

Also how are you getting your source data? You can actually have both Spark
and the output below at the same time running tol see the exact cause of it

${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
--from-beginning --topic newtopic





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 7 June 2016 at 11:32, Dominik Safaric  wrote:

> Unfortunately, even with this Spark configuration and Kafka parameters,
> the same exception keeps occurring:
>
> 16/06/07 12:26:11 INFO SimpleConsumer: Reconnect due to socket error:
> java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([,0])
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at scala.util.Either.fold(Either.scala:97)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>
> If it helps for troubleshooting, here are the logs of the Kafka server:
>
> 16-06-07 10:24:58,349] INFO Initiating client connection,
> connectString=localhost:2181 sessionTimeout=6000
> watcher=org.I0Itec.zkclient.ZkClient@4e05faa7
> (org.apache.zookeeper.ZooKeeper)
> [2016-06-07 10:24:58,365] INFO Opening socket connection to server
> localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-06-07 10:24:58,365] INFO Waiting for keeper state SyncConnected
> (org.I0Itec.zkclient.ZkClient)
> [2016-06-07 10:24:58,375] INFO Socket connection established to localhost/
> 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-06-07 10:24:58,405] INFO Session establishment complete on server
> localhost/127.0.0.1:2181, sessionid = 0x1552a64a9a8, negotiated
> timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-06-07 10:24:58,408] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
> [2016-06-07 10:24:58,562] INFO Loading logs. (kafka.log.LogManager)
> [2016-06-07 10:24:58,608] INFO Completed load of log -0 with
> log end offset 15 (kafka.log.Log)
> [2016-06-07 10:24:58,614] INFO Completed load of log _schemas-0 with log
> end offset 1 (kafka.log.Log)
> [2016-06-07 10:24:58,617] INFO Completed load of log -0 with
> log end offset 5 (kafka.log.Log)
> [2016-06-07 10:24:58,620] INFO Completed load of log -0 with
> log end offset 2 (kafka.log.Log)
> [2016-06-07 10:24:58,629] INFO Completed load of log -0 with
> log end offset 1759 (kafka.log.Log)
> [2016-06-07 10:24:58,635] INFO Logs loading complete.
> (kafka.log.LogManager)
> [2016-06-07 10:24:58,737] INFO Starting log cleanup with a period of
> 30 ms. (kafka.log.LogManager)
> [2016-06-07 10:24:58,739] INFO Starting log flusher with a default period
> of 9223372036854775807 ms. (kafka.log.LogManager)
> [2016-06-07 10:24:58,798] INFO Awaiting socket connections on 0.0.0.0:9092.
> (kafka.network.Acceptor)
> [2016-06-07 10:24:58,809] INFO [Socket Server on Broker 1], Started 1
> acceptor threads (kafka.network.SocketServer)
> [2016-06-07 10:24:58,849] INFO [ExpirationReaper-1], Starting
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-06-07 10:24:58,850] INFO [ExpirationReaper-1], Starting
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-06-07 10:24:58,953] INFO Creating /controller (is it secure? false)
> (kafka.utils.ZKCheckedEphemeral)
> [2016-06-07 10:24:58,973] INFO Result of znode creation is: OK
> (kafka.utils.ZKCheckedEphemeral)
> [2016-06-07 10:24:58,974] INFO 1 successfully elected as leader
> (kafka.server.ZookeeperLeaderElector)
> [2016-06-07 10:24:59,180] INFO [GroupCoordinator 1]: Starting up.
> (kafka.coordinator.GroupCoordinator)
> [2016-06-07 10:24:59,191] INFO [ExpirationReaper-1], Starting
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-06-07 10:24:59,194] INFO New leader is 1
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2016-06-07 10:24:59,198] INFO [Group Metadata Manager on Broker 1]:
> Removed 0 expired offsets in 16 milliseconds.
> (kafka.coordinator.GroupMetadataManager)
> [2016-06-07 10:24:59,195] INFO [ExpirationReaper-1], Starting
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-06-07 10:24:59,195] INFO [GroupCoordinator 1]: Startup compl

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Dominik Safaric
Unfortunately, even with this Spark configuration and Kafka parameters, the 
same exception keeps occurring:

16/06/07 12:26:11 INFO SimpleConsumer: Reconnect due to socket error: 
java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for 
Set([,0])
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)

If it helps for troubleshooting, here are the logs of the Kafka server:

16-06-07 10:24:58,349] INFO Initiating client connection, 
connectString=localhost:2181 sessionTimeout=6000 
watcher=org.I0Itec.zkclient.ZkClient@4e05faa7 (org.apache.zookeeper.ZooKeeper)
[2016-06-07 10:24:58,365] INFO Opening socket connection to server 
localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown 
error) (org.apache.zookeeper.ClientCnxn)
[2016-06-07 10:24:58,365] INFO Waiting for keeper state SyncConnected 
(org.I0Itec.zkclient.ZkClient)
[2016-06-07 10:24:58,375] INFO Socket connection established to 
localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2016-06-07 10:24:58,405] INFO Session establishment complete on server 
localhost/127.0.0.1:2181, sessionid = 0x1552a64a9a8, negotiated timeout = 
6000 (org.apache.zookeeper.ClientCnxn)
[2016-06-07 10:24:58,408] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2016-06-07 10:24:58,562] INFO Loading logs. (kafka.log.LogManager)
[2016-06-07 10:24:58,608] INFO Completed load of log -0 with log 
end offset 15 (kafka.log.Log)
[2016-06-07 10:24:58,614] INFO Completed load of log _schemas-0 with log end 
offset 1 (kafka.log.Log)
[2016-06-07 10:24:58,617] INFO Completed load of log -0 with log 
end offset 5 (kafka.log.Log)
[2016-06-07 10:24:58,620] INFO Completed load of log -0 with log 
end offset 2 (kafka.log.Log)
[2016-06-07 10:24:58,629] INFO Completed load of log -0 with log 
end offset 1759 (kafka.log.Log)
[2016-06-07 10:24:58,635] INFO Logs loading complete. (kafka.log.LogManager)
[2016-06-07 10:24:58,737] INFO Starting log cleanup with a period of 30 ms. 
(kafka.log.LogManager)
[2016-06-07 10:24:58,739] INFO Starting log flusher with a default period of 
9223372036854775807 ms. (kafka.log.LogManager)
[2016-06-07 10:24:58,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
(kafka.network.Acceptor)
[2016-06-07 10:24:58,809] INFO [Socket Server on Broker 1], Started 1 acceptor 
threads (kafka.network.SocketServer)
[2016-06-07 10:24:58,849] INFO [ExpirationReaper-1], Starting  
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-06-07 10:24:58,850] INFO [ExpirationReaper-1], Starting  
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-06-07 10:24:58,953] INFO Creating /controller (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2016-06-07 10:24:58,973] INFO Result of znode creation is: OK 
(kafka.utils.ZKCheckedEphemeral)
[2016-06-07 10:24:58,974] INFO 1 successfully elected as leader 
(kafka.server.ZookeeperLeaderElector)
[2016-06-07 10:24:59,180] INFO [GroupCoordinator 1]: Starting up. 
(kafka.coordinator.GroupCoordinator)
[2016-06-07 10:24:59,191] INFO [ExpirationReaper-1], Starting  
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-06-07 10:24:59,194] INFO New leader is 1 
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-06-07 10:24:59,198] INFO [Group Metadata Manager on Broker 1]: Removed 0 
expired offsets in 16 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-06-07 10:24:59,195] INFO [ExpirationReaper-1], Starting  
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-06-07 10:24:59,195] INFO [GroupCoordinator 1]: Startup complete. 
(kafka.coordinator.GroupCoordinator)
[2016-06-07 10:24:59,215] INFO [ThrottledRequestReaper-Produce], Starting  
(kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-06-07 10:24:59,217] INFO [ThrottledRequestReaper-Fetch], Starting  
(kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-06-07 10:24:59,220] INFO Will not load MX4J, mx4j-tools.jar is not in the 
classpath (kafka.utils.Mx4jLoader$)
[2016-06-07 10:24:59,230] INFO Creating /brokers/ids/1 (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2016-06-07 10:24:59,244] INFO Result of znode creation is: OK 
(kafka.utils.ZKCheckedEphemeral)
[2016-06-07 10:24:59,245] INFO Registered broker 1 at path /brokers/ids/1 with 
addresses: PLAINTEXT -> EndPoint(,9092,PLAINTEXT) 
(kafka.utils.ZkUtils)
[2016-06-07 10:24:59,257] INFO Kafka version : 0.9.0.1

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Mich Talebzadeh
ok that is good

Yours is basically simple streaming with Kafka (publishing topic) and your
Spark streaming. use the following as blueprint

// Create a local StreamingContext with two working thread and batch
interval of 2 seconds.
val sparkConf = new SparkConf().
 setAppName("CEP_streaming").
 setMaster("local[2]").
 set("spark.executor.memory", "4G").
 set("spark.cores.max", "2").
 set("spark.streaming.concurrentJobs", "2").
 set("spark.driver.allowMultipleContexts", "true").
 set("spark.hadoop.validateOutputSpecs", "false")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "CEP_streaming" )
val topics = Set("newtopic")
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topics)
dstream.cache()

val lines = dstream.map(_._2)
val price = lines.map(_.split(',').view(2)).map(_.toFloat)
// window length - The duration of the window below that must be multiple
of batch interval n in = > StreamingContext(sparkConf, Seconds(n))
val windowLength = 4
// sliding interval - The interval at which the window operation is
performed in other words data is collected within this "previous interval'
val slidingInterval = 2  // keep this the same as batch window for
continuous streaming. You are aggregating data that you are collecting over
the  batch Window
val countByValueAndWindow = price.filter(_ >
95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
countByValueAndWindow.print()
//
ssc.start()
ssc.awaitTermination()

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 7 June 2016 at 10:58, Dominik Safaric  wrote:

> Dear Mich,
>
> Thank you for the reply.
>
> By running the following command in the command line:
>
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
>  --from-beginning
>
> I do indeed retrieve all messages of a topic.
>
> Any indication onto what might cause the issue?
>
> An important note to make,  I’m using the default configuration of both
> Kafka and Zookeeper.
>
> On 07 Jun 2016, at 11:39, Mich Talebzadeh 
> wrote:
>
> I assume you zookeeper is up and running
>
> can you confirm that you are getting topics from kafka independently for
> example on the command line
>
> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
> --from-beginning --topic newtopic
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 7 June 2016 at 10:06, Dominik Safaric  wrote:
>
>> As I am trying to integrate Kafka into Spark, the following exception
>> occurs:
>>
>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([**,0])
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> at scala.util.Either.fold(Either.scala:97)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>> at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>> at .(:11)
>> at .()
>> at .(:7)
>> at .()
>> at $print()
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:483)
>> at
>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>> at
>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>> at
>> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>> at
>> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>> at
>> scal

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Dominik Safaric
Dear Mich,

Thank you for the reply.

By running the following command in the command line:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic  
--from-beginning

I do indeed retrieve all messages of a topic. 

Any indication onto what might cause the issue? 

An important note to make,  I’m using the default configuration of both Kafka 
and Zookeeper.

> On 07 Jun 2016, at 11:39, Mich Talebzadeh  wrote:
> 
> I assume you zookeeper is up and running
> 
> can you confirm that you are getting topics from kafka independently for 
> example on the command line
> 
> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181 
> --from-beginning --topic newtopic
> 
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 7 June 2016 at 10:06, Dominik Safaric  > wrote:
> As I am trying to integrate Kafka into Spark, the following exception occurs:
> 
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([**,0])
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at scala.util.Either.fold(Either.scala:97)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
> at org.mediasoft.spark.Driver$.main(Driver.scala:42)
> at .(:11)
> at .()
> at .(:7)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at 
> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
> at 
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
> at 
> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
> at 
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
> at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
> at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
> at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
> at
> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at 
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> 
> As for the Spark configuration:
> 
>val conf: SparkConf = new
> SparkConf().setAppName("AppName").setMaster("local[2]")
> 
> val confParams: Map[String, String] = Map(
>   "metadata.broker.list" -> ":9092",
>   "auto.offset.reset" -> "largest"
> )
> 
> val topics: Set[String] = Set("")
> 
> val context: StreamingContext = new StreamingContext(conf, Seconds(1))
> val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
> topics)
> 
> kafkaStream.foreachRDD(rdd => {
>   rdd.collect().foreach(println)
> })
> 
> context.awaitTermination()
> context.start()
> 
> The Kafka topic does exist, Kafka server is up and running and I am abl

Re: Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Mich Talebzadeh
I assume you zookeeper is up and running

can you confirm that you are getting topics from kafka independently for
example on the command line

${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
--from-beginning --topic newtopic






Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 7 June 2016 at 10:06, Dominik Safaric  wrote:

> As I am trying to integrate Kafka into Spark, the following exception
> occurs:
>
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([**,0])
> at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at scala.util.Either.fold(Either.scala:97)
> at
>
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> at
>
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
> at
>
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
> at org.mediasoft.spark.Driver$.main(Driver.scala:42)
> at .(:11)
> at .()
> at .(:7)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at
> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
> at
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
> at
> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
> at
> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
> at
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
> at
>
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
> at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> at
>
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
> at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
> at
>
> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>
> As for the Spark configuration:
>
>val conf: SparkConf = new
> SparkConf().setAppName("AppName").setMaster("local[2]")
>
> val confParams: Map[String, String] = Map(
>   "metadata.broker.list" -> ":9092",
>   "auto.offset.reset" -> "largest"
> )
>
> val topics: Set[String] = Set("")
>
> val context: StreamingContext = new StreamingContext(conf, Seconds(1))
> val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
> topics)
>
> kafkaStream.foreachRDD(rdd => {
>   rdd.collect().foreach(println)
> })
>
> context.awaitTermination()
> context.start()
>
> The Kafka topic does exist, Kafka server is up and running and I am able to
> produce messages to that particular topic using the Confluent REST API.
>
> What might the problem actually be?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.a

Apache Spark Kafka Integration - org.apache.spark.SparkException: Couldn't find leader offsets for Set()

2016-06-07 Thread Dominik Safaric
As I am trying to integrate Kafka into Spark, the following exception occurs:

org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([**,0])
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.mediasoft.spark.Driver$.main(Driver.scala:42)
at .(:11)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
at 
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
at
org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

As for the Spark configuration:

   val conf: SparkConf = new
SparkConf().setAppName("AppName").setMaster("local[2]")

val confParams: Map[String, String] = Map(
  "metadata.broker.list" -> ":9092",
  "auto.offset.reset" -> "largest"
)

val topics: Set[String] = Set("")

val context: StreamingContext = new StreamingContext(conf, Seconds(1))
val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
topics)

kafkaStream.foreachRDD(rdd => {
  rdd.collect().foreach(println)
})

context.awaitTermination()
context.start()

The Kafka topic does exist, Kafka server is up and running and I am able to
produce messages to that particular topic using the Confluent REST API. 

What might the problem actually be? 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org