Re: Spark streaming with Kafka

2020-11-03 Thread Kevin Pis
Hi,

this is my  Word Count demo.  https://github.com/kevincmchen/wordcount

MohitAbbi  于2020年11月4日周三 上午3:32写道:

> Hi,
>
> Can you please share the correct versions of JAR files which you used to
> resolve the issue. I'm also facing the same issue.
>
> Thanks
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 

Best,

Kevin Pis


Re: Spark streaming with Kafka

2020-11-03 Thread MohitAbbi
Hi,

Can you please share the correct versions of JAR files which you used to
resolve the issue. I'm also facing the same issue.

Thanks




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Kafka and Python

2020-08-12 Thread Sean Owen
What supports Python in (Kafka?) 0.8? I don't think Spark ever had a
specific Python-Kafka integration. But you have always been able to
use it to read DataFrames as in Structured Streaming.
Kafka 0.8 support is deprecated (gone in 3.0) but 0.10 means 0.10+ -
works with the latest 2.x.
What is the issue?

On Wed, Aug 12, 2020 at 7:53 AM German Schiavon
 wrote:
>
> Hey,
>
> Maybe I'm missing some restriction with EMR, but have you tried to use 
> Structured Streaming instead of Spark Streaming?
>
> https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html
>
> Regards
>
> On Wed, 12 Aug 2020 at 14:12, Hamish Whittal  
> wrote:
>>
>> Hi folks,
>>
>> Thought I would ask here because it's somewhat confusing. I'm using Spark 
>> 2.4.5 on EMR 5.30.1 with Amazon MSK.
>>
>> The version of Scala used is 2.11.12. I'm using this version of the 
>> libraries spark-streaming-kafka-0-8_2.11-2.4.5.jar
>>
>> Now I'm wanting to read from Kafka topics using Python (I need to stick to 
>> Python specifically).
>>
>> What seems confusing is that 0.8 has Python support, but 0.10 does not. Then 
>> 0.8 seems to have been deprecated as of Spark 2.3.0, so if I'm using 2.4.5 
>> then clearly I'm going to hit a roadblock here.
>>
>> Can someone clarify these things for me? Have I got this right?
>>
>> Thanks in advance,
>> Hamish

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Kafka and Python

2020-08-12 Thread German Schiavon
Hey,

Maybe I'm missing some restriction with EMR, but have you tried to use
Structured Streaming instead of Spark Streaming?

https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html

Regards

On Wed, 12 Aug 2020 at 14:12, Hamish Whittal 
wrote:

> Hi folks,
>
> Thought I would ask here because it's somewhat confusing. I'm using Spark
> 2.4.5 on EMR 5.30.1 with Amazon MSK.
>
> The version of Scala used is 2.11.12. I'm using this version of the
> libraries spark-streaming-kafka-0-8_2.11-2.4.5.jar
>
> Now I'm wanting to read from Kafka topics using Python (I need to stick to
> Python specifically).
>
> What seems confusing is that 0.8 has Python support, but 0.10 does not.
> Then 0.8 seems to have been deprecated as of Spark 2.3.0, so if I'm using
> 2.4.5 then clearly I'm going to hit a roadblock here.
>
> Can someone clarify these things for me? Have I got this right?
>
> Thanks in advance,
> Hamish
>


Re: Spark streaming with Kafka

2020-07-02 Thread dwgw
Hi

I am able to correct the issue. The issue was due to wrong version of JAR
file I have used. I have removed the these JAR files and copied correct
version of JAR files and the error has gone away.

Regards



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming with Kafka

2020-07-02 Thread Jungtaek Lim
I can't reproduce. Could you please make sure you're running spark-shell
with official spark 3.0.0 distribution? Please try out changing the
directory and using relative path like "./spark-shell".

On Thu, Jul 2, 2020 at 9:59 PM dwgw  wrote:

> Hi
> I am trying to stream kafka topic from spark shell but i am getting the
> following error.
> I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM) 64-Bit Server VM,
> *Java 1.8.0_212*)
>
> *[spark@hdp-dev ~]$ spark-shell --packages
> org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*
> Ivy Default Cache set to: /home/spark/.ivy2/cache
> The jars for the packages stored in: /home/spark/.ivy2/jars
> :: loading settings :: url =
>
> jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
> org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
> :: resolving dependencies ::
>
> org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0
> confs: [default]
> found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
> found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0
> in
> central
> found org.apache.kafka#kafka-clients;2.4.1 in central
> found com.github.luben#zstd-jni;1.4.4-3 in central
> found org.lz4#lz4-java;1.7.1 in central
> found org.xerial.snappy#snappy-java;1.1.7.5 in central
> found org.slf4j#slf4j-api;1.7.30 in central
> found org.spark-project.spark#unused;1.0.0 in central
> found org.apache.commons#commons-pool2;2.6.2 in central
> :: resolution report :: resolve 502ms :: artifacts dl 10ms
> :: modules in use:
> com.github.luben#zstd-jni;1.4.4-3 from central in [default]
> org.apache.commons#commons-pool2;2.6.2 from central in [default]
> org.apache.kafka#kafka-clients;2.4.1 from central in [default]
> org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in
> [default]
> org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from
> central in [default]
> org.lz4#lz4-java;1.7.1 from central in [default]
> org.slf4j#slf4j-api;1.7.30 from central in [default]
> org.spark-project.spark#unused;1.0.0 from central in [default]
> org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]
>
> -
> |  |modules||   artifacts
> |
> |   conf   | number| search|dwnlded|evicted||
> number|dwnlded|
>
> -
> |  default |   9   |   0   |   0   |   0   ||   9   |   0
> |
>
> -
> :: retrieving ::
> org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226
> confs: [default]
> 0 artifacts copied, 9 already retrieved (0kB/13ms)
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
> Spark context Web UI available at http://hdp-dev.infodetics.com:4040
> Spark context available as 'sc' (master = yarn, app id =
> application_1593620640299_0015).
> Spark session available as 'spark'.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 3.0.0
>   /_/
>
> Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_212)
> Type in expressions to have them evaluated.
> Type :help for more information.
>
>
> scala> val df = spark.
>  | readStream.
>  | format("kafka").
>  | option("kafka.bootstrap.servers", "XXX").
>  | option("subscribe", "XXX").
>  | option("kafka.sasl.mechanisms", "XXX").
>  | option("kafka.security.protocol", "XXX").
>  | option("kafka.sasl.username","XXX").
>  | option("kafka.sasl.password", "XXX").
>  | option("startingOffsets", "earliest").
>  | load
> java.lang.AbstractMethodError: Method
>
> org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
> is abstract
>   at
>
> org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
>   at
>
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
>   at
>
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
>   ... 57 elided
>
> Looking forward for a response.
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-27 Thread Cody Koeninger
If you're looking for some kind of instrumentation finer than at batch
boundaries, you'd have to do something with the individual messages
yourself.  You have full access to the individual messages including
offset.

On Thu, Apr 27, 2017 at 1:27 PM, Dominik Safaric
 wrote:
> Of course I am not asking to commit for every message. But instead of, 
> seeking to commit the last consumed offset at a given interval. For example, 
> from the 1st until the 5th second, messages until offset 100.000 of the 
> partition 10 were consumed, then from the 6th until the 10th second of 
> executing the last consumed offset of the same partition was 200.000 - and so 
> forth. This is the information I seek to get.
>
>> On 27 Apr 2017, at 20:11, Cody Koeninger  wrote:
>>
>> Are you asking for commits for every message?  Because that will kill
>> performance.
>>
>> On Thu, Apr 27, 2017 at 11:33 AM, Dominik Safaric
>>  wrote:
>>> Indeed I have. But, even when storing the offsets in Spark and committing 
>>> offsets upon completion of an output operation within the foreachRDD call 
>>> (as pointed in the example), the only offset that Spark’s Kafka 
>>> implementation commits to Kafka is the offset of the last message. For 
>>> example, if I have 100 million messages, then Spark will commit only the 
>>> 100 millionth offset, and the offsets of the intermediate batches - and 
>>> hence the questions.
>>>
 On 26 Apr 2017, at 21:42, Cody Koeninger  wrote:

 have you read

 http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself

 On Wed, Apr 26, 2017 at 1:17 PM, Dominik Safaric
  wrote:
> The reason why I want to obtain this information, i.e.  offset, timestamp> tuples is to relate the consumption with the 
> production rates using the __consumer_offsets Kafka internal topic. 
> Interestedly, the Spark’s KafkaConsumer implementation does not auto 
> commit the offsets upon offset commit expiration, because as seen in the 
> logs, Spark overrides the enable.auto.commit property to false.
>
> Any idea onto how to use the KafkaConsumer’s auto offset commits? Keep in 
> mind that I do not care about exactly-once, hence having messages 
> replayed is perfectly fine.
>
>> On 26 Apr 2017, at 19:26, Cody Koeninger  wrote:
>>
>> What is it you're actually trying to accomplish?
>>
>> You can get topic, partition, and offset bounds from an offset range like
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
>>
>> Timestamp isn't really a meaningful idea for a range of offsets.
>>
>>
>> On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
>>  wrote:
>>> Hi all,
>>>
>>> Because the Spark Streaming direct Kafka consumer maps offsets for a 
>>> given
>>> Kafka topic and a partition internally while having enable.auto.commit 
>>> set
>>> to false, how can I retrieve the offset of each made consumer’s poll 
>>> call
>>> using the offset ranges of an RDD? More precisely, the information I 
>>> seek to
>>> get after each poll call is the following: >> partition>.
>>>
>>> Thanks in advance,
>>> Dominik
>>>
>
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-27 Thread Dominik Safaric
Of course I am not asking to commit for every message. But instead of, seeking 
to commit the last consumed offset at a given interval. For example, from the 
1st until the 5th second, messages until offset 100.000 of the partition 10 
were consumed, then from the 6th until the 10th second of executing the last 
consumed offset of the same partition was 200.000 - and so forth. This is the 
information I seek to get. 

> On 27 Apr 2017, at 20:11, Cody Koeninger  wrote:
> 
> Are you asking for commits for every message?  Because that will kill
> performance.
> 
> On Thu, Apr 27, 2017 at 11:33 AM, Dominik Safaric
>  wrote:
>> Indeed I have. But, even when storing the offsets in Spark and committing 
>> offsets upon completion of an output operation within the foreachRDD call 
>> (as pointed in the example), the only offset that Spark’s Kafka 
>> implementation commits to Kafka is the offset of the last message. For 
>> example, if I have 100 million messages, then Spark will commit only the 100 
>> millionth offset, and the offsets of the intermediate batches - and hence 
>> the questions.
>> 
>>> On 26 Apr 2017, at 21:42, Cody Koeninger  wrote:
>>> 
>>> have you read
>>> 
>>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself
>>> 
>>> On Wed, Apr 26, 2017 at 1:17 PM, Dominik Safaric
>>>  wrote:
 The reason why I want to obtain this information, i.e.  tuples is to relate the consumption with the production rates 
 using the __consumer_offsets Kafka internal topic. Interestedly, the 
 Spark’s KafkaConsumer implementation does not auto commit the offsets upon 
 offset commit expiration, because as seen in the logs, Spark overrides the 
 enable.auto.commit property to false.
 
 Any idea onto how to use the KafkaConsumer’s auto offset commits? Keep in 
 mind that I do not care about exactly-once, hence having messages replayed 
 is perfectly fine.
 
> On 26 Apr 2017, at 19:26, Cody Koeninger  wrote:
> 
> What is it you're actually trying to accomplish?
> 
> You can get topic, partition, and offset bounds from an offset range like
> 
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
> 
> Timestamp isn't really a meaningful idea for a range of offsets.
> 
> 
> On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
>  wrote:
>> Hi all,
>> 
>> Because the Spark Streaming direct Kafka consumer maps offsets for a 
>> given
>> Kafka topic and a partition internally while having enable.auto.commit 
>> set
>> to false, how can I retrieve the offset of each made consumer’s poll call
>> using the offset ranges of an RDD? More precisely, the information I 
>> seek to
>> get after each poll call is the following: > partition>.
>> 
>> Thanks in advance,
>> Dominik
>> 
 
>> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-27 Thread Cody Koeninger
Are you asking for commits for every message?  Because that will kill
performance.

On Thu, Apr 27, 2017 at 11:33 AM, Dominik Safaric
 wrote:
> Indeed I have. But, even when storing the offsets in Spark and committing 
> offsets upon completion of an output operation within the foreachRDD call (as 
> pointed in the example), the only offset that Spark’s Kafka implementation 
> commits to Kafka is the offset of the last message. For example, if I have 
> 100 million messages, then Spark will commit only the 100 millionth offset, 
> and the offsets of the intermediate batches - and hence the questions.
>
>> On 26 Apr 2017, at 21:42, Cody Koeninger  wrote:
>>
>> have you read
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself
>>
>> On Wed, Apr 26, 2017 at 1:17 PM, Dominik Safaric
>>  wrote:
>>> The reason why I want to obtain this information, i.e. >> timestamp> tuples is to relate the consumption with the production rates 
>>> using the __consumer_offsets Kafka internal topic. Interestedly, the 
>>> Spark’s KafkaConsumer implementation does not auto commit the offsets upon 
>>> offset commit expiration, because as seen in the logs, Spark overrides the 
>>> enable.auto.commit property to false.
>>>
>>> Any idea onto how to use the KafkaConsumer’s auto offset commits? Keep in 
>>> mind that I do not care about exactly-once, hence having messages replayed 
>>> is perfectly fine.
>>>
 On 26 Apr 2017, at 19:26, Cody Koeninger  wrote:

 What is it you're actually trying to accomplish?

 You can get topic, partition, and offset bounds from an offset range like

 http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets

 Timestamp isn't really a meaningful idea for a range of offsets.


 On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
  wrote:
> Hi all,
>
> Because the Spark Streaming direct Kafka consumer maps offsets for a given
> Kafka topic and a partition internally while having enable.auto.commit set
> to false, how can I retrieve the offset of each made consumer’s poll call
> using the offset ranges of an RDD? More precisely, the information I seek 
> to
> get after each poll call is the following: .
>
> Thanks in advance,
> Dominik
>
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-27 Thread Dominik Safaric
Indeed I have. But, even when storing the offsets in Spark and committing 
offsets upon completion of an output operation within the foreachRDD call (as 
pointed in the example), the only offset that Spark’s Kafka implementation 
commits to Kafka is the offset of the last message. For example, if I have 100 
million messages, then Spark will commit only the 100 millionth offset, and the 
offsets of the intermediate batches - and hence the questions. 

> On 26 Apr 2017, at 21:42, Cody Koeninger  wrote:
> 
> have you read
> 
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself
> 
> On Wed, Apr 26, 2017 at 1:17 PM, Dominik Safaric
>  wrote:
>> The reason why I want to obtain this information, i.e. > timestamp> tuples is to relate the consumption with the production rates 
>> using the __consumer_offsets Kafka internal topic. Interestedly, the Spark’s 
>> KafkaConsumer implementation does not auto commit the offsets upon offset 
>> commit expiration, because as seen in the logs, Spark overrides the 
>> enable.auto.commit property to false.
>> 
>> Any idea onto how to use the KafkaConsumer’s auto offset commits? Keep in 
>> mind that I do not care about exactly-once, hence having messages replayed 
>> is perfectly fine.
>> 
>>> On 26 Apr 2017, at 19:26, Cody Koeninger  wrote:
>>> 
>>> What is it you're actually trying to accomplish?
>>> 
>>> You can get topic, partition, and offset bounds from an offset range like
>>> 
>>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
>>> 
>>> Timestamp isn't really a meaningful idea for a range of offsets.
>>> 
>>> 
>>> On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
>>>  wrote:
 Hi all,
 
 Because the Spark Streaming direct Kafka consumer maps offsets for a given
 Kafka topic and a partition internally while having enable.auto.commit set
 to false, how can I retrieve the offset of each made consumer’s poll call
 using the offset ranges of an RDD? More precisely, the information I seek 
 to
 get after each poll call is the following: .
 
 Thanks in advance,
 Dominik
 
>> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-26 Thread Cody Koeninger
have you read

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself

On Wed, Apr 26, 2017 at 1:17 PM, Dominik Safaric
 wrote:
> The reason why I want to obtain this information, i.e.  timestamp> tuples is to relate the consumption with the production rates 
> using the __consumer_offsets Kafka internal topic. Interestedly, the Spark’s 
> KafkaConsumer implementation does not auto commit the offsets upon offset 
> commit expiration, because as seen in the logs, Spark overrides the 
> enable.auto.commit property to false.
>
> Any idea onto how to use the KafkaConsumer’s auto offset commits? Keep in 
> mind that I do not care about exactly-once, hence having messages replayed is 
> perfectly fine.
>
>> On 26 Apr 2017, at 19:26, Cody Koeninger  wrote:
>>
>> What is it you're actually trying to accomplish?
>>
>> You can get topic, partition, and offset bounds from an offset range like
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
>>
>> Timestamp isn't really a meaningful idea for a range of offsets.
>>
>>
>> On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
>>  wrote:
>>> Hi all,
>>>
>>> Because the Spark Streaming direct Kafka consumer maps offsets for a given
>>> Kafka topic and a partition internally while having enable.auto.commit set
>>> to false, how can I retrieve the offset of each made consumer’s poll call
>>> using the offset ranges of an RDD? More precisely, the information I seek to
>>> get after each poll call is the following: .
>>>
>>> Thanks in advance,
>>> Dominik
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-26 Thread Dominik Safaric
The reason why I want to obtain this information, i.e.  tuples is to relate the consumption with the production rates using 
the __consumer_offsets Kafka internal topic. Interestedly, the Spark’s 
KafkaConsumer implementation does not auto commit the offsets upon offset 
commit expiration, because as seen in the logs, Spark overrides the 
enable.auto.commit property to false. 

Any idea onto how to use the KafkaConsumer’s auto offset commits? Keep in mind 
that I do not care about exactly-once, hence having messages replayed is 
perfectly fine.   

> On 26 Apr 2017, at 19:26, Cody Koeninger  wrote:
> 
> What is it you're actually trying to accomplish?
> 
> You can get topic, partition, and offset bounds from an offset range like
> 
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
> 
> Timestamp isn't really a meaningful idea for a range of offsets.
> 
> 
> On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
>  wrote:
>> Hi all,
>> 
>> Because the Spark Streaming direct Kafka consumer maps offsets for a given
>> Kafka topic and a partition internally while having enable.auto.commit set
>> to false, how can I retrieve the offset of each made consumer’s poll call
>> using the offset ranges of an RDD? More precisely, the information I seek to
>> get after each poll call is the following: .
>> 
>> Thanks in advance,
>> Dominik
>> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-26 Thread Cody Koeninger
What is it you're actually trying to accomplish?

You can get topic, partition, and offset bounds from an offset range like

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets

Timestamp isn't really a meaningful idea for a range of offsets.


On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
 wrote:
> Hi all,
>
> Because the Spark Streaming direct Kafka consumer maps offsets for a given
> Kafka topic and a partition internally while having enable.auto.commit set
> to false, how can I retrieve the offset of each made consumer’s poll call
> using the offset ranges of an RDD? More precisely, the information I seek to
> get after each poll call is the following: .
>
> Thanks in advance,
> Dominik
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming to kafka exactly once

2017-03-23 Thread Maurin Lenglart
Ok,
Thanks for your answers

On 3/22/17, 1:34 PM, "Cody Koeninger"  wrote:

If you're talking about reading the same message multiple times in a
failure situation, see

https://github.com/koeninger/kafka-exactly-once

If you're talking about producing the same message multiple times in a
failure situation, keep an eye on


https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

If you're talking about producers just misbehaving and sending
different copies of what is essentially the same message from a domain
perspective, you have to dedupe that with your own logic.

On Wed, Mar 22, 2017 at 2:52 PM, Matt Deaver  wrote:
> You have to handle de-duplication upstream or downstream. It might
> technically be possible to handle this in Spark but you'll probably have a
> better time handling duplicates in the service that reads from Kafka.
>
> On Wed, Mar 22, 2017 at 1:49 PM, Maurin Lenglart 
> wrote:
>>
>> Hi,
>> we are trying to build a spark streaming solution that subscribe and push
>> to kafka.
>>
>> But we are running into the problem of duplicates events.
>>
>> Right now, I am doing a “forEachRdd” and loop over the message of each
>> partition and send those message to kafka.
>>
>>
>>
>> Is there any good way of solving that issue?
>>
>>
>>
>> thanks
>
>
>
>
> --
> Regards,
>
> Matt
> Data Engineer
> https://www.linkedin.com/in/mdeaver
> http://mattdeav.pythonanywhere.com/




Re: Spark streaming to kafka exactly once

2017-03-22 Thread Cody Koeninger
If you're talking about reading the same message multiple times in a
failure situation, see

https://github.com/koeninger/kafka-exactly-once

If you're talking about producing the same message multiple times in a
failure situation, keep an eye on

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

If you're talking about producers just misbehaving and sending
different copies of what is essentially the same message from a domain
perspective, you have to dedupe that with your own logic.

On Wed, Mar 22, 2017 at 2:52 PM, Matt Deaver  wrote:
> You have to handle de-duplication upstream or downstream. It might
> technically be possible to handle this in Spark but you'll probably have a
> better time handling duplicates in the service that reads from Kafka.
>
> On Wed, Mar 22, 2017 at 1:49 PM, Maurin Lenglart 
> wrote:
>>
>> Hi,
>> we are trying to build a spark streaming solution that subscribe and push
>> to kafka.
>>
>> But we are running into the problem of duplicates events.
>>
>> Right now, I am doing a “forEachRdd” and loop over the message of each
>> partition and send those message to kafka.
>>
>>
>>
>> Is there any good way of solving that issue?
>>
>>
>>
>> thanks
>
>
>
>
> --
> Regards,
>
> Matt
> Data Engineer
> https://www.linkedin.com/in/mdeaver
> http://mattdeav.pythonanywhere.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming to kafka exactly once

2017-03-22 Thread Matt Deaver
You have to handle de-duplication upstream or downstream. It might
technically be possible to handle this in Spark but you'll probably have a
better time handling duplicates in the service that reads from Kafka.

On Wed, Mar 22, 2017 at 1:49 PM, Maurin Lenglart 
wrote:

> Hi,
> we are trying to build a spark streaming solution that subscribe and push
> to kafka.
>
> But we are running into the problem of duplicates events.
>
> Right now, I am doing a “forEachRdd” and loop over the message of each
> partition and send those message to kafka.
>
>
>
> Is there any good way of solving that issue?
>
>
>
> thanks
>



-- 
Regards,

Matt
Data Engineer
https://www.linkedin.com/in/mdeaver
http://mattdeav.pythonanywhere.com/


Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-21 Thread shyla deshpande
Thanks TD.

On Tue, Mar 14, 2017 at 4:37 PM, Tathagata Das  wrote:

> This setting allows multiple spark jobs generated through multiple
> foreachRDD to run concurrently, even if they are across batches. So output
> op2 from batch X, can run concurrently with op1 of batch X+1
> This is not safe because it breaks the checkpointing logic in subtle ways.
> Note that this was never documented in the spark online docs.
>
> On Tue, Mar 14, 2017 at 2:29 PM, shyla deshpande  > wrote:
>
>> Thanks TD for the response. Can you please provide more explanation. I am
>>  having multiple streams in the spark streaming application (Spark 2.0.2
>> using DStreams).  I know many people using this setting. So your
>> explanation will help a lot of people.
>>
>> Thanks
>>
>> On Fri, Mar 10, 2017 at 6:24 PM, Tathagata Das 
>> wrote:
>>
>>> That config I not safe. Please do not use it.
>>>
>>> On Mar 10, 2017 10:03 AM, "shyla deshpande" 
>>> wrote:
>>>
 I have a spark streaming application which processes 3 kafka streams
 and has 5 output operations.

 Not sure what should be the setting for spark.streaming.concurrentJobs.

 1. If the concurrentJobs setting is 4 does that mean 2 output
 operations will be run sequentially?

 2. If I had 6 cores what would be a ideal setting for concurrentJobs in
 this situation?

 I appreciate your input. Thanks

>>>
>>
>


Re: Spark Streaming from Kafka, deal with initial heavy load.

2017-03-20 Thread Cody Koeninger
You want spark.streaming.kafka.maxRatePerPartition for the direct stream.

On Sat, Mar 18, 2017 at 3:37 PM, Mal Edwin  wrote:
>
> Hi,
> You can enable backpressure to handle this.
>
> spark.streaming.backpressure.enabled
> spark.streaming.receiver.maxRate
>
> Thanks,
> Edwin
>
> On Mar 18, 2017, 12:53 AM -0400, sagarcasual . ,
> wrote:
>
> Hi, we have spark 1.6.1 streaming from Kafka (0.10.1) topic using direct
> approach. The streaming part works fine but when we initially start the job,
> we have to deal with really huge Kafka message backlog, millions of
> messages, and that first batch runs for over 40 hours,  and after 12 hours
> or so it becomes very very slow, it keeps crunching messages, but at a very
> low speed. Any idea how to overcome this issue? Once the job is all caught
> up, subsequent batches are quick and fast since the load is really tiny to
> process. So any idea how to avoid this problem?
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming from Kafka, deal with initial heavy load.

2017-03-18 Thread Mal Edwin

Hi,
You can enable backpressure to handle this.

spark.streaming.backpressure.enabled
spark.streaming.receiver.maxRate

Thanks,
Edwin

On Mar 18, 2017, 12:53 AM -0400, sagarcasual . , wrote:
> Hi, we have spark 1.6.1 streaming from Kafka (0.10.1) topic using direct 
> approach. The streaming part works fine but when we initially start the job, 
> we have to deal with really huge Kafka message backlog, millions of messages, 
> and that first batch runs for over 40 hours,  and after 12 hours or so it 
> becomes very very slow, it keeps crunching messages, but at a very low speed. 
> Any idea how to overcome this issue? Once the job is all caught up, 
> subsequent batches are quick and fast since the load is really tiny to 
> process. So any idea how to avoid this problem?




Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-14 Thread Tathagata Das
This setting allows multiple spark jobs generated through multiple
foreachRDD to run concurrently, even if they are across batches. So output
op2 from batch X, can run concurrently with op1 of batch X+1
This is not safe because it breaks the checkpointing logic in subtle ways.
Note that this was never documented in the spark online docs.

On Tue, Mar 14, 2017 at 2:29 PM, shyla deshpande 
wrote:

> Thanks TD for the response. Can you please provide more explanation. I am
>  having multiple streams in the spark streaming application (Spark 2.0.2
> using DStreams).  I know many people using this setting. So your
> explanation will help a lot of people.
>
> Thanks
>
> On Fri, Mar 10, 2017 at 6:24 PM, Tathagata Das 
> wrote:
>
>> That config I not safe. Please do not use it.
>>
>> On Mar 10, 2017 10:03 AM, "shyla deshpande" 
>> wrote:
>>
>>> I have a spark streaming application which processes 3 kafka streams and
>>> has 5 output operations.
>>>
>>> Not sure what should be the setting for spark.streaming.concurrentJobs.
>>>
>>> 1. If the concurrentJobs setting is 4 does that mean 2 output operations
>>> will be run sequentially?
>>>
>>> 2. If I had 6 cores what would be a ideal setting for concurrentJobs in
>>> this situation?
>>>
>>> I appreciate your input. Thanks
>>>
>>
>


Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-14 Thread shyla deshpande
Thanks TD for the response. Can you please provide more explanation. I am
 having multiple streams in the spark streaming application (Spark 2.0.2
using DStreams).  I know many people using this setting. So your
explanation will help a lot of people.

Thanks

On Fri, Mar 10, 2017 at 6:24 PM, Tathagata Das  wrote:

> That config I not safe. Please do not use it.
>
> On Mar 10, 2017 10:03 AM, "shyla deshpande" 
> wrote:
>
>> I have a spark streaming application which processes 3 kafka streams and
>> has 5 output operations.
>>
>> Not sure what should be the setting for spark.streaming.concurrentJobs.
>>
>> 1. If the concurrentJobs setting is 4 does that mean 2 output operations
>> will be run sequentially?
>>
>> 2. If I had 6 cores what would be a ideal setting for concurrentJobs in
>> this situation?
>>
>> I appreciate your input. Thanks
>>
>


Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-10 Thread Tathagata Das
That config I not safe. Please do not use it.

On Mar 10, 2017 10:03 AM, "shyla deshpande" 
wrote:

> I have a spark streaming application which processes 3 kafka streams and
> has 5 output operations.
>
> Not sure what should be the setting for spark.streaming.concurrentJobs.
>
> 1. If the concurrentJobs setting is 4 does that mean 2 output operations
> will be run sequentially?
>
> 2. If I had 6 cores what would be a ideal setting for concurrentJobs in
> this situation?
>
> I appreciate your input. Thanks
>


Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Cody Koeninger
If you haven't looked at the offset ranges in the logs for the time period
in question, I'd start there.

On Jan 24, 2017 2:51 PM, "Hakan İlter"  wrote:

Sorry for misunderstanding. When I said that, I meant there are no lag in
consumer. Kafka Manager shows each consumer's coverage and lag status.

On Tue, Jan 24, 2017 at 10:45 PM, Cody Koeninger  wrote:

> When you said " I check the offset ranges from Kafka Manager and don't
> see any significant deltas.", what were you comparing it against?  The
> offset ranges printed in spark logs?
>
> On Tue, Jan 24, 2017 at 2:11 PM, Hakan İlter  wrote:
> > First of all, I can both see the "Input Rate" from Spark job's statistics
> > page and Kafka producer message/sec from Kafka manager. The numbers are
> > different when I have the problem. Normally these numbers are very near.
> >
> > Besides, the job is an ETL job, it writes the results to Elastic Search.
> An
> > another legacy app also writes the same results to a database. There are
> > huge difference between DB and ES. I know how many records we process
> daily.
> >
> > Everything works fine if I run a job instance for each topic.
> >
> > On Tue, Jan 24, 2017 at 5:26 PM, Cody Koeninger 
> wrote:
> >>
> >> I'm confused, if you don't see any difference between the offsets the
> >> job is processing and the offsets available in kafka, then how do you
> >> know it's processing less than all of the data?
> >>
> >> On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter 
> >> wrote:
> >> > I'm using DirectStream as one stream for all topics. I check the
> offset
> >> > ranges from Kafka Manager and don't see any significant deltas.
> >> >
> >> > On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger 
> >> > wrote:
> >> >>
> >> >> Are you using receiver-based or direct stream?
> >> >>
> >> >> Are you doing 1 stream per topic, or 1 stream for all topics?
> >> >>
> >> >> If you're using the direct stream, the actual topics and offset
> ranges
> >> >> should be visible in the logs, so you should be able to see more
> >> >> detail about what's happening (e.g. all topics are still being
> >> >> processed but offsets are significantly behind, vs only certain
> topics
> >> >> being processed but keeping up with latest offsets)
> >> >>
> >> >> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter 
> >> >> wrote:
> >> >> > Hi everyone,
> >> >> >
> >> >> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data
> >> >> > from
> >> >> > multiple kafka topics. After starting the job, everything works
> fine
> >> >> > first
> >> >> > (like 700 req/sec) but after a while (couples of days or a week) it
> >> >> > starts
> >> >> > processing only some part of the data (like 350 req/sec). When I
> >> >> > check
> >> >> > the
> >> >> > kafka topics, I can see that there are still 700 req/sec coming to
> >> >> > the
> >> >> > topics. I don't see any errors, exceptions or any other problem.
> The
> >> >> > job
> >> >> > works fine when I start the same code with just single kafka topic.
> >> >> >
> >> >> > Do you have any idea or a clue to understand the problem?
> >> >> >
> >> >> > Thanks.
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> > View this message in context:
> >> >> >
> >> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-st
> reaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
> >> >> > Sent from the Apache Spark User List mailing list archive at
> >> >> > Nabble.com.
> >> >> >
> >> >> > 
> -
> >> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >> >
> >> >
> >> >
> >
> >
>


Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Hakan İlter
Sorry for misunderstanding. When I said that, I meant there are no lag in
consumer. Kafka Manager shows each consumer's coverage and lag status.

On Tue, Jan 24, 2017 at 10:45 PM, Cody Koeninger  wrote:

> When you said " I check the offset ranges from Kafka Manager and don't
> see any significant deltas.", what were you comparing it against?  The
> offset ranges printed in spark logs?
>
> On Tue, Jan 24, 2017 at 2:11 PM, Hakan İlter  wrote:
> > First of all, I can both see the "Input Rate" from Spark job's statistics
> > page and Kafka producer message/sec from Kafka manager. The numbers are
> > different when I have the problem. Normally these numbers are very near.
> >
> > Besides, the job is an ETL job, it writes the results to Elastic Search.
> An
> > another legacy app also writes the same results to a database. There are
> > huge difference between DB and ES. I know how many records we process
> daily.
> >
> > Everything works fine if I run a job instance for each topic.
> >
> > On Tue, Jan 24, 2017 at 5:26 PM, Cody Koeninger 
> wrote:
> >>
> >> I'm confused, if you don't see any difference between the offsets the
> >> job is processing and the offsets available in kafka, then how do you
> >> know it's processing less than all of the data?
> >>
> >> On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter 
> >> wrote:
> >> > I'm using DirectStream as one stream for all topics. I check the
> offset
> >> > ranges from Kafka Manager and don't see any significant deltas.
> >> >
> >> > On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger 
> >> > wrote:
> >> >>
> >> >> Are you using receiver-based or direct stream?
> >> >>
> >> >> Are you doing 1 stream per topic, or 1 stream for all topics?
> >> >>
> >> >> If you're using the direct stream, the actual topics and offset
> ranges
> >> >> should be visible in the logs, so you should be able to see more
> >> >> detail about what's happening (e.g. all topics are still being
> >> >> processed but offsets are significantly behind, vs only certain
> topics
> >> >> being processed but keeping up with latest offsets)
> >> >>
> >> >> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter 
> >> >> wrote:
> >> >> > Hi everyone,
> >> >> >
> >> >> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data
> >> >> > from
> >> >> > multiple kafka topics. After starting the job, everything works
> fine
> >> >> > first
> >> >> > (like 700 req/sec) but after a while (couples of days or a week) it
> >> >> > starts
> >> >> > processing only some part of the data (like 350 req/sec). When I
> >> >> > check
> >> >> > the
> >> >> > kafka topics, I can see that there are still 700 req/sec coming to
> >> >> > the
> >> >> > topics. I don't see any errors, exceptions or any other problem.
> The
> >> >> > job
> >> >> > works fine when I start the same code with just single kafka topic.
> >> >> >
> >> >> > Do you have any idea or a clue to understand the problem?
> >> >> >
> >> >> > Thanks.
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> > View this message in context:
> >> >> >
> >> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-
> streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
> >> >> > Sent from the Apache Spark User List mailing list archive at
> >> >> > Nabble.com.
> >> >> >
> >> >> > 
> -
> >> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >> >
> >> >
> >> >
> >
> >
>


Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Cody Koeninger
When you said " I check the offset ranges from Kafka Manager and don't
see any significant deltas.", what were you comparing it against?  The
offset ranges printed in spark logs?

On Tue, Jan 24, 2017 at 2:11 PM, Hakan İlter  wrote:
> First of all, I can both see the "Input Rate" from Spark job's statistics
> page and Kafka producer message/sec from Kafka manager. The numbers are
> different when I have the problem. Normally these numbers are very near.
>
> Besides, the job is an ETL job, it writes the results to Elastic Search. An
> another legacy app also writes the same results to a database. There are
> huge difference between DB and ES. I know how many records we process daily.
>
> Everything works fine if I run a job instance for each topic.
>
> On Tue, Jan 24, 2017 at 5:26 PM, Cody Koeninger  wrote:
>>
>> I'm confused, if you don't see any difference between the offsets the
>> job is processing and the offsets available in kafka, then how do you
>> know it's processing less than all of the data?
>>
>> On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter 
>> wrote:
>> > I'm using DirectStream as one stream for all topics. I check the offset
>> > ranges from Kafka Manager and don't see any significant deltas.
>> >
>> > On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger 
>> > wrote:
>> >>
>> >> Are you using receiver-based or direct stream?
>> >>
>> >> Are you doing 1 stream per topic, or 1 stream for all topics?
>> >>
>> >> If you're using the direct stream, the actual topics and offset ranges
>> >> should be visible in the logs, so you should be able to see more
>> >> detail about what's happening (e.g. all topics are still being
>> >> processed but offsets are significantly behind, vs only certain topics
>> >> being processed but keeping up with latest offsets)
>> >>
>> >> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter 
>> >> wrote:
>> >> > Hi everyone,
>> >> >
>> >> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data
>> >> > from
>> >> > multiple kafka topics. After starting the job, everything works fine
>> >> > first
>> >> > (like 700 req/sec) but after a while (couples of days or a week) it
>> >> > starts
>> >> > processing only some part of the data (like 350 req/sec). When I
>> >> > check
>> >> > the
>> >> > kafka topics, I can see that there are still 700 req/sec coming to
>> >> > the
>> >> > topics. I don't see any errors, exceptions or any other problem. The
>> >> > job
>> >> > works fine when I start the same code with just single kafka topic.
>> >> >
>> >> > Do you have any idea or a clue to understand the problem?
>> >> >
>> >> > Thanks.
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > View this message in context:
>> >> >
>> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
>> >> > Sent from the Apache Spark User List mailing list archive at
>> >> > Nabble.com.
>> >> >
>> >> > -
>> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >> >
>> >
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Hakan İlter
First of all, I can both see the "Input Rate" from Spark job's statistics
page and Kafka producer message/sec from Kafka manager. The numbers are
different when I have the problem. Normally these numbers are very near.

Besides, the job is an ETL job, it writes the results to Elastic Search. An
another legacy app also writes the same results to a database. There are
huge difference between DB and ES. I know how many records we process daily.

Everything works fine if I run a job instance for each topic.

On Tue, Jan 24, 2017 at 5:26 PM, Cody Koeninger  wrote:

> I'm confused, if you don't see any difference between the offsets the
> job is processing and the offsets available in kafka, then how do you
> know it's processing less than all of the data?
>
> On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter 
> wrote:
> > I'm using DirectStream as one stream for all topics. I check the offset
> > ranges from Kafka Manager and don't see any significant deltas.
> >
> > On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger 
> wrote:
> >>
> >> Are you using receiver-based or direct stream?
> >>
> >> Are you doing 1 stream per topic, or 1 stream for all topics?
> >>
> >> If you're using the direct stream, the actual topics and offset ranges
> >> should be visible in the logs, so you should be able to see more
> >> detail about what's happening (e.g. all topics are still being
> >> processed but offsets are significantly behind, vs only certain topics
> >> being processed but keeping up with latest offsets)
> >>
> >> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter 
> wrote:
> >> > Hi everyone,
> >> >
> >> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
> >> > multiple kafka topics. After starting the job, everything works fine
> >> > first
> >> > (like 700 req/sec) but after a while (couples of days or a week) it
> >> > starts
> >> > processing only some part of the data (like 350 req/sec). When I check
> >> > the
> >> > kafka topics, I can see that there are still 700 req/sec coming to the
> >> > topics. I don't see any errors, exceptions or any other problem. The
> job
> >> > works fine when I start the same code with just single kafka topic.
> >> >
> >> > Do you have any idea or a clue to understand the problem?
> >> >
> >> > Thanks.
> >> >
> >> >
> >> >
> >> > --
> >> > View this message in context:
> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-
> streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
> >> > Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >> >
> >> > -
> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >
> >
> >
>


Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Cody Koeninger
I'm confused, if you don't see any difference between the offsets the
job is processing and the offsets available in kafka, then how do you
know it's processing less than all of the data?

On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter  wrote:
> I'm using DirectStream as one stream for all topics. I check the offset
> ranges from Kafka Manager and don't see any significant deltas.
>
> On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger  wrote:
>>
>> Are you using receiver-based or direct stream?
>>
>> Are you doing 1 stream per topic, or 1 stream for all topics?
>>
>> If you're using the direct stream, the actual topics and offset ranges
>> should be visible in the logs, so you should be able to see more
>> detail about what's happening (e.g. all topics are still being
>> processed but offsets are significantly behind, vs only certain topics
>> being processed but keeping up with latest offsets)
>>
>> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter  wrote:
>> > Hi everyone,
>> >
>> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
>> > multiple kafka topics. After starting the job, everything works fine
>> > first
>> > (like 700 req/sec) but after a while (couples of days or a week) it
>> > starts
>> > processing only some part of the data (like 350 req/sec). When I check
>> > the
>> > kafka topics, I can see that there are still 700 req/sec coming to the
>> > topics. I don't see any errors, exceptions or any other problem. The job
>> > works fine when I start the same code with just single kafka topic.
>> >
>> > Do you have any idea or a clue to understand the problem?
>> >
>> > Thanks.
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-23 Thread Hakan İlter
I'm using DirectStream as one stream for all topics. I check the offset
ranges from Kafka Manager and don't see any significant deltas.

On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger  wrote:

> Are you using receiver-based or direct stream?
>
> Are you doing 1 stream per topic, or 1 stream for all topics?
>
> If you're using the direct stream, the actual topics and offset ranges
> should be visible in the logs, so you should be able to see more
> detail about what's happening (e.g. all topics are still being
> processed but offsets are significantly behind, vs only certain topics
> being processed but keeping up with latest offsets)
>
> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter  wrote:
> > Hi everyone,
> >
> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
> > multiple kafka topics. After starting the job, everything works fine
> first
> > (like 700 req/sec) but after a while (couples of days or a week) it
> starts
> > processing only some part of the data (like 350 req/sec). When I check
> the
> > kafka topics, I can see that there are still 700 req/sec coming to the
> > topics. I don't see any errors, exceptions or any other problem. The job
> > works fine when I start the same code with just single kafka topic.
> >
> > Do you have any idea or a clue to understand the problem?
> >
> > Thanks.
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-streaming-multiple-kafka-
> topic-doesn-t-work-at-least-once-tp28334.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-23 Thread Cody Koeninger
Are you using receiver-based or direct stream?

Are you doing 1 stream per topic, or 1 stream for all topics?

If you're using the direct stream, the actual topics and offset ranges
should be visible in the logs, so you should be able to see more
detail about what's happening (e.g. all topics are still being
processed but offsets are significantly behind, vs only certain topics
being processed but keeping up with latest offsets)

On Mon, Jan 23, 2017 at 3:14 PM, hakanilter  wrote:
> Hi everyone,
>
> I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
> multiple kafka topics. After starting the job, everything works fine first
> (like 700 req/sec) but after a while (couples of days or a week) it starts
> processing only some part of the data (like 350 req/sec). When I check the
> kafka topics, I can see that there are still 700 req/sec coming to the
> topics. I don't see any errors, exceptions or any other problem. The job
> works fine when I start the same code with just single kafka topic.
>
> Do you have any idea or a clue to understand the problem?
>
> Thanks.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Kafka

2016-12-12 Thread Anton Okolnychyi
thanks for all your replies, now I have a complete picture.



2016-12-12 16:49 GMT+01:00 Cody Koeninger :

> http://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html#creating-a-direct-stream
>
> Use a separate group id for each stream, like the docs say.
>
> If you're doing multiple output operations, and aren't caching, spark
> is going to read from kafka again each time, and if some of those
> reads are happening for the same group and same topicpartition, it's
> not going to work.
>
> On Sun, Dec 11, 2016 at 2:36 PM, Oleksii Dukhno
>  wrote:
> > Hi Anton,
> >
> > What is the command you run your spark app with? Why not working with
> data
> > instead of stream on your second stage operation? Can you provide logs
> with
> > the issue?
> >
> > ConcurrentModificationException is not a spark issue, it means that you
> use
> > the same Kafka consumer instance from more than one thread.
> >
> > Additionally,
> >
> > 1) As I understand new kafka consumer is created every time when you call
> > KafkaUtils.createDirectStream.
> > 2) If you assign the same group id to several consumer instances then all
> > the consumers will get different set of messages on the same topic. This
> is
> > a kind of load balancing which kafka provides with its Consumer API.
> >
> > Oleksii
> >
> > On 11 December 2016 at 18:46, Anton Okolnychyi <
> anton.okolnyc...@gmail.com>
> > wrote:
> >>
> >> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
> >> nothing custom.
> >>
> >>
> >> I will try restate the initial question. Let's consider an example.
> >>
> >> 1. I create a stream and subscribe to a certain topic.
> >>
> >> val stream = KafkaUtils.createDirectStream(...)
> >>
> >> 2. I extract the actual data from the stream. For instance, word counts.
> >>
> >> val wordCounts = stream.map(record => (record.value(), 1))
> >>
> >> 3. Then I compute something and output the result to console.
> >>
> >> val firstResult = stream.reduceByWindow(...)
> >> firstResult.print()
> >>
> >> Once that is done, I would like to perform another computation on top of
> >> wordCounts and output that result again to console. In my current
> >> understanding, I cannot just reuse wordCounts from Step 2 and should
> create
> >> a new stream with another group id and then define the second
> computation.
> >> Am I correct that if add the next part, then I can get
> >> "ConcurrentModificationException: KafkaConsumer is not safe for
> >> multi-threaded access"?
> >>
> >> // another computation on wordCounts
> >> val secondResult = wordCounts.reduceByKeyAndWindow(...)
> >> secondResult.output()
> >>
> >> Thanks,
> >> Anton
> >>
> >> 2016-12-11 17:11 GMT+01:00 Timur Shenkao :
> >>>
> >>> Hi,
> >>> Usual general questions are:
> >>> -- what is your Spark version?
> >>> -- what is your Kafka version?
> >>> -- do you use "standard" Kafka consumer or try to implement something
> >>> custom (your own multi-threaded consumer)?
> >>>
> >>> The freshest docs
> >>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html
> >>>
> >>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10
> >>> !!!)
> 
>  kafkaParams.put("group.id", "use_a_separate_group_id_for_
> each_stream");
> >>>
> >>>
> >>>
> >>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi
> >>>  wrote:
> 
>  Hi,
> 
>  I am experimenting with Spark Streaming and Kafka. I will appreciate
> if
>  someone can say whether the following assumption is correct.
> 
>  If I have multiple computations (each with its own output) on one
> stream
>  (created as KafkaUtils.createDirectStream), then there is a chance
> to have
>  ConcurrentModificationException: KafkaConsumer is not safe for
>  multi-threaded access.  To solve this problem, I should create a new
> stream
>  with different "group.id" for each computation.
> 
>  Am I right?
> 
>  Best regards,
>  Anton
> >>>
> >>>
> >>
> >
>


Re: Spark Streaming with Kafka

2016-12-12 Thread Cody Koeninger
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-a-direct-stream

Use a separate group id for each stream, like the docs say.

If you're doing multiple output operations, and aren't caching, spark
is going to read from kafka again each time, and if some of those
reads are happening for the same group and same topicpartition, it's
not going to work.

On Sun, Dec 11, 2016 at 2:36 PM, Oleksii Dukhno
 wrote:
> Hi Anton,
>
> What is the command you run your spark app with? Why not working with data
> instead of stream on your second stage operation? Can you provide logs with
> the issue?
>
> ConcurrentModificationException is not a spark issue, it means that you use
> the same Kafka consumer instance from more than one thread.
>
> Additionally,
>
> 1) As I understand new kafka consumer is created every time when you call
> KafkaUtils.createDirectStream.
> 2) If you assign the same group id to several consumer instances then all
> the consumers will get different set of messages on the same topic. This is
> a kind of load balancing which kafka provides with its Consumer API.
>
> Oleksii
>
> On 11 December 2016 at 18:46, Anton Okolnychyi 
> wrote:
>>
>> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
>> nothing custom.
>>
>>
>> I will try restate the initial question. Let's consider an example.
>>
>> 1. I create a stream and subscribe to a certain topic.
>>
>> val stream = KafkaUtils.createDirectStream(...)
>>
>> 2. I extract the actual data from the stream. For instance, word counts.
>>
>> val wordCounts = stream.map(record => (record.value(), 1))
>>
>> 3. Then I compute something and output the result to console.
>>
>> val firstResult = stream.reduceByWindow(...)
>> firstResult.print()
>>
>> Once that is done, I would like to perform another computation on top of
>> wordCounts and output that result again to console. In my current
>> understanding, I cannot just reuse wordCounts from Step 2 and should create
>> a new stream with another group id and then define the second computation.
>> Am I correct that if add the next part, then I can get
>> "ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access"?
>>
>> // another computation on wordCounts
>> val secondResult = wordCounts.reduceByKeyAndWindow(...)
>> secondResult.output()
>>
>> Thanks,
>> Anton
>>
>> 2016-12-11 17:11 GMT+01:00 Timur Shenkao :
>>>
>>> Hi,
>>> Usual general questions are:
>>> -- what is your Spark version?
>>> -- what is your Kafka version?
>>> -- do you use "standard" Kafka consumer or try to implement something
>>> custom (your own multi-threaded consumer)?
>>>
>>> The freshest docs
>>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>>>
>>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10
>>> !!!)

 kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>>>
>>>
>>>
>>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi
>>>  wrote:

 Hi,

 I am experimenting with Spark Streaming and Kafka. I will appreciate if
 someone can say whether the following assumption is correct.

 If I have multiple computations (each with its own output) on one stream
 (created as KafkaUtils.createDirectStream), then there is a chance to have
 ConcurrentModificationException: KafkaConsumer is not safe for
 multi-threaded access.  To solve this problem, I should create a new stream
 with different "group.id" for each computation.

 Am I right?

 Best regards,
 Anton
>>>
>>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Kafka

2016-12-11 Thread Oleksii Dukhno
Hi Anton,

What is the command you run your spark app with? Why not working with data
instead of stream on your second stage operation? Can you provide logs with
the issue?

ConcurrentModificationException is not a spark issue, it means that you use
the same Kafka consumer instance from more than one thread.

Additionally,

1) As I understand new kafka consumer is created every time when you
call KafkaUtils.createDirectStream.
2) If you assign the same group id to several consumer instances then all
the consumers will get different set of messages on the same topic. This is
a kind of load balancing which kafka provides with its Consumer API.

Oleksii

On 11 December 2016 at 18:46, Anton Okolnychyi 
wrote:

> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
> nothing custom.
>
>
> I will try restate the initial question. Let's consider an example.
>
> 1. I create a stream and subscribe to a certain topic.
>
> val stream = KafkaUtils.createDirectStream(...)
>
> 2. I extract the actual data from the stream. For instance, word counts.
>
> val wordCounts = stream.map(record => (record.value(), 1))
>
> 3. Then I compute something and output the result to console.
>
> val firstResult = stream.reduceByWindow(...)
> firstResult.print()
>
> Once that is done, I would like to perform another computation on top of
> wordCounts and output that result again to console. In my current
> understanding, I cannot just reuse wordCounts from Step 2 and should create
> a new stream with another group id and then define the second computation.
> Am I correct that if add the next part, then I can get "
> ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access"?
>
> // another computation on wordCounts
> val secondResult = wordCounts.reduceByKeyAndWindow(...)
> secondResult.output()
>
> Thanks,
> Anton
>
> 2016-12-11 17:11 GMT+01:00 Timur Shenkao :
>
>> Hi,
>> Usual general questions are:
>> -- what is your Spark version?
>> -- what is your Kafka version?
>> -- do you use "standard" Kafka consumer or try to implement something
>> custom (your own multi-threaded consumer)?
>>
>> The freshest docs https://spark.apache.org/docs/
>> latest/streaming-kafka-0-10-integration.html
>>
>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10
>> !!!)
>>
>>> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>>>
>>>
>>
>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi <
>> anton.okolnyc...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am experimenting with Spark Streaming and Kafka. I will appreciate if
>>> someone can say whether the following assumption is correct.
>>>
>>> If I have multiple computations (each with its own output) on one stream
>>> (created as KafkaUtils.createDirectStream), then there is a chance to
>>> have ConcurrentModificationException: KafkaConsumer is not safe for
>>> multi-threaded access.  To solve this problem, I should create a new stream
>>> with different "group.id" for each computation.
>>>
>>> Am I right?
>>>
>>> Best regards,
>>> Anton
>>>
>>
>>
>


Re: Spark Streaming with Kafka

2016-12-11 Thread Anton Okolnychyi
sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
nothing custom.


I will try restate the initial question. Let's consider an example.

1. I create a stream and subscribe to a certain topic.

val stream = KafkaUtils.createDirectStream(...)

2. I extract the actual data from the stream. For instance, word counts.

val wordCounts = stream.map(record => (record.value(), 1))

3. Then I compute something and output the result to console.

val firstResult = stream.reduceByWindow(...)
firstResult.print()

Once that is done, I would like to perform another computation on top of
wordCounts and output that result again to console. In my current
understanding, I cannot just reuse wordCounts from Step 2 and should create
a new stream with another group id and then define the second computation.
Am I correct that if add the next part, then I can get "
ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access"?

// another computation on wordCounts
val secondResult = wordCounts.reduceByKeyAndWindow(...)
secondResult.output()

Thanks,
Anton

2016-12-11 17:11 GMT+01:00 Timur Shenkao :

> Hi,
> Usual general questions are:
> -- what is your Spark version?
> -- what is your Kafka version?
> -- do you use "standard" Kafka consumer or try to implement something
> custom (your own multi-threaded consumer)?
>
> The freshest docs https://spark.apache.org/docs/
> latest/streaming-kafka-0-10-integration.html
>
> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10 !!!)
>
>> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>>
>>
>
> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi <
> anton.okolnyc...@gmail.com> wrote:
>
>> Hi,
>>
>> I am experimenting with Spark Streaming and Kafka. I will appreciate if
>> someone can say whether the following assumption is correct.
>>
>> If I have multiple computations (each with its own output) on one stream
>> (created as KafkaUtils.createDirectStream), then there is a chance to
>> have ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access.  To solve this problem, I should create a new stream
>> with different "group.id" for each computation.
>>
>> Am I right?
>>
>> Best regards,
>> Anton
>>
>
>


Re: Spark Streaming with Kafka

2016-12-11 Thread Timur Shenkao
Hi,
Usual general questions are:
-- what is your Spark version?
-- what is your Kafka version?
-- do you use "standard" Kafka consumer or try to implement something
custom (your own multi-threaded consumer)?

The freshest docs
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10 !!!)

> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>
>

On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi <
anton.okolnyc...@gmail.com> wrote:

> Hi,
>
> I am experimenting with Spark Streaming and Kafka. I will appreciate if
> someone can say whether the following assumption is correct.
>
> If I have multiple computations (each with its own output) on one stream
> (created as KafkaUtils.createDirectStream), then there is a chance to
> have ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access.  To solve this problem, I should create a new stream
> with different "group.id" for each computation.
>
> Am I right?
>
> Best regards,
> Anton
>


Re: Spark Streaming 2 Kafka 0.10 Integration for Aggregating Data

2016-10-18 Thread Sean Owen
Try adding the spark-streaming_2.11 artifact as a dependency too. You will
be directly depending on it.

On Tue, Oct 18, 2016 at 2:16 PM Furkan KAMACI 
wrote:

> Hi,
>
> I have a search application and want to monitor queries per second for it.
> I have Kafka at my backend which acts like a bus for messages. Whenever a
> search request is done I publish the nano time of the current system. I
> want to use Spark Streaming to aggregate such data but I am so new to it.
>
> I wanted to follow that example:
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>
> I've added that dependencies:
>
> 
> org.apache.spark
> spark-streaming-kafka-0-10_2.11
> 2.0.1
> 
> 
> org.apache.spark
> spark-core_2.10
> 2.0.1
> 
>
> However I cannot see even Duration class at my dependencies. On the other
> hand given documentation is missing and when you click Java there is no
> code at tabs.
>
> Could you guide me how can I implement monitoring such a metric?
>
> Kind Regards,
> Furkan KAMACI
>


Re: Spark Streaming with Kafka

2016-05-24 Thread Rasika Pohankar
Hi firemonk9,

Sorry, its been too long but I just saw this. I hope you were able to
resolve it. FWIW, we were able to solve this with the help of the Low Level
Kafka Consumer, instead of the inbuilt Kafka consumer in Spark, from here:
https://github.com/dibbhatt/kafka-spark-consumer/.

Regards,
Rasika.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-tp21222p27014.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming from Kafka best fit

2016-03-07 Thread pratik khadloya
Would using mapPartitions instead of map help here?

~Pratik

On Tue, Mar 1, 2016 at 10:07 AM Cody Koeninger  wrote:

> You don't need an equal number of executor cores to partitions.  An
> executor can and will work on multiple partitions within a batch, one after
> the other.  The real issue is whether you are able to keep your processing
> time under your batch time, so that delay doesn't increase.
>
> On Tue, Mar 1, 2016 at 11:59 AM, Jatin Kumar 
> wrote:
>
>> Thanks Cody!
>>
>> I understand what you said and if I am correct it will be using 224
>> executor cores just for fetching + stage-1 processing of 224 partitions. I
>> will obviously need more cores for processing further stages and fetching
>> next batch.
>>
>> I will start with higher number of executor cores and see how it goes.
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger 
>> wrote:
>>
>>> > "How do I keep a balance of executors which receive data from Kafka
>>> and which process data"
>>>
>>> I think you're misunderstanding how the direct stream works.  The
>>> executor which receives data is also the executor which processes data,
>>> there aren't separate receivers.  If it's a single stage worth of work
>>> (e.g. straight map / filter), the processing of a given partition is going
>>> to be done by the executor that read it from kafka.  If you do something
>>> involving a shuffle (e.g. reduceByKey), other executors will do additional
>>> processing.  The question of which executor works on which tasks is up to
>>> the scheduler (and getPreferredLocations, which only matters if you're
>>> running spark on the same nodes as kafka)
>>>
>>> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
>>> jku...@rocketfuelinc.com.invalid> wrote:
>>>
 Hello all,

 I see that there are as of today 3 ways one can read from Kafka in
 spark streaming:
 1. KafkaUtils.createStream() (here
 )
 2. KafkaUtils.createDirectStream() (here
 )
 3. Kafka-spark-consumer (here
 )

 My spark streaming application has to read from 1 kafka topic with
 around 224 partitions, consuming data at around 150MB/s (~90,000
 messages/sec) which reduces to around 3MB/s (~1400 messages/sec) after
 filtering. After filtering I need to maintain top 1 URL counts. I don't
 really care about exactly once semantics as I am interested in rough
 estimate.

 Code:

 sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
 sparkConf.setAppName("KafkaReader")
 val ssc = StreamingContext.getOrCreate(kCheckPointDir, 
 createStreamingContext)

 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
 val kafkaParams = Map[String, String](
   "metadata.broker.list" -> "kafka.server.ip:9092",
   "group.id" -> consumer_group
 )

 val lineStreams = (1 to N).map{ _ =>
   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
 ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
 }

 ssc.union(
   lineStreams.map(stream => {
   stream.map(ParseStringToLogRecord)
 .filter(record => isGoodRecord(record))
 .map(record => record.url)
   })
 ).window(Seconds(120), Seconds(120))  // 2 Minute window
   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute 
 moving window, 28 will probably help in parallelism
   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
   .mapPartitions(iter => {
 iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
 1000).iterator
   }, true)
   .foreachRDD((latestRDD, rddTime) => {
   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
 record._1)).sortByKey(false).take(1000))
   })

 ssc.start()
 ssc.awaitTermination()

 Questions:

 a) I used #2 but I found that I couldn't control how many executors
 will be actually fetching from Kafka. How do I keep a balance of executors
 which receive data from Kafka and which process data? Do they keep changing
 for every batch?

 b) Now I am trying to use #1 creating multiple DStreams, filtering them
 and then doing a union. I don't understand why would the number of events
 processed per 120 seconds batch will change drastically. PFA the events/sec
 graph while running with 1 receiver. How to debug this?

 c) What will be the most suitable method to integrate with Kafka from
 above 3? Any recommendations for getting maximum performance, running the
 streaming application reliably in production 

Re: Spark streaming from Kafka best fit

2016-03-01 Thread Cody Koeninger
You don't need an equal number of executor cores to partitions.  An
executor can and will work on multiple partitions within a batch, one after
the other.  The real issue is whether you are able to keep your processing
time under your batch time, so that delay doesn't increase.

On Tue, Mar 1, 2016 at 11:59 AM, Jatin Kumar 
wrote:

> Thanks Cody!
>
> I understand what you said and if I am correct it will be using 224
> executor cores just for fetching + stage-1 processing of 224 partitions. I
> will obviously need more cores for processing further stages and fetching
> next batch.
>
> I will start with higher number of executor cores and see how it goes.
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger  wrote:
>
>> > "How do I keep a balance of executors which receive data from Kafka
>> and which process data"
>>
>> I think you're misunderstanding how the direct stream works.  The
>> executor which receives data is also the executor which processes data,
>> there aren't separate receivers.  If it's a single stage worth of work
>> (e.g. straight map / filter), the processing of a given partition is going
>> to be done by the executor that read it from kafka.  If you do something
>> involving a shuffle (e.g. reduceByKey), other executors will do additional
>> processing.  The question of which executor works on which tasks is up to
>> the scheduler (and getPreferredLocations, which only matters if you're
>> running spark on the same nodes as kafka)
>>
>> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
>> jku...@rocketfuelinc.com.invalid> wrote:
>>
>>> Hello all,
>>>
>>> I see that there are as of today 3 ways one can read from Kafka in spark
>>> streaming:
>>> 1. KafkaUtils.createStream() (here
>>> )
>>> 2. KafkaUtils.createDirectStream() (here
>>> )
>>> 3. Kafka-spark-consumer (here
>>> )
>>>
>>> My spark streaming application has to read from 1 kafka topic with
>>> around 224 partitions, consuming data at around 150MB/s (~90,000
>>> messages/sec) which reduces to around 3MB/s (~1400 messages/sec) after
>>> filtering. After filtering I need to maintain top 1 URL counts. I don't
>>> really care about exactly once semantics as I am interested in rough
>>> estimate.
>>>
>>> Code:
>>>
>>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
>>> sparkConf.setAppName("KafkaReader")
>>> val ssc = StreamingContext.getOrCreate(kCheckPointDir, 
>>> createStreamingContext)
>>>
>>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>>> val kafkaParams = Map[String, String](
>>>   "metadata.broker.list" -> "kafka.server.ip:9092",
>>>   "group.id" -> consumer_group
>>> )
>>>
>>> val lineStreams = (1 to N).map{ _ =>
>>>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>>> ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
>>> }
>>>
>>> ssc.union(
>>>   lineStreams.map(stream => {
>>>   stream.map(ParseStringToLogRecord)
>>> .filter(record => isGoodRecord(record))
>>> .map(record => record.url)
>>>   })
>>> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>>>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute 
>>> moving window, 28 will probably help in parallelism
>>>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>>>   .mapPartitions(iter => {
>>> iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
>>> 1000).iterator
>>>   }, true)
>>>   .foreachRDD((latestRDD, rddTime) => {
>>>   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
>>> record._1)).sortByKey(false).take(1000))
>>>   })
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>>
>>> Questions:
>>>
>>> a) I used #2 but I found that I couldn't control how many executors will
>>> be actually fetching from Kafka. How do I keep a balance of executors which
>>> receive data from Kafka and which process data? Do they keep changing for
>>> every batch?
>>>
>>> b) Now I am trying to use #1 creating multiple DStreams, filtering them
>>> and then doing a union. I don't understand why would the number of events
>>> processed per 120 seconds batch will change drastically. PFA the events/sec
>>> graph while running with 1 receiver. How to debug this?
>>>
>>> c) What will be the most suitable method to integrate with Kafka from
>>> above 3? Any recommendations for getting maximum performance, running the
>>> streaming application reliably in production environment?
>>>
>>> --
>>> Thanks
>>> Jatin Kumar
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org

Re: Spark streaming from Kafka best fit

2016-03-01 Thread Jatin Kumar
Thanks Cody!

I understand what you said and if I am correct it will be using 224
executor cores just for fetching + stage-1 processing of 224 partitions. I
will obviously need more cores for processing further stages and fetching
next batch.

I will start with higher number of executor cores and see how it goes.

--
Thanks
Jatin Kumar | Rocket Scientist
+91-7696741743 m

On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger  wrote:

> > "How do I keep a balance of executors which receive data from Kafka and
> which process data"
>
> I think you're misunderstanding how the direct stream works.  The executor
> which receives data is also the executor which processes data, there aren't
> separate receivers.  If it's a single stage worth of work (e.g. straight
> map / filter), the processing of a given partition is going to be done by
> the executor that read it from kafka.  If you do something involving a
> shuffle (e.g. reduceByKey), other executors will do additional processing.
> The question of which executor works on which tasks is up to the scheduler
> (and getPreferredLocations, which only matters if you're running spark on
> the same nodes as kafka)
>
> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
> jku...@rocketfuelinc.com.invalid> wrote:
>
>> Hello all,
>>
>> I see that there are as of today 3 ways one can read from Kafka in spark
>> streaming:
>> 1. KafkaUtils.createStream() (here
>> )
>> 2. KafkaUtils.createDirectStream() (here
>> )
>> 3. Kafka-spark-consumer (here
>> )
>>
>> My spark streaming application has to read from 1 kafka topic with around
>> 224 partitions, consuming data at around 150MB/s (~90,000 messages/sec)
>> which reduces to around 3MB/s (~1400 messages/sec) after filtering. After
>> filtering I need to maintain top 1 URL counts. I don't really care
>> about exactly once semantics as I am interested in rough estimate.
>>
>> Code:
>>
>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
>> sparkConf.setAppName("KafkaReader")
>> val ssc = StreamingContext.getOrCreate(kCheckPointDir, 
>> createStreamingContext)
>>
>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>> val kafkaParams = Map[String, String](
>>   "metadata.broker.list" -> "kafka.server.ip:9092",
>>   "group.id" -> consumer_group
>> )
>>
>> val lineStreams = (1 to N).map{ _ =>
>>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>> ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
>> }
>>
>> ssc.union(
>>   lineStreams.map(stream => {
>>   stream.map(ParseStringToLogRecord)
>> .filter(record => isGoodRecord(record))
>> .map(record => record.url)
>>   })
>> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute 
>> moving window, 28 will probably help in parallelism
>>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>>   .mapPartitions(iter => {
>> iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
>> 1000).iterator
>>   }, true)
>>   .foreachRDD((latestRDD, rddTime) => {
>>   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
>> record._1)).sortByKey(false).take(1000))
>>   })
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>> Questions:
>>
>> a) I used #2 but I found that I couldn't control how many executors will
>> be actually fetching from Kafka. How do I keep a balance of executors which
>> receive data from Kafka and which process data? Do they keep changing for
>> every batch?
>>
>> b) Now I am trying to use #1 creating multiple DStreams, filtering them
>> and then doing a union. I don't understand why would the number of events
>> processed per 120 seconds batch will change drastically. PFA the events/sec
>> graph while running with 1 receiver. How to debug this?
>>
>> c) What will be the most suitable method to integrate with Kafka from
>> above 3? Any recommendations for getting maximum performance, running the
>> streaming application reliably in production environment?
>>
>> --
>> Thanks
>> Jatin Kumar
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>


Re: Spark streaming from Kafka best fit

2016-03-01 Thread Cody Koeninger
> "How do I keep a balance of executors which receive data from Kafka and
which process data"

I think you're misunderstanding how the direct stream works.  The executor
which receives data is also the executor which processes data, there aren't
separate receivers.  If it's a single stage worth of work (e.g. straight
map / filter), the processing of a given partition is going to be done by
the executor that read it from kafka.  If you do something involving a
shuffle (e.g. reduceByKey), other executors will do additional processing.
The question of which executor works on which tasks is up to the scheduler
(and getPreferredLocations, which only matters if you're running spark on
the same nodes as kafka)

On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
jku...@rocketfuelinc.com.invalid> wrote:

> Hello all,
>
> I see that there are as of today 3 ways one can read from Kafka in spark
> streaming:
> 1. KafkaUtils.createStream() (here
> )
> 2. KafkaUtils.createDirectStream() (here
> )
> 3. Kafka-spark-consumer (here
> )
>
> My spark streaming application has to read from 1 kafka topic with around
> 224 partitions, consuming data at around 150MB/s (~90,000 messages/sec)
> which reduces to around 3MB/s (~1400 messages/sec) after filtering. After
> filtering I need to maintain top 1 URL counts. I don't really care
> about exactly once semantics as I am interested in rough estimate.
>
> Code:
>
> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
> sparkConf.setAppName("KafkaReader")
> val ssc = StreamingContext.getOrCreate(kCheckPointDir, createStreamingContext)
>
> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
> val kafkaParams = Map[String, String](
>   "metadata.broker.list" -> "kafka.server.ip:9092",
>   "group.id" -> consumer_group
> )
>
> val lineStreams = (1 to N).map{ _ =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
> }
>
> ssc.union(
>   lineStreams.map(stream => {
>   stream.map(ParseStringToLogRecord)
> .filter(record => isGoodRecord(record))
> .map(record => record.url)
>   })
> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute moving 
> window, 28 will probably help in parallelism
>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>   .mapPartitions(iter => {
> iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
> 1000).iterator
>   }, true)
>   .foreachRDD((latestRDD, rddTime) => {
>   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
> record._1)).sortByKey(false).take(1000))
>   })
>
> ssc.start()
> ssc.awaitTermination()
>
> Questions:
>
> a) I used #2 but I found that I couldn't control how many executors will
> be actually fetching from Kafka. How do I keep a balance of executors which
> receive data from Kafka and which process data? Do they keep changing for
> every batch?
>
> b) Now I am trying to use #1 creating multiple DStreams, filtering them
> and then doing a union. I don't understand why would the number of events
> processed per 120 seconds batch will change drastically. PFA the events/sec
> graph while running with 1 receiver. How to debug this?
>
> c) What will be the most suitable method to integrate with Kafka from
> above 3? Any recommendations for getting maximum performance, running the
> streaming application reliably in production environment?
>
> --
> Thanks
> Jatin Kumar
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread Cody Koeninger
If by smaller block interval you mean the value in seconds passed to the
streaming context constructor, no.  You'll still get everything from the
starting offset until now in the first batch.

On Thu, Feb 18, 2016 at 10:02 AM, praveen S  wrote:

> Sorry.. Rephrasing :
> Can this issue be resolved by having a smaller block interval?
>
> Regards,
> Praveen
> On 18 Feb 2016 21:30, "praveen S"  wrote:
>
>> Can having a smaller block interval only resolve this?
>>
>> Regards,
>> Praveen
>> On 18 Feb 2016 21:13, "Cody Koeninger"  wrote:
>>
>>> Backpressure won't help you with the first batch, you'd need 
>>> spark.streaming.kafka.maxRatePerPartition
>>> for that
>>>
>>> On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:
>>>
 Have a look at

 spark.streaming.backpressure.enabled
 Property

 Regards,
 Praveen
 On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:

> I have a spark streaming application running in production. I am
> trying to find a solution for a particular use case when my application 
> has
> a downtime of say 5 hours and is restarted. Now, when I start my streaming
> application after 5 hours there would be considerable amount of data then
> in the Kafka and my cluster would be unable to repartition and process 
> that.
>
> Is there any workaround so that when my streaming application starts
> it starts taking data for 1-2 hours, process it , then take the data for
> next 1 hour process it. Now when its done processing of previous 5 hours
> data which missed, normal streaming should start with the given slide
> interval.
>
> Please suggest any ideas and feasibility of this.
>
>
> Thanks !!
> Abhi
>

>>>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread praveen S
Sorry.. Rephrasing :
Can this issue be resolved by having a smaller block interval?

Regards,
Praveen
On 18 Feb 2016 21:30, "praveen S"  wrote:

> Can having a smaller block interval only resolve this?
>
> Regards,
> Praveen
> On 18 Feb 2016 21:13, "Cody Koeninger"  wrote:
>
>> Backpressure won't help you with the first batch, you'd need 
>> spark.streaming.kafka.maxRatePerPartition
>> for that
>>
>> On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:
>>
>>> Have a look at
>>>
>>> spark.streaming.backpressure.enabled
>>> Property
>>>
>>> Regards,
>>> Praveen
>>> On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:
>>>
 I have a spark streaming application running in production. I am trying
 to find a solution for a particular use case when my application has a
 downtime of say 5 hours and is restarted. Now, when I start my streaming
 application after 5 hours there would be considerable amount of data then
 in the Kafka and my cluster would be unable to repartition and process 
 that.

 Is there any workaround so that when my streaming application starts it
 starts taking data for 1-2 hours, process it , then take the data for next
 1 hour process it. Now when its done processing of previous 5 hours data
 which missed, normal streaming should start with the given slide interval.

 Please suggest any ideas and feasibility of this.


 Thanks !!
 Abhi

>>>
>>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread praveen S
Can having a smaller block interval only resolve this?

Regards,
Praveen
On 18 Feb 2016 21:13, "Cody Koeninger"  wrote:

> Backpressure won't help you with the first batch, you'd need 
> spark.streaming.kafka.maxRatePerPartition
> for that
>
> On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:
>
>> Have a look at
>>
>> spark.streaming.backpressure.enabled
>> Property
>>
>> Regards,
>> Praveen
>> On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:
>>
>>> I have a spark streaming application running in production. I am trying
>>> to find a solution for a particular use case when my application has a
>>> downtime of say 5 hours and is restarted. Now, when I start my streaming
>>> application after 5 hours there would be considerable amount of data then
>>> in the Kafka and my cluster would be unable to repartition and process that.
>>>
>>> Is there any workaround so that when my streaming application starts it
>>> starts taking data for 1-2 hours, process it , then take the data for next
>>> 1 hour process it. Now when its done processing of previous 5 hours data
>>> which missed, normal streaming should start with the given slide interval.
>>>
>>> Please suggest any ideas and feasibility of this.
>>>
>>>
>>> Thanks !!
>>> Abhi
>>>
>>
>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread Cody Koeninger
Backpressure won't help you with the first batch, you'd need
spark.streaming.kafka.maxRatePerPartition
for that

On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:

> Have a look at
>
> spark.streaming.backpressure.enabled
> Property
>
> Regards,
> Praveen
> On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:
>
>> I have a spark streaming application running in production. I am trying
>> to find a solution for a particular use case when my application has a
>> downtime of say 5 hours and is restarted. Now, when I start my streaming
>> application after 5 hours there would be considerable amount of data then
>> in the Kafka and my cluster would be unable to repartition and process that.
>>
>> Is there any workaround so that when my streaming application starts it
>> starts taking data for 1-2 hours, process it , then take the data for next
>> 1 hour process it. Now when its done processing of previous 5 hours data
>> which missed, normal streaming should start with the given slide interval.
>>
>> Please suggest any ideas and feasibility of this.
>>
>>
>> Thanks !!
>> Abhi
>>
>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread praveen S
Have a look at

spark.streaming.backpressure.enabled
Property

Regards,
Praveen
On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:

> I have a spark streaming application running in production. I am trying to
> find a solution for a particular use case when my application has a
> downtime of say 5 hours and is restarted. Now, when I start my streaming
> application after 5 hours there would be considerable amount of data then
> in the Kafka and my cluster would be unable to repartition and process that.
>
> Is there any workaround so that when my streaming application starts it
> starts taking data for 1-2 hours, process it , then take the data for next
> 1 hour process it. Now when its done processing of previous 5 hours data
> which missed, normal streaming should start with the given slide interval.
>
> Please suggest any ideas and feasibility of this.
>
>
> Thanks !!
> Abhi
>


Re: Spark Streaming with Kafka DirectStream

2016-02-17 Thread Cody Koeninger
You can print whatever you want wherever you want, it's just a question of
whether it's going to show up on the driver or the various executors logs

On Wed, Feb 17, 2016 at 5:50 AM, Cyril Scetbon 
wrote:

> I don't think we can print an integer value in a spark streaming process
> As opposed to a spark job. I think I can print the content of an rdd but
> not debug messages. Am I wrong ?
>
> Cyril Scetbon
>
> On Feb 17, 2016, at 12:51 AM, ayan guha  wrote:
>
> Hi
>
> You can always use RDD properties, which already has partition information.
>
>
> https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/how_many_partitions_does_an_rdd_have.html
>
>
> On Wed, Feb 17, 2016 at 2:36 PM, Cyril Scetbon 
> wrote:
>
>> Your understanding is the right one (having re-read the documentation).
>> Still wondering how I can verify that 5 partitions have been created. My
>> job is reading from a topic in Kafka that has 5 partitions and sends the
>> data to E/S. I can see that when there is one task to read from Kafka there
>> are 5 tasks writing to E/S. So I'm supposing that the task reading from
>> Kafka does it in // using 5 partitions and that's why there are then 5
>> tasks to write to E/S. But I'm supposing ...
>>
>> On Feb 16, 2016, at 21:12, ayan guha  wrote:
>>
>> I have a slightly different understanding.
>>
>> Direct stream generates 1 RDD per batch, however, number of partitions in
>> that RDD = number of partitions in kafka topic.
>>
>> On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon 
>> wrote:
>>
>>> Hi guys,
>>>
>>> I'm making some tests with Spark and Kafka using a Python script. I use
>>> the second method that doesn't need any receiver (Direct Approach). It
>>> should adapt the number of RDDs to the number of partitions in the topic.
>>> I'm trying to verify it. What's the easiest way to verify it ? I also tried
>>> to co-locate Yarn, Spark and Kafka to check if RDDs are created depending
>>> on the leaders of partitions in a topic, and they are not. Can you confirm
>>> that RDDs are not created depending on the location of partitions and that
>>> co-locating Kafka with Spark is not a must-have or that Spark does not take
>>> advantage of it ?
>>>
>>> As the parallelism is simplified (by creating as many RDDs as there are
>>> partitions) I suppose that the biggest part of the tuning is playing with
>>> KafKa partitions (not talking about network configuration or management of
>>> Spark resources) ?
>>>
>>> Thank you
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>
>


Re: Spark Streaming with Kafka DirectStream

2016-02-17 Thread Cyril Scetbon
I don't think we can print an integer value in a spark streaming process As 
opposed to a spark job. I think I can print the content of an rdd but not debug 
messages. Am I wrong ? 

Cyril Scetbon

> On Feb 17, 2016, at 12:51 AM, ayan guha  wrote:
> 
> Hi
> 
> You can always use RDD properties, which already has partition information.
> 
> https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/how_many_partitions_does_an_rdd_have.html
>  
> 
>> On Wed, Feb 17, 2016 at 2:36 PM, Cyril Scetbon  wrote:
>> Your understanding is the right one (having re-read the documentation). 
>> Still wondering how I can verify that 5 partitions have been created. My job 
>> is reading from a topic in Kafka that has 5 partitions and sends the data to 
>> E/S. I can see that when there is one task to read from Kafka there are 5 
>> tasks writing to E/S. So I'm supposing that the task reading from Kafka does 
>> it in // using 5 partitions and that's why there are then 5 tasks to write 
>> to E/S. But I'm supposing ...
>> 
>>> On Feb 16, 2016, at 21:12, ayan guha  wrote:
>>> 
>>> I have a slightly different understanding. 
>>> 
>>> Direct stream generates 1 RDD per batch, however, number of partitions in 
>>> that RDD = number of partitions in kafka topic.
>>> 
 On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon  
 wrote:
 Hi guys,
 
 I'm making some tests with Spark and Kafka using a Python script. I use 
 the second method that doesn't need any receiver (Direct Approach). It 
 should adapt the number of RDDs to the number of partitions in the topic. 
 I'm trying to verify it. What's the easiest way to verify it ? I also 
 tried to co-locate Yarn, Spark and Kafka to check if RDDs are created 
 depending on the leaders of partitions in a topic, and they are not. Can 
 you confirm that RDDs are not created depending on the location of 
 partitions and that co-locating Kafka with Spark is not a must-have or 
 that Spark does not take advantage of it ?
 
 As the parallelism is simplified (by creating as many RDDs as there are 
 partitions) I suppose that the biggest part of the tuning is playing with 
 KafKa partitions (not talking about network configuration or management of 
 Spark resources) ?
 
 Thank you
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>>> 
>>> 
>>> -- 
>>> Best Regards,
>>> Ayan Guha
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha


Re: Spark Streaming with Kafka Use Case

2016-02-17 Thread Cody Koeninger
Just use a kafka rdd in a batch job or two, then start your streaming job.

On Wed, Feb 17, 2016 at 12:57 AM, Abhishek Anand 
wrote:

> I have a spark streaming application running in production. I am trying to
> find a solution for a particular use case when my application has a
> downtime of say 5 hours and is restarted. Now, when I start my streaming
> application after 5 hours there would be considerable amount of data then
> in the Kafka and my cluster would be unable to repartition and process that.
>
> Is there any workaround so that when my streaming application starts it
> starts taking data for 1-2 hours, process it , then take the data for next
> 1 hour process it. Now when its done processing of previous 5 hours data
> which missed, normal streaming should start with the given slide interval.
>
> Please suggest any ideas and feasibility of this.
>
>
> Thanks !!
> Abhi
>


Re: Spark Streaming with Kafka DirectStream

2016-02-16 Thread ayan guha
Hi

You can always use RDD properties, which already has partition information.

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/how_many_partitions_does_an_rdd_have.html


On Wed, Feb 17, 2016 at 2:36 PM, Cyril Scetbon 
wrote:

> Your understanding is the right one (having re-read the documentation).
> Still wondering how I can verify that 5 partitions have been created. My
> job is reading from a topic in Kafka that has 5 partitions and sends the
> data to E/S. I can see that when there is one task to read from Kafka there
> are 5 tasks writing to E/S. So I'm supposing that the task reading from
> Kafka does it in // using 5 partitions and that's why there are then 5
> tasks to write to E/S. But I'm supposing ...
>
> On Feb 16, 2016, at 21:12, ayan guha  wrote:
>
> I have a slightly different understanding.
>
> Direct stream generates 1 RDD per batch, however, number of partitions in
> that RDD = number of partitions in kafka topic.
>
> On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon 
> wrote:
>
>> Hi guys,
>>
>> I'm making some tests with Spark and Kafka using a Python script. I use
>> the second method that doesn't need any receiver (Direct Approach). It
>> should adapt the number of RDDs to the number of partitions in the topic.
>> I'm trying to verify it. What's the easiest way to verify it ? I also tried
>> to co-locate Yarn, Spark and Kafka to check if RDDs are created depending
>> on the leaders of partitions in a topic, and they are not. Can you confirm
>> that RDDs are not created depending on the location of partitions and that
>> co-locating Kafka with Spark is not a must-have or that Spark does not take
>> advantage of it ?
>>
>> As the parallelism is simplified (by creating as many RDDs as there are
>> partitions) I suppose that the biggest part of the tuning is playing with
>> KafKa partitions (not talking about network configuration or management of
>> Spark resources) ?
>>
>> Thank you
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Spark Streaming with Kafka DirectStream

2016-02-16 Thread Cyril Scetbon
Your understanding is the right one (having re-read the documentation). Still 
wondering how I can verify that 5 partitions have been created. My job is 
reading from a topic in Kafka that has 5 partitions and sends the data to E/S. 
I can see that when there is one task to read from Kafka there are 5 tasks 
writing to E/S. So I'm supposing that the task reading from Kafka does it in // 
using 5 partitions and that's why there are then 5 tasks to write to E/S. But 
I'm supposing ...

> On Feb 16, 2016, at 21:12, ayan guha  wrote:
> 
> I have a slightly different understanding. 
> 
> Direct stream generates 1 RDD per batch, however, number of partitions in 
> that RDD = number of partitions in kafka topic.
> 
> On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon  > wrote:
> Hi guys,
> 
> I'm making some tests with Spark and Kafka using a Python script. I use the 
> second method that doesn't need any receiver (Direct Approach). It should 
> adapt the number of RDDs to the number of partitions in the topic. I'm trying 
> to verify it. What's the easiest way to verify it ? I also tried to co-locate 
> Yarn, Spark and Kafka to check if RDDs are created depending on the leaders 
> of partitions in a topic, and they are not. Can you confirm that RDDs are not 
> created depending on the location of partitions and that co-locating Kafka 
> with Spark is not a must-have or that Spark does not take advantage of it ?
> 
> As the parallelism is simplified (by creating as many RDDs as there are 
> partitions) I suppose that the biggest part of the tuning is playing with 
> KafKa partitions (not talking about network configuration or management of 
> Spark resources) ?
> 
> Thank you
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha



Re: Spark Streaming with Kafka DirectStream

2016-02-16 Thread ayan guha
I have a slightly different understanding.

Direct stream generates 1 RDD per batch, however, number of partitions in
that RDD = number of partitions in kafka topic.

On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon 
wrote:

> Hi guys,
>
> I'm making some tests with Spark and Kafka using a Python script. I use
> the second method that doesn't need any receiver (Direct Approach). It
> should adapt the number of RDDs to the number of partitions in the topic.
> I'm trying to verify it. What's the easiest way to verify it ? I also tried
> to co-locate Yarn, Spark and Kafka to check if RDDs are created depending
> on the leaders of partitions in a topic, and they are not. Can you confirm
> that RDDs are not created depending on the location of partitions and that
> co-locating Kafka with Spark is not a must-have or that Spark does not take
> advantage of it ?
>
> As the parallelism is simplified (by creating as many RDDs as there are
> partitions) I suppose that the biggest part of the tuning is playing with
> KafKa partitions (not talking about network configuration or management of
> Spark resources) ?
>
> Thank you
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Spark Streaming with Kafka: Dealing with 'slow' partitions

2016-02-12 Thread p pathiyil
Thanks Sebastian.

I was indeed trying out FAIR scheduling with a high value for
concurrentJobs today.

It does improve the latency seen by the non-hot partitions, even if it does
not provide complete isolation. So it might be an acceptable middle ground.
On 12 Feb 2016 12:18, "Sebastian Piu"  wrote:

> Have you tried using fair scheduler and queues
> On 12 Feb 2016 4:24 a.m., "p pathiyil"  wrote:
>
>> With this setting, I can see that the next job is being executed before
>> the previous one is finished. However, the processing of the 'hot'
>> partition eventually hogs all the concurrent jobs. If there was a way to
>> restrict jobs to be one per partition, then this setting would provide the
>> per-partition isolation.
>>
>> Is there anything in the framework which would give control over that
>> aspect ?
>>
>> Thanks.
>>
>>
>> On Thu, Feb 11, 2016 at 9:55 PM, Cody Koeninger 
>> wrote:
>>
>>> spark.streaming.concurrentJobs
>>>
>>>
>>> see e.g. 
>>> http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
>>>
>>>
>>> On Thu, Feb 11, 2016 at 9:33 AM, p pathiyil  wrote:
>>>
 Thanks for the response Cody.

 The producers are out of my control, so can't really balance the
 incoming content across the various topics and partitions. The number of
 topics and partitions are quite large and the volume across then not very
 well known ahead of time. So it is quite hard to segregate low and high
 volume topics in to separate driver programs.

 Will look at shuffle / repartition.

 Could you share the setting for starting another batch in parallel ? It
 might be ok to call the 'save' of the processed messages out of order if
 that is the only consequence of this setting.

 When separate DStreams are created per partition (and if union() is not
 called on them), what aspect of the framework still ties the scheduling of
 jobs across the partitions together ? Asking this to see if creating
 multiple threads in the driver and calling createDirectStream per partition
 in those threads can provide isolation.



 On Thu, Feb 11, 2016 at 8:14 PM, Cody Koeninger 
 wrote:

> The real way to fix this is by changing partitioning, so you don't
> have a hot partition.  It would be better to do this at the time you're
> producing messages, but you can also do it with a shuffle / repartition
> during consuming.
>
> There is a setting to allow another batch to start in parallel, but
> that's likely to have unintended consequences.
>
> On Thu, Feb 11, 2016 at 7:59 AM, p pathiyil 
> wrote:
>
>> Hi,
>>
>> I am looking at a way to isolate the processing of messages from each
>> Kafka partition within the same driver.
>>
>> Scenario: A DStream is created with the createDirectStream call by
>> passing in a few partitions. Let us say that the streaming context is
>> defined to have a time duration of 2 seconds. If the processing of 
>> messages
>> from a single partition takes more than 2 seconds (while all the others
>> finish much quicker), it seems that the next set of jobs get scheduled 
>> only
>> after the processing of that last partition. This means that the delay is
>> effective for all partitions and not just the partition that was truly 
>> the
>> cause of the delay. What I would like to do is to have the delay only
>> impact the 'slow' partition.
>>
>> Tried to create one DStream per partition and then do a union of all
>> partitions, (similar to the sample in
>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
>> but that didn't seem to help.
>>
>> Please suggest the correct approach to solve this issue.
>>
>> Thanks,
>> Praveen.
>>
>
>

>>>
>>


Re: Spark Streaming with Kafka: Dealing with 'slow' partitions

2016-02-11 Thread Sebastian Piu
Have you tried using fair scheduler and queues
On 12 Feb 2016 4:24 a.m., "p pathiyil"  wrote:

> With this setting, I can see that the next job is being executed before
> the previous one is finished. However, the processing of the 'hot'
> partition eventually hogs all the concurrent jobs. If there was a way to
> restrict jobs to be one per partition, then this setting would provide the
> per-partition isolation.
>
> Is there anything in the framework which would give control over that
> aspect ?
>
> Thanks.
>
>
> On Thu, Feb 11, 2016 at 9:55 PM, Cody Koeninger 
> wrote:
>
>> spark.streaming.concurrentJobs
>>
>>
>> see e.g. 
>> http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
>>
>>
>> On Thu, Feb 11, 2016 at 9:33 AM, p pathiyil  wrote:
>>
>>> Thanks for the response Cody.
>>>
>>> The producers are out of my control, so can't really balance the
>>> incoming content across the various topics and partitions. The number of
>>> topics and partitions are quite large and the volume across then not very
>>> well known ahead of time. So it is quite hard to segregate low and high
>>> volume topics in to separate driver programs.
>>>
>>> Will look at shuffle / repartition.
>>>
>>> Could you share the setting for starting another batch in parallel ? It
>>> might be ok to call the 'save' of the processed messages out of order if
>>> that is the only consequence of this setting.
>>>
>>> When separate DStreams are created per partition (and if union() is not
>>> called on them), what aspect of the framework still ties the scheduling of
>>> jobs across the partitions together ? Asking this to see if creating
>>> multiple threads in the driver and calling createDirectStream per partition
>>> in those threads can provide isolation.
>>>
>>>
>>>
>>> On Thu, Feb 11, 2016 at 8:14 PM, Cody Koeninger 
>>> wrote:
>>>
 The real way to fix this is by changing partitioning, so you don't have
 a hot partition.  It would be better to do this at the time you're
 producing messages, but you can also do it with a shuffle / repartition
 during consuming.

 There is a setting to allow another batch to start in parallel, but
 that's likely to have unintended consequences.

 On Thu, Feb 11, 2016 at 7:59 AM, p pathiyil  wrote:

> Hi,
>
> I am looking at a way to isolate the processing of messages from each
> Kafka partition within the same driver.
>
> Scenario: A DStream is created with the createDirectStream call by
> passing in a few partitions. Let us say that the streaming context is
> defined to have a time duration of 2 seconds. If the processing of 
> messages
> from a single partition takes more than 2 seconds (while all the others
> finish much quicker), it seems that the next set of jobs get scheduled 
> only
> after the processing of that last partition. This means that the delay is
> effective for all partitions and not just the partition that was truly the
> cause of the delay. What I would like to do is to have the delay only
> impact the 'slow' partition.
>
> Tried to create one DStream per partition and then do a union of all
> partitions, (similar to the sample in
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
> but that didn't seem to help.
>
> Please suggest the correct approach to solve this issue.
>
> Thanks,
> Praveen.
>


>>>
>>
>


Re: Spark Streaming with Kafka: Dealing with 'slow' partitions

2016-02-11 Thread p pathiyil
With this setting, I can see that the next job is being executed before the
previous one is finished. However, the processing of the 'hot' partition
eventually hogs all the concurrent jobs. If there was a way to restrict
jobs to be one per partition, then this setting would provide the
per-partition isolation.

Is there anything in the framework which would give control over that
aspect ?

Thanks.


On Thu, Feb 11, 2016 at 9:55 PM, Cody Koeninger  wrote:

> spark.streaming.concurrentJobs
>
>
> see e.g. 
> http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
>
>
> On Thu, Feb 11, 2016 at 9:33 AM, p pathiyil  wrote:
>
>> Thanks for the response Cody.
>>
>> The producers are out of my control, so can't really balance the incoming
>> content across the various topics and partitions. The number of topics and
>> partitions are quite large and the volume across then not very well known
>> ahead of time. So it is quite hard to segregate low and high volume topics
>> in to separate driver programs.
>>
>> Will look at shuffle / repartition.
>>
>> Could you share the setting for starting another batch in parallel ? It
>> might be ok to call the 'save' of the processed messages out of order if
>> that is the only consequence of this setting.
>>
>> When separate DStreams are created per partition (and if union() is not
>> called on them), what aspect of the framework still ties the scheduling of
>> jobs across the partitions together ? Asking this to see if creating
>> multiple threads in the driver and calling createDirectStream per partition
>> in those threads can provide isolation.
>>
>>
>>
>> On Thu, Feb 11, 2016 at 8:14 PM, Cody Koeninger 
>> wrote:
>>
>>> The real way to fix this is by changing partitioning, so you don't have
>>> a hot partition.  It would be better to do this at the time you're
>>> producing messages, but you can also do it with a shuffle / repartition
>>> during consuming.
>>>
>>> There is a setting to allow another batch to start in parallel, but
>>> that's likely to have unintended consequences.
>>>
>>> On Thu, Feb 11, 2016 at 7:59 AM, p pathiyil  wrote:
>>>
 Hi,

 I am looking at a way to isolate the processing of messages from each
 Kafka partition within the same driver.

 Scenario: A DStream is created with the createDirectStream call by
 passing in a few partitions. Let us say that the streaming context is
 defined to have a time duration of 2 seconds. If the processing of messages
 from a single partition takes more than 2 seconds (while all the others
 finish much quicker), it seems that the next set of jobs get scheduled only
 after the processing of that last partition. This means that the delay is
 effective for all partitions and not just the partition that was truly the
 cause of the delay. What I would like to do is to have the delay only
 impact the 'slow' partition.

 Tried to create one DStream per partition and then do a union of all
 partitions, (similar to the sample in
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
 but that didn't seem to help.

 Please suggest the correct approach to solve this issue.

 Thanks,
 Praveen.

>>>
>>>
>>
>


RE: [Spark Streaming] Joining Kafka and Cassandra DataFrames

2016-02-09 Thread Mohammed Guller
You may have better luck with this question on the Spark Cassandra Connector 
mailing list.



One quick question about this code from your email:

   // Load DataFrame from C* data-source

   val base_data = base_data_df.getInstance(sqlContext)



What exactly is base_data_df and how are you creating it?

Mohammed
Author: Big Data Analytics with 
Spark



-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 6:58 AM
To: user@spark.apache.org
Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames



All,



I'm new to Spark and I'm having a hard time doing a simple join of two DFs



Intent:

-  I'm receiving data from Kafka via direct stream and would like to enrich the 
messages with data from Cassandra. The Kafka messages

(Protobufs) are decoded into DataFrames and then joined with a (supposedly 
pre-filtered) DF from Cassandra. The relation of (Kafka) streaming batch size 
to raw C* data is [several streaming messages to millions of C* rows], BUT the 
join always yields exactly ONE result [1:1] per message. After the join the 
resulting DF is eventually stored to another C* table.



Problem:

- Even though I'm joining the two DFs on the full Cassandra primary key and 
pushing the corresponding filter to C*, it seems that Spark is loading the 
whole C* data-set into memory before actually joining (which I'd like to 
prevent by using the filter/predicate pushdown).

This leads to a lot of shuffling and tasks being spawned, hence the "simple" 
join takes forever...



Could anyone shed some light on this? In my perception this should be a 
prime-example for DFs and Spark Streaming.



Environment:

- Spark 1.6

- Cassandra 2.1.12

- Cassandra-Spark-Connector 1.5-RC1

- Kafka 0.8.2.2



Code:



def main(args: Array[String]) {

 val conf = new SparkConf()

   .setAppName("test")

   .set("spark.cassandra.connection.host", "xxx")

   .set("spark.cassandra.connection.keep_alive_ms", "3")

   .setMaster("local[*]")



 val ssc = new StreamingContext(conf, Seconds(10))

 ssc.sparkContext.setLogLevel("INFO")



 // Initialise Kafka

 val kafkaTopics = Set[String]("xxx")

 val kafkaParams = Map[String, String](

   "metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",

   "auto.offset.reset" -> "smallest")



 // Kafka stream

 val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, 
MyMsgDecoder](ssc, kafkaParams, kafkaTopics)



 // Executed on the driver

 messages.foreachRDD { rdd =>



   // Create an instance of SQLContext

   val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)

   import sqlContext.implicits._



   // Map MyMsg RDD

   val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}



   // Convert RDD[MyMsg] to DataFrame

   val MyMsgDf = MyMsgRdd.toDF()

.select(

 $"prim1Id" as 'prim1_id,

 $"prim2Id" as 'prim2_id,

 $...

   )



   // Load DataFrame from C* data-source

   val base_data = base_data_df.getInstance(sqlContext)



   // Inner join on prim1Id and prim2Id

   val joinedDf = MyMsgDf.join(base_data,

 MyMsgDf("prim1_id") === base_data("prim1_id") &&

 MyMsgDf("prim2_id") === base_data("prim2_id"), "left")

 .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))

 && base_data("prim2_id").isin(MyMsgDf("prim2_id")))



   joinedDf.show()

   joinedDf.printSchema()



   // Select relevant fields



   // Persist



 }



 // Start the computation

 ssc.start()

 ssc.awaitTermination()

}



SO:

http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandra-dataframes-in-spark-streaming-ignores-c-predicate-p







-

To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org For 
additional commands, e-mail: 
user-h...@spark.apache.org




Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

2016-02-09 Thread bernhard

Hi Mohammed

I'm aware of that documentation, what are you hinting at specifically?  
I'm pushing all elements of the partition key, so that should work. As  
user zero323 on SO pointed out it the problem is most probably related  
to the dynamic nature of the predicate elements (two distributed  
collections per filter per join).


The statement "To push down partition keys, all of them must be  
included, but not more than one predicate per partition key, otherwise  
nothing is pushed down."


Does not apply IMO?

Bernhard

Quoting Mohammed Guller <moham...@glassbeam.com>:


Hi Bernhard,

Take a look at the examples shown under the "Pushing down clauses to  
Cassandra" sections on this page:


https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md


Mohammed
Author: Big Data Analytics with Spark

-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 10:05 PM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Hi Mohammed

Thanks for hint, I should probably do that :)

As for the DF singleton:

/**
  * Lazily instantiated singleton instance of base_data DataFrame
  */
object base_data_df {

   @transient private var instance: DataFrame = _

   def getInstance(sqlContext: SQLContext): DataFrame = {
 if (instance == null) {
   // Load DataFrame with C* data-source
   instance = sqlContext.read
 .format("org.apache.spark.sql.cassandra")
 .options(Map("table" -> "cf", "keyspace" -> "ks"))
 .load()
 }
 instance
   }
}

Bernhard

Quoting Mohammed Guller <moham...@glassbeam.com>:


You may have better luck with this question on the Spark Cassandra
Connector mailing list.



One quick question about this code from your email:

   // Load DataFrame from C* data-source

   val base_data = base_data_df.getInstance(sqlContext)



What exactly is base_data_df and how are you creating it?

Mohammed
Author: Big Data Analytics with
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/
1484209656/>



-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 6:58 AM
To: user@spark.apache.org
Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames



All,



I'm new to Spark and I'm having a hard time doing a simple join of two
DFs



Intent:

-  I'm receiving data from Kafka via direct stream and would like to
enrich the messages with data from Cassandra. The Kafka messages

(Protobufs) are decoded into DataFrames and then joined with a
(supposedly pre-filtered) DF from Cassandra. The relation of (Kafka)
streaming batch size to raw C* data is [several streaming messages to
millions of C* rows], BUT the join always yields exactly ONE result
[1:1] per message. After the join the resulting DF is eventually
stored to another C* table.



Problem:

- Even though I'm joining the two DFs on the full Cassandra primary
key and pushing the corresponding filter to C*, it seems that Spark is
loading the whole C* data-set into memory before actually joining
(which I'd like to prevent by using the filter/predicate pushdown).

This leads to a lot of shuffling and tasks being spawned, hence the
"simple" join takes forever...



Could anyone shed some light on this? In my perception this should be
a prime-example for DFs and Spark Streaming.



Environment:

- Spark 1.6

- Cassandra 2.1.12

- Cassandra-Spark-Connector 1.5-RC1

- Kafka 0.8.2.2



Code:



def main(args: Array[String]) {

 val conf = new SparkConf()

   .setAppName("test")

   .set("spark.cassandra.connection.host", "xxx")

   .set("spark.cassandra.connection.keep_alive_ms", "3")

   .setMaster("local[*]")



 val ssc = new StreamingContext(conf, Seconds(10))

 ssc.sparkContext.setLogLevel("INFO")



 // Initialise Kafka

 val kafkaTopics = Set[String]("xxx")

 val kafkaParams = Map[String, String](

   "metadata.broker.list" ->
"xxx:32000,xxx:32000,xxx:32000,xxx:32000",

   "auto.offset.reset" -> "smallest")



 // Kafka stream

 val messages = KafkaUtils.createDirectStream[String, MyMsg,
StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)



 // Executed on the driver

 messages.foreachRDD { rdd =>



   // Create an instance of SQLContext

   val sqlContext =
SQLContextSingleton.getInstance(rdd.sparkContext)

   import sqlContext.implicits._



   // Map MyMsg RDD

   val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}



   // Convert RDD[MyMsg] to DataFrame

   val MyMsgDf = MyMsgRdd.toDF()

.select(

 $"prim1

Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

2016-02-09 Thread bernhard
The filter in the join is re-arranged in the DAG (from what I can tell  
--> explain/UI) and should therefore be pushed accordingly. I also  
made experiments applying the filter to base_data before the join  
explicitly, effectively creating a new DF, but no luck either.



Quoting Mohammed Guller <moham...@glassbeam.com>:

Moving the spark mailing list to BCC since this is not really  
related to Spark.


May be I am missing something, but where are you calling the filter  
method on the base_data DF to push down the predicates to Cassandra  
before calling the join method?


Mohammed
Author: Big Data Analytics with Spark


-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 10:47 PM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Hi Mohammed

I'm aware of that documentation, what are you hinting at specifically?
I'm pushing all elements of the partition key, so that should work.  
As user zero323 on SO pointed out it the problem is most probably  
related to the dynamic nature of the predicate elements (two  
distributed collections per filter per join).


The statement "To push down partition keys, all of them must be  
included, but not more than one predicate per partition key,  
otherwise nothing is pushed down."


Does not apply IMO?

Bernhard

Quoting Mohammed Guller <moham...@glassbeam.com>:


Hi Bernhard,

Take a look at the examples shown under the "Pushing down clauses to
Cassandra" sections on this page:

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/
14_data_frames.md


Mohammed
Author: Big Data Analytics with Spark

-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 10:05 PM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Hi Mohammed

Thanks for hint, I should probably do that :)

As for the DF singleton:

/**
  * Lazily instantiated singleton instance of base_data DataFrame
  */
object base_data_df {

   @transient private var instance: DataFrame = _

   def getInstance(sqlContext: SQLContext): DataFrame = {
 if (instance == null) {
   // Load DataFrame with C* data-source
   instance = sqlContext.read
 .format("org.apache.spark.sql.cassandra")
 .options(Map("table" -> "cf", "keyspace" -> "ks"))
 .load()
 }
 instance
   }
}

Bernhard

Quoting Mohammed Guller <moham...@glassbeam.com>:


You may have better luck with this question on the Spark Cassandra
Connector mailing list.



One quick question about this code from your email:

   // Load DataFrame from C* data-source

   val base_data = base_data_df.getInstance(sqlContext)



What exactly is base_data_df and how are you creating it?

Mohammed
Author: Big Data Analytics with
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp
/
1484209656/>



-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 6:58 AM
To: user@spark.apache.org
Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames



All,



I'm new to Spark and I'm having a hard time doing a simple join of
two DFs



Intent:

-  I'm receiving data from Kafka via direct stream and would like to
enrich the messages with data from Cassandra. The Kafka messages

(Protobufs) are decoded into DataFrames and then joined with a
(supposedly pre-filtered) DF from Cassandra. The relation of (Kafka)
streaming batch size to raw C* data is [several streaming messages to
millions of C* rows], BUT the join always yields exactly ONE result
[1:1] per message. After the join the resulting DF is eventually
stored to another C* table.



Problem:

- Even though I'm joining the two DFs on the full Cassandra primary
key and pushing the corresponding filter to C*, it seems that Spark
is loading the whole C* data-set into memory before actually joining
(which I'd like to prevent by using the filter/predicate pushdown).

This leads to a lot of shuffling and tasks being spawned, hence the
"simple" join takes forever...



Could anyone shed some light on this? In my perception this should be
a prime-example for DFs and Spark Streaming.



Environment:

- Spark 1.6

- Cassandra 2.1.12

- Cassandra-Spark-Connector 1.5-RC1

- Kafka 0.8.2.2



Code:



def main(args: Array[String]) {

 val conf = new SparkConf()

   .setAppName("test")

   .set("spark.cassandra.connection.host", "xxx")

   .set("spark.cassandra.connection.keep_alive_ms", "3")

   .setMaster("local[*]")



 val ssc = new StreamingContext(conf, Seconds(10))

 ssc.sparkContext.setLogLevel("INFO")




RE: [Spark Streaming] Joining Kafka and Cassandra DataFrames

2016-02-09 Thread Mohammed Guller
Hi Bernhard,

Take a look at the examples shown under the "Pushing down clauses to Cassandra" 
sections on this page:

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md


Mohammed
Author: Big Data Analytics with Spark

-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch] 
Sent: Tuesday, February 9, 2016 10:05 PM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Hi Mohammed

Thanks for hint, I should probably do that :)

As for the DF singleton:

/**
  * Lazily instantiated singleton instance of base_data DataFrame
  */
object base_data_df {

   @transient private var instance: DataFrame = _

   def getInstance(sqlContext: SQLContext): DataFrame = {
 if (instance == null) {
   // Load DataFrame with C* data-source
   instance = sqlContext.read
 .format("org.apache.spark.sql.cassandra")
 .options(Map("table" -> "cf", "keyspace" -> "ks"))
 .load()
 }
 instance
   }
}

Bernhard

Quoting Mohammed Guller <moham...@glassbeam.com>:

> You may have better luck with this question on the Spark Cassandra 
> Connector mailing list.
>
>
>
> One quick question about this code from your email:
>
>// Load DataFrame from C* data-source
>
>val base_data = base_data_df.getInstance(sqlContext)
>
>
>
> What exactly is base_data_df and how are you creating it?
>
> Mohammed
> Author: Big Data Analytics with
> Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/
> 1484209656/>
>
>
>
> -Original Message-
> From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
> Sent: Tuesday, February 9, 2016 6:58 AM
> To: user@spark.apache.org
> Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>
>
>
> All,
>
>
>
> I'm new to Spark and I'm having a hard time doing a simple join of two 
> DFs
>
>
>
> Intent:
>
> -  I'm receiving data from Kafka via direct stream and would like to 
> enrich the messages with data from Cassandra. The Kafka messages
>
> (Protobufs) are decoded into DataFrames and then joined with a 
> (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka) 
> streaming batch size to raw C* data is [several streaming messages to 
> millions of C* rows], BUT the join always yields exactly ONE result 
> [1:1] per message. After the join the resulting DF is eventually 
> stored to another C* table.
>
>
>
> Problem:
>
> - Even though I'm joining the two DFs on the full Cassandra primary 
> key and pushing the corresponding filter to C*, it seems that Spark is 
> loading the whole C* data-set into memory before actually joining 
> (which I'd like to prevent by using the filter/predicate pushdown).
>
> This leads to a lot of shuffling and tasks being spawned, hence the 
> "simple" join takes forever...
>
>
>
> Could anyone shed some light on this? In my perception this should be 
> a prime-example for DFs and Spark Streaming.
>
>
>
> Environment:
>
> - Spark 1.6
>
> - Cassandra 2.1.12
>
> - Cassandra-Spark-Connector 1.5-RC1
>
> - Kafka 0.8.2.2
>
>
>
> Code:
>
>
>
> def main(args: Array[String]) {
>
>  val conf = new SparkConf()
>
>.setAppName("test")
>
>.set("spark.cassandra.connection.host", "xxx")
>
>.set("spark.cassandra.connection.keep_alive_ms", "3")
>
>.setMaster("local[*]")
>
>
>
>  val ssc = new StreamingContext(conf, Seconds(10))
>
>  ssc.sparkContext.setLogLevel("INFO")
>
>
>
>  // Initialise Kafka
>
>  val kafkaTopics = Set[String]("xxx")
>
>  val kafkaParams = Map[String, String](
>
>"metadata.broker.list" -> 
> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
>
>"auto.offset.reset" -> "smallest")
>
>
>
>  // Kafka stream
>
>  val messages = KafkaUtils.createDirectStream[String, MyMsg, 
> StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)
>
>
>
>  // Executed on the driver
>
>  messages.foreachRDD { rdd =>
>
>
>
>// Create an instance of SQLContext
>
>val sqlContext = 
> SQLContextSingleton.getInstance(rdd.sparkContext)
>
>import sqlContext.implicits._
>
>
>
>// Map MyMsg RDD
>
>val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
>
>
>
>// Convert RDD[MyMsg] to DataFrame
>
>  

RE: [Spark Streaming] Joining Kafka and Cassandra DataFrames

2016-02-09 Thread Mohammed Guller
Moving the spark mailing list to BCC since this is not really related to Spark.

May be I am missing something, but where are you calling the filter method on 
the base_data DF to push down the predicates to Cassandra before calling the 
join method? 

Mohammed
Author: Big Data Analytics with Spark


-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch] 
Sent: Tuesday, February 9, 2016 10:47 PM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Hi Mohammed

I'm aware of that documentation, what are you hinting at specifically?  
I'm pushing all elements of the partition key, so that should work. As user 
zero323 on SO pointed out it the problem is most probably related to the 
dynamic nature of the predicate elements (two distributed collections per 
filter per join).

The statement "To push down partition keys, all of them must be included, but 
not more than one predicate per partition key, otherwise nothing is pushed 
down."

Does not apply IMO?

Bernhard

Quoting Mohammed Guller <moham...@glassbeam.com>:

> Hi Bernhard,
>
> Take a look at the examples shown under the "Pushing down clauses to 
> Cassandra" sections on this page:
>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/
> 14_data_frames.md
>
>
> Mohammed
> Author: Big Data Analytics with Spark
>
> -Original Message-
> From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
> Sent: Tuesday, February 9, 2016 10:05 PM
> To: Mohammed Guller
> Cc: user@spark.apache.org
> Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>
> Hi Mohammed
>
> Thanks for hint, I should probably do that :)
>
> As for the DF singleton:
>
> /**
>   * Lazily instantiated singleton instance of base_data DataFrame
>   */
> object base_data_df {
>
>@transient private var instance: DataFrame = _
>
>def getInstance(sqlContext: SQLContext): DataFrame = {
>  if (instance == null) {
>// Load DataFrame with C* data-source
>instance = sqlContext.read
>  .format("org.apache.spark.sql.cassandra")
>  .options(Map("table" -> "cf", "keyspace" -> "ks"))
>  .load()
>  }
>  instance
>}
> }
>
> Bernhard
>
> Quoting Mohammed Guller <moham...@glassbeam.com>:
>
>> You may have better luck with this question on the Spark Cassandra 
>> Connector mailing list.
>>
>>
>>
>> One quick question about this code from your email:
>>
>>// Load DataFrame from C* data-source
>>
>>val base_data = base_data_df.getInstance(sqlContext)
>>
>>
>>
>> What exactly is base_data_df and how are you creating it?
>>
>> Mohammed
>> Author: Big Data Analytics with
>> Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp
>> /
>> 1484209656/>
>>
>>
>>
>> -Original Message-
>> From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
>> Sent: Tuesday, February 9, 2016 6:58 AM
>> To: user@spark.apache.org
>> Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>>
>>
>>
>> All,
>>
>>
>>
>> I'm new to Spark and I'm having a hard time doing a simple join of 
>> two DFs
>>
>>
>>
>> Intent:
>>
>> -  I'm receiving data from Kafka via direct stream and would like to 
>> enrich the messages with data from Cassandra. The Kafka messages
>>
>> (Protobufs) are decoded into DataFrames and then joined with a 
>> (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka) 
>> streaming batch size to raw C* data is [several streaming messages to 
>> millions of C* rows], BUT the join always yields exactly ONE result 
>> [1:1] per message. After the join the resulting DF is eventually 
>> stored to another C* table.
>>
>>
>>
>> Problem:
>>
>> - Even though I'm joining the two DFs on the full Cassandra primary 
>> key and pushing the corresponding filter to C*, it seems that Spark 
>> is loading the whole C* data-set into memory before actually joining 
>> (which I'd like to prevent by using the filter/predicate pushdown).
>>
>> This leads to a lot of shuffling and tasks being spawned, hence the 
>> "simple" join takes forever...
>>
>>
>>
>> Could anyone shed some light on this? In my perception this should be 
>> a prime-example for DFs and Spark Streaming.
>>
>>
>>
>> Environment:
>>
>> - Spa

Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

2016-02-09 Thread bernhard

Hi Mohammed

Thanks for hint, I should probably do that :)

As for the DF singleton:

/**
 * Lazily instantiated singleton instance of base_data DataFrame
 */
object base_data_df {

  @transient private var instance: DataFrame = _

  def getInstance(sqlContext: SQLContext): DataFrame = {
if (instance == null) {
  // Load DataFrame with C* data-source
  instance = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "cf", "keyspace" -> "ks"))
.load()
}
instance
  }
}

Bernhard

Quoting Mohammed Guller :

You may have better luck with this question on the Spark Cassandra  
Connector mailing list.




One quick question about this code from your email:

   // Load DataFrame from C* data-source

   val base_data = base_data_df.getInstance(sqlContext)



What exactly is base_data_df and how are you creating it?

Mohammed
Author: Big Data Analytics with  
Spark




-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 6:58 AM
To: user@spark.apache.org
Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames



All,



I'm new to Spark and I'm having a hard time doing a simple join of two DFs



Intent:

-  I'm receiving data from Kafka via direct stream and would like to  
enrich the messages with data from Cassandra. The Kafka messages


(Protobufs) are decoded into DataFrames and then joined with a  
(supposedly pre-filtered) DF from Cassandra. The relation of (Kafka)  
streaming batch size to raw C* data is [several streaming messages  
to millions of C* rows], BUT the join always yields exactly ONE  
result [1:1] per message. After the join the resulting DF is  
eventually stored to another C* table.




Problem:

- Even though I'm joining the two DFs on the full Cassandra primary  
key and pushing the corresponding filter to C*, it seems that Spark  
is loading the whole C* data-set into memory before actually joining  
(which I'd like to prevent by using the filter/predicate pushdown).


This leads to a lot of shuffling and tasks being spawned, hence the  
"simple" join takes forever...




Could anyone shed some light on this? In my perception this should  
be a prime-example for DFs and Spark Streaming.




Environment:

- Spark 1.6

- Cassandra 2.1.12

- Cassandra-Spark-Connector 1.5-RC1

- Kafka 0.8.2.2



Code:



def main(args: Array[String]) {

 val conf = new SparkConf()

   .setAppName("test")

   .set("spark.cassandra.connection.host", "xxx")

   .set("spark.cassandra.connection.keep_alive_ms", "3")

   .setMaster("local[*]")



 val ssc = new StreamingContext(conf, Seconds(10))

 ssc.sparkContext.setLogLevel("INFO")



 // Initialise Kafka

 val kafkaTopics = Set[String]("xxx")

 val kafkaParams = Map[String, String](

   "metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",

   "auto.offset.reset" -> "smallest")



 // Kafka stream

 val messages = KafkaUtils.createDirectStream[String, MyMsg,  
StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)




 // Executed on the driver

 messages.foreachRDD { rdd =>



   // Create an instance of SQLContext

   val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)

   import sqlContext.implicits._



   // Map MyMsg RDD

   val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}



   // Convert RDD[MyMsg] to DataFrame

   val MyMsgDf = MyMsgRdd.toDF()

.select(

 $"prim1Id" as 'prim1_id,

 $"prim2Id" as 'prim2_id,

 $...

   )



   // Load DataFrame from C* data-source

   val base_data = base_data_df.getInstance(sqlContext)



   // Inner join on prim1Id and prim2Id

   val joinedDf = MyMsgDf.join(base_data,

 MyMsgDf("prim1_id") === base_data("prim1_id") &&

 MyMsgDf("prim2_id") === base_data("prim2_id"), "left")

 .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))

 && base_data("prim2_id").isin(MyMsgDf("prim2_id")))



   joinedDf.show()

   joinedDf.printSchema()



   // Select relevant fields



   // Persist



 }



 // Start the computation

 ssc.start()

 ssc.awaitTermination()

}



SO:

http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandra-dataframes-in-spark-streaming-ignores-c-predicate-p







-

To unsubscribe, e-mail:  
user-unsubscr...@spark.apache.org  
For additional commands, e-mail:  
user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

Re: Spark Streaming: My kafka receivers are not consuming in parallel

2016-02-03 Thread Jorge Rodriguez
Please ignore this question, as i've figured out what my problem was.

In the case that anyone else runs into something similar, the problem was
on the kafka side.  I was using the console producer to generate the
messages going into the kafka logs.  This producer will send all of the
messages to the same partition, unless you specify the "--new-producer"
parameter.

Thanks,
Jorge

On Wed, Feb 3, 2016 at 12:44 PM, Jorge Rodriguez 
wrote:

> Hello Spark users,
>
> We are setting up our fist bach of spark streaming pipelines.  And I am
> running into an issue which I am not sure how to resolve, but seems like
> should be fairly trivial.
>
> I am using receiver-mode Kafka consumer that comes with Spark, and running
> in standalone mode.  I've setup two receivers, which are consuming from a 4
> broker, 4 partition kafka topic.
>
> If you will look at the image below, you will see that* even though I
> have two receivers, only one of them ever consumes data at a time*.  I
> believe this to be my current bottleneck for scaling.
>
> What am I missing?
>
> To me, order of events consumed is not important.  I just want to optimize
> for maximum throughput.
>
>
> [image: Inline image 1]
>
> Thanks in advance for any help or tips!
>
> Jorge
>


Re: Spark Streaming with Kafka - batch DStreams in memory

2016-02-02 Thread Cody Koeninger
It's possible you could (ab)use updateStateByKey or mapWithState for this.

But honestly it's probably a lot more straightforward to just choose a
reasonable batch size that gets you a reasonable file size for most of your
keys, then use filecrush or something similar to deal with the hdfs small
file problem.

On Mon, Feb 1, 2016 at 10:11 PM, p pathiyil  wrote:

> Hi,
>
> Are there any ways to store DStreams / RDD read from Kafka in memory to be
> processed at a later time ? What we need to do is to read data from Kafka,
> process it to be keyed by some attribute that is present in the Kafka
> messages, and write out the data related to each key when we have
> accumulated enough data for that key to write out a file that is close to
> the HDFS block size, say 64MB. We are looking at ways to avoid writing out
> some file of the entire Kafka content periodically and then later run a
> second job to read those files and split them out to another set of files
> as necessary.
>
> Thanks.
>


Re: Spark Streaming 1.5.2+Kafka+Python. Strange reading

2015-12-24 Thread Akhil Das
Would you mind posting the relevant code snippet?

Thanks
Best Regards

On Wed, Dec 23, 2015 at 7:33 PM, Vyacheslav Yanuk 
wrote:

> Hi.
> I have very strange situation with direct reading from Kafka.
> For example.
> I have 1000 messages in Kafka.
> After submitting my application I read this data and process it.
> As I process the data I have accumulated 10 new entries.
> In next reading from Kafka I read only 3 records, but not 10!!!
> Why???
> I don't understand...
> Explain to me please!
>
> --
> WBR, Vyacheslav Yanuk
> Codeminders.com
>


Re: Spark Streaming 1.5.2+Kafka+Python (docs)

2015-12-23 Thread Cody Koeninger
Read the documentation
spark.apache.org/docs/latest/streaming-kafka-integration.html
If you still have questions, read the resources linked from
https://github.com/koeninger/kafka-exactly-once

On Wed, Dec 23, 2015 at 7:24 AM, Vyacheslav Yanuk 
wrote:

> Colleagues
> Documents written about  createDirectStream that
>
> "This does not use Zookeeper to store offsets. The consumed offsets are
> tracked by the stream itself. For interoperability with Kafka monitoring
> tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself
> from the streaming application. You can access the offsets used in each
> batch from the generated RDDs (see   "
>
> My question is.
> How I can access the offsets used in each batch ???
> What I should SEE???
>
> --
> WBR, Vyacheslav Yanuk
> Codeminders.com
>


Re: Spark Streaming Specify Kafka Partition

2015-12-04 Thread Cody Koeninger
So createDirectStream will give you a JavaInputDStream of R, where R is the
return type you chose for your message handler.

If you want a JavaPairInputDStream, you may have to call .mapToPair in
order to convert the stream, even if the type you chose for R was already
Tuple2

(note that I try to stay as far away from Java as possible, so this answer
is untested, possibly inaccurate, may throw checked exceptions etc etc)

On Thu, Dec 3, 2015 at 5:21 PM, Alan Braithwaite 
wrote:

> One quick newbie question since I got another chance to look at this
> today.  We're using java for our spark applications.  The
> createDirectStream we were using previously [1] returns a
> JavaPairInputDStream, but the createDirectStream with fromOffsets expects
> an argument recordClass to pass into the generic constructor for
> createDirectStream.
>
> In the code for the first function signature (without fromOffsets) it's
> being constructed in Scala as just a tuple (K, V).   How do I pass this
> same class/type information from java as the record class to get a 
> JavaPairInputDStream V>?
>
> I understand this might be a question more fit for a scala mailing list
> but google is failing me at the moment for hints on the interoperability of
> scala and java generics.
>
> [1] The original createDirectStream:
> https://github.com/apache/spark/blob/branch-1.5/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala#L395-L423
>
> Thanks,
> - Alan
>
> On Tue, Dec 1, 2015 at 8:12 AM, Cody Koeninger  wrote:
>
>> I actually haven't tried that, since I tend to do the offset lookups if
>> necessary.
>>
>> It's possible that it will work, try it and let me know.
>>
>> Be aware that if you're doing a count() or take() operation directly on
>> the rdd it'll definitely give you the wrong result if you're using -1 for
>> one of the offsets.
>>
>>
>>
>> On Tue, Dec 1, 2015 at 9:58 AM, Alan Braithwaite 
>> wrote:
>>
>>> Neat, thanks.  If I specify something like -1 as the offset, will it
>>> consume from the latest offset or do I have to instrument that manually?
>>>
>>> - Alan
>>>
>>> On Tue, Dec 1, 2015 at 6:43 AM, Cody Koeninger 
>>> wrote:
>>>
 Yes, there is a version of createDirectStream that lets you specify
 fromOffsets: Map[TopicAndPartition, Long]

 On Mon, Nov 30, 2015 at 7:43 PM, Alan Braithwaite 
 wrote:

> Is there any mechanism in the kafka streaming source to specify the
> exact partition id that we want a streaming job to consume from?
>
> If not, is there a workaround besides writing our a custom receiver?
>
> Thanks,
> - Alan
>


>>>
>>
>


Re: Spark Streaming Specify Kafka Partition

2015-12-03 Thread Alan Braithwaite
One quick newbie question since I got another chance to look at this
today.  We're using java for our spark applications.  The createDirectStream
we were using previously [1] returns a JavaPairInputDStream, but the
createDirectStream with fromOffsets expects an argument recordClass to pass
into the generic constructor for createDirectStream.

In the code for the first function signature (without fromOffsets) it's
being constructed in Scala as just a tuple (K, V).   How do I pass this
same class/type information from java as the record class to get a
JavaPairInputDStream?

I understand this might be a question more fit for a scala mailing list but
google is failing me at the moment for hints on the interoperability of
scala and java generics.

[1] The original createDirectStream:
https://github.com/apache/spark/blob/branch-1.5/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala#L395-L423

Thanks,
- Alan

On Tue, Dec 1, 2015 at 8:12 AM, Cody Koeninger  wrote:

> I actually haven't tried that, since I tend to do the offset lookups if
> necessary.
>
> It's possible that it will work, try it and let me know.
>
> Be aware that if you're doing a count() or take() operation directly on
> the rdd it'll definitely give you the wrong result if you're using -1 for
> one of the offsets.
>
>
>
> On Tue, Dec 1, 2015 at 9:58 AM, Alan Braithwaite 
> wrote:
>
>> Neat, thanks.  If I specify something like -1 as the offset, will it
>> consume from the latest offset or do I have to instrument that manually?
>>
>> - Alan
>>
>> On Tue, Dec 1, 2015 at 6:43 AM, Cody Koeninger 
>> wrote:
>>
>>> Yes, there is a version of createDirectStream that lets you specify
>>> fromOffsets: Map[TopicAndPartition, Long]
>>>
>>> On Mon, Nov 30, 2015 at 7:43 PM, Alan Braithwaite 
>>> wrote:
>>>
 Is there any mechanism in the kafka streaming source to specify the
 exact partition id that we want a streaming job to consume from?

 If not, is there a workaround besides writing our a custom receiver?

 Thanks,
 - Alan

>>>
>>>
>>
>


Re: Spark Streaming Specify Kafka Partition

2015-12-01 Thread Alan Braithwaite
Neat, thanks.  If I specify something like -1 as the offset, will it
consume from the latest offset or do I have to instrument that manually?

- Alan

On Tue, Dec 1, 2015 at 6:43 AM, Cody Koeninger  wrote:

> Yes, there is a version of createDirectStream that lets you specify
> fromOffsets: Map[TopicAndPartition, Long]
>
> On Mon, Nov 30, 2015 at 7:43 PM, Alan Braithwaite 
> wrote:
>
>> Is there any mechanism in the kafka streaming source to specify the exact
>> partition id that we want a streaming job to consume from?
>>
>> If not, is there a workaround besides writing our a custom receiver?
>>
>> Thanks,
>> - Alan
>>
>
>


Re: Spark Streaming Specify Kafka Partition

2015-12-01 Thread Cody Koeninger
Yes, there is a version of createDirectStream that lets you specify
fromOffsets: Map[TopicAndPartition, Long]

On Mon, Nov 30, 2015 at 7:43 PM, Alan Braithwaite 
wrote:

> Is there any mechanism in the kafka streaming source to specify the exact
> partition id that we want a streaming job to consume from?
>
> If not, is there a workaround besides writing our a custom receiver?
>
> Thanks,
> - Alan
>


Re: Spark Streaming Specify Kafka Partition

2015-12-01 Thread Cody Koeninger
I actually haven't tried that, since I tend to do the offset lookups if
necessary.

It's possible that it will work, try it and let me know.

Be aware that if you're doing a count() or take() operation directly on the
rdd it'll definitely give you the wrong result if you're using -1 for one
of the offsets.



On Tue, Dec 1, 2015 at 9:58 AM, Alan Braithwaite 
wrote:

> Neat, thanks.  If I specify something like -1 as the offset, will it
> consume from the latest offset or do I have to instrument that manually?
>
> - Alan
>
> On Tue, Dec 1, 2015 at 6:43 AM, Cody Koeninger  wrote:
>
>> Yes, there is a version of createDirectStream that lets you specify
>> fromOffsets: Map[TopicAndPartition, Long]
>>
>> On Mon, Nov 30, 2015 at 7:43 PM, Alan Braithwaite 
>> wrote:
>>
>>> Is there any mechanism in the kafka streaming source to specify the
>>> exact partition id that we want a streaming job to consume from?
>>>
>>> If not, is there a workaround besides writing our a custom receiver?
>>>
>>> Thanks,
>>> - Alan
>>>
>>
>>
>


Re: Spark Streaming and Kafka MultiNode Setup - Data Locality

2015-09-21 Thread Adrian Tanase
We do - using Spark streaming, Kafka, HDFS all collocated on the same nodes. 
Works great so far.


Spark picks up the location information and reads data from the partitions 
hosted by the local broker, showing up as NODE_LOCAL in the UI.

You also need to look at the locality options in the config 
(spark.locality.waitand friends) - just to make sure you're not wasting time if 
the kafka cluster becomes unbalanced and there are fewer cores than partitions 
on a particular node - you want to get to RACK_LOCAL as quickly as possible, 
we've set this to 500 milis instead of the default of 3 seconds.

-adrian


From: Cody Koeninger <c...@koeninger.org>
Sent: Monday, September 21, 2015 10:19 PM
To: Ashish Soni
Cc: user
Subject: Re: Spark Streaming and Kafka MultiNode Setup - Data Locality

The direct stream already uses the kafka leader for a given partition as the 
preferred location.

I don't run kafka on the same nodes as spark, and I don't know anyone who does, 
so that situation isn't particularly well tested.

On Mon, Sep 21, 2015 at 1:15 PM, Ashish Soni 
<asoni.le...@gmail.com<mailto:asoni.le...@gmail.com>> wrote:
Hi All ,

Just wanted to find out if there is an benefits to installing  kafka brokers 
and spark nodes on the same machine ?

is it possible that spark can pull data from kafka if it is local to the node 
i.e. the broker or partition is on the same machine.

Thanks,
Ashish



Re: Spark Streaming and Kafka MultiNode Setup - Data Locality

2015-09-21 Thread Cody Koeninger
The direct stream already uses the kafka leader for a given partition as
the preferred location.

I don't run kafka on the same nodes as spark, and I don't know anyone who
does, so that situation isn't particularly well tested.

On Mon, Sep 21, 2015 at 1:15 PM, Ashish Soni  wrote:

> Hi All ,
>
> Just wanted to find out if there is an benefits to installing  kafka
> brokers and spark nodes on the same machine ?
>
> is it possible that spark can pull data from kafka if it is local to the
> node i.e. the broker or partition is on the same machine.
>
> Thanks,
> Ashish
>


Re: spark streaming 1.3 kafka topic error

2015-08-31 Thread Cody Koeninger
You can't set it to less than 1

Just set it to max int if that's really what you want to do

On Mon, Aug 31, 2015 at 6:00 AM, Shushant Arora 
wrote:

> Say if my cluster takes long time for rebalance for some reason
> intermittently . So to handle that Can I have infinite retries instead of
> killing the app? What should be the value of retries (-1) will work or
> something else ?
>
> On Thu, Aug 27, 2015 at 6:46 PM, Cody Koeninger 
> wrote:
>
>> Your kafka broker died or you otherwise had a rebalance.
>>
>> Normally spark retries take care of that.
>>
>> Is there something going on with your kafka installation, that rebalance
>> is taking especially long?
>>
>> Yes, increasing backoff / max number of retries will "help", but it's
>> better to figure out what's going on with kafka.
>>
>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> My streaming application gets killed with below error
>>>
>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>> kafka.common.NotLeaderForPartitionException,
>>> kafka.common.NotLeaderForPartitionException,
>>> kafka.common.NotLeaderForPartitionException,
>>> kafka.common.NotLeaderForPartitionException,
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>>> [testtopic,193]))
>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs
>>> for time 144062612 ms
>>> org.apache.spark.SparkException:
>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([testtopic,115]))
>>> at
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>> at
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>> at
>>>
>>>
>>>
>>> Kafka params in job logs printed are :
>>>  value.serializer = class
>>> org.apache.kafka.common.serialization.StringSerializer
>>> key.serializer = class
>>> org.apache.kafka.common.serialization.StringSerializer
>>> block.on.buffer.full = true
>>> retry.backoff.ms = 100
>>> buffer.memory = 1048576
>>> batch.size = 16384
>>> metrics.sample.window.ms = 3
>>> metadata.max.age.ms = 30
>>> receive.buffer.bytes = 32768
>>> timeout.ms = 3
>>> max.in.flight.requests.per.connection = 5
>>> bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
>>> metric.reporters = []
>>> client.id =
>>> compression.type = none
>>> retries = 0
>>> max.request.size = 1048576
>>> send.buffer.bytes = 131072
>>> acks = all
>>> reconnect.backoff.ms = 10
>>> linger.ms = 0
>>> metrics.num.samples = 2
>>> metadata.fetch.timeout.ms = 6
>>>
>>>
>>> Is it kafka broker getting down and job is getting killed ? Whats the
>>> best way to handle it ?
>>> Increasing retries and backoff time  wil help and to what values those
>>> should be set to never have streaming application failure - rather it keep
>>> on retrying after few seconds and send a event so that my custom code can
>>> send notification of kafka broker down if its because of that.
>>>
>>>
>>> Thanks
>>>
>>>
>>
>


Re: spark streaming 1.3 kafka topic error

2015-08-31 Thread Shushant Arora
Say if my cluster takes long time for rebalance for some reason
intermittently . So to handle that Can I have infinite retries instead of
killing the app? What should be the value of retries (-1) will work or
something else ?

On Thu, Aug 27, 2015 at 6:46 PM, Cody Koeninger  wrote:

> Your kafka broker died or you otherwise had a rebalance.
>
> Normally spark retries take care of that.
>
> Is there something going on with your kafka installation, that rebalance
> is taking especially long?
>
> Yes, increasing backoff / max number of retries will "help", but it's
> better to figure out what's going on with kafka.
>
> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora  > wrote:
>
>> Hi
>>
>> My streaming application gets killed with below error
>>
>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>> kafka.common.NotLeaderForPartitionException,
>> kafka.common.NotLeaderForPartitionException,
>> kafka.common.NotLeaderForPartitionException,
>> kafka.common.NotLeaderForPartitionException,
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>> [testtopic,193]))
>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for
>> time 144062612 ms
>> org.apache.spark.SparkException:
>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([testtopic,115]))
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>> at
>>
>>
>>
>> Kafka params in job logs printed are :
>>  value.serializer = class
>> org.apache.kafka.common.serialization.StringSerializer
>> key.serializer = class
>> org.apache.kafka.common.serialization.StringSerializer
>> block.on.buffer.full = true
>> retry.backoff.ms = 100
>> buffer.memory = 1048576
>> batch.size = 16384
>> metrics.sample.window.ms = 3
>> metadata.max.age.ms = 30
>> receive.buffer.bytes = 32768
>> timeout.ms = 3
>> max.in.flight.requests.per.connection = 5
>> bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
>> metric.reporters = []
>> client.id =
>> compression.type = none
>> retries = 0
>> max.request.size = 1048576
>> send.buffer.bytes = 131072
>> acks = all
>> reconnect.backoff.ms = 10
>> linger.ms = 0
>> metrics.num.samples = 2
>> metadata.fetch.timeout.ms = 6
>>
>>
>> Is it kafka broker getting down and job is getting killed ? Whats the
>> best way to handle it ?
>> Increasing retries and backoff time  wil help and to what values those
>> should be set to never have streaming application failure - rather it keep
>> on retrying after few seconds and send a event so that my custom code can
>> send notification of kafka broker down if its because of that.
>>
>>
>> Thanks
>>
>>
>


Re: spark streaming 1.3 kafka topic error

2015-08-27 Thread Ahmed Nawar
Dears,

I needs to commit DB Transaction for each partition,Not for each row.
below didn't work for me.


rdd.mapPartitions(partitionOfRecords = {

DBConnectionInit()

val results = partitionOfRecords.map(..)

DBConnection.commit()


})



Best regards,

Ahmed Atef Nawwar

Data Management  Big Data Consultant






On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger c...@koeninger.org wrote:

 Your kafka broker died or you otherwise had a rebalance.

 Normally spark retries take care of that.

 Is there something going on with your kafka installation, that rebalance
 is taking especially long?

 Yes, increasing backoff / max number of retries will help, but it's
 better to figure out what's going on with kafka.

 On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Hi

 My streaming application gets killed with below error

 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
 ArrayBuffer(kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 org.apache.spark.SparkException: Couldn't find leader offsets for
 Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
 [testtopic,193]))
 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for
 time 144062612 ms
 org.apache.spark.SparkException:
 ArrayBuffer(kafka.common.NotLeaderForPartitionException,
 org.apache.spark.SparkException: Couldn't find leader offsets for
 Set([testtopic,115]))
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at



 Kafka params in job logs printed are :
  value.serializer = class
 org.apache.kafka.common.serialization.StringSerializer
 key.serializer = class
 org.apache.kafka.common.serialization.StringSerializer
 block.on.buffer.full = true
 retry.backoff.ms = 100
 buffer.memory = 1048576
 batch.size = 16384
 metrics.sample.window.ms = 3
 metadata.max.age.ms = 30
 receive.buffer.bytes = 32768
 timeout.ms = 3
 max.in.flight.requests.per.connection = 5
 bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
 metric.reporters = []
 client.id =
 compression.type = none
 retries = 0
 max.request.size = 1048576
 send.buffer.bytes = 131072
 acks = all
 reconnect.backoff.ms = 10
 linger.ms = 0
 metrics.num.samples = 2
 metadata.fetch.timeout.ms = 6


 Is it kafka broker getting down and job is getting killed ? Whats the
 best way to handle it ?
 Increasing retries and backoff time  wil help and to what values those
 should be set to never have streaming application failure - rather it keep
 on retrying after few seconds and send a event so that my custom code can
 send notification of kafka broker down if its because of that.


 Thanks





Re: spark streaming 1.3 kafka topic error

2015-08-27 Thread Cody Koeninger
Map is lazy.  You need an actual action, or nothing will happen.  Use
foreachPartition, or do an empty foreach after the map.

On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar ahmed.na...@gmail.com wrote:

 Dears,

 I needs to commit DB Transaction for each partition,Not for each row.
 below didn't work for me.


 rdd.mapPartitions(partitionOfRecords = {

 DBConnectionInit()

 val results = partitionOfRecords.map(..)

 DBConnection.commit()


 })



 Best regards,

 Ahmed Atef Nawwar

 Data Management  Big Data Consultant






 On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Your kafka broker died or you otherwise had a rebalance.

 Normally spark retries take care of that.

 Is there something going on with your kafka installation, that rebalance
 is taking especially long?

 Yes, increasing backoff / max number of retries will help, but it's
 better to figure out what's going on with kafka.

 On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi

 My streaming application gets killed with below error

 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
 ArrayBuffer(kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 kafka.common.NotLeaderForPartitionException,
 org.apache.spark.SparkException: Couldn't find leader offsets for
 Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
 [testtopic,193]))
 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs
 for time 144062612 ms
 org.apache.spark.SparkException:
 ArrayBuffer(kafka.common.NotLeaderForPartitionException,
 org.apache.spark.SparkException: Couldn't find leader offsets for
 Set([testtopic,115]))
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at



 Kafka params in job logs printed are :
  value.serializer = class
 org.apache.kafka.common.serialization.StringSerializer
 key.serializer = class
 org.apache.kafka.common.serialization.StringSerializer
 block.on.buffer.full = true
 retry.backoff.ms = 100
 buffer.memory = 1048576
 batch.size = 16384
 metrics.sample.window.ms = 3
 metadata.max.age.ms = 30
 receive.buffer.bytes = 32768
 timeout.ms = 3
 max.in.flight.requests.per.connection = 5
 bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
 metric.reporters = []
 client.id =
 compression.type = none
 retries = 0
 max.request.size = 1048576
 send.buffer.bytes = 131072
 acks = all
 reconnect.backoff.ms = 10
 linger.ms = 0
 metrics.num.samples = 2
 metadata.fetch.timeout.ms = 6


 Is it kafka broker getting down and job is getting killed ? Whats the
 best way to handle it ?
 Increasing retries and backoff time  wil help and to what values those
 should be set to never have streaming application failure - rather it keep
 on retrying after few seconds and send a event so that my custom code can
 send notification of kafka broker down if its because of that.


 Thanks






Re: spark streaming 1.3 kafka buffer size

2015-08-27 Thread Cody Koeninger
As it stands currently, no.

If you're already overriding the dstream, it would be pretty
straightforward to change the kafka parameters used when creating the rdd
for the next batch though

On Wed, Aug 26, 2015 at 11:41 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 Can I change this param fetch.message.max.bytes  or 
 spark.streaming.kafka.maxRatePerPartition
 at run time across batches.
 Say I detected some fail condition in my system and I decided to sonsume i
 next batch interval only 10 messages per partition and if that succeed I
 reset the max limit to unlimited again .

 On Wed, Aug 26, 2015 at 9:32 PM, Cody Koeninger c...@koeninger.org
 wrote:

 see http://kafka.apache.org/documentation.html#consumerconfigs

 fetch.message.max.bytes

 in the kafka params passed to the constructor


 On Wed, Aug 26, 2015 at 10:39 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 whats the default buffer in spark streaming 1.3 for  kafka messages.

 Say In this run it has to fetch messages from offset 1 to 1. will it
 fetch all in one go or internally it fetches messages in  few messages
 batch.

 Is there any setting to configure this no of offsets fetched in one
 batch?






Re: spark streaming 1.3 kafka buffer size

2015-08-26 Thread Cody Koeninger
see http://kafka.apache.org/documentation.html#consumerconfigs

fetch.message.max.bytes

in the kafka params passed to the constructor


On Wed, Aug 26, 2015 at 10:39 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 whats the default buffer in spark streaming 1.3 for  kafka messages.

 Say In this run it has to fetch messages from offset 1 to 1. will it
 fetch all in one go or internally it fetches messages in  few messages
 batch.

 Is there any setting to configure this no of offsets fetched in one batch?



Re: spark streaming 1.3 kafka buffer size

2015-08-26 Thread Shushant Arora
Can I change this param fetch.message.max.bytes  or
spark.streaming.kafka.maxRatePerPartition
at run time across batches.
Say I detected some fail condition in my system and I decided to sonsume i
next batch interval only 10 messages per partition and if that succeed I
reset the max limit to unlimited again .

On Wed, Aug 26, 2015 at 9:32 PM, Cody Koeninger c...@koeninger.org wrote:

 see http://kafka.apache.org/documentation.html#consumerconfigs

 fetch.message.max.bytes

 in the kafka params passed to the constructor


 On Wed, Aug 26, 2015 at 10:39 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 whats the default buffer in spark streaming 1.3 for  kafka messages.

 Say In this run it has to fetch messages from offset 1 to 1. will it
 fetch all in one go or internally it fetches messages in  few messages
 batch.

 Is there any setting to configure this no of offsets fetched in one batch?





Re: spark streaming 1.3 kafka error

2015-08-22 Thread Shushant Arora
Exception comes when client has so many connections to some another
external server also.
So I think Exception is coming because of client side issue only- server
side there is no issue.


Want to understand is executor(simple consumer) not making new connection
to kafka broker at start of each task ? Or is it created once only and that
is getting closed somehow ?

On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 it comes at start of each tasks when there is new data inserted in kafka.(
 data inserted is very few)
 kafka topic has 300 partitions - data inserted is ~10 MB.

 Tasks gets failed and it retries which succeed and after certain no of
 fail tasks it kills the job.




 On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 That looks like you are choking your kafka machine. Do a top on the kafka
 machines and see the workload, it may happen that you are spending too much
 time on disk io etc.
 On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote:

 Sounds like that's happening consistently, not an occasional network
 problem?

 Look at the Kafka broker logs

 Make sure you've configured the correct kafka broker hosts / ports (note
 that direct stream does not use zookeeper host / port).

 Make sure that host / port is reachable from your driver and worker
 nodes, ie telnet or netcat to it.  It looks like your driver can reach it
 (since there's partition info in the logs), but that doesn't mean the
 worker can.

 Use lsof / netstat to see what's going on with those ports while the job
 is running, or tcpdump if you need to.

 If you can't figure out what's going on from a networking point of view,
 post a minimal reproducible code sample that demonstrates the issue, so it
 can be tested in a different environment.





 On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi


 Getting below error in spark streaming 1.3 while consuming from kafka 
 using directkafka stream. Few of tasks are getting failed in each run.


 What is the reason /solution of this error?


 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in 
 stage 130.0 (TID 16332)
 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.
at kafka.utils.Utils$.read(Utils.scala:376)
at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at 
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
at 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
at 
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
 task 16348
 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 
 130.0 (TID 16348)
 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic 
 test_hbrealtimeevents, partition 75 

Re: spark streaming 1.3 kafka error

2015-08-22 Thread Shushant Arora
On trying the consumer without external connections  or with low number of
external conections its working fine -

so doubt is how  socket got closed -

java.io.EOFException: Received -1 when reading from channel, socket
has likely been closed.



On Sat, Aug 22, 2015 at 7:24 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Can you try some other consumer and see if the issue still exists?
 On Aug 22, 2015 12:47 AM, Shushant Arora shushantaror...@gmail.com
 wrote:

 Exception comes when client has so many connections to some another
 external server also.
 So I think Exception is coming because of client side issue only- server
 side there is no issue.


 Want to understand is executor(simple consumer) not making new connection
 to kafka broker at start of each task ? Or is it created once only and that
 is getting closed somehow ?

 On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 it comes at start of each tasks when there is new data inserted in
 kafka.( data inserted is very few)
 kafka topic has 300 partitions - data inserted is ~10 MB.

 Tasks gets failed and it retries which succeed and after certain no of
 fail tasks it kills the job.




 On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 That looks like you are choking your kafka machine. Do a top on the
 kafka machines and see the workload, it may happen that you are spending
 too much time on disk io etc.
 On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote:

 Sounds like that's happening consistently, not an occasional network
 problem?

 Look at the Kafka broker logs

 Make sure you've configured the correct kafka broker hosts / ports
 (note that direct stream does not use zookeeper host / port).

 Make sure that host / port is reachable from your driver and worker
 nodes, ie telnet or netcat to it.  It looks like your driver can reach it
 (since there's partition info in the logs), but that doesn't mean the
 worker can.

 Use lsof / netstat to see what's going on with those ports while the
 job is running, or tcpdump if you need to.

 If you can't figure out what's going on from a networking point of
 view, post a minimal reproducible code sample that demonstrates the issue,
 so it can be tested in a different environment.





 On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi


 Getting below error in spark streaming 1.3 while consuming from kafka 
 using directkafka stream. Few of tasks are getting failed in each run.


 What is the reason /solution of this error?


 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in 
 stage 130.0 (TID 16332)
 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.
  at kafka.utils.Utils$.read(Utils.scala:376)
  at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
  at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
  at 
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
  at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
  at 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
  at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
  at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
  at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
  at 
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
  at 
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
  at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
  at 
 

Re: spark streaming 1.3 kafka error

2015-08-22 Thread Shushant Arora
On trying the consumer without external connections  or with low
number of external conections its working fine -

so doubt is how  socket got closed -


15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in
stage 130.0 (TID 16332)
java.io.EOFException: Received -1 when reading from channel, socket
has likely been closed.
at kafka.utils.Utils$.read(Utils.scala:376)


is executor(simple consumer) not making new connection to kafka broker at
start of each task ? Or is it created once only and that is getting closed
somehow ?

On Sat, Aug 22, 2015 at 7:35 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 On trying the consumer without external connections  or with low number of
 external conections its working fine -

 so doubt is how  socket got closed -

 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.



 On Sat, Aug 22, 2015 at 7:24 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you try some other consumer and see if the issue still exists?
 On Aug 22, 2015 12:47 AM, Shushant Arora shushantaror...@gmail.com
 wrote:

 Exception comes when client has so many connections to some another
 external server also.
 So I think Exception is coming because of client side issue only- server
 side there is no issue.


 Want to understand is executor(simple consumer) not making new
 connection to kafka broker at start of each task ? Or is it created once
 only and that is getting closed somehow ?

 On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 it comes at start of each tasks when there is new data inserted in
 kafka.( data inserted is very few)
 kafka topic has 300 partitions - data inserted is ~10 MB.

 Tasks gets failed and it retries which succeed and after certain no of
 fail tasks it kills the job.




 On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 That looks like you are choking your kafka machine. Do a top on the
 kafka machines and see the workload, it may happen that you are spending
 too much time on disk io etc.
 On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote:

 Sounds like that's happening consistently, not an occasional network
 problem?

 Look at the Kafka broker logs

 Make sure you've configured the correct kafka broker hosts / ports
 (note that direct stream does not use zookeeper host / port).

 Make sure that host / port is reachable from your driver and worker
 nodes, ie telnet or netcat to it.  It looks like your driver can reach it
 (since there's partition info in the logs), but that doesn't mean the
 worker can.

 Use lsof / netstat to see what's going on with those ports while the
 job is running, or tcpdump if you need to.

 If you can't figure out what's going on from a networking point of
 view, post a minimal reproducible code sample that demonstrates the 
 issue,
 so it can be tested in a different environment.





 On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi


 Getting below error in spark streaming 1.3 while consuming from kafka 
 using directkafka stream. Few of tasks are getting failed in each run.


 What is the reason /solution of this error?


 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in 
 stage 130.0 (TID 16332)
 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.
 at kafka.utils.Utils$.read(Utils.scala:376)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at 
 kafka.network.Receive$class.readCompletely(Transmission.scala:56)
 at 
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
 at 
 kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
 at 
 kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
 at 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
 at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
 at 

Re: spark streaming 1.3 kafka error

2015-08-22 Thread Akhil Das
Can you try some other consumer and see if the issue still exists?
On Aug 22, 2015 12:47 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 Exception comes when client has so many connections to some another
 external server also.
 So I think Exception is coming because of client side issue only- server
 side there is no issue.


 Want to understand is executor(simple consumer) not making new connection
 to kafka broker at start of each task ? Or is it created once only and that
 is getting closed somehow ?

 On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora shushantaror...@gmail.com
  wrote:

 it comes at start of each tasks when there is new data inserted in
 kafka.( data inserted is very few)
 kafka topic has 300 partitions - data inserted is ~10 MB.

 Tasks gets failed and it retries which succeed and after certain no of
 fail tasks it kills the job.




 On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 That looks like you are choking your kafka machine. Do a top on the
 kafka machines and see the workload, it may happen that you are spending
 too much time on disk io etc.
 On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote:

 Sounds like that's happening consistently, not an occasional network
 problem?

 Look at the Kafka broker logs

 Make sure you've configured the correct kafka broker hosts / ports
 (note that direct stream does not use zookeeper host / port).

 Make sure that host / port is reachable from your driver and worker
 nodes, ie telnet or netcat to it.  It looks like your driver can reach it
 (since there's partition info in the logs), but that doesn't mean the
 worker can.

 Use lsof / netstat to see what's going on with those ports while the
 job is running, or tcpdump if you need to.

 If you can't figure out what's going on from a networking point of
 view, post a minimal reproducible code sample that demonstrates the issue,
 so it can be tested in a different environment.





 On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi


 Getting below error in spark streaming 1.3 while consuming from kafka 
 using directkafka stream. Few of tasks are getting failed in each run.


 What is the reason /solution of this error?


 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in 
 stage 130.0 (TID 16332)
 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.
   at kafka.utils.Utils$.read(Utils.scala:376)
   at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
   at 
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
   at 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
   at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
   at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
   at 
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
   at 
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
   at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
   at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
   at org.apache.spark.scheduler.Task.run(Task.scala:64)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got 
 assigned task 16348
 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in 

Re: spark streaming 1.3 kafka error

2015-08-22 Thread Dibyendu Bhattacharya
I think you also can give a try to this consumer :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer in your
environment. This has been running fine for topic with large number of
Kafka partition (  200 ) like yours without any issue.. no issue with
connection as this consumer re-use kafka connection , and also can recover
from any failures ( network loss , Kafka leader goes down, ZK down etc ..).


Regards,
Dibyendu

On Sat, Aug 22, 2015 at 7:35 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 On trying the consumer without external connections  or with low number of
 external conections its working fine -

 so doubt is how  socket got closed -

 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.



 On Sat, Aug 22, 2015 at 7:24 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you try some other consumer and see if the issue still exists?
 On Aug 22, 2015 12:47 AM, Shushant Arora shushantaror...@gmail.com
 wrote:

 Exception comes when client has so many connections to some another
 external server also.
 So I think Exception is coming because of client side issue only- server
 side there is no issue.


 Want to understand is executor(simple consumer) not making new
 connection to kafka broker at start of each task ? Or is it created once
 only and that is getting closed somehow ?

 On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 it comes at start of each tasks when there is new data inserted in
 kafka.( data inserted is very few)
 kafka topic has 300 partitions - data inserted is ~10 MB.

 Tasks gets failed and it retries which succeed and after certain no of
 fail tasks it kills the job.




 On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 That looks like you are choking your kafka machine. Do a top on the
 kafka machines and see the workload, it may happen that you are spending
 too much time on disk io etc.
 On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote:

 Sounds like that's happening consistently, not an occasional network
 problem?

 Look at the Kafka broker logs

 Make sure you've configured the correct kafka broker hosts / ports
 (note that direct stream does not use zookeeper host / port).

 Make sure that host / port is reachable from your driver and worker
 nodes, ie telnet or netcat to it.  It looks like your driver can reach it
 (since there's partition info in the logs), but that doesn't mean the
 worker can.

 Use lsof / netstat to see what's going on with those ports while the
 job is running, or tcpdump if you need to.

 If you can't figure out what's going on from a networking point of
 view, post a minimal reproducible code sample that demonstrates the 
 issue,
 so it can be tested in a different environment.





 On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi


 Getting below error in spark streaming 1.3 while consuming from kafka 
 using directkafka stream. Few of tasks are getting failed in each run.


 What is the reason /solution of this error?


 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in 
 stage 130.0 (TID 16332)
 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.
 at kafka.utils.Utils$.read(Utils.scala:376)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at 
 kafka.network.Receive$class.readCompletely(Transmission.scala:56)
 at 
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
 at 
 kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
 at 
 kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
 at 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
 at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
 at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
 at 
 

Re: spark streaming 1.3 kafka error

2015-08-22 Thread Cody Koeninger
To be perfectly clear, the direct kafka stream will also recover from any
failures, because it does the simplest thing possible - fail the task and
let spark retry it.

If you're consistently having socket closed problems on one task after
another, there's probably something else going on in your environment.
Shushant, none of your responses have indicated whether you've tried any of
the system level troubleshooting suggestions that have been made by various
people.

Also, if you have 300 partitions, and only 10mb of data, that is completely
unnecessary.  You're probably going to have lots of empty partitions, which
will have a negative effect on your runtime.

On Sat, Aug 22, 2015 at 11:28 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 I think you also can give a try to this consumer :
 http://spark-packages.org/package/dibbhatt/kafka-spark-consumer in your
 environment. This has been running fine for topic with large number of
 Kafka partition (  200 ) like yours without any issue.. no issue with
 connection as this consumer re-use kafka connection , and also can recover
 from any failures ( network loss , Kafka leader goes down, ZK down etc ..).


 Regards,
 Dibyendu

 On Sat, Aug 22, 2015 at 7:35 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 On trying the consumer without external connections  or with low number
 of external conections its working fine -

 so doubt is how  socket got closed -

 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.



 On Sat, Aug 22, 2015 at 7:24 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you try some other consumer and see if the issue still exists?
 On Aug 22, 2015 12:47 AM, Shushant Arora shushantaror...@gmail.com
 wrote:

 Exception comes when client has so many connections to some another
 external server also.
 So I think Exception is coming because of client side issue only-
 server side there is no issue.


 Want to understand is executor(simple consumer) not making new
 connection to kafka broker at start of each task ? Or is it created once
 only and that is getting closed somehow ?

 On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 it comes at start of each tasks when there is new data inserted in
 kafka.( data inserted is very few)
 kafka topic has 300 partitions - data inserted is ~10 MB.

 Tasks gets failed and it retries which succeed and after certain no of
 fail tasks it kills the job.




 On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 That looks like you are choking your kafka machine. Do a top on the
 kafka machines and see the workload, it may happen that you are spending
 too much time on disk io etc.
 On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote:

 Sounds like that's happening consistently, not an occasional network
 problem?

 Look at the Kafka broker logs

 Make sure you've configured the correct kafka broker hosts / ports
 (note that direct stream does not use zookeeper host / port).

 Make sure that host / port is reachable from your driver and worker
 nodes, ie telnet or netcat to it.  It looks like your driver can reach 
 it
 (since there's partition info in the logs), but that doesn't mean the
 worker can.

 Use lsof / netstat to see what's going on with those ports while the
 job is running, or tcpdump if you need to.

 If you can't figure out what's going on from a networking point of
 view, post a minimal reproducible code sample that demonstrates the 
 issue,
 so it can be tested in a different environment.





 On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi


 Getting below error in spark streaming 1.3 while consuming from kafka 
 using directkafka stream. Few of tasks are getting failed in each run.


 What is the reason /solution of this error?


 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in 
 stage 130.0 (TID 16332)
 java.io.EOFException: Received -1 when reading from channel, socket 
 has likely been closed.
at kafka.utils.Utils$.read(Utils.scala:376)
at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at 
 kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at 
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at 
 kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at 
 kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
at 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at 
 

Re: spark streaming 1.3 kafka error

2015-08-21 Thread Shushant Arora
it comes at start of each tasks when there is new data inserted in kafka.(
data inserted is very few)
kafka topic has 300 partitions - data inserted is ~10 MB.

Tasks gets failed and it retries which succeed and after certain no of fail
tasks it kills the job.




On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 That looks like you are choking your kafka machine. Do a top on the kafka
 machines and see the workload, it may happen that you are spending too much
 time on disk io etc.
 On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote:

 Sounds like that's happening consistently, not an occasional network
 problem?

 Look at the Kafka broker logs

 Make sure you've configured the correct kafka broker hosts / ports (note
 that direct stream does not use zookeeper host / port).

 Make sure that host / port is reachable from your driver and worker
 nodes, ie telnet or netcat to it.  It looks like your driver can reach it
 (since there's partition info in the logs), but that doesn't mean the
 worker can.

 Use lsof / netstat to see what's going on with those ports while the job
 is running, or tcpdump if you need to.

 If you can't figure out what's going on from a networking point of view,
 post a minimal reproducible code sample that demonstrates the issue, so it
 can be tested in a different environment.





 On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi


 Getting below error in spark streaming 1.3 while consuming from kafka using 
 directkafka stream. Few of tasks are getting failed in each run.


 What is the reason /solution of this error?


 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 
 130.0 (TID 16332)
 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.
 at kafka.utils.Utils$.read(Utils.scala:376)
 at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
 at 
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
 at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
 at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
 at 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
 at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
 at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
 at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
 at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at 
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
 at 
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
 task 16348
 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 
 (TID 16348)
 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic 
 test_hbrealtimeevents, partition 75 offsets 4701 - 4718
 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties




 Thanks





Re: spark streaming 1.3 kafka error

2015-08-21 Thread Cody Koeninger
Sounds like that's happening consistently, not an occasional network
problem?

Look at the Kafka broker logs

Make sure you've configured the correct kafka broker hosts / ports (note
that direct stream does not use zookeeper host / port).

Make sure that host / port is reachable from your driver and worker nodes,
ie telnet or netcat to it.  It looks like your driver can reach it (since
there's partition info in the logs), but that doesn't mean the worker can.

Use lsof / netstat to see what's going on with those ports while the job is
running, or tcpdump if you need to.

If you can't figure out what's going on from a networking point of view,
post a minimal reproducible code sample that demonstrates the issue, so it
can be tested in a different environment.





On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 Hi


 Getting below error in spark streaming 1.3 while consuming from kafka using 
 directkafka stream. Few of tasks are getting failed in each run.


 What is the reason /solution of this error?


 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 
 130.0 (TID 16332)
 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.
   at kafka.utils.Utils$.read(Utils.scala:376)
   at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
   at 
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
   at 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
   at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
   at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
   at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
   at 
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
   at 
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
   at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
   at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
   at org.apache.spark.scheduler.Task.run(Task.scala:64)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
 task 16348
 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 
 (TID 16348)
 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents, 
 partition 75 offsets 4701 - 4718
 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties




 Thanks




Re: spark streaming 1.3 kafka error

2015-08-21 Thread Akhil Das
That looks like you are choking your kafka machine. Do a top on the kafka
machines and see the workload, it may happen that you are spending too much
time on disk io etc.
On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote:

 Sounds like that's happening consistently, not an occasional network
 problem?

 Look at the Kafka broker logs

 Make sure you've configured the correct kafka broker hosts / ports (note
 that direct stream does not use zookeeper host / port).

 Make sure that host / port is reachable from your driver and worker nodes,
 ie telnet or netcat to it.  It looks like your driver can reach it (since
 there's partition info in the logs), but that doesn't mean the worker can.

 Use lsof / netstat to see what's going on with those ports while the job
 is running, or tcpdump if you need to.

 If you can't figure out what's going on from a networking point of view,
 post a minimal reproducible code sample that demonstrates the issue, so it
 can be tested in a different environment.





 On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Hi


 Getting below error in spark streaming 1.3 while consuming from kafka using 
 directkafka stream. Few of tasks are getting failed in each run.


 What is the reason /solution of this error?


 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 
 130.0 (TID 16332)
 java.io.EOFException: Received -1 when reading from channel, socket has 
 likely been closed.
  at kafka.utils.Utils$.read(Utils.scala:376)
  at 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
  at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
  at 
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
  at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
  at 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
  at 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
  at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
  at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
  at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
  at 
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
  at 
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
  at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
  at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
 task 16348
 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 
 (TID 16348)
 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic 
 test_hbrealtimeevents, partition 75 offsets 4701 - 4718
 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties




 Thanks





Re: Spark Streaming: Change Kafka topics on runtime

2015-08-14 Thread Cody Koeninger
There's a long recent thread in this list about stopping apps, subject was
stopping spark stream app

at 1 second I wouldn't run repeated rdds, no.

I'd take a look at subclassing, personally (you'll have to rebuild the
streaming kafka project since a lot is private), but if topic changes dont
happen that often, restarting the app when they do should be fine.

On Fri, Aug 14, 2015 at 6:34 AM, Nisrina Luthfiyati 
nisrina.luthfiy...@gmail.com wrote:

 Hi Cody,

 by start/stopping, do you mean the streaming context or the app entirely?
 From what I understand once a streaming context has been stopped it cannot
 be restarted, but I also haven't found a way to stop the app
 programmatically.

 The batch duration will probably be around 1-10 seconds. I think this is
 small enough to not make it a batch job?

 Thanks again

 On Thu, Aug 13, 2015 at 10:15 PM, Cody Koeninger c...@koeninger.org
 wrote:

 The current kafka stream implementation assumes the set of topics doesn't
 change during operation.

 You could either take a crack at writing a subclass that does what you
 need; stop/start; or if your batch duration isn't too small, you could run
 it as a series of RDDs (using the existing KafkaUtils.createRDD) where the
 set of topics is determined before each rdd.

 On Thu, Aug 13, 2015 at 4:38 AM, Nisrina Luthfiyati 
 nisrina.luthfiy...@gmail.com wrote:

 Hi all,

 I want to write a Spark Streaming program that listens to Kafka for a
 list of topics.
 The list of topics that I want to consume is stored in a DB and might
 change dynamically. I plan to periodically refresh this list of topics in
 the Spark Streaming app.

 My question is is it possible to add/remove a Kafka topic that is
 consumed by a stream, or probably create a new stream at runtime?
 Would I need to stop/start the program or is there any other way to do
 this?

 Thanks!
 Nisrina





 --
 Nisrina Luthfiyati - Ilmu Komputer Fasilkom UI 2010
 http://www.facebook.com/nisrina.luthfiyati
 http://id.linkedin.com/in/nisrina




Re: Spark Streaming: Change Kafka topics on runtime

2015-08-14 Thread Nisrina Luthfiyati
Hi Cody,

by start/stopping, do you mean the streaming context or the app entirely?
From what I understand once a streaming context has been stopped it cannot
be restarted, but I also haven't found a way to stop the app
programmatically.

The batch duration will probably be around 1-10 seconds. I think this is
small enough to not make it a batch job?

Thanks again

On Thu, Aug 13, 2015 at 10:15 PM, Cody Koeninger c...@koeninger.org wrote:

 The current kafka stream implementation assumes the set of topics doesn't
 change during operation.

 You could either take a crack at writing a subclass that does what you
 need; stop/start; or if your batch duration isn't too small, you could run
 it as a series of RDDs (using the existing KafkaUtils.createRDD) where the
 set of topics is determined before each rdd.

 On Thu, Aug 13, 2015 at 4:38 AM, Nisrina Luthfiyati 
 nisrina.luthfiy...@gmail.com wrote:

 Hi all,

 I want to write a Spark Streaming program that listens to Kafka for a
 list of topics.
 The list of topics that I want to consume is stored in a DB and might
 change dynamically. I plan to periodically refresh this list of topics in
 the Spark Streaming app.

 My question is is it possible to add/remove a Kafka topic that is
 consumed by a stream, or probably create a new stream at runtime?
 Would I need to stop/start the program or is there any other way to do
 this?

 Thanks!
 Nisrina





-- 
Nisrina Luthfiyati - Ilmu Komputer Fasilkom UI 2010
http://www.facebook.com/nisrina.luthfiyati
http://id.linkedin.com/in/nisrina


Re: Spark Streaming: Change Kafka topics on runtime

2015-08-13 Thread Cody Koeninger
The current kafka stream implementation assumes the set of topics doesn't
change during operation.

You could either take a crack at writing a subclass that does what you
need; stop/start; or if your batch duration isn't too small, you could run
it as a series of RDDs (using the existing KafkaUtils.createRDD) where the
set of topics is determined before each rdd.

On Thu, Aug 13, 2015 at 4:38 AM, Nisrina Luthfiyati 
nisrina.luthfiy...@gmail.com wrote:

 Hi all,

 I want to write a Spark Streaming program that listens to Kafka for a list
 of topics.
 The list of topics that I want to consume is stored in a DB and might
 change dynamically. I plan to periodically refresh this list of topics in
 the Spark Streaming app.

 My question is is it possible to add/remove a Kafka topic that is consumed
 by a stream, or probably create a new stream at runtime?
 Would I need to stop/start the program or is there any other way to do
 this?

 Thanks!
 Nisrina



Re: spark streaming get kafka individual message's offset and partition no

2015-07-28 Thread Cody Koeninger
You don't have to use some other package in order to get access to the
offsets.

Shushant, have you read the available documentation at

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

or watched

https://www.youtube.com/watch?v=fXnNEq1v3VA

The kafka partition number is the same as the spark partition number.  The
messages for a given partition are in offset order without gaps, so you can
use the offset range to determine the offset for a given message.  Or you
can use the messageHandler argument to KafkaUtils.createDirectStream to get
access to all of the MessageAndMetadata, including partition and offset, on
a per-message basis.

On Tue, Jul 28, 2015 at 7:48 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 Hi

 I am processing kafka messages using spark streaming 1.3.

 I am using mapPartitions function to process kafka message.
  How can I access offset no of individual message getting being processed.


 JavaPairInputDStreambyte[], byte[] directKafkaStream
 =KafkaUtils.createDirectStream(..);

 directKafkaStream.mapPartitions(new
 FlatMapFunctionIteratorTuple2byte[],byte[], String() {
 public IterableString call(IteratorTuple2byte[], byte[] t)
 throws Exception {

 while(t.hasNext()){
 Tuple2byte[], byte[] tuple = t.next();
 byte[] key = tuple._1();
 byte[] msg = tuple._2();
  ///how to get kafka partition no and offset of this message
  }
 }
 });







Re: spark streaming with kafka reset offset

2015-07-14 Thread Cody Koeninger
You have access to the offset ranges for a given rdd in the stream by
typecasting to HasOffsetRanges.  You can then store the offsets wherever
you need to.

On Tue, Jul 14, 2015 at 5:00 PM, Chen Song chen.song...@gmail.com wrote:

 A follow up question.

 When using createDirectStream approach, the offsets are checkpointed to
 HDFS and it is understandable by Spark Streaming job. Is there a way to
 expose the offsets via a REST api to end users. Or alternatively, is there
 a way to have offsets committed to Kafka Offset Manager so users can query
 from a consumer programmatically?

 Essentially, all I need to do is monitor the progress of data consumption
 of the Kafka topic.


 On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger c...@koeninger.org
 wrote:

 You can't use different versions of spark in your application vs your
 cluster.

 For the direct stream, it's not 60 partitions per executor, it's 300
 partitions, and executors work on them as they are scheduled.  Yes, if you
 have no messages you will get an empty partition.  It's up to you whether
 it's worthwhile to call coalesce or not.

 On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Is this 3 is no of parallel consumer threads per receiver , means in
 total we have 2*3=6 consumer in same consumer group consuming from all 300
 partitions.
 3 is just parallelism on same receiver and recommendation is to use 1
 per receiver since consuming from kafka is not cpu bound rather NIC(network
 bound)  increasing consumer thread on one receiver won't make it parallel
 in ideal sense ?

 In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
 topic has 300 partions , does kafkaRDD created on 5 executors will have 60
 partitions per executor (total 300 one to one mapping) and if some of kafka
 partitions are empty say offset of last checkpoint to current is same for
 partitons P123, still it will create empty partition in kafkaRDD ? So we
 should call coalesce on kafkaRDD ?


 And is there any incompatibity issue when I include spark-streaming_2.10
 (version 1.3) and spark-core_2.10(version 1.3) in my application but my
 cluster has spark version 1.2 ?






 On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 1. Here you are basically creating 2 receivers and asking each of them
 to consume 3 kafka partitions each.

 - In 1.2 we have high level consumers so how can we restrict no of
 kafka partitions to consume from? Say I have 300 kafka partitions in kafka
 topic and as in above I gave 2 receivers and 3 kafka partitions . Then is
 it mean I will read from 6 out of 300 partitions only and for rest 294
 partitions data is lost?


 2.One more doubt in spark streaming how is it decided which part of
 main function of driver will run at each batch interval ? Since whole code
 is written in one function(main function in driver) so how it determined
 kafka streams receivers  not to be registered in each batch only processing
 to be done .






 On Mon, Jun 29, 2015 at 7:35 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 Let me take ashot at your questions. (I am sure people like Cody and
 TD will correct if I am wrong)

 0. This is exact copy from the similar question in mail thread from
 Akhil D:
 Since you set local[4] you will have 4 threads for your computation,
 and since you are having 2 receivers, you are left with 2 threads to
 process ((0 + 2) -- This 2 is your 2 threads.) And the other /2
 means you are having 2 tasks in that stage (with id 0).

 1. Here you are basically creating 2 receivers and asking each of them
 to consume 3 kafka partitions each.
 2. How does that matter? It depends on how many receivers you have
 created to consume that data and if you have repartitioned it. Remember,
 spark is lazy and executors are relted to the context
 3. I think in java, factory method is fixed. You just pass around the
 contextFactory object. (I love python :) see the signature isso much
 cleaner :) )
 4. Yes, if you use spark checkpointing. You can use yourcustom check
 pointing too.

 Best
 Ayan



 On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Few doubts :

 In 1.2 streaming when I use union of streams , my streaming
 application getting hanged sometimes and nothing gets printed on driver.


 [Stage 2:

 (0 + 2) / 2]
  Whats is 0+2/2 here signifies.



 1.Does no of streams in topicsMap.put(testSparkPartitioned, 3); be
 same as numstreams=2 ? in unioned stream ?

 2. I launched app on yarnRM with num-executors as 5 . It created 2
 receivers and 5 execuots . As in stream receivers nodes get fixed at 
 start
 of app throughout its lifetime . Does executors gets allicated at start 
 of
 each job on 1s batch interval? If yes, how does its fast to allocate
 resources. I mean if i increase num-executors to 50 , it will negotiate 
 50
 executors from yarnRM at start of each job so does it takes more 

Re: spark streaming with kafka reset offset

2015-07-14 Thread Chen Song
A follow up question.

When using createDirectStream approach, the offsets are checkpointed to
HDFS and it is understandable by Spark Streaming job. Is there a way to
expose the offsets via a REST api to end users. Or alternatively, is there
a way to have offsets committed to Kafka Offset Manager so users can query
from a consumer programmatically?

Essentially, all I need to do is monitor the progress of data consumption
of the Kafka topic.


On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger c...@koeninger.org wrote:

 You can't use different versions of spark in your application vs your
 cluster.

 For the direct stream, it's not 60 partitions per executor, it's 300
 partitions, and executors work on them as they are scheduled.  Yes, if you
 have no messages you will get an empty partition.  It's up to you whether
 it's worthwhile to call coalesce or not.

 On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Is this 3 is no of parallel consumer threads per receiver , means in
 total we have 2*3=6 consumer in same consumer group consuming from all 300
 partitions.
 3 is just parallelism on same receiver and recommendation is to use 1 per
 receiver since consuming from kafka is not cpu bound rather NIC(network
 bound)  increasing consumer thread on one receiver won't make it parallel
 in ideal sense ?

 In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
 topic has 300 partions , does kafkaRDD created on 5 executors will have 60
 partitions per executor (total 300 one to one mapping) and if some of kafka
 partitions are empty say offset of last checkpoint to current is same for
 partitons P123, still it will create empty partition in kafkaRDD ? So we
 should call coalesce on kafkaRDD ?


 And is there any incompatibity issue when I include spark-streaming_2.10
 (version 1.3) and spark-core_2.10(version 1.3) in my application but my
 cluster has spark version 1.2 ?






 On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 1. Here you are basically creating 2 receivers and asking each of them
 to consume 3 kafka partitions each.

 - In 1.2 we have high level consumers so how can we restrict no of kafka
 partitions to consume from? Say I have 300 kafka partitions in kafka topic
 and as in above I gave 2 receivers and 3 kafka partitions . Then is it mean
 I will read from 6 out of 300 partitions only and for rest 294 partitions
 data is lost?


 2.One more doubt in spark streaming how is it decided which part of main
 function of driver will run at each batch interval ? Since whole code is
 written in one function(main function in driver) so how it determined kafka
 streams receivers  not to be registered in each batch only processing to be
 done .






 On Mon, Jun 29, 2015 at 7:35 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 Let me take ashot at your questions. (I am sure people like Cody and TD
 will correct if I am wrong)

 0. This is exact copy from the similar question in mail thread from
 Akhil D:
 Since you set local[4] you will have 4 threads for your computation,
 and since you are having 2 receivers, you are left with 2 threads to
 process ((0 + 2) -- This 2 is your 2 threads.) And the other /2 means
 you are having 2 tasks in that stage (with id 0).

 1. Here you are basically creating 2 receivers and asking each of them
 to consume 3 kafka partitions each.
 2. How does that matter? It depends on how many receivers you have
 created to consume that data and if you have repartitioned it. Remember,
 spark is lazy and executors are relted to the context
 3. I think in java, factory method is fixed. You just pass around the
 contextFactory object. (I love python :) see the signature isso much
 cleaner :) )
 4. Yes, if you use spark checkpointing. You can use yourcustom check
 pointing too.

 Best
 Ayan



 On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Few doubts :

 In 1.2 streaming when I use union of streams , my streaming
 application getting hanged sometimes and nothing gets printed on driver.


 [Stage 2:

   (0 + 2) / 2]
  Whats is 0+2/2 here signifies.



 1.Does no of streams in topicsMap.put(testSparkPartitioned, 3); be
 same as numstreams=2 ? in unioned stream ?

 2. I launched app on yarnRM with num-executors as 5 . It created 2
 receivers and 5 execuots . As in stream receivers nodes get fixed at start
 of app throughout its lifetime . Does executors gets allicated at start of
 each job on 1s batch interval? If yes, how does its fast to allocate
 resources. I mean if i increase num-executors to 50 , it will negotiate 50
 executors from yarnRM at start of each job so does it takes more time in
 allocating executors than batch interval(here 1s , say if 500ms).? Can i
 fixed processing executors also throughout the app?




 SparkConf conf = new SparkConf().setAppName(SampleSparkStreamingApp);
 JavaStreamingContext jssc = new
 

Re: spark streaming with kafka reset offset

2015-07-14 Thread Tathagata Das
Of course, exactly once receiving is not same as exactly once. In case of
direct kafka stream, the data may actually be pulled multiple time. But
even if the data of a batch is pulled twice because of some failure, the
final result (that is, transformed data accessed through foreachRDD) will
always be the same even if recomputed. In other words, the data in
partition x of the RDD of time t, will always be the same even if that
partition gets recomputed. Now, to get end-to-end exactly once, you will
have also push data out to external data stores in the exactly-once manner
- either the updates are idempotent, or you can use the unique id [(batch
time, partition ID)] to update the store transactionally (such that each
partition is inserted into the data store only once.

This is also explained in my talk. -
https://www.youtube.com/watch?v=d5UJonrruHk

On Tue, Jul 14, 2015 at 8:18 PM, Chen Song chen.song...@gmail.com wrote:

 Thanks TD.

 As for 1), if timing is not guaranteed, how does exactly once semantics
 supported? It feels like exactly once receiving is not necessarily exactly
 once processing.

 Chen

 On Tue, Jul 14, 2015 at 10:16 PM, Tathagata Das t...@databricks.com
 wrote:



 On Tue, Jul 14, 2015 at 6:42 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks TD and Cody. I saw that.

 1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets
 on HDFS at the end of each batch interval?


 The timing is not guaranteed.


 2. In the code, if I first apply transformations and actions on the
 directKafkaStream and then use foreachRDD on the original KafkaDStream to
 commit offsets myself, will offsets commits always happen after
 transformation and action?

 What do you mean by original KafkaDStream? if you meant the
 directKafkaStream? If yes, then yes, output operations like foreachRDD is
 executed in each batch in the same order as they are defined.

 dstream1.foreachRDD { rdd = func1(rdd) }
 dstream2.foreachRDD { rdd = func2(rdd) }

 In every batch interval, func1 will be executed before func2.




 Chen

 On Tue, Jul 14, 2015 at 6:43 PM, Tathagata Das t...@databricks.com
 wrote:

 Relevant documentation -
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
 towards the end.

 directKafkaStream.foreachRDD { rdd =
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  // offsetRanges.length = # of Kafka partitions being consumed
  ...
  }


 On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger c...@koeninger.org
 wrote:

 You have access to the offset ranges for a given rdd in the stream by
 typecasting to HasOffsetRanges.  You can then store the offsets wherever
 you need to.

 On Tue, Jul 14, 2015 at 5:00 PM, Chen Song chen.song...@gmail.com
 wrote:

 A follow up question.

 When using createDirectStream approach, the offsets are checkpointed
 to HDFS and it is understandable by Spark Streaming job. Is there a way 
 to
 expose the offsets via a REST api to end users. Or alternatively, is 
 there
 a way to have offsets committed to Kafka Offset Manager so users can 
 query
 from a consumer programmatically?

 Essentially, all I need to do is monitor the progress of data
 consumption of the Kafka topic.


 On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger c...@koeninger.org
 wrote:

 You can't use different versions of spark in your application vs
 your cluster.

 For the direct stream, it's not 60 partitions per executor, it's 300
 partitions, and executors work on them as they are scheduled.  Yes, if 
 you
 have no messages you will get an empty partition.  It's up to you 
 whether
 it's worthwhile to call coalesce or not.

 On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Is this 3 is no of parallel consumer threads per receiver , means
 in total we have 2*3=6 consumer in same consumer group consuming from 
 all
 300 partitions.
 3 is just parallelism on same receiver and recommendation is to use
 1 per receiver since consuming from kafka is not cpu bound rather
 NIC(network bound)  increasing consumer thread on one receiver won't 
 make
 it parallel in ideal sense ?

 In non receiver based consumer spark 1.3 If I use 5 execuots and
 kafka topic has 300 partions , does kafkaRDD created on 5 executors 
 will
 have 60 partitions per executor (total 300 one to one mapping) and if 
 some
 of kafka partitions are empty say offset of last checkpoint to current 
 is
 same for partitons P123, still it will create empty partition in 
 kafkaRDD ?
 So we should call coalesce on kafkaRDD ?


 And is there any incompatibity issue when I include
 spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in 
 my
 application but my cluster has spark version 1.2 ?






 On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 1. Here you are basically creating 2 receivers and asking each of
 them to consume 3 kafka partitions each.

 - In 1.2 we have high level consumers so how can we restrict no 

Re: spark streaming with kafka reset offset

2015-07-14 Thread Chen Song
Thanks TD and Cody. I saw that.

1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets on
HDFS at the end of each batch interval?
2. In the code, if I first apply transformations and actions on the
directKafkaStream and then use foreachRDD on the original KafkaDStream to
commit offsets myself, will offsets commits always happen after
transformation and action?

Chen

On Tue, Jul 14, 2015 at 6:43 PM, Tathagata Das t...@databricks.com wrote:

 Relevant documentation -
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
 towards the end.

 directKafkaStream.foreachRDD { rdd =
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  // offsetRanges.length = # of Kafka partitions being consumed
  ...
  }


 On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger c...@koeninger.org
 wrote:

 You have access to the offset ranges for a given rdd in the stream by
 typecasting to HasOffsetRanges.  You can then store the offsets wherever
 you need to.

 On Tue, Jul 14, 2015 at 5:00 PM, Chen Song chen.song...@gmail.com
 wrote:

 A follow up question.

 When using createDirectStream approach, the offsets are checkpointed to
 HDFS and it is understandable by Spark Streaming job. Is there a way to
 expose the offsets via a REST api to end users. Or alternatively, is there
 a way to have offsets committed to Kafka Offset Manager so users can query
 from a consumer programmatically?

 Essentially, all I need to do is monitor the progress of data
 consumption of the Kafka topic.


 On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger c...@koeninger.org
 wrote:

 You can't use different versions of spark in your application vs your
 cluster.

 For the direct stream, it's not 60 partitions per executor, it's 300
 partitions, and executors work on them as they are scheduled.  Yes, if you
 have no messages you will get an empty partition.  It's up to you whether
 it's worthwhile to call coalesce or not.

 On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Is this 3 is no of parallel consumer threads per receiver , means in
 total we have 2*3=6 consumer in same consumer group consuming from all 300
 partitions.
 3 is just parallelism on same receiver and recommendation is to use 1
 per receiver since consuming from kafka is not cpu bound rather 
 NIC(network
 bound)  increasing consumer thread on one receiver won't make it parallel
 in ideal sense ?

 In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
 topic has 300 partions , does kafkaRDD created on 5 executors will have 60
 partitions per executor (total 300 one to one mapping) and if some of 
 kafka
 partitions are empty say offset of last checkpoint to current is same for
 partitons P123, still it will create empty partition in kafkaRDD ? So we
 should call coalesce on kafkaRDD ?


 And is there any incompatibity issue when I include
 spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in my
 application but my cluster has spark version 1.2 ?






 On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 1. Here you are basically creating 2 receivers and asking each of
 them to consume 3 kafka partitions each.

 - In 1.2 we have high level consumers so how can we restrict no of
 kafka partitions to consume from? Say I have 300 kafka partitions in 
 kafka
 topic and as in above I gave 2 receivers and 3 kafka partitions . Then is
 it mean I will read from 6 out of 300 partitions only and for rest 294
 partitions data is lost?


 2.One more doubt in spark streaming how is it decided which part of
 main function of driver will run at each batch interval ? Since whole 
 code
 is written in one function(main function in driver) so how it determined
 kafka streams receivers  not to be registered in each batch only 
 processing
 to be done .






 On Mon, Jun 29, 2015 at 7:35 PM, ayan guha guha.a...@gmail.com
 wrote:

 Hi

 Let me take ashot at your questions. (I am sure people like Cody and
 TD will correct if I am wrong)

 0. This is exact copy from the similar question in mail thread from
 Akhil D:
 Since you set local[4] you will have 4 threads for your computation,
 and since you are having 2 receivers, you are left with 2 threads
 to process ((0 + 2) -- This 2 is your 2 threads.) And the other /2
 means you are having 2 tasks in that stage (with id 0).

 1. Here you are basically creating 2 receivers and asking each of
 them to consume 3 kafka partitions each.
 2. How does that matter? It depends on how many receivers you have
 created to consume that data and if you have repartitioned it. Remember,
 spark is lazy and executors are relted to the context
 3. I think in java, factory method is fixed. You just pass around
 the contextFactory object. (I love python :) see the signature isso much
 cleaner :) )
 4. Yes, if you use spark checkpointing. You can use yourcustom check
 pointing too.

 Best
 Ayan



 On Mon, Jun 29, 2015 at 4:02 AM, 

Re: spark streaming with kafka reset offset

2015-07-14 Thread Chen Song
Thanks TD.

As for 1), if timing is not guaranteed, how does exactly once semantics
supported? It feels like exactly once receiving is not necessarily exactly
once processing.

Chen

On Tue, Jul 14, 2015 at 10:16 PM, Tathagata Das t...@databricks.com wrote:



 On Tue, Jul 14, 2015 at 6:42 PM, Chen Song chen.song...@gmail.com wrote:

 Thanks TD and Cody. I saw that.

 1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets
 on HDFS at the end of each batch interval?


 The timing is not guaranteed.


 2. In the code, if I first apply transformations and actions on the
 directKafkaStream and then use foreachRDD on the original KafkaDStream to
 commit offsets myself, will offsets commits always happen after
 transformation and action?

 What do you mean by original KafkaDStream? if you meant the
 directKafkaStream? If yes, then yes, output operations like foreachRDD is
 executed in each batch in the same order as they are defined.

 dstream1.foreachRDD { rdd = func1(rdd) }
 dstream2.foreachRDD { rdd = func2(rdd) }

 In every batch interval, func1 will be executed before func2.




 Chen

 On Tue, Jul 14, 2015 at 6:43 PM, Tathagata Das t...@databricks.com
 wrote:

 Relevant documentation -
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
 towards the end.

 directKafkaStream.foreachRDD { rdd =
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  // offsetRanges.length = # of Kafka partitions being consumed
  ...
  }


 On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger c...@koeninger.org
 wrote:

 You have access to the offset ranges for a given rdd in the stream by
 typecasting to HasOffsetRanges.  You can then store the offsets wherever
 you need to.

 On Tue, Jul 14, 2015 at 5:00 PM, Chen Song chen.song...@gmail.com
 wrote:

 A follow up question.

 When using createDirectStream approach, the offsets are checkpointed
 to HDFS and it is understandable by Spark Streaming job. Is there a way to
 expose the offsets via a REST api to end users. Or alternatively, is there
 a way to have offsets committed to Kafka Offset Manager so users can query
 from a consumer programmatically?

 Essentially, all I need to do is monitor the progress of data
 consumption of the Kafka topic.


 On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger c...@koeninger.org
 wrote:

 You can't use different versions of spark in your application vs your
 cluster.

 For the direct stream, it's not 60 partitions per executor, it's 300
 partitions, and executors work on them as they are scheduled.  Yes, if 
 you
 have no messages you will get an empty partition.  It's up to you whether
 it's worthwhile to call coalesce or not.

 On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Is this 3 is no of parallel consumer threads per receiver , means in
 total we have 2*3=6 consumer in same consumer group consuming from all 
 300
 partitions.
 3 is just parallelism on same receiver and recommendation is to use
 1 per receiver since consuming from kafka is not cpu bound rather
 NIC(network bound)  increasing consumer thread on one receiver won't 
 make
 it parallel in ideal sense ?

 In non receiver based consumer spark 1.3 If I use 5 execuots and
 kafka topic has 300 partions , does kafkaRDD created on 5 executors will
 have 60 partitions per executor (total 300 one to one mapping) and if 
 some
 of kafka partitions are empty say offset of last checkpoint to current 
 is
 same for partitons P123, still it will create empty partition in 
 kafkaRDD ?
 So we should call coalesce on kafkaRDD ?


 And is there any incompatibity issue when I include
 spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in 
 my
 application but my cluster has spark version 1.2 ?






 On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 1. Here you are basically creating 2 receivers and asking each of
 them to consume 3 kafka partitions each.

 - In 1.2 we have high level consumers so how can we restrict no of
 kafka partitions to consume from? Say I have 300 kafka partitions in 
 kafka
 topic and as in above I gave 2 receivers and 3 kafka partitions . Then 
 is
 it mean I will read from 6 out of 300 partitions only and for rest 294
 partitions data is lost?


 2.One more doubt in spark streaming how is it decided which part of
 main function of driver will run at each batch interval ? Since whole 
 code
 is written in one function(main function in driver) so how it 
 determined
 kafka streams receivers  not to be registered in each batch only 
 processing
 to be done .






 On Mon, Jun 29, 2015 at 7:35 PM, ayan guha guha.a...@gmail.com
 wrote:

 Hi

 Let me take ashot at your questions. (I am sure people like Cody
 and TD will correct if I am wrong)

 0. This is exact copy from the similar question in mail thread
 from Akhil D:
 Since you set local[4] you will have 4 threads for your
 computation, and since you are having 2 receivers, you 

Re: spark streaming with kafka reset offset

2015-07-14 Thread Tathagata Das
Relevant documentation -
https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
towards the end.

directKafkaStream.foreachRDD { rdd =
 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
 // offsetRanges.length = # of Kafka partitions being consumed
 ...
 }


On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger c...@koeninger.org wrote:

 You have access to the offset ranges for a given rdd in the stream by
 typecasting to HasOffsetRanges.  You can then store the offsets wherever
 you need to.

 On Tue, Jul 14, 2015 at 5:00 PM, Chen Song chen.song...@gmail.com wrote:

 A follow up question.

 When using createDirectStream approach, the offsets are checkpointed to
 HDFS and it is understandable by Spark Streaming job. Is there a way to
 expose the offsets via a REST api to end users. Or alternatively, is there
 a way to have offsets committed to Kafka Offset Manager so users can query
 from a consumer programmatically?

 Essentially, all I need to do is monitor the progress of data consumption
 of the Kafka topic.


 On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger c...@koeninger.org
 wrote:

 You can't use different versions of spark in your application vs your
 cluster.

 For the direct stream, it's not 60 partitions per executor, it's 300
 partitions, and executors work on them as they are scheduled.  Yes, if you
 have no messages you will get an empty partition.  It's up to you whether
 it's worthwhile to call coalesce or not.

 On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Is this 3 is no of parallel consumer threads per receiver , means in
 total we have 2*3=6 consumer in same consumer group consuming from all 300
 partitions.
 3 is just parallelism on same receiver and recommendation is to use 1
 per receiver since consuming from kafka is not cpu bound rather NIC(network
 bound)  increasing consumer thread on one receiver won't make it parallel
 in ideal sense ?

 In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
 topic has 300 partions , does kafkaRDD created on 5 executors will have 60
 partitions per executor (total 300 one to one mapping) and if some of kafka
 partitions are empty say offset of last checkpoint to current is same for
 partitons P123, still it will create empty partition in kafkaRDD ? So we
 should call coalesce on kafkaRDD ?


 And is there any incompatibity issue when I include
 spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in my
 application but my cluster has spark version 1.2 ?






 On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 1. Here you are basically creating 2 receivers and asking each of them
 to consume 3 kafka partitions each.

 - In 1.2 we have high level consumers so how can we restrict no of
 kafka partitions to consume from? Say I have 300 kafka partitions in kafka
 topic and as in above I gave 2 receivers and 3 kafka partitions . Then is
 it mean I will read from 6 out of 300 partitions only and for rest 294
 partitions data is lost?


 2.One more doubt in spark streaming how is it decided which part of
 main function of driver will run at each batch interval ? Since whole code
 is written in one function(main function in driver) so how it determined
 kafka streams receivers  not to be registered in each batch only 
 processing
 to be done .






 On Mon, Jun 29, 2015 at 7:35 PM, ayan guha guha.a...@gmail.com
 wrote:

 Hi

 Let me take ashot at your questions. (I am sure people like Cody and
 TD will correct if I am wrong)

 0. This is exact copy from the similar question in mail thread from
 Akhil D:
 Since you set local[4] you will have 4 threads for your computation,
 and since you are having 2 receivers, you are left with 2 threads to
 process ((0 + 2) -- This 2 is your 2 threads.) And the other /2
 means you are having 2 tasks in that stage (with id 0).

 1. Here you are basically creating 2 receivers and asking each of
 them to consume 3 kafka partitions each.
 2. How does that matter? It depends on how many receivers you have
 created to consume that data and if you have repartitioned it. Remember,
 spark is lazy and executors are relted to the context
 3. I think in java, factory method is fixed. You just pass around the
 contextFactory object. (I love python :) see the signature isso much
 cleaner :) )
 4. Yes, if you use spark checkpointing. You can use yourcustom check
 pointing too.

 Best
 Ayan



 On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Few doubts :

 In 1.2 streaming when I use union of streams , my streaming
 application getting hanged sometimes and nothing gets printed on driver.


 [Stage 2:

 (0 + 2) / 2]
  Whats is 0+2/2 here signifies.



 1.Does no of streams in topicsMap.put(testSparkPartitioned, 3);
 be same as numstreams=2 ? in unioned stream ?

 2. I launched app on yarnRM with num-executors as 5 . It created 

Re: spark streaming with kafka reset offset

2015-07-14 Thread Tathagata Das
On Tue, Jul 14, 2015 at 6:42 PM, Chen Song chen.song...@gmail.com wrote:

 Thanks TD and Cody. I saw that.

 1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets
 on HDFS at the end of each batch interval?


The timing is not guaranteed.


 2. In the code, if I first apply transformations and actions on the
 directKafkaStream and then use foreachRDD on the original KafkaDStream to
 commit offsets myself, will offsets commits always happen after
 transformation and action?

 What do you mean by original KafkaDStream? if you meant the
directKafkaStream? If yes, then yes, output operations like foreachRDD is
executed in each batch in the same order as they are defined.

dstream1.foreachRDD { rdd = func1(rdd) }
dstream2.foreachRDD { rdd = func2(rdd) }

In every batch interval, func1 will be executed before func2.




 Chen

 On Tue, Jul 14, 2015 at 6:43 PM, Tathagata Das t...@databricks.com
 wrote:

 Relevant documentation -
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
 towards the end.

 directKafkaStream.foreachRDD { rdd =
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  // offsetRanges.length = # of Kafka partitions being consumed
  ...
  }


 On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger c...@koeninger.org
 wrote:

 You have access to the offset ranges for a given rdd in the stream by
 typecasting to HasOffsetRanges.  You can then store the offsets wherever
 you need to.

 On Tue, Jul 14, 2015 at 5:00 PM, Chen Song chen.song...@gmail.com
 wrote:

 A follow up question.

 When using createDirectStream approach, the offsets are checkpointed to
 HDFS and it is understandable by Spark Streaming job. Is there a way to
 expose the offsets via a REST api to end users. Or alternatively, is there
 a way to have offsets committed to Kafka Offset Manager so users can query
 from a consumer programmatically?

 Essentially, all I need to do is monitor the progress of data
 consumption of the Kafka topic.


 On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger c...@koeninger.org
 wrote:

 You can't use different versions of spark in your application vs your
 cluster.

 For the direct stream, it's not 60 partitions per executor, it's 300
 partitions, and executors work on them as they are scheduled.  Yes, if you
 have no messages you will get an empty partition.  It's up to you whether
 it's worthwhile to call coalesce or not.

 On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Is this 3 is no of parallel consumer threads per receiver , means in
 total we have 2*3=6 consumer in same consumer group consuming from all 
 300
 partitions.
 3 is just parallelism on same receiver and recommendation is to use 1
 per receiver since consuming from kafka is not cpu bound rather 
 NIC(network
 bound)  increasing consumer thread on one receiver won't make it parallel
 in ideal sense ?

 In non receiver based consumer spark 1.3 If I use 5 execuots and
 kafka topic has 300 partions , does kafkaRDD created on 5 executors will
 have 60 partitions per executor (total 300 one to one mapping) and if 
 some
 of kafka partitions are empty say offset of last checkpoint to current is
 same for partitons P123, still it will create empty partition in 
 kafkaRDD ?
 So we should call coalesce on kafkaRDD ?


 And is there any incompatibity issue when I include
 spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in my
 application but my cluster has spark version 1.2 ?






 On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 1. Here you are basically creating 2 receivers and asking each of
 them to consume 3 kafka partitions each.

 - In 1.2 we have high level consumers so how can we restrict no of
 kafka partitions to consume from? Say I have 300 kafka partitions in 
 kafka
 topic and as in above I gave 2 receivers and 3 kafka partitions . Then 
 is
 it mean I will read from 6 out of 300 partitions only and for rest 294
 partitions data is lost?


 2.One more doubt in spark streaming how is it decided which part of
 main function of driver will run at each batch interval ? Since whole 
 code
 is written in one function(main function in driver) so how it determined
 kafka streams receivers  not to be registered in each batch only 
 processing
 to be done .






 On Mon, Jun 29, 2015 at 7:35 PM, ayan guha guha.a...@gmail.com
 wrote:

 Hi

 Let me take ashot at your questions. (I am sure people like Cody
 and TD will correct if I am wrong)

 0. This is exact copy from the similar question in mail thread from
 Akhil D:
 Since you set local[4] you will have 4 threads for your
 computation, and since you are having 2 receivers, you are left
 with 2 threads to process ((0 + 2) -- This 2 is your 2 threads.)
 And the other /2 means you are having 2 tasks in that stage (with
 id 0).

 1. Here you are basically creating 2 receivers and asking each of
 them to consume 3 kafka partitions each.
 2. How does 

Re: spark streaming with kafka reset offset

2015-06-30 Thread Shushant Arora
Is this 3 is no of parallel consumer threads per receiver , means in total
we have 2*3=6 consumer in same consumer group consuming from all 300
partitions.
3 is just parallelism on same receiver and recommendation is to use 1 per
receiver since consuming from kafka is not cpu bound rather NIC(network
bound)  increasing consumer thread on one receiver won't make it parallel
in ideal sense ?

In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
topic has 300 partions , does kafkaRDD created on 5 executors will have 60
partitions per executor (total 300 one to one mapping) and if some of kafka
partitions are empty say offset of last checkpoint to current is same for
partitons P123, still it will create empty partition in kafkaRDD ? So we
should call coalesce on kafkaRDD ?


And is there any incompatibity issue when I include spark-streaming_2.10
(version 1.3) and spark-core_2.10(version 1.3) in my application but my
cluster has spark version 1.2 ?






On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 1. Here you are basically creating 2 receivers and asking each of them to
 consume 3 kafka partitions each.

 - In 1.2 we have high level consumers so how can we restrict no of kafka
 partitions to consume from? Say I have 300 kafka partitions in kafka topic
 and as in above I gave 2 receivers and 3 kafka partitions . Then is it mean
 I will read from 6 out of 300 partitions only and for rest 294 partitions
 data is lost?


 2.One more doubt in spark streaming how is it decided which part of main
 function of driver will run at each batch interval ? Since whole code is
 written in one function(main function in driver) so how it determined kafka
 streams receivers  not to be registered in each batch only processing to be
 done .






 On Mon, Jun 29, 2015 at 7:35 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 Let me take ashot at your questions. (I am sure people like Cody and TD
 will correct if I am wrong)

 0. This is exact copy from the similar question in mail thread from Akhil
 D:
 Since you set local[4] you will have 4 threads for your computation, and
 since you are having 2 receivers, you are left with 2 threads to process
 ((0 + 2) -- This 2 is your 2 threads.) And the other /2 means you are
 having 2 tasks in that stage (with id 0).

 1. Here you are basically creating 2 receivers and asking each of them to
 consume 3 kafka partitions each.
 2. How does that matter? It depends on how many receivers you have
 created to consume that data and if you have repartitioned it. Remember,
 spark is lazy and executors are relted to the context
 3. I think in java, factory method is fixed. You just pass around the
 contextFactory object. (I love python :) see the signature isso much
 cleaner :) )
 4. Yes, if you use spark checkpointing. You can use yourcustom check
 pointing too.

 Best
 Ayan



 On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Few doubts :

 In 1.2 streaming when I use union of streams , my streaming application
 getting hanged sometimes and nothing gets printed on driver.


 [Stage 2:

 (0 + 2) / 2]
  Whats is 0+2/2 here signifies.



 1.Does no of streams in topicsMap.put(testSparkPartitioned, 3); be
 same as numstreams=2 ? in unioned stream ?

 2. I launched app on yarnRM with num-executors as 5 . It created 2
 receivers and 5 execuots . As in stream receivers nodes get fixed at start
 of app throughout its lifetime . Does executors gets allicated at start of
 each job on 1s batch interval? If yes, how does its fast to allocate
 resources. I mean if i increase num-executors to 50 , it will negotiate 50
 executors from yarnRM at start of each job so does it takes more time in
 allocating executors than batch interval(here 1s , say if 500ms).? Can i
 fixed processing executors also throughout the app?




 SparkConf conf = new SparkConf().setAppName(SampleSparkStreamingApp);
 JavaStreamingContext jssc = new
 JavaStreamingContext(conf,Durations.milliseconds(1000));

 MapString,String kafkaParams = new HashMapString, String();
 kafkaParams.put(zookeeper.connect,ipadd:2181);
 kafkaParams.put(group.id, testgroup);
 kafkaParams.put(zookeeper.session.timeout.ms, 1);
  MapString,Integer topicsMap = new HashMapString,Integer();
 topicsMap.put(testSparkPartitioned, 3);
 int numStreams = 2;
 ListJavaPairDStreambyte[],byte[] kafkaStreams = new
 ArrayListJavaPairDStreambyte[], byte[]();
   for(int i=0;inumStreams;i++){
  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
 byte[].class,kafka.serializer.DefaultDecoder.class ,
 kafka.serializer.DefaultDecoder.class,
 kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
 }
  JavaPairDStreambyte[], byte[] directKafkaStream =
 jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
 kafkaStreams.size()));
  JavaDStreamString lines = directKafkaStream.map(new
 FunctionTuple2byte[],byte[], String() {

 public String 

Re: spark streaming with kafka reset offset

2015-06-30 Thread Cody Koeninger
You can't use different versions of spark in your application vs your
cluster.

For the direct stream, it's not 60 partitions per executor, it's 300
partitions, and executors work on them as they are scheduled.  Yes, if you
have no messages you will get an empty partition.  It's up to you whether
it's worthwhile to call coalesce or not.

On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 Is this 3 is no of parallel consumer threads per receiver , means in total
 we have 2*3=6 consumer in same consumer group consuming from all 300
 partitions.
 3 is just parallelism on same receiver and recommendation is to use 1 per
 receiver since consuming from kafka is not cpu bound rather NIC(network
 bound)  increasing consumer thread on one receiver won't make it parallel
 in ideal sense ?

 In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
 topic has 300 partions , does kafkaRDD created on 5 executors will have 60
 partitions per executor (total 300 one to one mapping) and if some of kafka
 partitions are empty say offset of last checkpoint to current is same for
 partitons P123, still it will create empty partition in kafkaRDD ? So we
 should call coalesce on kafkaRDD ?


 And is there any incompatibity issue when I include spark-streaming_2.10
 (version 1.3) and spark-core_2.10(version 1.3) in my application but my
 cluster has spark version 1.2 ?






 On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 1. Here you are basically creating 2 receivers and asking each of them to
 consume 3 kafka partitions each.

 - In 1.2 we have high level consumers so how can we restrict no of kafka
 partitions to consume from? Say I have 300 kafka partitions in kafka topic
 and as in above I gave 2 receivers and 3 kafka partitions . Then is it mean
 I will read from 6 out of 300 partitions only and for rest 294 partitions
 data is lost?


 2.One more doubt in spark streaming how is it decided which part of main
 function of driver will run at each batch interval ? Since whole code is
 written in one function(main function in driver) so how it determined kafka
 streams receivers  not to be registered in each batch only processing to be
 done .






 On Mon, Jun 29, 2015 at 7:35 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 Let me take ashot at your questions. (I am sure people like Cody and TD
 will correct if I am wrong)

 0. This is exact copy from the similar question in mail thread from
 Akhil D:
 Since you set local[4] you will have 4 threads for your computation, and
 since you are having 2 receivers, you are left with 2 threads to
 process ((0 + 2) -- This 2 is your 2 threads.) And the other /2 means
 you are having 2 tasks in that stage (with id 0).

 1. Here you are basically creating 2 receivers and asking each of them
 to consume 3 kafka partitions each.
 2. How does that matter? It depends on how many receivers you have
 created to consume that data and if you have repartitioned it. Remember,
 spark is lazy and executors are relted to the context
 3. I think in java, factory method is fixed. You just pass around the
 contextFactory object. (I love python :) see the signature isso much
 cleaner :) )
 4. Yes, if you use spark checkpointing. You can use yourcustom check
 pointing too.

 Best
 Ayan



 On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Few doubts :

 In 1.2 streaming when I use union of streams , my streaming application
 getting hanged sometimes and nothing gets printed on driver.


 [Stage 2:

   (0 + 2) / 2]
  Whats is 0+2/2 here signifies.



 1.Does no of streams in topicsMap.put(testSparkPartitioned, 3); be
 same as numstreams=2 ? in unioned stream ?

 2. I launched app on yarnRM with num-executors as 5 . It created 2
 receivers and 5 execuots . As in stream receivers nodes get fixed at start
 of app throughout its lifetime . Does executors gets allicated at start of
 each job on 1s batch interval? If yes, how does its fast to allocate
 resources. I mean if i increase num-executors to 50 , it will negotiate 50
 executors from yarnRM at start of each job so does it takes more time in
 allocating executors than batch interval(here 1s , say if 500ms).? Can i
 fixed processing executors also throughout the app?




 SparkConf conf = new SparkConf().setAppName(SampleSparkStreamingApp);
 JavaStreamingContext jssc = new
 JavaStreamingContext(conf,Durations.milliseconds(1000));

 MapString,String kafkaParams = new HashMapString, String();
 kafkaParams.put(zookeeper.connect,ipadd:2181);
 kafkaParams.put(group.id, testgroup);
 kafkaParams.put(zookeeper.session.timeout.ms, 1);
  MapString,Integer topicsMap = new HashMapString,Integer();
 topicsMap.put(testSparkPartitioned, 3);
 int numStreams = 2;
 ListJavaPairDStreambyte[],byte[] kafkaStreams = new
 ArrayListJavaPairDStreambyte[], byte[]();
   for(int i=0;inumStreams;i++){
  

  1   2   >