Spark streaming / confluent Kafka- messages are empty

2022-06-09 Thread KhajaAsmath Mohammed


Hi,

I am trying to read data from confluent Kafka using  avro schema registry. 
Messages are always empty and stream always shows empty records. Any suggestion 
on this please ??

Thanks,
Asmath
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming with Kafka

2020-11-03 Thread Kevin Pis
Hi,

this is my  Word Count demo.  https://github.com/kevincmchen/wordcount

MohitAbbi  于2020年11月4日周三 上午3:32写道:

> Hi,
>
> Can you please share the correct versions of JAR files which you used to
> resolve the issue. I'm also facing the same issue.
>
> Thanks
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 

Best,

Kevin Pis


Re: Spark streaming with Kafka

2020-11-03 Thread MohitAbbi
Hi,

Can you please share the correct versions of JAR files which you used to
resolve the issue. I'm also facing the same issue.

Thanks




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Kafka and Python

2020-08-12 Thread Sean Owen
What supports Python in (Kafka?) 0.8? I don't think Spark ever had a
specific Python-Kafka integration. But you have always been able to
use it to read DataFrames as in Structured Streaming.
Kafka 0.8 support is deprecated (gone in 3.0) but 0.10 means 0.10+ -
works with the latest 2.x.
What is the issue?

On Wed, Aug 12, 2020 at 7:53 AM German Schiavon
 wrote:
>
> Hey,
>
> Maybe I'm missing some restriction with EMR, but have you tried to use 
> Structured Streaming instead of Spark Streaming?
>
> https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html
>
> Regards
>
> On Wed, 12 Aug 2020 at 14:12, Hamish Whittal  
> wrote:
>>
>> Hi folks,
>>
>> Thought I would ask here because it's somewhat confusing. I'm using Spark 
>> 2.4.5 on EMR 5.30.1 with Amazon MSK.
>>
>> The version of Scala used is 2.11.12. I'm using this version of the 
>> libraries spark-streaming-kafka-0-8_2.11-2.4.5.jar
>>
>> Now I'm wanting to read from Kafka topics using Python (I need to stick to 
>> Python specifically).
>>
>> What seems confusing is that 0.8 has Python support, but 0.10 does not. Then 
>> 0.8 seems to have been deprecated as of Spark 2.3.0, so if I'm using 2.4.5 
>> then clearly I'm going to hit a roadblock here.
>>
>> Can someone clarify these things for me? Have I got this right?
>>
>> Thanks in advance,
>> Hamish

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Kafka and Python

2020-08-12 Thread German Schiavon
Hey,

Maybe I'm missing some restriction with EMR, but have you tried to use
Structured Streaming instead of Spark Streaming?

https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html

Regards

On Wed, 12 Aug 2020 at 14:12, Hamish Whittal 
wrote:

> Hi folks,
>
> Thought I would ask here because it's somewhat confusing. I'm using Spark
> 2.4.5 on EMR 5.30.1 with Amazon MSK.
>
> The version of Scala used is 2.11.12. I'm using this version of the
> libraries spark-streaming-kafka-0-8_2.11-2.4.5.jar
>
> Now I'm wanting to read from Kafka topics using Python (I need to stick to
> Python specifically).
>
> What seems confusing is that 0.8 has Python support, but 0.10 does not.
> Then 0.8 seems to have been deprecated as of Spark 2.3.0, so if I'm using
> 2.4.5 then clearly I'm going to hit a roadblock here.
>
> Can someone clarify these things for me? Have I got this right?
>
> Thanks in advance,
> Hamish
>


Spark Streaming with Kafka and Python

2020-08-12 Thread Hamish Whittal
Hi folks,

Thought I would ask here because it's somewhat confusing. I'm using Spark
2.4.5 on EMR 5.30.1 with Amazon MSK.

The version of Scala used is 2.11.12. I'm using this version of the
libraries spark-streaming-kafka-0-8_2.11-2.4.5.jar

Now I'm wanting to read from Kafka topics using Python (I need to stick to
Python specifically).

What seems confusing is that 0.8 has Python support, but 0.10 does not.
Then 0.8 seems to have been deprecated as of Spark 2.3.0, so if I'm using
2.4.5 then clearly I'm going to hit a roadblock here.

Can someone clarify these things for me? Have I got this right?

Thanks in advance,
Hamish


Re: Spark streaming with Kafka

2020-07-02 Thread dwgw
Hi

I am able to correct the issue. The issue was due to wrong version of JAR
file I have used. I have removed the these JAR files and copied correct
version of JAR files and the error has gone away.

Regards



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming with Kafka

2020-07-02 Thread Jungtaek Lim
I can't reproduce. Could you please make sure you're running spark-shell
with official spark 3.0.0 distribution? Please try out changing the
directory and using relative path like "./spark-shell".

On Thu, Jul 2, 2020 at 9:59 PM dwgw  wrote:

> Hi
> I am trying to stream kafka topic from spark shell but i am getting the
> following error.
> I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM) 64-Bit Server VM,
> *Java 1.8.0_212*)
>
> *[spark@hdp-dev ~]$ spark-shell --packages
> org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*
> Ivy Default Cache set to: /home/spark/.ivy2/cache
> The jars for the packages stored in: /home/spark/.ivy2/jars
> :: loading settings :: url =
>
> jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
> org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
> :: resolving dependencies ::
>
> org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0
> confs: [default]
> found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
> found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0
> in
> central
> found org.apache.kafka#kafka-clients;2.4.1 in central
> found com.github.luben#zstd-jni;1.4.4-3 in central
> found org.lz4#lz4-java;1.7.1 in central
> found org.xerial.snappy#snappy-java;1.1.7.5 in central
> found org.slf4j#slf4j-api;1.7.30 in central
> found org.spark-project.spark#unused;1.0.0 in central
> found org.apache.commons#commons-pool2;2.6.2 in central
> :: resolution report :: resolve 502ms :: artifacts dl 10ms
> :: modules in use:
> com.github.luben#zstd-jni;1.4.4-3 from central in [default]
> org.apache.commons#commons-pool2;2.6.2 from central in [default]
> org.apache.kafka#kafka-clients;2.4.1 from central in [default]
> org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in
> [default]
> org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from
> central in [default]
> org.lz4#lz4-java;1.7.1 from central in [default]
> org.slf4j#slf4j-api;1.7.30 from central in [default]
> org.spark-project.spark#unused;1.0.0 from central in [default]
> org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]
>
> -
> |  |modules||   artifacts
> |
> |   conf   | number| search|dwnlded|evicted||
> number|dwnlded|
>
> -
> |  default |   9   |   0   |   0   |   0   ||   9   |   0
> |
>
> -
> :: retrieving ::
> org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226
> confs: [default]
> 0 artifacts copied, 9 already retrieved (0kB/13ms)
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
> Spark context Web UI available at http://hdp-dev.infodetics.com:4040
> Spark context available as 'sc' (master = yarn, app id =
> application_1593620640299_0015).
> Spark session available as 'spark'.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 3.0.0
>   /_/
>
> Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_212)
> Type in expressions to have them evaluated.
> Type :help for more information.
>
>
> scala> val df = spark.
>  | readStream.
>  | format("kafka").
>  | option("kafka.bootstrap.servers", "XXX").
>  | option("subscribe", "XXX").
>  | option("kafka.sasl.mechanisms", "XXX").
>  | option("kafka.security.protocol", "XXX").
>  | option("kafka.sasl.username","XXX").
>  | option("kafka.sasl.password", "XXX").
>  | option("startingOffsets", "earliest").
>  | load
> java.lang.AbstractMethodError: Method
>
> org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
> is abstract
>   at
>
> org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
>   at
>
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
>   at
>
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
>   ... 57 elided
>
> Looking forward for a response.
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark streaming with Kafka

2020-07-02 Thread dwgw
Hi
I am trying to stream kafka topic from spark shell but i am getting the
following error. 
I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM) 64-Bit Server VM,
*Java 1.8.0_212*)

*[spark@hdp-dev ~]$ spark-shell --packages
org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*
Ivy Default Cache set to: /home/spark/.ivy2/cache
The jars for the packages stored in: /home/spark/.ivy2/jars
:: loading settings :: url =
jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in
central
found org.apache.kafka#kafka-clients;2.4.1 in central
found com.github.luben#zstd-jni;1.4.4-3 in central
found org.lz4#lz4-java;1.7.1 in central
found org.xerial.snappy#snappy-java;1.1.7.5 in central
found org.slf4j#slf4j-api;1.7.30 in central
found org.spark-project.spark#unused;1.0.0 in central
found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 502ms :: artifacts dl 10ms
:: modules in use:
com.github.luben#zstd-jni;1.4.4-3 from central in [default]
org.apache.commons#commons-pool2;2.6.2 from central in [default]
org.apache.kafka#kafka-clients;2.4.1 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in
[default]
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from
central in [default]
org.lz4#lz4-java;1.7.1 from central in [default]
org.slf4j#slf4j-api;1.7.30 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]
   
-
|  |modules||   artifacts  
|
|   conf   | number| search|dwnlded|evicted||
number|dwnlded|
   
-
|  default |   9   |   0   |   0   |   0   ||   9   |   0  
|
   
-
:: retrieving ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226
confs: [default]
0 artifacts copied, 9 already retrieved (0kB/13ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
Spark context Web UI available at http://hdp-dev.infodetics.com:4040
Spark context available as 'sc' (master = yarn, app id =
application_1593620640299_0015).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0
  /_/
 
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.


scala> val df = spark.
 | readStream.
 | format("kafka").
 | option("kafka.bootstrap.servers", "XXX").
 | option("subscribe", "XXX").
 | option("kafka.sasl.mechanisms", "XXX").
 | option("kafka.security.protocol", "XXX").
 | option("kafka.sasl.username","XXX").
 | option("kafka.sasl.password", "XXX").
 | option("startingOffsets", "earliest").
 | load
java.lang.AbstractMethodError: Method
org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
is abstract
  at
org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
  at
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
  at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
  ... 57 elided

Looking forward for a response.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming with Kafka

2020-07-02 Thread dwgw
HiI am trying to stream kafka topic from spark shell but i am getting the
following error. I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM)
64-Bit Server VM, *Java 1.8.0_212*)*[spark@hdp-dev ~]$ spark-shell
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*Ivy Default
Cache set to: /home/spark/.ivy2/cacheThe jars for the packages stored in:
/home/spark/.ivy2/jars:: loading settings :: url =
jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xmlorg.apache.spark#spark-sql-kafka-0-10_2.12
added as a dependency:: resolving dependencies ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0   

confs: [default]found
org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in centralfound
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central   
found org.apache.kafka#kafka-clients;2.4.1 in centralfound
com.github.luben#zstd-jni;1.4.4-3 in centralfound
org.lz4#lz4-java;1.7.1 in centralfound
org.xerial.snappy#snappy-java;1.1.7.5 in centralfound
org.slf4j#slf4j-api;1.7.30 in centralfound
org.spark-project.spark#unused;1.0.0 in centralfound
org.apache.commons#commons-pool2;2.6.2 in central:: resolution report ::
resolve 502ms :: artifacts dl 10ms:: modules in use:   
com.github.luben#zstd-jni;1.4.4-3 from central in [default]   
org.apache.commons#commons-pool2;2.6.2 from central in [default]   
org.apache.kafka#kafka-clients;2.4.1 from central in [default]   
org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in [default]  
 
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from central in
[default]org.lz4#lz4-java;1.7.1 from central in [default]   
org.slf4j#slf4j-api;1.7.30 from central in [default]   
org.spark-project.spark#unused;1.0.0 from central in [default]   
org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]   
-   
|  |modules||   artifacts   |   
|   conf   | number| search|dwnlded|evicted|| number|dwnlded|   
-   
|  default |   9   |   0   |   0   |   0   ||   9   |   0   |   
-::
retrieving ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226   
confs: [default]0 artifacts copied, 9 already retrieved
(0kB/13ms)Setting default log level to "WARN".To adjust logging level use
sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).Spark
context Web UI available at http://hdp-dev.infodetics.com:4040Spark context
available as 'sc' (master = yarn, app id =
application_1593620640299_0015).Spark session available as 'spark'.Welcome
to    __ / __/__  ___ _/ /___\ \/ _ \/ _ `/
__/  '_/   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0  /_/ Using
Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_212)Type in expressions to have them evaluated.Type :help for more
information.scala> val df = spark. | readStream. | format("kafka").
| option("kafka.bootstrap.servers", "XXX"). | option("subscribe",
"XXX"). | option("kafka.sasl.mechanisms", "XXX"). |
option("kafka.security.protocol", "XXX"). |
option("kafka.sasl.username","XXX"). | option("kafka.sasl.password",
"XXX"). | option("startingOffsets", "earliest"). |
loadjava.lang.AbstractMethodError: Method
org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
is abstract  at
org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
 
at
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
 
at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
 
... 57 elidedLooking forward for a response.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

Spark Streaming loading kafka source value column type

2019-03-01 Thread oskarryn

Hi,

Why is `value` column in streamed dataframe obtained from kafka topic 
natively of binary type (look at the table 
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) 
if in fact it holds a string with the message's data and we CAST it as 
string anyways?


Oskar


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming with kafka input stuck in (Re-)joing group because of group rebalancing

2018-05-15 Thread JF Chen
When I terminate a spark streaming application and restart it, it always
stuck in this step:
>
> Revoking previously assigned partitions [] for group [mygroup]
> (Re-)joing group [mygroup]


If I use a new group id, even though it works fine, I may lose the data
from the last time I read the previous group id.

So how to solve it?


Regard,
Junfeng Chen


Re: Checkpoints not cleaned using Spark streaming + watermarking + kafka

2017-09-22 Thread MathieuP
The expected setting to clean these files is :
- spark.sql.streaming.minBatchesToRetain

More info on structured streaming settings :
https://github.com/jaceklaskowski/spark-structured-streaming-book/blob/master/spark-sql-streaming-properties.adoc





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Checkpoints not cleaned using Spark streaming + watermarking + kafka

2017-09-21 Thread MathieuP
Hi Spark Users ! :)

I come to you with a question about checkpoints. 
I have a streaming application that consumes and produces to Kafka.
The computation requires a window and watermarking.
Since this is a streaming application with a Kafka output, a checkpoint is
expected.

The application runs using spark-submit on a single master and writes on the
local hard drive. 
It runs fine until the number of checkpoints files in "state" directory
totally fills the disk.
It is  due to the fact that there is no more inode available (not a space
issue ; but tens of thousands inodes are consumed).

I searched in the docs and SO.

I've found the settings :
- spark.cleaner.referenceTracking.cleanCheckpoints
- spark.cleaner.periodicGC.interval
I set them from the app and from the command line, without any success.
Do I misuse them ? Is there another setting ?

I can also see this kind of logs :
...
17/09/21 23:27:46 INFO spark.ContextCleaner: Cleaned accumulator 25
17/09/21 23:27:46 INFO spark.ContextCleaner: Cleaned accumulator 11
17/09/21 23:27:46 INFO spark.ContextCleaner: Cleaned shuffle 0
17/09/21 23:27:46 INFO spark.ContextCleaner: Cleaned accumulator 7
...

A sample that reproduces the issue:
The window, watermarking and output trigger durations are set to 10 seconds.
The kafka topic is quite small (2 messages per seconds are added).

https://gist.github.com/anonymous/2e83db84d5190ed1ad7a7d2d5cd632f0

Regards,




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming handling Kafka exceptions

2017-07-17 Thread Jean-Francois Gosselin
How can I handle an error with Kafka with my DirectStream (network issue,
zookeeper or broker going down) ? For example when the consumer fails to
connect with Kafka (at startup) I only get a DEBUG log (not even an ERROR)
and no exception are thrown ...

I'm using Spark 2.1.1 and spark-streaming-kafka-0-10.

16:50:23.149 [ForkJoinPool-1-worker-5] DEBUG
o.a.kafka.common.network.Selector - Connection with localhost/127.0.0.1
disconnected
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
at
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:81)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:335)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)


Thanks


Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-27 Thread Cody Koeninger
If you're looking for some kind of instrumentation finer than at batch
boundaries, you'd have to do something with the individual messages
yourself.  You have full access to the individual messages including
offset.

On Thu, Apr 27, 2017 at 1:27 PM, Dominik Safaric
<dominiksafa...@gmail.com> wrote:
> Of course I am not asking to commit for every message. But instead of, 
> seeking to commit the last consumed offset at a given interval. For example, 
> from the 1st until the 5th second, messages until offset 100.000 of the 
> partition 10 were consumed, then from the 6th until the 10th second of 
> executing the last consumed offset of the same partition was 200.000 - and so 
> forth. This is the information I seek to get.
>
>> On 27 Apr 2017, at 20:11, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Are you asking for commits for every message?  Because that will kill
>> performance.
>>
>> On Thu, Apr 27, 2017 at 11:33 AM, Dominik Safaric
>> <dominiksafa...@gmail.com> wrote:
>>> Indeed I have. But, even when storing the offsets in Spark and committing 
>>> offsets upon completion of an output operation within the foreachRDD call 
>>> (as pointed in the example), the only offset that Spark’s Kafka 
>>> implementation commits to Kafka is the offset of the last message. For 
>>> example, if I have 100 million messages, then Spark will commit only the 
>>> 100 millionth offset, and the offsets of the intermediate batches - and 
>>> hence the questions.
>>>
>>>> On 26 Apr 2017, at 21:42, Cody Koeninger <c...@koeninger.org> wrote:
>>>>
>>>> have you read
>>>>
>>>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself
>>>>
>>>> On Wed, Apr 26, 2017 at 1:17 PM, Dominik Safaric
>>>> <dominiksafa...@gmail.com> wrote:
>>>>> The reason why I want to obtain this information, i.e. <partition, 
>>>>> offset, timestamp> tuples is to relate the consumption with the 
>>>>> production rates using the __consumer_offsets Kafka internal topic. 
>>>>> Interestedly, the Spark’s KafkaConsumer implementation does not auto 
>>>>> commit the offsets upon offset commit expiration, because as seen in the 
>>>>> logs, Spark overrides the enable.auto.commit property to false.
>>>>>
>>>>> Any idea onto how to use the KafkaConsumer’s auto offset commits? Keep in 
>>>>> mind that I do not care about exactly-once, hence having messages 
>>>>> replayed is perfectly fine.
>>>>>
>>>>>> On 26 Apr 2017, at 19:26, Cody Koeninger <c...@koeninger.org> wrote:
>>>>>>
>>>>>> What is it you're actually trying to accomplish?
>>>>>>
>>>>>> You can get topic, partition, and offset bounds from an offset range like
>>>>>>
>>>>>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
>>>>>>
>>>>>> Timestamp isn't really a meaningful idea for a range of offsets.
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
>>>>>> <dominiksafa...@gmail.com> wrote:
>>>>>>> Hi all,
>>>>>>>
>>>>>>> Because the Spark Streaming direct Kafka consumer maps offsets for a 
>>>>>>> given
>>>>>>> Kafka topic and a partition internally while having enable.auto.commit 
>>>>>>> set
>>>>>>> to false, how can I retrieve the offset of each made consumer’s poll 
>>>>>>> call
>>>>>>> using the offset ranges of an RDD? More precisely, the information I 
>>>>>>> seek to
>>>>>>> get after each poll call is the following: <timestamp, offset, 
>>>>>>> partition>.
>>>>>>>
>>>>>>> Thanks in advance,
>>>>>>> Dominik
>>>>>>>
>>>>>
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-27 Thread Dominik Safaric
Of course I am not asking to commit for every message. But instead of, seeking 
to commit the last consumed offset at a given interval. For example, from the 
1st until the 5th second, messages until offset 100.000 of the partition 10 
were consumed, then from the 6th until the 10th second of executing the last 
consumed offset of the same partition was 200.000 - and so forth. This is the 
information I seek to get. 

> On 27 Apr 2017, at 20:11, Cody Koeninger <c...@koeninger.org> wrote:
> 
> Are you asking for commits for every message?  Because that will kill
> performance.
> 
> On Thu, Apr 27, 2017 at 11:33 AM, Dominik Safaric
> <dominiksafa...@gmail.com> wrote:
>> Indeed I have. But, even when storing the offsets in Spark and committing 
>> offsets upon completion of an output operation within the foreachRDD call 
>> (as pointed in the example), the only offset that Spark’s Kafka 
>> implementation commits to Kafka is the offset of the last message. For 
>> example, if I have 100 million messages, then Spark will commit only the 100 
>> millionth offset, and the offsets of the intermediate batches - and hence 
>> the questions.
>> 
>>> On 26 Apr 2017, at 21:42, Cody Koeninger <c...@koeninger.org> wrote:
>>> 
>>> have you read
>>> 
>>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself
>>> 
>>> On Wed, Apr 26, 2017 at 1:17 PM, Dominik Safaric
>>> <dominiksafa...@gmail.com> wrote:
>>>> The reason why I want to obtain this information, i.e. <partition, offset, 
>>>> timestamp> tuples is to relate the consumption with the production rates 
>>>> using the __consumer_offsets Kafka internal topic. Interestedly, the 
>>>> Spark’s KafkaConsumer implementation does not auto commit the offsets upon 
>>>> offset commit expiration, because as seen in the logs, Spark overrides the 
>>>> enable.auto.commit property to false.
>>>> 
>>>> Any idea onto how to use the KafkaConsumer’s auto offset commits? Keep in 
>>>> mind that I do not care about exactly-once, hence having messages replayed 
>>>> is perfectly fine.
>>>> 
>>>>> On 26 Apr 2017, at 19:26, Cody Koeninger <c...@koeninger.org> wrote:
>>>>> 
>>>>> What is it you're actually trying to accomplish?
>>>>> 
>>>>> You can get topic, partition, and offset bounds from an offset range like
>>>>> 
>>>>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
>>>>> 
>>>>> Timestamp isn't really a meaningful idea for a range of offsets.
>>>>> 
>>>>> 
>>>>> On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
>>>>> <dominiksafa...@gmail.com> wrote:
>>>>>> Hi all,
>>>>>> 
>>>>>> Because the Spark Streaming direct Kafka consumer maps offsets for a 
>>>>>> given
>>>>>> Kafka topic and a partition internally while having enable.auto.commit 
>>>>>> set
>>>>>> to false, how can I retrieve the offset of each made consumer’s poll call
>>>>>> using the offset ranges of an RDD? More precisely, the information I 
>>>>>> seek to
>>>>>> get after each poll call is the following: <timestamp, offset, 
>>>>>> partition>.
>>>>>> 
>>>>>> Thanks in advance,
>>>>>> Dominik
>>>>>> 
>>>> 
>> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-27 Thread Cody Koeninger
Are you asking for commits for every message?  Because that will kill
performance.

On Thu, Apr 27, 2017 at 11:33 AM, Dominik Safaric
<dominiksafa...@gmail.com> wrote:
> Indeed I have. But, even when storing the offsets in Spark and committing 
> offsets upon completion of an output operation within the foreachRDD call (as 
> pointed in the example), the only offset that Spark’s Kafka implementation 
> commits to Kafka is the offset of the last message. For example, if I have 
> 100 million messages, then Spark will commit only the 100 millionth offset, 
> and the offsets of the intermediate batches - and hence the questions.
>
>> On 26 Apr 2017, at 21:42, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> have you read
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself
>>
>> On Wed, Apr 26, 2017 at 1:17 PM, Dominik Safaric
>> <dominiksafa...@gmail.com> wrote:
>>> The reason why I want to obtain this information, i.e. <partition, offset, 
>>> timestamp> tuples is to relate the consumption with the production rates 
>>> using the __consumer_offsets Kafka internal topic. Interestedly, the 
>>> Spark’s KafkaConsumer implementation does not auto commit the offsets upon 
>>> offset commit expiration, because as seen in the logs, Spark overrides the 
>>> enable.auto.commit property to false.
>>>
>>> Any idea onto how to use the KafkaConsumer’s auto offset commits? Keep in 
>>> mind that I do not care about exactly-once, hence having messages replayed 
>>> is perfectly fine.
>>>
>>>> On 26 Apr 2017, at 19:26, Cody Koeninger <c...@koeninger.org> wrote:
>>>>
>>>> What is it you're actually trying to accomplish?
>>>>
>>>> You can get topic, partition, and offset bounds from an offset range like
>>>>
>>>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
>>>>
>>>> Timestamp isn't really a meaningful idea for a range of offsets.
>>>>
>>>>
>>>> On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
>>>> <dominiksafa...@gmail.com> wrote:
>>>>> Hi all,
>>>>>
>>>>> Because the Spark Streaming direct Kafka consumer maps offsets for a given
>>>>> Kafka topic and a partition internally while having enable.auto.commit set
>>>>> to false, how can I retrieve the offset of each made consumer’s poll call
>>>>> using the offset ranges of an RDD? More precisely, the information I seek 
>>>>> to
>>>>> get after each poll call is the following: <timestamp, offset, partition>.
>>>>>
>>>>> Thanks in advance,
>>>>> Dominik
>>>>>
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-27 Thread Dominik Safaric
Indeed I have. But, even when storing the offsets in Spark and committing 
offsets upon completion of an output operation within the foreachRDD call (as 
pointed in the example), the only offset that Spark’s Kafka implementation 
commits to Kafka is the offset of the last message. For example, if I have 100 
million messages, then Spark will commit only the 100 millionth offset, and the 
offsets of the intermediate batches - and hence the questions. 

> On 26 Apr 2017, at 21:42, Cody Koeninger <c...@koeninger.org> wrote:
> 
> have you read
> 
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself
> 
> On Wed, Apr 26, 2017 at 1:17 PM, Dominik Safaric
> <dominiksafa...@gmail.com> wrote:
>> The reason why I want to obtain this information, i.e. <partition, offset, 
>> timestamp> tuples is to relate the consumption with the production rates 
>> using the __consumer_offsets Kafka internal topic. Interestedly, the Spark’s 
>> KafkaConsumer implementation does not auto commit the offsets upon offset 
>> commit expiration, because as seen in the logs, Spark overrides the 
>> enable.auto.commit property to false.
>> 
>> Any idea onto how to use the KafkaConsumer’s auto offset commits? Keep in 
>> mind that I do not care about exactly-once, hence having messages replayed 
>> is perfectly fine.
>> 
>>> On 26 Apr 2017, at 19:26, Cody Koeninger <c...@koeninger.org> wrote:
>>> 
>>> What is it you're actually trying to accomplish?
>>> 
>>> You can get topic, partition, and offset bounds from an offset range like
>>> 
>>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
>>> 
>>> Timestamp isn't really a meaningful idea for a range of offsets.
>>> 
>>> 
>>> On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
>>> <dominiksafa...@gmail.com> wrote:
>>>> Hi all,
>>>> 
>>>> Because the Spark Streaming direct Kafka consumer maps offsets for a given
>>>> Kafka topic and a partition internally while having enable.auto.commit set
>>>> to false, how can I retrieve the offset of each made consumer’s poll call
>>>> using the offset ranges of an RDD? More precisely, the information I seek 
>>>> to
>>>> get after each poll call is the following: <timestamp, offset, partition>.
>>>> 
>>>> Thanks in advance,
>>>> Dominik
>>>> 
>> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-26 Thread Cody Koeninger
have you read

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself

On Wed, Apr 26, 2017 at 1:17 PM, Dominik Safaric
<dominiksafa...@gmail.com> wrote:
> The reason why I want to obtain this information, i.e. <partition, offset, 
> timestamp> tuples is to relate the consumption with the production rates 
> using the __consumer_offsets Kafka internal topic. Interestedly, the Spark’s 
> KafkaConsumer implementation does not auto commit the offsets upon offset 
> commit expiration, because as seen in the logs, Spark overrides the 
> enable.auto.commit property to false.
>
> Any idea onto how to use the KafkaConsumer’s auto offset commits? Keep in 
> mind that I do not care about exactly-once, hence having messages replayed is 
> perfectly fine.
>
>> On 26 Apr 2017, at 19:26, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> What is it you're actually trying to accomplish?
>>
>> You can get topic, partition, and offset bounds from an offset range like
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
>>
>> Timestamp isn't really a meaningful idea for a range of offsets.
>>
>>
>> On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
>> <dominiksafa...@gmail.com> wrote:
>>> Hi all,
>>>
>>> Because the Spark Streaming direct Kafka consumer maps offsets for a given
>>> Kafka topic and a partition internally while having enable.auto.commit set
>>> to false, how can I retrieve the offset of each made consumer’s poll call
>>> using the offset ranges of an RDD? More precisely, the information I seek to
>>> get after each poll call is the following: <timestamp, offset, partition>.
>>>
>>> Thanks in advance,
>>> Dominik
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-26 Thread Dominik Safaric
The reason why I want to obtain this information, i.e. <partition, offset, 
timestamp> tuples is to relate the consumption with the production rates using 
the __consumer_offsets Kafka internal topic. Interestedly, the Spark’s 
KafkaConsumer implementation does not auto commit the offsets upon offset 
commit expiration, because as seen in the logs, Spark overrides the 
enable.auto.commit property to false. 

Any idea onto how to use the KafkaConsumer’s auto offset commits? Keep in mind 
that I do not care about exactly-once, hence having messages replayed is 
perfectly fine.   

> On 26 Apr 2017, at 19:26, Cody Koeninger <c...@koeninger.org> wrote:
> 
> What is it you're actually trying to accomplish?
> 
> You can get topic, partition, and offset bounds from an offset range like
> 
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
> 
> Timestamp isn't really a meaningful idea for a range of offsets.
> 
> 
> On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
> <dominiksafa...@gmail.com> wrote:
>> Hi all,
>> 
>> Because the Spark Streaming direct Kafka consumer maps offsets for a given
>> Kafka topic and a partition internally while having enable.auto.commit set
>> to false, how can I retrieve the offset of each made consumer’s poll call
>> using the offset ranges of an RDD? More precisely, the information I seek to
>> get after each poll call is the following: <timestamp, offset, partition>.
>> 
>> Thanks in advance,
>> Dominik
>> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-26 Thread Cody Koeninger
What is it you're actually trying to accomplish?

You can get topic, partition, and offset bounds from an offset range like

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets

Timestamp isn't really a meaningful idea for a range of offsets.


On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
<dominiksafa...@gmail.com> wrote:
> Hi all,
>
> Because the Spark Streaming direct Kafka consumer maps offsets for a given
> Kafka topic and a partition internally while having enable.auto.commit set
> to false, how can I retrieve the offset of each made consumer’s poll call
> using the offset ranges of an RDD? More precisely, the information I seek to
> get after each poll call is the following: <timestamp, offset, partition>.
>
> Thanks in advance,
> Dominik
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-25 Thread Dominik Safaric
Hi all,

Because the Spark Streaming direct Kafka consumer maps offsets for a given 
Kafka topic and a partition internally while having enable.auto.commit set to 
false, how can I retrieve the offset of each made consumer’s poll call using 
the offset ranges of an RDD? More precisely, the information I seek to get 
after each poll call is the following: <timestamp, offset, partition>. 

Thanks in advance,
Dominik



Re: Spark streaming to kafka exactly once

2017-03-23 Thread Maurin Lenglart
Ok,
Thanks for your answers

On 3/22/17, 1:34 PM, "Cody Koeninger"  wrote:

If you're talking about reading the same message multiple times in a
failure situation, see

https://github.com/koeninger/kafka-exactly-once

If you're talking about producing the same message multiple times in a
failure situation, keep an eye on


https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

If you're talking about producers just misbehaving and sending
different copies of what is essentially the same message from a domain
perspective, you have to dedupe that with your own logic.

On Wed, Mar 22, 2017 at 2:52 PM, Matt Deaver  wrote:
> You have to handle de-duplication upstream or downstream. It might
> technically be possible to handle this in Spark but you'll probably have a
> better time handling duplicates in the service that reads from Kafka.
>
> On Wed, Mar 22, 2017 at 1:49 PM, Maurin Lenglart 
> wrote:
>>
>> Hi,
>> we are trying to build a spark streaming solution that subscribe and push
>> to kafka.
>>
>> But we are running into the problem of duplicates events.
>>
>> Right now, I am doing a “forEachRdd” and loop over the message of each
>> partition and send those message to kafka.
>>
>>
>>
>> Is there any good way of solving that issue?
>>
>>
>>
>> thanks
>
>
>
>
> --
> Regards,
>
> Matt
> Data Engineer
> https://www.linkedin.com/in/mdeaver
> http://mattdeav.pythonanywhere.com/




Re: Spark streaming to kafka exactly once

2017-03-22 Thread Cody Koeninger
If you're talking about reading the same message multiple times in a
failure situation, see

https://github.com/koeninger/kafka-exactly-once

If you're talking about producing the same message multiple times in a
failure situation, keep an eye on

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

If you're talking about producers just misbehaving and sending
different copies of what is essentially the same message from a domain
perspective, you have to dedupe that with your own logic.

On Wed, Mar 22, 2017 at 2:52 PM, Matt Deaver  wrote:
> You have to handle de-duplication upstream or downstream. It might
> technically be possible to handle this in Spark but you'll probably have a
> better time handling duplicates in the service that reads from Kafka.
>
> On Wed, Mar 22, 2017 at 1:49 PM, Maurin Lenglart 
> wrote:
>>
>> Hi,
>> we are trying to build a spark streaming solution that subscribe and push
>> to kafka.
>>
>> But we are running into the problem of duplicates events.
>>
>> Right now, I am doing a “forEachRdd” and loop over the message of each
>> partition and send those message to kafka.
>>
>>
>>
>> Is there any good way of solving that issue?
>>
>>
>>
>> thanks
>
>
>
>
> --
> Regards,
>
> Matt
> Data Engineer
> https://www.linkedin.com/in/mdeaver
> http://mattdeav.pythonanywhere.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming to kafka exactly once

2017-03-22 Thread Matt Deaver
You have to handle de-duplication upstream or downstream. It might
technically be possible to handle this in Spark but you'll probably have a
better time handling duplicates in the service that reads from Kafka.

On Wed, Mar 22, 2017 at 1:49 PM, Maurin Lenglart 
wrote:

> Hi,
> we are trying to build a spark streaming solution that subscribe and push
> to kafka.
>
> But we are running into the problem of duplicates events.
>
> Right now, I am doing a “forEachRdd” and loop over the message of each
> partition and send those message to kafka.
>
>
>
> Is there any good way of solving that issue?
>
>
>
> thanks
>



-- 
Regards,

Matt
Data Engineer
https://www.linkedin.com/in/mdeaver
http://mattdeav.pythonanywhere.com/


Spark streaming to kafka exactly once

2017-03-22 Thread Maurin Lenglart
Hi,
we are trying to build a spark streaming solution that subscribe and push to 
kafka.
But we are running into the problem of duplicates events.
Right now, I am doing a “forEachRdd” and loop over the message of each 
partition and send those message to kafka.

Is there any good way of solving that issue?

thanks


Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-21 Thread shyla deshpande
Thanks TD.

On Tue, Mar 14, 2017 at 4:37 PM, Tathagata Das  wrote:

> This setting allows multiple spark jobs generated through multiple
> foreachRDD to run concurrently, even if they are across batches. So output
> op2 from batch X, can run concurrently with op1 of batch X+1
> This is not safe because it breaks the checkpointing logic in subtle ways.
> Note that this was never documented in the spark online docs.
>
> On Tue, Mar 14, 2017 at 2:29 PM, shyla deshpande  > wrote:
>
>> Thanks TD for the response. Can you please provide more explanation. I am
>>  having multiple streams in the spark streaming application (Spark 2.0.2
>> using DStreams).  I know many people using this setting. So your
>> explanation will help a lot of people.
>>
>> Thanks
>>
>> On Fri, Mar 10, 2017 at 6:24 PM, Tathagata Das 
>> wrote:
>>
>>> That config I not safe. Please do not use it.
>>>
>>> On Mar 10, 2017 10:03 AM, "shyla deshpande" 
>>> wrote:
>>>
 I have a spark streaming application which processes 3 kafka streams
 and has 5 output operations.

 Not sure what should be the setting for spark.streaming.concurrentJobs.

 1. If the concurrentJobs setting is 4 does that mean 2 output
 operations will be run sequentially?

 2. If I had 6 cores what would be a ideal setting for concurrentJobs in
 this situation?

 I appreciate your input. Thanks

>>>
>>
>


Re: Spark Streaming from Kafka, deal with initial heavy load.

2017-03-20 Thread Cody Koeninger
You want spark.streaming.kafka.maxRatePerPartition for the direct stream.

On Sat, Mar 18, 2017 at 3:37 PM, Mal Edwin <mal.ed...@vinadionline.com> wrote:
>
> Hi,
> You can enable backpressure to handle this.
>
> spark.streaming.backpressure.enabled
> spark.streaming.receiver.maxRate
>
> Thanks,
> Edwin
>
> On Mar 18, 2017, 12:53 AM -0400, sagarcasual . <sagarcas...@gmail.com>,
> wrote:
>
> Hi, we have spark 1.6.1 streaming from Kafka (0.10.1) topic using direct
> approach. The streaming part works fine but when we initially start the job,
> we have to deal with really huge Kafka message backlog, millions of
> messages, and that first batch runs for over 40 hours,  and after 12 hours
> or so it becomes very very slow, it keeps crunching messages, but at a very
> low speed. Any idea how to overcome this issue? Once the job is all caught
> up, subsequent batches are quick and fast since the load is really tiny to
> process. So any idea how to avoid this problem?
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming from Kafka, deal with initial heavy load.

2017-03-18 Thread Mal Edwin

Hi,
You can enable backpressure to handle this.

spark.streaming.backpressure.enabled
spark.streaming.receiver.maxRate

Thanks,
Edwin

On Mar 18, 2017, 12:53 AM -0400, sagarcasual . <sagarcas...@gmail.com>, wrote:
> Hi, we have spark 1.6.1 streaming from Kafka (0.10.1) topic using direct 
> approach. The streaming part works fine but when we initially start the job, 
> we have to deal with really huge Kafka message backlog, millions of messages, 
> and that first batch runs for over 40 hours,  and after 12 hours or so it 
> becomes very very slow, it keeps crunching messages, but at a very low speed. 
> Any idea how to overcome this issue? Once the job is all caught up, 
> subsequent batches are quick and fast since the load is really tiny to 
> process. So any idea how to avoid this problem?




Spark Streaming from Kafka, deal with initial heavy load.

2017-03-17 Thread sagarcasual .
Hi, we have spark 1.6.1 streaming from Kafka (0.10.1) topic using direct
approach. The streaming part works fine but when we initially start the
job, we have to deal with really huge Kafka message backlog, millions of
messages, and that first batch runs for over 40 hours,  and after 12 hours
or so it becomes very very slow, it keeps crunching messages, but at a very
low speed. Any idea how to overcome this issue? Once the job is all caught
up, subsequent batches are quick and fast since the load is really tiny to
process. So any idea how to avoid this problem?


Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-14 Thread Tathagata Das
This setting allows multiple spark jobs generated through multiple
foreachRDD to run concurrently, even if they are across batches. So output
op2 from batch X, can run concurrently with op1 of batch X+1
This is not safe because it breaks the checkpointing logic in subtle ways.
Note that this was never documented in the spark online docs.

On Tue, Mar 14, 2017 at 2:29 PM, shyla deshpande 
wrote:

> Thanks TD for the response. Can you please provide more explanation. I am
>  having multiple streams in the spark streaming application (Spark 2.0.2
> using DStreams).  I know many people using this setting. So your
> explanation will help a lot of people.
>
> Thanks
>
> On Fri, Mar 10, 2017 at 6:24 PM, Tathagata Das 
> wrote:
>
>> That config I not safe. Please do not use it.
>>
>> On Mar 10, 2017 10:03 AM, "shyla deshpande" 
>> wrote:
>>
>>> I have a spark streaming application which processes 3 kafka streams and
>>> has 5 output operations.
>>>
>>> Not sure what should be the setting for spark.streaming.concurrentJobs.
>>>
>>> 1. If the concurrentJobs setting is 4 does that mean 2 output operations
>>> will be run sequentially?
>>>
>>> 2. If I had 6 cores what would be a ideal setting for concurrentJobs in
>>> this situation?
>>>
>>> I appreciate your input. Thanks
>>>
>>
>


Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-14 Thread shyla deshpande
Thanks TD for the response. Can you please provide more explanation. I am
 having multiple streams in the spark streaming application (Spark 2.0.2
using DStreams).  I know many people using this setting. So your
explanation will help a lot of people.

Thanks

On Fri, Mar 10, 2017 at 6:24 PM, Tathagata Das  wrote:

> That config I not safe. Please do not use it.
>
> On Mar 10, 2017 10:03 AM, "shyla deshpande" 
> wrote:
>
>> I have a spark streaming application which processes 3 kafka streams and
>> has 5 output operations.
>>
>> Not sure what should be the setting for spark.streaming.concurrentJobs.
>>
>> 1. If the concurrentJobs setting is 4 does that mean 2 output operations
>> will be run sequentially?
>>
>> 2. If I had 6 cores what would be a ideal setting for concurrentJobs in
>> this situation?
>>
>> I appreciate your input. Thanks
>>
>


Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-10 Thread Tathagata Das
That config I not safe. Please do not use it.

On Mar 10, 2017 10:03 AM, "shyla deshpande" 
wrote:

> I have a spark streaming application which processes 3 kafka streams and
> has 5 output operations.
>
> Not sure what should be the setting for spark.streaming.concurrentJobs.
>
> 1. If the concurrentJobs setting is 4 does that mean 2 output operations
> will be run sequentially?
>
> 2. If I had 6 cores what would be a ideal setting for concurrentJobs in
> this situation?
>
> I appreciate your input. Thanks
>


spark streaming with kafka source, how many concurrent jobs?

2017-03-10 Thread shyla deshpande
I have a spark streaming application which processes 3 kafka streams and
has 5 output operations.

Not sure what should be the setting for spark.streaming.concurrentJobs.

1. If the concurrentJobs setting is 4 does that mean 2 output operations
will be run sequentially?

2. If I had 6 cores what would be a ideal setting for concurrentJobs in
this situation?

I appreciate your input. Thanks


Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Cody Koeninger
If you haven't looked at the offset ranges in the logs for the time period
in question, I'd start there.

On Jan 24, 2017 2:51 PM, "Hakan İlter"  wrote:

Sorry for misunderstanding. When I said that, I meant there are no lag in
consumer. Kafka Manager shows each consumer's coverage and lag status.

On Tue, Jan 24, 2017 at 10:45 PM, Cody Koeninger  wrote:

> When you said " I check the offset ranges from Kafka Manager and don't
> see any significant deltas.", what were you comparing it against?  The
> offset ranges printed in spark logs?
>
> On Tue, Jan 24, 2017 at 2:11 PM, Hakan İlter  wrote:
> > First of all, I can both see the "Input Rate" from Spark job's statistics
> > page and Kafka producer message/sec from Kafka manager. The numbers are
> > different when I have the problem. Normally these numbers are very near.
> >
> > Besides, the job is an ETL job, it writes the results to Elastic Search.
> An
> > another legacy app also writes the same results to a database. There are
> > huge difference between DB and ES. I know how many records we process
> daily.
> >
> > Everything works fine if I run a job instance for each topic.
> >
> > On Tue, Jan 24, 2017 at 5:26 PM, Cody Koeninger 
> wrote:
> >>
> >> I'm confused, if you don't see any difference between the offsets the
> >> job is processing and the offsets available in kafka, then how do you
> >> know it's processing less than all of the data?
> >>
> >> On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter 
> >> wrote:
> >> > I'm using DirectStream as one stream for all topics. I check the
> offset
> >> > ranges from Kafka Manager and don't see any significant deltas.
> >> >
> >> > On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger 
> >> > wrote:
> >> >>
> >> >> Are you using receiver-based or direct stream?
> >> >>
> >> >> Are you doing 1 stream per topic, or 1 stream for all topics?
> >> >>
> >> >> If you're using the direct stream, the actual topics and offset
> ranges
> >> >> should be visible in the logs, so you should be able to see more
> >> >> detail about what's happening (e.g. all topics are still being
> >> >> processed but offsets are significantly behind, vs only certain
> topics
> >> >> being processed but keeping up with latest offsets)
> >> >>
> >> >> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter 
> >> >> wrote:
> >> >> > Hi everyone,
> >> >> >
> >> >> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data
> >> >> > from
> >> >> > multiple kafka topics. After starting the job, everything works
> fine
> >> >> > first
> >> >> > (like 700 req/sec) but after a while (couples of days or a week) it
> >> >> > starts
> >> >> > processing only some part of the data (like 350 req/sec). When I
> >> >> > check
> >> >> > the
> >> >> > kafka topics, I can see that there are still 700 req/sec coming to
> >> >> > the
> >> >> > topics. I don't see any errors, exceptions or any other problem.
> The
> >> >> > job
> >> >> > works fine when I start the same code with just single kafka topic.
> >> >> >
> >> >> > Do you have any idea or a clue to understand the problem?
> >> >> >
> >> >> > Thanks.
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> > View this message in context:
> >> >> >
> >> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-st
> reaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
> >> >> > Sent from the Apache Spark User List mailing list archive at
> >> >> > Nabble.com.
> >> >> >
> >> >> > 
> -
> >> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >> >
> >> >
> >> >
> >
> >
>


Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Hakan İlter
Sorry for misunderstanding. When I said that, I meant there are no lag in
consumer. Kafka Manager shows each consumer's coverage and lag status.

On Tue, Jan 24, 2017 at 10:45 PM, Cody Koeninger <c...@koeninger.org> wrote:

> When you said " I check the offset ranges from Kafka Manager and don't
> see any significant deltas.", what were you comparing it against?  The
> offset ranges printed in spark logs?
>
> On Tue, Jan 24, 2017 at 2:11 PM, Hakan İlter <hakanil...@gmail.com> wrote:
> > First of all, I can both see the "Input Rate" from Spark job's statistics
> > page and Kafka producer message/sec from Kafka manager. The numbers are
> > different when I have the problem. Normally these numbers are very near.
> >
> > Besides, the job is an ETL job, it writes the results to Elastic Search.
> An
> > another legacy app also writes the same results to a database. There are
> > huge difference between DB and ES. I know how many records we process
> daily.
> >
> > Everything works fine if I run a job instance for each topic.
> >
> > On Tue, Jan 24, 2017 at 5:26 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> I'm confused, if you don't see any difference between the offsets the
> >> job is processing and the offsets available in kafka, then how do you
> >> know it's processing less than all of the data?
> >>
> >> On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter <hakanil...@gmail.com>
> >> wrote:
> >> > I'm using DirectStream as one stream for all topics. I check the
> offset
> >> > ranges from Kafka Manager and don't see any significant deltas.
> >> >
> >> > On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger <c...@koeninger.org>
> >> > wrote:
> >> >>
> >> >> Are you using receiver-based or direct stream?
> >> >>
> >> >> Are you doing 1 stream per topic, or 1 stream for all topics?
> >> >>
> >> >> If you're using the direct stream, the actual topics and offset
> ranges
> >> >> should be visible in the logs, so you should be able to see more
> >> >> detail about what's happening (e.g. all topics are still being
> >> >> processed but offsets are significantly behind, vs only certain
> topics
> >> >> being processed but keeping up with latest offsets)
> >> >>
> >> >> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter <hakanil...@gmail.com>
> >> >> wrote:
> >> >> > Hi everyone,
> >> >> >
> >> >> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data
> >> >> > from
> >> >> > multiple kafka topics. After starting the job, everything works
> fine
> >> >> > first
> >> >> > (like 700 req/sec) but after a while (couples of days or a week) it
> >> >> > starts
> >> >> > processing only some part of the data (like 350 req/sec). When I
> >> >> > check
> >> >> > the
> >> >> > kafka topics, I can see that there are still 700 req/sec coming to
> >> >> > the
> >> >> > topics. I don't see any errors, exceptions or any other problem.
> The
> >> >> > job
> >> >> > works fine when I start the same code with just single kafka topic.
> >> >> >
> >> >> > Do you have any idea or a clue to understand the problem?
> >> >> >
> >> >> > Thanks.
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> > View this message in context:
> >> >> >
> >> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-
> streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
> >> >> > Sent from the Apache Spark User List mailing list archive at
> >> >> > Nabble.com.
> >> >> >
> >> >> > 
> -
> >> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >> >
> >> >
> >> >
> >
> >
>


Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Cody Koeninger
When you said " I check the offset ranges from Kafka Manager and don't
see any significant deltas.", what were you comparing it against?  The
offset ranges printed in spark logs?

On Tue, Jan 24, 2017 at 2:11 PM, Hakan İlter <hakanil...@gmail.com> wrote:
> First of all, I can both see the "Input Rate" from Spark job's statistics
> page and Kafka producer message/sec from Kafka manager. The numbers are
> different when I have the problem. Normally these numbers are very near.
>
> Besides, the job is an ETL job, it writes the results to Elastic Search. An
> another legacy app also writes the same results to a database. There are
> huge difference between DB and ES. I know how many records we process daily.
>
> Everything works fine if I run a job instance for each topic.
>
> On Tue, Jan 24, 2017 at 5:26 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> I'm confused, if you don't see any difference between the offsets the
>> job is processing and the offsets available in kafka, then how do you
>> know it's processing less than all of the data?
>>
>> On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter <hakanil...@gmail.com>
>> wrote:
>> > I'm using DirectStream as one stream for all topics. I check the offset
>> > ranges from Kafka Manager and don't see any significant deltas.
>> >
>> > On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> >>
>> >> Are you using receiver-based or direct stream?
>> >>
>> >> Are you doing 1 stream per topic, or 1 stream for all topics?
>> >>
>> >> If you're using the direct stream, the actual topics and offset ranges
>> >> should be visible in the logs, so you should be able to see more
>> >> detail about what's happening (e.g. all topics are still being
>> >> processed but offsets are significantly behind, vs only certain topics
>> >> being processed but keeping up with latest offsets)
>> >>
>> >> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter <hakanil...@gmail.com>
>> >> wrote:
>> >> > Hi everyone,
>> >> >
>> >> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data
>> >> > from
>> >> > multiple kafka topics. After starting the job, everything works fine
>> >> > first
>> >> > (like 700 req/sec) but after a while (couples of days or a week) it
>> >> > starts
>> >> > processing only some part of the data (like 350 req/sec). When I
>> >> > check
>> >> > the
>> >> > kafka topics, I can see that there are still 700 req/sec coming to
>> >> > the
>> >> > topics. I don't see any errors, exceptions or any other problem. The
>> >> > job
>> >> > works fine when I start the same code with just single kafka topic.
>> >> >
>> >> > Do you have any idea or a clue to understand the problem?
>> >> >
>> >> > Thanks.
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > View this message in context:
>> >> >
>> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
>> >> > Sent from the Apache Spark User List mailing list archive at
>> >> > Nabble.com.
>> >> >
>> >> > -
>> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >> >
>> >
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Hakan İlter
First of all, I can both see the "Input Rate" from Spark job's statistics
page and Kafka producer message/sec from Kafka manager. The numbers are
different when I have the problem. Normally these numbers are very near.

Besides, the job is an ETL job, it writes the results to Elastic Search. An
another legacy app also writes the same results to a database. There are
huge difference between DB and ES. I know how many records we process daily.

Everything works fine if I run a job instance for each topic.

On Tue, Jan 24, 2017 at 5:26 PM, Cody Koeninger <c...@koeninger.org> wrote:

> I'm confused, if you don't see any difference between the offsets the
> job is processing and the offsets available in kafka, then how do you
> know it's processing less than all of the data?
>
> On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter <hakanil...@gmail.com>
> wrote:
> > I'm using DirectStream as one stream for all topics. I check the offset
> > ranges from Kafka Manager and don't see any significant deltas.
> >
> > On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> Are you using receiver-based or direct stream?
> >>
> >> Are you doing 1 stream per topic, or 1 stream for all topics?
> >>
> >> If you're using the direct stream, the actual topics and offset ranges
> >> should be visible in the logs, so you should be able to see more
> >> detail about what's happening (e.g. all topics are still being
> >> processed but offsets are significantly behind, vs only certain topics
> >> being processed but keeping up with latest offsets)
> >>
> >> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter <hakanil...@gmail.com>
> wrote:
> >> > Hi everyone,
> >> >
> >> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
> >> > multiple kafka topics. After starting the job, everything works fine
> >> > first
> >> > (like 700 req/sec) but after a while (couples of days or a week) it
> >> > starts
> >> > processing only some part of the data (like 350 req/sec). When I check
> >> > the
> >> > kafka topics, I can see that there are still 700 req/sec coming to the
> >> > topics. I don't see any errors, exceptions or any other problem. The
> job
> >> > works fine when I start the same code with just single kafka topic.
> >> >
> >> > Do you have any idea or a clue to understand the problem?
> >> >
> >> > Thanks.
> >> >
> >> >
> >> >
> >> > --
> >> > View this message in context:
> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-
> streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
> >> > Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >> >
> >> > -
> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >
> >
> >
>


Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Cody Koeninger
I'm confused, if you don't see any difference between the offsets the
job is processing and the offsets available in kafka, then how do you
know it's processing less than all of the data?

On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter <hakanil...@gmail.com> wrote:
> I'm using DirectStream as one stream for all topics. I check the offset
> ranges from Kafka Manager and don't see any significant deltas.
>
> On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Are you using receiver-based or direct stream?
>>
>> Are you doing 1 stream per topic, or 1 stream for all topics?
>>
>> If you're using the direct stream, the actual topics and offset ranges
>> should be visible in the logs, so you should be able to see more
>> detail about what's happening (e.g. all topics are still being
>> processed but offsets are significantly behind, vs only certain topics
>> being processed but keeping up with latest offsets)
>>
>> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter <hakanil...@gmail.com> wrote:
>> > Hi everyone,
>> >
>> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
>> > multiple kafka topics. After starting the job, everything works fine
>> > first
>> > (like 700 req/sec) but after a while (couples of days or a week) it
>> > starts
>> > processing only some part of the data (like 350 req/sec). When I check
>> > the
>> > kafka topics, I can see that there are still 700 req/sec coming to the
>> > topics. I don't see any errors, exceptions or any other problem. The job
>> > works fine when I start the same code with just single kafka topic.
>> >
>> > Do you have any idea or a clue to understand the problem?
>> >
>> > Thanks.
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-23 Thread Hakan İlter
I'm using DirectStream as one stream for all topics. I check the offset
ranges from Kafka Manager and don't see any significant deltas.

On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Are you using receiver-based or direct stream?
>
> Are you doing 1 stream per topic, or 1 stream for all topics?
>
> If you're using the direct stream, the actual topics and offset ranges
> should be visible in the logs, so you should be able to see more
> detail about what's happening (e.g. all topics are still being
> processed but offsets are significantly behind, vs only certain topics
> being processed but keeping up with latest offsets)
>
> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter <hakanil...@gmail.com> wrote:
> > Hi everyone,
> >
> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
> > multiple kafka topics. After starting the job, everything works fine
> first
> > (like 700 req/sec) but after a while (couples of days or a week) it
> starts
> > processing only some part of the data (like 350 req/sec). When I check
> the
> > kafka topics, I can see that there are still 700 req/sec coming to the
> > topics. I don't see any errors, exceptions or any other problem. The job
> > works fine when I start the same code with just single kafka topic.
> >
> > Do you have any idea or a clue to understand the problem?
> >
> > Thanks.
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-streaming-multiple-kafka-
> topic-doesn-t-work-at-least-once-tp28334.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-23 Thread Cody Koeninger
Are you using receiver-based or direct stream?

Are you doing 1 stream per topic, or 1 stream for all topics?

If you're using the direct stream, the actual topics and offset ranges
should be visible in the logs, so you should be able to see more
detail about what's happening (e.g. all topics are still being
processed but offsets are significantly behind, vs only certain topics
being processed but keeping up with latest offsets)

On Mon, Jan 23, 2017 at 3:14 PM, hakanilter <hakanil...@gmail.com> wrote:
> Hi everyone,
>
> I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
> multiple kafka topics. After starting the job, everything works fine first
> (like 700 req/sec) but after a while (couples of days or a week) it starts
> processing only some part of the data (like 350 req/sec). When I check the
> kafka topics, I can see that there are still 700 req/sec coming to the
> topics. I don't see any errors, exceptions or any other problem. The job
> works fine when I start the same code with just single kafka topic.
>
> Do you have any idea or a clue to understand the problem?
>
> Thanks.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-23 Thread hakanilter
Hi everyone,

I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
multiple kafka topics. After starting the job, everything works fine first
(like 700 req/sec) but after a while (couples of days or a week) it starts
processing only some part of the data (like 350 req/sec). When I check the
kafka topics, I can see that there are still 700 req/sec coming to the
topics. I don't see any errors, exceptions or any other problem. The job
works fine when I start the same code with just single kafka topic. 

Do you have any idea or a clue to understand the problem? 

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Kafka

2016-12-12 Thread Anton Okolnychyi
thanks for all your replies, now I have a complete picture.



2016-12-12 16:49 GMT+01:00 Cody Koeninger <c...@koeninger.org>:

> http://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html#creating-a-direct-stream
>
> Use a separate group id for each stream, like the docs say.
>
> If you're doing multiple output operations, and aren't caching, spark
> is going to read from kafka again each time, and if some of those
> reads are happening for the same group and same topicpartition, it's
> not going to work.
>
> On Sun, Dec 11, 2016 at 2:36 PM, Oleksii Dukhno
> <oleksii.duk...@gmail.com> wrote:
> > Hi Anton,
> >
> > What is the command you run your spark app with? Why not working with
> data
> > instead of stream on your second stage operation? Can you provide logs
> with
> > the issue?
> >
> > ConcurrentModificationException is not a spark issue, it means that you
> use
> > the same Kafka consumer instance from more than one thread.
> >
> > Additionally,
> >
> > 1) As I understand new kafka consumer is created every time when you call
> > KafkaUtils.createDirectStream.
> > 2) If you assign the same group id to several consumer instances then all
> > the consumers will get different set of messages on the same topic. This
> is
> > a kind of load balancing which kafka provides with its Consumer API.
> >
> > Oleksii
> >
> > On 11 December 2016 at 18:46, Anton Okolnychyi <
> anton.okolnyc...@gmail.com>
> > wrote:
> >>
> >> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
> >> nothing custom.
> >>
> >>
> >> I will try restate the initial question. Let's consider an example.
> >>
> >> 1. I create a stream and subscribe to a certain topic.
> >>
> >> val stream = KafkaUtils.createDirectStream(...)
> >>
> >> 2. I extract the actual data from the stream. For instance, word counts.
> >>
> >> val wordCounts = stream.map(record => (record.value(), 1))
> >>
> >> 3. Then I compute something and output the result to console.
> >>
> >> val firstResult = stream.reduceByWindow(...)
> >> firstResult.print()
> >>
> >> Once that is done, I would like to perform another computation on top of
> >> wordCounts and output that result again to console. In my current
> >> understanding, I cannot just reuse wordCounts from Step 2 and should
> create
> >> a new stream with another group id and then define the second
> computation.
> >> Am I correct that if add the next part, then I can get
> >> "ConcurrentModificationException: KafkaConsumer is not safe for
> >> multi-threaded access"?
> >>
> >> // another computation on wordCounts
> >> val secondResult = wordCounts.reduceByKeyAndWindow(...)
> >> secondResult.output()
> >>
> >> Thanks,
> >> Anton
> >>
> >> 2016-12-11 17:11 GMT+01:00 Timur Shenkao <t...@timshenkao.su>:
> >>>
> >>> Hi,
> >>> Usual general questions are:
> >>> -- what is your Spark version?
> >>> -- what is your Kafka version?
> >>> -- do you use "standard" Kafka consumer or try to implement something
> >>> custom (your own multi-threaded consumer)?
> >>>
> >>> The freshest docs
> >>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html
> >>>
> >>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10
> >>> !!!)
> >>>>
> >>>> kafkaParams.put("group.id", "use_a_separate_group_id_for_
> each_stream");
> >>>
> >>>
> >>>
> >>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi
> >>> <anton.okolnyc...@gmail.com> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> I am experimenting with Spark Streaming and Kafka. I will appreciate
> if
> >>>> someone can say whether the following assumption is correct.
> >>>>
> >>>> If I have multiple computations (each with its own output) on one
> stream
> >>>> (created as KafkaUtils.createDirectStream), then there is a chance
> to have
> >>>> ConcurrentModificationException: KafkaConsumer is not safe for
> >>>> multi-threaded access.  To solve this problem, I should create a new
> stream
> >>>> with different "group.id" for each computation.
> >>>>
> >>>> Am I right?
> >>>>
> >>>> Best regards,
> >>>> Anton
> >>>
> >>>
> >>
> >
>


Re: Spark Streaming with Kafka

2016-12-12 Thread Cody Koeninger
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-a-direct-stream

Use a separate group id for each stream, like the docs say.

If you're doing multiple output operations, and aren't caching, spark
is going to read from kafka again each time, and if some of those
reads are happening for the same group and same topicpartition, it's
not going to work.

On Sun, Dec 11, 2016 at 2:36 PM, Oleksii Dukhno
<oleksii.duk...@gmail.com> wrote:
> Hi Anton,
>
> What is the command you run your spark app with? Why not working with data
> instead of stream on your second stage operation? Can you provide logs with
> the issue?
>
> ConcurrentModificationException is not a spark issue, it means that you use
> the same Kafka consumer instance from more than one thread.
>
> Additionally,
>
> 1) As I understand new kafka consumer is created every time when you call
> KafkaUtils.createDirectStream.
> 2) If you assign the same group id to several consumer instances then all
> the consumers will get different set of messages on the same topic. This is
> a kind of load balancing which kafka provides with its Consumer API.
>
> Oleksii
>
> On 11 December 2016 at 18:46, Anton Okolnychyi <anton.okolnyc...@gmail.com>
> wrote:
>>
>> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
>> nothing custom.
>>
>>
>> I will try restate the initial question. Let's consider an example.
>>
>> 1. I create a stream and subscribe to a certain topic.
>>
>> val stream = KafkaUtils.createDirectStream(...)
>>
>> 2. I extract the actual data from the stream. For instance, word counts.
>>
>> val wordCounts = stream.map(record => (record.value(), 1))
>>
>> 3. Then I compute something and output the result to console.
>>
>> val firstResult = stream.reduceByWindow(...)
>> firstResult.print()
>>
>> Once that is done, I would like to perform another computation on top of
>> wordCounts and output that result again to console. In my current
>> understanding, I cannot just reuse wordCounts from Step 2 and should create
>> a new stream with another group id and then define the second computation.
>> Am I correct that if add the next part, then I can get
>> "ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access"?
>>
>> // another computation on wordCounts
>> val secondResult = wordCounts.reduceByKeyAndWindow(...)
>> secondResult.output()
>>
>> Thanks,
>> Anton
>>
>> 2016-12-11 17:11 GMT+01:00 Timur Shenkao <t...@timshenkao.su>:
>>>
>>> Hi,
>>> Usual general questions are:
>>> -- what is your Spark version?
>>> -- what is your Kafka version?
>>> -- do you use "standard" Kafka consumer or try to implement something
>>> custom (your own multi-threaded consumer)?
>>>
>>> The freshest docs
>>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>>>
>>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10
>>> !!!)
>>>>
>>>> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>>>
>>>
>>>
>>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi
>>> <anton.okolnyc...@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I am experimenting with Spark Streaming and Kafka. I will appreciate if
>>>> someone can say whether the following assumption is correct.
>>>>
>>>> If I have multiple computations (each with its own output) on one stream
>>>> (created as KafkaUtils.createDirectStream), then there is a chance to have
>>>> ConcurrentModificationException: KafkaConsumer is not safe for
>>>> multi-threaded access.  To solve this problem, I should create a new stream
>>>> with different "group.id" for each computation.
>>>>
>>>> Am I right?
>>>>
>>>> Best regards,
>>>> Anton
>>>
>>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Kafka

2016-12-11 Thread Oleksii Dukhno
Hi Anton,

What is the command you run your spark app with? Why not working with data
instead of stream on your second stage operation? Can you provide logs with
the issue?

ConcurrentModificationException is not a spark issue, it means that you use
the same Kafka consumer instance from more than one thread.

Additionally,

1) As I understand new kafka consumer is created every time when you
call KafkaUtils.createDirectStream.
2) If you assign the same group id to several consumer instances then all
the consumers will get different set of messages on the same topic. This is
a kind of load balancing which kafka provides with its Consumer API.

Oleksii

On 11 December 2016 at 18:46, Anton Okolnychyi <anton.okolnyc...@gmail.com>
wrote:

> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
> nothing custom.
>
>
> I will try restate the initial question. Let's consider an example.
>
> 1. I create a stream and subscribe to a certain topic.
>
> val stream = KafkaUtils.createDirectStream(...)
>
> 2. I extract the actual data from the stream. For instance, word counts.
>
> val wordCounts = stream.map(record => (record.value(), 1))
>
> 3. Then I compute something and output the result to console.
>
> val firstResult = stream.reduceByWindow(...)
> firstResult.print()
>
> Once that is done, I would like to perform another computation on top of
> wordCounts and output that result again to console. In my current
> understanding, I cannot just reuse wordCounts from Step 2 and should create
> a new stream with another group id and then define the second computation.
> Am I correct that if add the next part, then I can get "
> ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access"?
>
> // another computation on wordCounts
> val secondResult = wordCounts.reduceByKeyAndWindow(...)
> secondResult.output()
>
> Thanks,
> Anton
>
> 2016-12-11 17:11 GMT+01:00 Timur Shenkao <t...@timshenkao.su>:
>
>> Hi,
>> Usual general questions are:
>> -- what is your Spark version?
>> -- what is your Kafka version?
>> -- do you use "standard" Kafka consumer or try to implement something
>> custom (your own multi-threaded consumer)?
>>
>> The freshest docs https://spark.apache.org/docs/
>> latest/streaming-kafka-0-10-integration.html
>>
>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10
>> !!!)
>>
>>> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>>>
>>>
>>
>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi <
>> anton.okolnyc...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am experimenting with Spark Streaming and Kafka. I will appreciate if
>>> someone can say whether the following assumption is correct.
>>>
>>> If I have multiple computations (each with its own output) on one stream
>>> (created as KafkaUtils.createDirectStream), then there is a chance to
>>> have ConcurrentModificationException: KafkaConsumer is not safe for
>>> multi-threaded access.  To solve this problem, I should create a new stream
>>> with different "group.id" for each computation.
>>>
>>> Am I right?
>>>
>>> Best regards,
>>> Anton
>>>
>>
>>
>


Re: Spark Streaming with Kafka

2016-12-11 Thread Anton Okolnychyi
sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
nothing custom.


I will try restate the initial question. Let's consider an example.

1. I create a stream and subscribe to a certain topic.

val stream = KafkaUtils.createDirectStream(...)

2. I extract the actual data from the stream. For instance, word counts.

val wordCounts = stream.map(record => (record.value(), 1))

3. Then I compute something and output the result to console.

val firstResult = stream.reduceByWindow(...)
firstResult.print()

Once that is done, I would like to perform another computation on top of
wordCounts and output that result again to console. In my current
understanding, I cannot just reuse wordCounts from Step 2 and should create
a new stream with another group id and then define the second computation.
Am I correct that if add the next part, then I can get "
ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access"?

// another computation on wordCounts
val secondResult = wordCounts.reduceByKeyAndWindow(...)
secondResult.output()

Thanks,
Anton

2016-12-11 17:11 GMT+01:00 Timur Shenkao <t...@timshenkao.su>:

> Hi,
> Usual general questions are:
> -- what is your Spark version?
> -- what is your Kafka version?
> -- do you use "standard" Kafka consumer or try to implement something
> custom (your own multi-threaded consumer)?
>
> The freshest docs https://spark.apache.org/docs/
> latest/streaming-kafka-0-10-integration.html
>
> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10 !!!)
>
>> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>>
>>
>
> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi <
> anton.okolnyc...@gmail.com> wrote:
>
>> Hi,
>>
>> I am experimenting with Spark Streaming and Kafka. I will appreciate if
>> someone can say whether the following assumption is correct.
>>
>> If I have multiple computations (each with its own output) on one stream
>> (created as KafkaUtils.createDirectStream), then there is a chance to
>> have ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access.  To solve this problem, I should create a new stream
>> with different "group.id" for each computation.
>>
>> Am I right?
>>
>> Best regards,
>> Anton
>>
>
>


Re: Spark Streaming with Kafka

2016-12-11 Thread Timur Shenkao
Hi,
Usual general questions are:
-- what is your Spark version?
-- what is your Kafka version?
-- do you use "standard" Kafka consumer or try to implement something
custom (your own multi-threaded consumer)?

The freshest docs
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10 !!!)

> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>
>

On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi <
anton.okolnyc...@gmail.com> wrote:

> Hi,
>
> I am experimenting with Spark Streaming and Kafka. I will appreciate if
> someone can say whether the following assumption is correct.
>
> If I have multiple computations (each with its own output) on one stream
> (created as KafkaUtils.createDirectStream), then there is a chance to
> have ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access.  To solve this problem, I should create a new stream
> with different "group.id" for each computation.
>
> Am I right?
>
> Best regards,
> Anton
>


Spark Streaming with Kafka

2016-12-11 Thread Anton Okolnychyi
Hi,

I am experimenting with Spark Streaming and Kafka. I will appreciate if
someone can say whether the following assumption is correct.

If I have multiple computations (each with its own output) on one stream
(created as KafkaUtils.createDirectStream), then there is a chance to have
ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access.  To solve this problem, I should create a new stream
with different "group.id" for each computation.

Am I right?

Best regards,
Anton


Re: Need guidelines in Spark Streaming and Kafka integration

2016-11-16 Thread Karim, Md. Rezaul
Hi Tariq and Jon,

At first thanks for quick response. I really appreciate that.

Well, I would like to start from the very begging of using Kafka with
Spark. For example, in the Spark distribution, I found an example using
Kafka with Spark streaming that demonstrates a Direct Kafka Word Count
example. In that example, I found the main class
*JavaDirectKafkaWordCount.java* under the
spark-2.0.0-bin-hadoop2.7\examples\src\main\java\org\apache\spark\examples\streaming
directory) that contains a code segment as follows:


---*-
String brokers = args[0];
String topics = args[1];

// Create context with a 2 seconds batch interval
SparkConf sparkConf = new
SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(20));

Set topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
---*-

In this code block, the confusing part is setting the values of two command
line arguments (i.e., *brokers *and *topics*). I tried to set them as
follows:

String brokers = "localhost:8890,localhost:8892";
String topics = " topic1,topic2";

However, I know this is not the right way to do so. But there has to have
the correct ways of setting the value of the brokers and topics.

Now, the thing is that I need help how to set/configure these two
parameters so that I can run this hello world like example successfully.
Any kind of help would be highly appreciated.




Regards,
_
*Md. Rezaul Karim* BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html
<http://139.59.184.114/index.html>

On 17 November 2016 at 03:08, Jon Gregg <jonrgr...@gmail.com> wrote:

> Since you're completely new to Kafka, I would start with the Kafka docs (
> https://kafka.apache.org/documentation).  You should be able to get
> through the Getting Started part easily and there are some examples for
> setting up a basic Kafka server.
>
> You don't need Kafka to start working with Spark Streaming (there are
> examples online to pull directly from Twitter, for example).  But at a high
> level if you're sending data from one server to another, it can be
> beneficial to send the messages to a distributed queue first for durable
> storage (so data doesn't get lost in transmission) and other benefits.
>
> On Wed, Nov 16, 2016 at 2:12 PM, Mohammad Tariq <donta...@gmail.com>
> wrote:
>
>> Hi Karim,
>>
>> Are you looking for something specific? Some information about your
>> usecase would be really  helpful in order to answer your question.
>>
>>
>> On Wednesday, November 16, 2016, Karim, Md. Rezaul <
>> rezaul.ka...@insight-centre.org> wrote:
>>
>>> Hi All,
>>>
>>> I am completely new with Kafka. I was wondering if somebody could
>>> provide me some guidelines on how to develop real-time streaming
>>> applications using Spark Streaming API with Kafka.
>>>
>>> I am aware the Spark Streaming  and Kafka integration [1]. However, a
>>> real life example should be better to start?
>>>
>>>
>>>
>>> 1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>>>
>>>
>>>
>>>
>>>
>>> Regards,
>>> _
>>> *Md. Rezaul Karim* BSc, MSc
>>> PhD Researcher, INSIGHT Centre for Data Analytics
>>> National University of Ireland, Galway
>>> IDA Business Park, Dangan, Galway, Ireland
>>> Web: http://www.reza-analytics.eu/index.html
>>> <http://139.59.184.114/index.html>
>>>
>>
>>
>> --
>>
>>
>> [image: http://]
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>> <http://about.me/mti>
>>
>>
>>
>


Re: Need guidelines in Spark Streaming and Kafka integration

2016-11-16 Thread Jon Gregg
Since you're completely new to Kafka, I would start with the Kafka docs (
https://kafka.apache.org/documentation).  You should be able to get through
the Getting Started part easily and there are some examples for setting up
a basic Kafka server.

You don't need Kafka to start working with Spark Streaming (there are
examples online to pull directly from Twitter, for example).  But at a high
level if you're sending data from one server to another, it can be
beneficial to send the messages to a distributed queue first for durable
storage (so data doesn't get lost in transmission) and other benefits.

On Wed, Nov 16, 2016 at 2:12 PM, Mohammad Tariq <donta...@gmail.com> wrote:

> Hi Karim,
>
> Are you looking for something specific? Some information about your
> usecase would be really  helpful in order to answer your question.
>
>
> On Wednesday, November 16, 2016, Karim, Md. Rezaul <
> rezaul.ka...@insight-centre.org> wrote:
>
>> Hi All,
>>
>> I am completely new with Kafka. I was wondering if somebody could provide
>> me some guidelines on how to develop real-time streaming applications using
>> Spark Streaming API with Kafka.
>>
>> I am aware the Spark Streaming  and Kafka integration [1]. However, a
>> real life example should be better to start?
>>
>>
>>
>> 1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>>
>>
>>
>>
>>
>> Regards,
>> _
>> *Md. Rezaul Karim* BSc, MSc
>> PhD Researcher, INSIGHT Centre for Data Analytics
>> National University of Ireland, Galway
>> IDA Business Park, Dangan, Galway, Ireland
>> Web: http://www.reza-analytics.eu/index.html
>> <http://139.59.184.114/index.html>
>>
>
>
> --
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> <http://about.me/mti>
>
>
>


Re: Need guidelines in Spark Streaming and Kafka integration

2016-11-16 Thread Mohammad Tariq
Hi Karim,

Are you looking for something specific? Some information about your usecase
would be really  helpful in order to answer your question.

On Wednesday, November 16, 2016, Karim, Md. Rezaul <
rezaul.ka...@insight-centre.org> wrote:

> Hi All,
>
> I am completely new with Kafka. I was wondering if somebody could provide
> me some guidelines on how to develop real-time streaming applications using
> Spark Streaming API with Kafka.
>
> I am aware the Spark Streaming  and Kafka integration [1]. However, a real
> life example should be better to start?
>
>
>
> 1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>
>
>
>
>
> Regards,
> _
> *Md. Rezaul Karim* BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> <http://139.59.184.114/index.html>
>


-- 


[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]
<http://about.me/mti>


Need guidelines in Spark Streaming and Kafka integration

2016-11-16 Thread Karim, Md. Rezaul
Hi All,

I am completely new with Kafka. I was wondering if somebody could provide
me some guidelines on how to develop real-time streaming applications using
Spark Streaming API with Kafka.

I am aware the Spark Streaming  and Kafka integration [1]. However, a real
life example should be better to start?



1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html





Regards,
_
*Md. Rezaul Karim* BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html
<http://139.59.184.114/index.html>


Re: Spark Streaming 2 Kafka 0.10 Integration for Aggregating Data

2016-10-18 Thread Sean Owen
Try adding the spark-streaming_2.11 artifact as a dependency too. You will
be directly depending on it.

On Tue, Oct 18, 2016 at 2:16 PM Furkan KAMACI <furkankam...@gmail.com>
wrote:

> Hi,
>
> I have a search application and want to monitor queries per second for it.
> I have Kafka at my backend which acts like a bus for messages. Whenever a
> search request is done I publish the nano time of the current system. I
> want to use Spark Streaming to aggregate such data but I am so new to it.
>
> I wanted to follow that example:
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>
> I've added that dependencies:
>
> 
>     org.apache.spark
> spark-streaming-kafka-0-10_2.11
> 2.0.1
> 
> 
> org.apache.spark
> spark-core_2.10
> 2.0.1
> 
>
> However I cannot see even Duration class at my dependencies. On the other
> hand given documentation is missing and when you click Java there is no
> code at tabs.
>
> Could you guide me how can I implement monitoring such a metric?
>
> Kind Regards,
> Furkan KAMACI
>


Spark Streaming 2 Kafka 0.10 Integration for Aggregating Data

2016-10-18 Thread Furkan KAMACI
Hi,

I have a search application and want to monitor queries per second for it.
I have Kafka at my backend which acts like a bus for messages. Whenever a
search request is done I publish the nano time of the current system. I
want to use Spark Streaming to aggregate such data but I am so new to it.

I wanted to follow that example:
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

I've added that dependencies:


org.apache.spark
spark-streaming-kafka-0-10_2.11
2.0.1


org.apache.spark
spark-core_2.10
2.0.1


However I cannot see even Duration class at my dependencies. On the other
hand given documentation is missing and when you click Java there is no
code at tabs.

Could you guide me how can I implement monitoring such a metric?

Kind Regards,
Furkan KAMACI


Re: Spark Streaming with Kafka

2016-05-24 Thread Rasika Pohankar
Hi firemonk9,

Sorry, its been too long but I just saw this. I hope you were able to
resolve it. FWIW, we were able to solve this with the help of the Low Level
Kafka Consumer, instead of the inbuilt Kafka consumer in Spark, from here:
https://github.com/dibbhatt/kafka-spark-consumer/.

Regards,
Rasika.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-tp21222p27014.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming from Kafka best fit

2016-03-07 Thread pratik khadloya
Would using mapPartitions instead of map help here?

~Pratik

On Tue, Mar 1, 2016 at 10:07 AM Cody Koeninger <c...@koeninger.org> wrote:

> You don't need an equal number of executor cores to partitions.  An
> executor can and will work on multiple partitions within a batch, one after
> the other.  The real issue is whether you are able to keep your processing
> time under your batch time, so that delay doesn't increase.
>
> On Tue, Mar 1, 2016 at 11:59 AM, Jatin Kumar <jku...@rocketfuelinc.com>
> wrote:
>
>> Thanks Cody!
>>
>> I understand what you said and if I am correct it will be using 224
>> executor cores just for fetching + stage-1 processing of 224 partitions. I
>> will obviously need more cores for processing further stages and fetching
>> next batch.
>>
>> I will start with higher number of executor cores and see how it goes.
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> > "How do I keep a balance of executors which receive data from Kafka
>>> and which process data"
>>>
>>> I think you're misunderstanding how the direct stream works.  The
>>> executor which receives data is also the executor which processes data,
>>> there aren't separate receivers.  If it's a single stage worth of work
>>> (e.g. straight map / filter), the processing of a given partition is going
>>> to be done by the executor that read it from kafka.  If you do something
>>> involving a shuffle (e.g. reduceByKey), other executors will do additional
>>> processing.  The question of which executor works on which tasks is up to
>>> the scheduler (and getPreferredLocations, which only matters if you're
>>> running spark on the same nodes as kafka)
>>>
>>> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
>>> jku...@rocketfuelinc.com.invalid> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I see that there are as of today 3 ways one can read from Kafka in
>>>> spark streaming:
>>>> 1. KafkaUtils.createStream() (here
>>>> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
>>>> 2. KafkaUtils.createDirectStream() (here
>>>> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
>>>> 3. Kafka-spark-consumer (here
>>>> <https://github.com/dibbhatt/kafka-spark-consumer>)
>>>>
>>>> My spark streaming application has to read from 1 kafka topic with
>>>> around 224 partitions, consuming data at around 150MB/s (~90,000
>>>> messages/sec) which reduces to around 3MB/s (~1400 messages/sec) after
>>>> filtering. After filtering I need to maintain top 1 URL counts. I don't
>>>> really care about exactly once semantics as I am interested in rough
>>>> estimate.
>>>>
>>>> Code:
>>>>
>>>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
>>>> sparkConf.setAppName("KafkaReader")
>>>> val ssc = StreamingContext.getOrCreate(kCheckPointDir, 
>>>> createStreamingContext)
>>>>
>>>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>>>> val kafkaParams = Map[String, String](
>>>>   "metadata.broker.list" -> "kafka.server.ip:9092",
>>>>   "group.id" -> consumer_group
>>>> )
>>>>
>>>> val lineStreams = (1 to N).map{ _ =>
>>>>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>>>> ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
>>>> }
>>>>
>>>> ssc.union(
>>>>   lineStreams.map(stream => {
>>>>   stream.map(ParseStringToLogRecord)
>>>> .filter(record => isGoodRecord(record))
>>>> .map(record => record.url)
>>>>   })
>>>> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>>>>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute 
>>>> moving window, 28 will probably help in parallelism
>>>>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>>>>   .mapPartitions(iter => {
>>>> iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
>>>> 1000).iterator
>>>> 

Re: Spark streaming from Kafka best fit

2016-03-01 Thread Cody Koeninger
You don't need an equal number of executor cores to partitions.  An
executor can and will work on multiple partitions within a batch, one after
the other.  The real issue is whether you are able to keep your processing
time under your batch time, so that delay doesn't increase.

On Tue, Mar 1, 2016 at 11:59 AM, Jatin Kumar <jku...@rocketfuelinc.com>
wrote:

> Thanks Cody!
>
> I understand what you said and if I am correct it will be using 224
> executor cores just for fetching + stage-1 processing of 224 partitions. I
> will obviously need more cores for processing further stages and fetching
> next batch.
>
> I will start with higher number of executor cores and see how it goes.
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> > "How do I keep a balance of executors which receive data from Kafka
>> and which process data"
>>
>> I think you're misunderstanding how the direct stream works.  The
>> executor which receives data is also the executor which processes data,
>> there aren't separate receivers.  If it's a single stage worth of work
>> (e.g. straight map / filter), the processing of a given partition is going
>> to be done by the executor that read it from kafka.  If you do something
>> involving a shuffle (e.g. reduceByKey), other executors will do additional
>> processing.  The question of which executor works on which tasks is up to
>> the scheduler (and getPreferredLocations, which only matters if you're
>> running spark on the same nodes as kafka)
>>
>> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
>> jku...@rocketfuelinc.com.invalid> wrote:
>>
>>> Hello all,
>>>
>>> I see that there are as of today 3 ways one can read from Kafka in spark
>>> streaming:
>>> 1. KafkaUtils.createStream() (here
>>> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
>>> 2. KafkaUtils.createDirectStream() (here
>>> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
>>> 3. Kafka-spark-consumer (here
>>> <https://github.com/dibbhatt/kafka-spark-consumer>)
>>>
>>> My spark streaming application has to read from 1 kafka topic with
>>> around 224 partitions, consuming data at around 150MB/s (~90,000
>>> messages/sec) which reduces to around 3MB/s (~1400 messages/sec) after
>>> filtering. After filtering I need to maintain top 1 URL counts. I don't
>>> really care about exactly once semantics as I am interested in rough
>>> estimate.
>>>
>>> Code:
>>>
>>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
>>> sparkConf.setAppName("KafkaReader")
>>> val ssc = StreamingContext.getOrCreate(kCheckPointDir, 
>>> createStreamingContext)
>>>
>>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>>> val kafkaParams = Map[String, String](
>>>   "metadata.broker.list" -> "kafka.server.ip:9092",
>>>   "group.id" -> consumer_group
>>> )
>>>
>>> val lineStreams = (1 to N).map{ _ =>
>>>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>>> ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
>>> }
>>>
>>> ssc.union(
>>>   lineStreams.map(stream => {
>>>   stream.map(ParseStringToLogRecord)
>>> .filter(record => isGoodRecord(record))
>>> .map(record => record.url)
>>>   })
>>> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>>>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute 
>>> moving window, 28 will probably help in parallelism
>>>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>>>   .mapPartitions(iter => {
>>> iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
>>> 1000).iterator
>>>   }, true)
>>>   .foreachRDD((latestRDD, rddTime) => {
>>>   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
>>> record._1)).sortByKey(false).take(1000))
>>>   })
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>>
>>> Questions:
>>>
>>> a) I used #2 but I found that I couldn't control how many executors will
>>> be actually fetching from Kafka. How do I keep a balance of executors which
>>> receive data from Kafka and which process data? Do they keep changing for
>>> every batch?
>>>
>>> b) Now I am trying to use #1 creating multiple DStreams, filtering them
>>> and then doing a union. I don't understand why would the number of events
>>> processed per 120 seconds batch will change drastically. PFA the events/sec
>>> graph while running with 1 receiver. How to debug this?
>>>
>>> c) What will be the most suitable method to integrate with Kafka from
>>> above 3? Any recommendations for getting maximum performance, running the
>>> streaming application reliably in production environment?
>>>
>>> --
>>> Thanks
>>> Jatin Kumar
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>>
>


Re: Spark streaming from Kafka best fit

2016-03-01 Thread Jatin Kumar
Thanks Cody!

I understand what you said and if I am correct it will be using 224
executor cores just for fetching + stage-1 processing of 224 partitions. I
will obviously need more cores for processing further stages and fetching
next batch.

I will start with higher number of executor cores and see how it goes.

--
Thanks
Jatin Kumar | Rocket Scientist
+91-7696741743 m

On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger <c...@koeninger.org> wrote:

> > "How do I keep a balance of executors which receive data from Kafka and
> which process data"
>
> I think you're misunderstanding how the direct stream works.  The executor
> which receives data is also the executor which processes data, there aren't
> separate receivers.  If it's a single stage worth of work (e.g. straight
> map / filter), the processing of a given partition is going to be done by
> the executor that read it from kafka.  If you do something involving a
> shuffle (e.g. reduceByKey), other executors will do additional processing.
> The question of which executor works on which tasks is up to the scheduler
> (and getPreferredLocations, which only matters if you're running spark on
> the same nodes as kafka)
>
> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
> jku...@rocketfuelinc.com.invalid> wrote:
>
>> Hello all,
>>
>> I see that there are as of today 3 ways one can read from Kafka in spark
>> streaming:
>> 1. KafkaUtils.createStream() (here
>> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
>> 2. KafkaUtils.createDirectStream() (here
>> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
>> 3. Kafka-spark-consumer (here
>> <https://github.com/dibbhatt/kafka-spark-consumer>)
>>
>> My spark streaming application has to read from 1 kafka topic with around
>> 224 partitions, consuming data at around 150MB/s (~90,000 messages/sec)
>> which reduces to around 3MB/s (~1400 messages/sec) after filtering. After
>> filtering I need to maintain top 1 URL counts. I don't really care
>> about exactly once semantics as I am interested in rough estimate.
>>
>> Code:
>>
>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
>> sparkConf.setAppName("KafkaReader")
>> val ssc = StreamingContext.getOrCreate(kCheckPointDir, 
>> createStreamingContext)
>>
>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>> val kafkaParams = Map[String, String](
>>   "metadata.broker.list" -> "kafka.server.ip:9092",
>>   "group.id" -> consumer_group
>> )
>>
>> val lineStreams = (1 to N).map{ _ =>
>>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>> ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
>> }
>>
>> ssc.union(
>>   lineStreams.map(stream => {
>>   stream.map(ParseStringToLogRecord)
>> .filter(record => isGoodRecord(record))
>> .map(record => record.url)
>>   })
>> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute 
>> moving window, 28 will probably help in parallelism
>>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>>   .mapPartitions(iter => {
>> iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
>> 1000).iterator
>>   }, true)
>>   .foreachRDD((latestRDD, rddTime) => {
>>   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
>> record._1)).sortByKey(false).take(1000))
>>   })
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>> Questions:
>>
>> a) I used #2 but I found that I couldn't control how many executors will
>> be actually fetching from Kafka. How do I keep a balance of executors which
>> receive data from Kafka and which process data? Do they keep changing for
>> every batch?
>>
>> b) Now I am trying to use #1 creating multiple DStreams, filtering them
>> and then doing a union. I don't understand why would the number of events
>> processed per 120 seconds batch will change drastically. PFA the events/sec
>> graph while running with 1 receiver. How to debug this?
>>
>> c) What will be the most suitable method to integrate with Kafka from
>> above 3? Any recommendations for getting maximum performance, running the
>> streaming application reliably in production environment?
>>
>> --
>> Thanks
>> Jatin Kumar
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>


Re: Spark streaming from Kafka best fit

2016-03-01 Thread Cody Koeninger
> "How do I keep a balance of executors which receive data from Kafka and
which process data"

I think you're misunderstanding how the direct stream works.  The executor
which receives data is also the executor which processes data, there aren't
separate receivers.  If it's a single stage worth of work (e.g. straight
map / filter), the processing of a given partition is going to be done by
the executor that read it from kafka.  If you do something involving a
shuffle (e.g. reduceByKey), other executors will do additional processing.
The question of which executor works on which tasks is up to the scheduler
(and getPreferredLocations, which only matters if you're running spark on
the same nodes as kafka)

On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
jku...@rocketfuelinc.com.invalid> wrote:

> Hello all,
>
> I see that there are as of today 3 ways one can read from Kafka in spark
> streaming:
> 1. KafkaUtils.createStream() (here
> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
> 2. KafkaUtils.createDirectStream() (here
> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
> 3. Kafka-spark-consumer (here
> <https://github.com/dibbhatt/kafka-spark-consumer>)
>
> My spark streaming application has to read from 1 kafka topic with around
> 224 partitions, consuming data at around 150MB/s (~90,000 messages/sec)
> which reduces to around 3MB/s (~1400 messages/sec) after filtering. After
> filtering I need to maintain top 1 URL counts. I don't really care
> about exactly once semantics as I am interested in rough estimate.
>
> Code:
>
> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
> sparkConf.setAppName("KafkaReader")
> val ssc = StreamingContext.getOrCreate(kCheckPointDir, createStreamingContext)
>
> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
> val kafkaParams = Map[String, String](
>   "metadata.broker.list" -> "kafka.server.ip:9092",
>   "group.id" -> consumer_group
> )
>
> val lineStreams = (1 to N).map{ _ =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
> }
>
> ssc.union(
>   lineStreams.map(stream => {
>   stream.map(ParseStringToLogRecord)
> .filter(record => isGoodRecord(record))
> .map(record => record.url)
>   })
> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute moving 
> window, 28 will probably help in parallelism
>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>   .mapPartitions(iter => {
> iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
> 1000).iterator
>   }, true)
>   .foreachRDD((latestRDD, rddTime) => {
>   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
> record._1)).sortByKey(false).take(1000))
>   })
>
> ssc.start()
> ssc.awaitTermination()
>
> Questions:
>
> a) I used #2 but I found that I couldn't control how many executors will
> be actually fetching from Kafka. How do I keep a balance of executors which
> receive data from Kafka and which process data? Do they keep changing for
> every batch?
>
> b) Now I am trying to use #1 creating multiple DStreams, filtering them
> and then doing a union. I don't understand why would the number of events
> processed per 120 seconds batch will change drastically. PFA the events/sec
> graph while running with 1 receiver. How to debug this?
>
> c) What will be the most suitable method to integrate with Kafka from
> above 3? Any recommendations for getting maximum performance, running the
> streaming application reliably in production environment?
>
> --
> Thanks
> Jatin Kumar
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread Cody Koeninger
If by smaller block interval you mean the value in seconds passed to the
streaming context constructor, no.  You'll still get everything from the
starting offset until now in the first batch.

On Thu, Feb 18, 2016 at 10:02 AM, praveen S  wrote:

> Sorry.. Rephrasing :
> Can this issue be resolved by having a smaller block interval?
>
> Regards,
> Praveen
> On 18 Feb 2016 21:30, "praveen S"  wrote:
>
>> Can having a smaller block interval only resolve this?
>>
>> Regards,
>> Praveen
>> On 18 Feb 2016 21:13, "Cody Koeninger"  wrote:
>>
>>> Backpressure won't help you with the first batch, you'd need 
>>> spark.streaming.kafka.maxRatePerPartition
>>> for that
>>>
>>> On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:
>>>
 Have a look at

 spark.streaming.backpressure.enabled
 Property

 Regards,
 Praveen
 On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:

> I have a spark streaming application running in production. I am
> trying to find a solution for a particular use case when my application 
> has
> a downtime of say 5 hours and is restarted. Now, when I start my streaming
> application after 5 hours there would be considerable amount of data then
> in the Kafka and my cluster would be unable to repartition and process 
> that.
>
> Is there any workaround so that when my streaming application starts
> it starts taking data for 1-2 hours, process it , then take the data for
> next 1 hour process it. Now when its done processing of previous 5 hours
> data which missed, normal streaming should start with the given slide
> interval.
>
> Please suggest any ideas and feasibility of this.
>
>
> Thanks !!
> Abhi
>

>>>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread praveen S
Sorry.. Rephrasing :
Can this issue be resolved by having a smaller block interval?

Regards,
Praveen
On 18 Feb 2016 21:30, "praveen S"  wrote:

> Can having a smaller block interval only resolve this?
>
> Regards,
> Praveen
> On 18 Feb 2016 21:13, "Cody Koeninger"  wrote:
>
>> Backpressure won't help you with the first batch, you'd need 
>> spark.streaming.kafka.maxRatePerPartition
>> for that
>>
>> On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:
>>
>>> Have a look at
>>>
>>> spark.streaming.backpressure.enabled
>>> Property
>>>
>>> Regards,
>>> Praveen
>>> On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:
>>>
 I have a spark streaming application running in production. I am trying
 to find a solution for a particular use case when my application has a
 downtime of say 5 hours and is restarted. Now, when I start my streaming
 application after 5 hours there would be considerable amount of data then
 in the Kafka and my cluster would be unable to repartition and process 
 that.

 Is there any workaround so that when my streaming application starts it
 starts taking data for 1-2 hours, process it , then take the data for next
 1 hour process it. Now when its done processing of previous 5 hours data
 which missed, normal streaming should start with the given slide interval.

 Please suggest any ideas and feasibility of this.


 Thanks !!
 Abhi

>>>
>>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread praveen S
Can having a smaller block interval only resolve this?

Regards,
Praveen
On 18 Feb 2016 21:13, "Cody Koeninger"  wrote:

> Backpressure won't help you with the first batch, you'd need 
> spark.streaming.kafka.maxRatePerPartition
> for that
>
> On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:
>
>> Have a look at
>>
>> spark.streaming.backpressure.enabled
>> Property
>>
>> Regards,
>> Praveen
>> On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:
>>
>>> I have a spark streaming application running in production. I am trying
>>> to find a solution for a particular use case when my application has a
>>> downtime of say 5 hours and is restarted. Now, when I start my streaming
>>> application after 5 hours there would be considerable amount of data then
>>> in the Kafka and my cluster would be unable to repartition and process that.
>>>
>>> Is there any workaround so that when my streaming application starts it
>>> starts taking data for 1-2 hours, process it , then take the data for next
>>> 1 hour process it. Now when its done processing of previous 5 hours data
>>> which missed, normal streaming should start with the given slide interval.
>>>
>>> Please suggest any ideas and feasibility of this.
>>>
>>>
>>> Thanks !!
>>> Abhi
>>>
>>
>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread Cody Koeninger
Backpressure won't help you with the first batch, you'd need
spark.streaming.kafka.maxRatePerPartition
for that

On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:

> Have a look at
>
> spark.streaming.backpressure.enabled
> Property
>
> Regards,
> Praveen
> On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:
>
>> I have a spark streaming application running in production. I am trying
>> to find a solution for a particular use case when my application has a
>> downtime of say 5 hours and is restarted. Now, when I start my streaming
>> application after 5 hours there would be considerable amount of data then
>> in the Kafka and my cluster would be unable to repartition and process that.
>>
>> Is there any workaround so that when my streaming application starts it
>> starts taking data for 1-2 hours, process it , then take the data for next
>> 1 hour process it. Now when its done processing of previous 5 hours data
>> which missed, normal streaming should start with the given slide interval.
>>
>> Please suggest any ideas and feasibility of this.
>>
>>
>> Thanks !!
>> Abhi
>>
>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread praveen S
Have a look at

spark.streaming.backpressure.enabled
Property

Regards,
Praveen
On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:

> I have a spark streaming application running in production. I am trying to
> find a solution for a particular use case when my application has a
> downtime of say 5 hours and is restarted. Now, when I start my streaming
> application after 5 hours there would be considerable amount of data then
> in the Kafka and my cluster would be unable to repartition and process that.
>
> Is there any workaround so that when my streaming application starts it
> starts taking data for 1-2 hours, process it , then take the data for next
> 1 hour process it. Now when its done processing of previous 5 hours data
> which missed, normal streaming should start with the given slide interval.
>
> Please suggest any ideas and feasibility of this.
>
>
> Thanks !!
> Abhi
>


Re: Spark Streaming with Kafka DirectStream

2016-02-17 Thread Cody Koeninger
You can print whatever you want wherever you want, it's just a question of
whether it's going to show up on the driver or the various executors logs

On Wed, Feb 17, 2016 at 5:50 AM, Cyril Scetbon 
wrote:

> I don't think we can print an integer value in a spark streaming process
> As opposed to a spark job. I think I can print the content of an rdd but
> not debug messages. Am I wrong ?
>
> Cyril Scetbon
>
> On Feb 17, 2016, at 12:51 AM, ayan guha  wrote:
>
> Hi
>
> You can always use RDD properties, which already has partition information.
>
>
> https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/how_many_partitions_does_an_rdd_have.html
>
>
> On Wed, Feb 17, 2016 at 2:36 PM, Cyril Scetbon 
> wrote:
>
>> Your understanding is the right one (having re-read the documentation).
>> Still wondering how I can verify that 5 partitions have been created. My
>> job is reading from a topic in Kafka that has 5 partitions and sends the
>> data to E/S. I can see that when there is one task to read from Kafka there
>> are 5 tasks writing to E/S. So I'm supposing that the task reading from
>> Kafka does it in // using 5 partitions and that's why there are then 5
>> tasks to write to E/S. But I'm supposing ...
>>
>> On Feb 16, 2016, at 21:12, ayan guha  wrote:
>>
>> I have a slightly different understanding.
>>
>> Direct stream generates 1 RDD per batch, however, number of partitions in
>> that RDD = number of partitions in kafka topic.
>>
>> On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon 
>> wrote:
>>
>>> Hi guys,
>>>
>>> I'm making some tests with Spark and Kafka using a Python script. I use
>>> the second method that doesn't need any receiver (Direct Approach). It
>>> should adapt the number of RDDs to the number of partitions in the topic.
>>> I'm trying to verify it. What's the easiest way to verify it ? I also tried
>>> to co-locate Yarn, Spark and Kafka to check if RDDs are created depending
>>> on the leaders of partitions in a topic, and they are not. Can you confirm
>>> that RDDs are not created depending on the location of partitions and that
>>> co-locating Kafka with Spark is not a must-have or that Spark does not take
>>> advantage of it ?
>>>
>>> As the parallelism is simplified (by creating as many RDDs as there are
>>> partitions) I suppose that the biggest part of the tuning is playing with
>>> KafKa partitions (not talking about network configuration or management of
>>> Spark resources) ?
>>>
>>> Thank you
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>
>


Re: Spark Streaming with Kafka DirectStream

2016-02-17 Thread Cyril Scetbon
I don't think we can print an integer value in a spark streaming process As 
opposed to a spark job. I think I can print the content of an rdd but not debug 
messages. Am I wrong ? 

Cyril Scetbon

> On Feb 17, 2016, at 12:51 AM, ayan guha  wrote:
> 
> Hi
> 
> You can always use RDD properties, which already has partition information.
> 
> https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/how_many_partitions_does_an_rdd_have.html
>  
> 
>> On Wed, Feb 17, 2016 at 2:36 PM, Cyril Scetbon  wrote:
>> Your understanding is the right one (having re-read the documentation). 
>> Still wondering how I can verify that 5 partitions have been created. My job 
>> is reading from a topic in Kafka that has 5 partitions and sends the data to 
>> E/S. I can see that when there is one task to read from Kafka there are 5 
>> tasks writing to E/S. So I'm supposing that the task reading from Kafka does 
>> it in // using 5 partitions and that's why there are then 5 tasks to write 
>> to E/S. But I'm supposing ...
>> 
>>> On Feb 16, 2016, at 21:12, ayan guha  wrote:
>>> 
>>> I have a slightly different understanding. 
>>> 
>>> Direct stream generates 1 RDD per batch, however, number of partitions in 
>>> that RDD = number of partitions in kafka topic.
>>> 
 On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon  
 wrote:
 Hi guys,
 
 I'm making some tests with Spark and Kafka using a Python script. I use 
 the second method that doesn't need any receiver (Direct Approach). It 
 should adapt the number of RDDs to the number of partitions in the topic. 
 I'm trying to verify it. What's the easiest way to verify it ? I also 
 tried to co-locate Yarn, Spark and Kafka to check if RDDs are created 
 depending on the leaders of partitions in a topic, and they are not. Can 
 you confirm that RDDs are not created depending on the location of 
 partitions and that co-locating Kafka with Spark is not a must-have or 
 that Spark does not take advantage of it ?
 
 As the parallelism is simplified (by creating as many RDDs as there are 
 partitions) I suppose that the biggest part of the tuning is playing with 
 KafKa partitions (not talking about network configuration or management of 
 Spark resources) ?
 
 Thank you
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>>> 
>>> 
>>> -- 
>>> Best Regards,
>>> Ayan Guha
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha


Re: Spark Streaming with Kafka Use Case

2016-02-17 Thread Cody Koeninger
Just use a kafka rdd in a batch job or two, then start your streaming job.

On Wed, Feb 17, 2016 at 12:57 AM, Abhishek Anand 
wrote:

> I have a spark streaming application running in production. I am trying to
> find a solution for a particular use case when my application has a
> downtime of say 5 hours and is restarted. Now, when I start my streaming
> application after 5 hours there would be considerable amount of data then
> in the Kafka and my cluster would be unable to repartition and process that.
>
> Is there any workaround so that when my streaming application starts it
> starts taking data for 1-2 hours, process it , then take the data for next
> 1 hour process it. Now when its done processing of previous 5 hours data
> which missed, normal streaming should start with the given slide interval.
>
> Please suggest any ideas and feasibility of this.
>
>
> Thanks !!
> Abhi
>


Re: Spark Streaming with Kafka DirectStream

2016-02-16 Thread ayan guha
Hi

You can always use RDD properties, which already has partition information.

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/how_many_partitions_does_an_rdd_have.html


On Wed, Feb 17, 2016 at 2:36 PM, Cyril Scetbon 
wrote:

> Your understanding is the right one (having re-read the documentation).
> Still wondering how I can verify that 5 partitions have been created. My
> job is reading from a topic in Kafka that has 5 partitions and sends the
> data to E/S. I can see that when there is one task to read from Kafka there
> are 5 tasks writing to E/S. So I'm supposing that the task reading from
> Kafka does it in // using 5 partitions and that's why there are then 5
> tasks to write to E/S. But I'm supposing ...
>
> On Feb 16, 2016, at 21:12, ayan guha  wrote:
>
> I have a slightly different understanding.
>
> Direct stream generates 1 RDD per batch, however, number of partitions in
> that RDD = number of partitions in kafka topic.
>
> On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon 
> wrote:
>
>> Hi guys,
>>
>> I'm making some tests with Spark and Kafka using a Python script. I use
>> the second method that doesn't need any receiver (Direct Approach). It
>> should adapt the number of RDDs to the number of partitions in the topic.
>> I'm trying to verify it. What's the easiest way to verify it ? I also tried
>> to co-locate Yarn, Spark and Kafka to check if RDDs are created depending
>> on the leaders of partitions in a topic, and they are not. Can you confirm
>> that RDDs are not created depending on the location of partitions and that
>> co-locating Kafka with Spark is not a must-have or that Spark does not take
>> advantage of it ?
>>
>> As the parallelism is simplified (by creating as many RDDs as there are
>> partitions) I suppose that the biggest part of the tuning is playing with
>> KafKa partitions (not talking about network configuration or management of
>> Spark resources) ?
>>
>> Thank you
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Spark Streaming with Kafka DirectStream

2016-02-16 Thread Cyril Scetbon
Your understanding is the right one (having re-read the documentation). Still 
wondering how I can verify that 5 partitions have been created. My job is 
reading from a topic in Kafka that has 5 partitions and sends the data to E/S. 
I can see that when there is one task to read from Kafka there are 5 tasks 
writing to E/S. So I'm supposing that the task reading from Kafka does it in // 
using 5 partitions and that's why there are then 5 tasks to write to E/S. But 
I'm supposing ...

> On Feb 16, 2016, at 21:12, ayan guha  wrote:
> 
> I have a slightly different understanding. 
> 
> Direct stream generates 1 RDD per batch, however, number of partitions in 
> that RDD = number of partitions in kafka topic.
> 
> On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon  > wrote:
> Hi guys,
> 
> I'm making some tests with Spark and Kafka using a Python script. I use the 
> second method that doesn't need any receiver (Direct Approach). It should 
> adapt the number of RDDs to the number of partitions in the topic. I'm trying 
> to verify it. What's the easiest way to verify it ? I also tried to co-locate 
> Yarn, Spark and Kafka to check if RDDs are created depending on the leaders 
> of partitions in a topic, and they are not. Can you confirm that RDDs are not 
> created depending on the location of partitions and that co-locating Kafka 
> with Spark is not a must-have or that Spark does not take advantage of it ?
> 
> As the parallelism is simplified (by creating as many RDDs as there are 
> partitions) I suppose that the biggest part of the tuning is playing with 
> KafKa partitions (not talking about network configuration or management of 
> Spark resources) ?
> 
> Thank you
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha



Re: Spark Streaming with Kafka DirectStream

2016-02-16 Thread ayan guha
I have a slightly different understanding.

Direct stream generates 1 RDD per batch, however, number of partitions in
that RDD = number of partitions in kafka topic.

On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon 
wrote:

> Hi guys,
>
> I'm making some tests with Spark and Kafka using a Python script. I use
> the second method that doesn't need any receiver (Direct Approach). It
> should adapt the number of RDDs to the number of partitions in the topic.
> I'm trying to verify it. What's the easiest way to verify it ? I also tried
> to co-locate Yarn, Spark and Kafka to check if RDDs are created depending
> on the leaders of partitions in a topic, and they are not. Can you confirm
> that RDDs are not created depending on the location of partitions and that
> co-locating Kafka with Spark is not a must-have or that Spark does not take
> advantage of it ?
>
> As the parallelism is simplified (by creating as many RDDs as there are
> partitions) I suppose that the biggest part of the tuning is playing with
> KafKa partitions (not talking about network configuration or management of
> Spark resources) ?
>
> Thank you
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Spark Streaming with Kafka DirectStream

2016-02-16 Thread Cyril Scetbon
Hi guys,

I'm making some tests with Spark and Kafka using a Python script. I use the 
second method that doesn't need any receiver (Direct Approach). It should adapt 
the number of RDDs to the number of partitions in the topic. I'm trying to 
verify it. What's the easiest way to verify it ? I also tried to co-locate 
Yarn, Spark and Kafka to check if RDDs are created depending on the leaders of 
partitions in a topic, and they are not. Can you confirm that RDDs are not 
created depending on the location of partitions and that co-locating Kafka with 
Spark is not a must-have or that Spark does not take advantage of it ?

As the parallelism is simplified (by creating as many RDDs as there are 
partitions) I suppose that the biggest part of the tuning is playing with KafKa 
partitions (not talking about network configuration or management of Spark 
resources) ?

Thank you
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming with Kafka: Dealing with 'slow' partitions

2016-02-12 Thread p pathiyil
Thanks Sebastian.

I was indeed trying out FAIR scheduling with a high value for
concurrentJobs today.

It does improve the latency seen by the non-hot partitions, even if it does
not provide complete isolation. So it might be an acceptable middle ground.
On 12 Feb 2016 12:18, "Sebastian Piu"  wrote:

> Have you tried using fair scheduler and queues
> On 12 Feb 2016 4:24 a.m., "p pathiyil"  wrote:
>
>> With this setting, I can see that the next job is being executed before
>> the previous one is finished. However, the processing of the 'hot'
>> partition eventually hogs all the concurrent jobs. If there was a way to
>> restrict jobs to be one per partition, then this setting would provide the
>> per-partition isolation.
>>
>> Is there anything in the framework which would give control over that
>> aspect ?
>>
>> Thanks.
>>
>>
>> On Thu, Feb 11, 2016 at 9:55 PM, Cody Koeninger 
>> wrote:
>>
>>> spark.streaming.concurrentJobs
>>>
>>>
>>> see e.g. 
>>> http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
>>>
>>>
>>> On Thu, Feb 11, 2016 at 9:33 AM, p pathiyil  wrote:
>>>
 Thanks for the response Cody.

 The producers are out of my control, so can't really balance the
 incoming content across the various topics and partitions. The number of
 topics and partitions are quite large and the volume across then not very
 well known ahead of time. So it is quite hard to segregate low and high
 volume topics in to separate driver programs.

 Will look at shuffle / repartition.

 Could you share the setting for starting another batch in parallel ? It
 might be ok to call the 'save' of the processed messages out of order if
 that is the only consequence of this setting.

 When separate DStreams are created per partition (and if union() is not
 called on them), what aspect of the framework still ties the scheduling of
 jobs across the partitions together ? Asking this to see if creating
 multiple threads in the driver and calling createDirectStream per partition
 in those threads can provide isolation.



 On Thu, Feb 11, 2016 at 8:14 PM, Cody Koeninger 
 wrote:

> The real way to fix this is by changing partitioning, so you don't
> have a hot partition.  It would be better to do this at the time you're
> producing messages, but you can also do it with a shuffle / repartition
> during consuming.
>
> There is a setting to allow another batch to start in parallel, but
> that's likely to have unintended consequences.
>
> On Thu, Feb 11, 2016 at 7:59 AM, p pathiyil 
> wrote:
>
>> Hi,
>>
>> I am looking at a way to isolate the processing of messages from each
>> Kafka partition within the same driver.
>>
>> Scenario: A DStream is created with the createDirectStream call by
>> passing in a few partitions. Let us say that the streaming context is
>> defined to have a time duration of 2 seconds. If the processing of 
>> messages
>> from a single partition takes more than 2 seconds (while all the others
>> finish much quicker), it seems that the next set of jobs get scheduled 
>> only
>> after the processing of that last partition. This means that the delay is
>> effective for all partitions and not just the partition that was truly 
>> the
>> cause of the delay. What I would like to do is to have the delay only
>> impact the 'slow' partition.
>>
>> Tried to create one DStream per partition and then do a union of all
>> partitions, (similar to the sample in
>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
>> but that didn't seem to help.
>>
>> Please suggest the correct approach to solve this issue.
>>
>> Thanks,
>> Praveen.
>>
>
>

>>>
>>


Re: Spark Streaming with Kafka: Dealing with 'slow' partitions

2016-02-11 Thread Sebastian Piu
Have you tried using fair scheduler and queues
On 12 Feb 2016 4:24 a.m., "p pathiyil"  wrote:

> With this setting, I can see that the next job is being executed before
> the previous one is finished. However, the processing of the 'hot'
> partition eventually hogs all the concurrent jobs. If there was a way to
> restrict jobs to be one per partition, then this setting would provide the
> per-partition isolation.
>
> Is there anything in the framework which would give control over that
> aspect ?
>
> Thanks.
>
>
> On Thu, Feb 11, 2016 at 9:55 PM, Cody Koeninger 
> wrote:
>
>> spark.streaming.concurrentJobs
>>
>>
>> see e.g. 
>> http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
>>
>>
>> On Thu, Feb 11, 2016 at 9:33 AM, p pathiyil  wrote:
>>
>>> Thanks for the response Cody.
>>>
>>> The producers are out of my control, so can't really balance the
>>> incoming content across the various topics and partitions. The number of
>>> topics and partitions are quite large and the volume across then not very
>>> well known ahead of time. So it is quite hard to segregate low and high
>>> volume topics in to separate driver programs.
>>>
>>> Will look at shuffle / repartition.
>>>
>>> Could you share the setting for starting another batch in parallel ? It
>>> might be ok to call the 'save' of the processed messages out of order if
>>> that is the only consequence of this setting.
>>>
>>> When separate DStreams are created per partition (and if union() is not
>>> called on them), what aspect of the framework still ties the scheduling of
>>> jobs across the partitions together ? Asking this to see if creating
>>> multiple threads in the driver and calling createDirectStream per partition
>>> in those threads can provide isolation.
>>>
>>>
>>>
>>> On Thu, Feb 11, 2016 at 8:14 PM, Cody Koeninger 
>>> wrote:
>>>
 The real way to fix this is by changing partitioning, so you don't have
 a hot partition.  It would be better to do this at the time you're
 producing messages, but you can also do it with a shuffle / repartition
 during consuming.

 There is a setting to allow another batch to start in parallel, but
 that's likely to have unintended consequences.

 On Thu, Feb 11, 2016 at 7:59 AM, p pathiyil  wrote:

> Hi,
>
> I am looking at a way to isolate the processing of messages from each
> Kafka partition within the same driver.
>
> Scenario: A DStream is created with the createDirectStream call by
> passing in a few partitions. Let us say that the streaming context is
> defined to have a time duration of 2 seconds. If the processing of 
> messages
> from a single partition takes more than 2 seconds (while all the others
> finish much quicker), it seems that the next set of jobs get scheduled 
> only
> after the processing of that last partition. This means that the delay is
> effective for all partitions and not just the partition that was truly the
> cause of the delay. What I would like to do is to have the delay only
> impact the 'slow' partition.
>
> Tried to create one DStream per partition and then do a union of all
> partitions, (similar to the sample in
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
> but that didn't seem to help.
>
> Please suggest the correct approach to solve this issue.
>
> Thanks,
> Praveen.
>


>>>
>>
>


Spark Streaming with Kafka: Dealing with 'slow' partitions

2016-02-11 Thread p pathiyil
Hi,

I am looking at a way to isolate the processing of messages from each Kafka
partition within the same driver.

Scenario: A DStream is created with the createDirectStream call by passing
in a few partitions. Let us say that the streaming context is defined to
have a time duration of 2 seconds. If the processing of messages from a
single partition takes more than 2 seconds (while all the others finish
much quicker), it seems that the next set of jobs get scheduled only after
the processing of that last partition. This means that the delay is
effective for all partitions and not just the partition that was truly the
cause of the delay. What I would like to do is to have the delay only
impact the 'slow' partition.

Tried to create one DStream per partition and then do a union of all
partitions, (similar to the sample in
http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
but that didn't seem to help.

Please suggest the correct approach to solve this issue.

Thanks,
Praveen.


Re: Spark Streaming with Kafka: Dealing with 'slow' partitions

2016-02-11 Thread p pathiyil
With this setting, I can see that the next job is being executed before the
previous one is finished. However, the processing of the 'hot' partition
eventually hogs all the concurrent jobs. If there was a way to restrict
jobs to be one per partition, then this setting would provide the
per-partition isolation.

Is there anything in the framework which would give control over that
aspect ?

Thanks.


On Thu, Feb 11, 2016 at 9:55 PM, Cody Koeninger  wrote:

> spark.streaming.concurrentJobs
>
>
> see e.g. 
> http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
>
>
> On Thu, Feb 11, 2016 at 9:33 AM, p pathiyil  wrote:
>
>> Thanks for the response Cody.
>>
>> The producers are out of my control, so can't really balance the incoming
>> content across the various topics and partitions. The number of topics and
>> partitions are quite large and the volume across then not very well known
>> ahead of time. So it is quite hard to segregate low and high volume topics
>> in to separate driver programs.
>>
>> Will look at shuffle / repartition.
>>
>> Could you share the setting for starting another batch in parallel ? It
>> might be ok to call the 'save' of the processed messages out of order if
>> that is the only consequence of this setting.
>>
>> When separate DStreams are created per partition (and if union() is not
>> called on them), what aspect of the framework still ties the scheduling of
>> jobs across the partitions together ? Asking this to see if creating
>> multiple threads in the driver and calling createDirectStream per partition
>> in those threads can provide isolation.
>>
>>
>>
>> On Thu, Feb 11, 2016 at 8:14 PM, Cody Koeninger 
>> wrote:
>>
>>> The real way to fix this is by changing partitioning, so you don't have
>>> a hot partition.  It would be better to do this at the time you're
>>> producing messages, but you can also do it with a shuffle / repartition
>>> during consuming.
>>>
>>> There is a setting to allow another batch to start in parallel, but
>>> that's likely to have unintended consequences.
>>>
>>> On Thu, Feb 11, 2016 at 7:59 AM, p pathiyil  wrote:
>>>
 Hi,

 I am looking at a way to isolate the processing of messages from each
 Kafka partition within the same driver.

 Scenario: A DStream is created with the createDirectStream call by
 passing in a few partitions. Let us say that the streaming context is
 defined to have a time duration of 2 seconds. If the processing of messages
 from a single partition takes more than 2 seconds (while all the others
 finish much quicker), it seems that the next set of jobs get scheduled only
 after the processing of that last partition. This means that the delay is
 effective for all partitions and not just the partition that was truly the
 cause of the delay. What I would like to do is to have the delay only
 impact the 'slow' partition.

 Tried to create one DStream per partition and then do a union of all
 partitions, (similar to the sample in
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
 but that didn't seem to help.

 Please suggest the correct approach to solve this issue.

 Thanks,
 Praveen.

>>>
>>>
>>
>


[Spark Streaming] Joining Kafka and Cassandra DataFrames

2016-02-09 Thread bernhard

All,

I'm new to Spark and I'm having a hard time doing a simple join of two DFs

Intent:
-  I'm receiving data from Kafka via direct stream and would like to  
enrich the messages with data from Cassandra. The Kafka messages  
(Protobufs) are decoded into DataFrames and then joined with a  
(supposedly pre-filtered) DF from Cassandra. The relation of (Kafka)  
streaming batch size to raw C* data is [several streaming messages to  
millions of C* rows], BUT the join always yields exactly ONE result  
[1:1] per message. After the join the resulting DF is eventually  
stored to another C* table.


Problem:
- Even though I'm joining the two DFs on the full Cassandra primary  
key and pushing the corresponding filter to C*, it seems that Spark is  
loading the whole C* data-set into memory before actually joining  
(which I'd like to prevent by using the filter/predicate pushdown).  
This leads to a lot of shuffling and tasks being spawned, hence the  
"simple" join takes forever...


Could anyone shed some light on this? In my perception this should be  
a prime-example for DFs and Spark Streaming.


Environment:
- Spark 1.6
- Cassandra 2.1.12
- Cassandra-Spark-Connector 1.5-RC1
- Kafka 0.8.2.2

Code:

def main(args: Array[String]) {
val conf = new SparkConf()
  .setAppName("test")
  .set("spark.cassandra.connection.host", "xxx")
  .set("spark.cassandra.connection.keep_alive_ms", "3")
  .setMaster("local[*]")

val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("INFO")

// Initialise Kafka
val kafkaTopics = Set[String]("xxx")
val kafkaParams = Map[String, String](
  "metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
  "auto.offset.reset" -> "smallest")

// Kafka stream
val messages = KafkaUtils.createDirectStream[String, MyMsg,  
StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)


// Executed on the driver
messages.foreachRDD { rdd =>

  // Create an instance of SQLContext
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  import sqlContext.implicits._

  // Map MyMsg RDD
  val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}

  // Convert RDD[MyMsg] to DataFrame
  val MyMsgDf = MyMsgRdd.toDF()
.select(
$"prim1Id" as 'prim1_id,
$"prim2Id" as 'prim2_id,
$...
  )

  // Load DataFrame from C* data-source
  val base_data = base_data_df.getInstance(sqlContext)

  // Inner join on prim1Id and prim2Id
  val joinedDf = MyMsgDf.join(base_data,
MyMsgDf("prim1_id") === base_data("prim1_id") &&
MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
.filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
&& base_data("prim2_id").isin(MyMsgDf("prim2_id")))

  joinedDf.show()
  joinedDf.printSchema()

  // Select relevant fields

  // Persist

}

// Start the computation
ssc.start()
ssc.awaitTermination()
}

SO:  
http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandra-dataframes-in-spark-streaming-ignores-c-predicate-p




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: [Spark Streaming] Joining Kafka and Cassandra DataFrames

2016-02-09 Thread Mohammed Guller
You may have better luck with this question on the Spark Cassandra Connector 
mailing list.



One quick question about this code from your email:

   // Load DataFrame from C* data-source

   val base_data = base_data_df.getInstance(sqlContext)



What exactly is base_data_df and how are you creating it?

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>



-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 6:58 AM
To: user@spark.apache.org
Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames



All,



I'm new to Spark and I'm having a hard time doing a simple join of two DFs



Intent:

-  I'm receiving data from Kafka via direct stream and would like to enrich the 
messages with data from Cassandra. The Kafka messages

(Protobufs) are decoded into DataFrames and then joined with a (supposedly 
pre-filtered) DF from Cassandra. The relation of (Kafka) streaming batch size 
to raw C* data is [several streaming messages to millions of C* rows], BUT the 
join always yields exactly ONE result [1:1] per message. After the join the 
resulting DF is eventually stored to another C* table.



Problem:

- Even though I'm joining the two DFs on the full Cassandra primary key and 
pushing the corresponding filter to C*, it seems that Spark is loading the 
whole C* data-set into memory before actually joining (which I'd like to 
prevent by using the filter/predicate pushdown).

This leads to a lot of shuffling and tasks being spawned, hence the "simple" 
join takes forever...



Could anyone shed some light on this? In my perception this should be a 
prime-example for DFs and Spark Streaming.



Environment:

- Spark 1.6

- Cassandra 2.1.12

- Cassandra-Spark-Connector 1.5-RC1

- Kafka 0.8.2.2



Code:



def main(args: Array[String]) {

 val conf = new SparkConf()

   .setAppName("test")

   .set("spark.cassandra.connection.host", "xxx")

   .set("spark.cassandra.connection.keep_alive_ms", "3")

   .setMaster("local[*]")



 val ssc = new StreamingContext(conf, Seconds(10))

 ssc.sparkContext.setLogLevel("INFO")



 // Initialise Kafka

 val kafkaTopics = Set[String]("xxx")

 val kafkaParams = Map[String, String](

   "metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",

   "auto.offset.reset" -> "smallest")



 // Kafka stream

 val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, 
MyMsgDecoder](ssc, kafkaParams, kafkaTopics)



 // Executed on the driver

 messages.foreachRDD { rdd =>



   // Create an instance of SQLContext

   val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)

   import sqlContext.implicits._



   // Map MyMsg RDD

   val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}



   // Convert RDD[MyMsg] to DataFrame

   val MyMsgDf = MyMsgRdd.toDF()

.select(

 $"prim1Id" as 'prim1_id,

 $"prim2Id" as 'prim2_id,

 $...

   )



   // Load DataFrame from C* data-source

   val base_data = base_data_df.getInstance(sqlContext)



   // Inner join on prim1Id and prim2Id

   val joinedDf = MyMsgDf.join(base_data,

 MyMsgDf("prim1_id") === base_data("prim1_id") &&

 MyMsgDf("prim2_id") === base_data("prim2_id"), "left")

 .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))

 && base_data("prim2_id").isin(MyMsgDf("prim2_id")))



   joinedDf.show()

   joinedDf.printSchema()



   // Select relevant fields



   // Persist



 }



 // Start the computation

 ssc.start()

 ssc.awaitTermination()

}



SO:

http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandra-dataframes-in-spark-streaming-ignores-c-predicate-p







-

To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For 
additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>




Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

2016-02-09 Thread bernhard

Hi Mohammed

I'm aware of that documentation, what are you hinting at specifically?  
I'm pushing all elements of the partition key, so that should work. As  
user zero323 on SO pointed out it the problem is most probably related  
to the dynamic nature of the predicate elements (two distributed  
collections per filter per join).


The statement "To push down partition keys, all of them must be  
included, but not more than one predicate per partition key, otherwise  
nothing is pushed down."


Does not apply IMO?

Bernhard

Quoting Mohammed Guller <moham...@glassbeam.com>:


Hi Bernhard,

Take a look at the examples shown under the "Pushing down clauses to  
Cassandra" sections on this page:


https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md


Mohammed
Author: Big Data Analytics with Spark

-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 10:05 PM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Hi Mohammed

Thanks for hint, I should probably do that :)

As for the DF singleton:

/**
  * Lazily instantiated singleton instance of base_data DataFrame
  */
object base_data_df {

   @transient private var instance: DataFrame = _

   def getInstance(sqlContext: SQLContext): DataFrame = {
 if (instance == null) {
   // Load DataFrame with C* data-source
   instance = sqlContext.read
 .format("org.apache.spark.sql.cassandra")
 .options(Map("table" -> "cf", "keyspace" -> "ks"))
 .load()
 }
 instance
   }
}

Bernhard

Quoting Mohammed Guller <moham...@glassbeam.com>:


You may have better luck with this question on the Spark Cassandra
Connector mailing list.



One quick question about this code from your email:

   // Load DataFrame from C* data-source

   val base_data = base_data_df.getInstance(sqlContext)



What exactly is base_data_df and how are you creating it?

Mohammed
Author: Big Data Analytics with
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/
1484209656/>



-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 6:58 AM
To: user@spark.apache.org
Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames



All,



I'm new to Spark and I'm having a hard time doing a simple join of two
DFs



Intent:

-  I'm receiving data from Kafka via direct stream and would like to
enrich the messages with data from Cassandra. The Kafka messages

(Protobufs) are decoded into DataFrames and then joined with a
(supposedly pre-filtered) DF from Cassandra. The relation of (Kafka)
streaming batch size to raw C* data is [several streaming messages to
millions of C* rows], BUT the join always yields exactly ONE result
[1:1] per message. After the join the resulting DF is eventually
stored to another C* table.



Problem:

- Even though I'm joining the two DFs on the full Cassandra primary
key and pushing the corresponding filter to C*, it seems that Spark is
loading the whole C* data-set into memory before actually joining
(which I'd like to prevent by using the filter/predicate pushdown).

This leads to a lot of shuffling and tasks being spawned, hence the
"simple" join takes forever...



Could anyone shed some light on this? In my perception this should be
a prime-example for DFs and Spark Streaming.



Environment:

- Spark 1.6

- Cassandra 2.1.12

- Cassandra-Spark-Connector 1.5-RC1

- Kafka 0.8.2.2



Code:



def main(args: Array[String]) {

 val conf = new SparkConf()

   .setAppName("test")

   .set("spark.cassandra.connection.host", "xxx")

   .set("spark.cassandra.connection.keep_alive_ms", "3")

   .setMaster("local[*]")



 val ssc = new StreamingContext(conf, Seconds(10))

 ssc.sparkContext.setLogLevel("INFO")



 // Initialise Kafka

 val kafkaTopics = Set[String]("xxx")

 val kafkaParams = Map[String, String](

   "metadata.broker.list" ->
"xxx:32000,xxx:32000,xxx:32000,xxx:32000",

   "auto.offset.reset" -> "smallest")



 // Kafka stream

 val messages = KafkaUtils.createDirectStream[String, MyMsg,
StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)



 // Executed on the driver

 messages.foreachRDD { rdd =>



   // Create an instance of SQLContext

   val sqlContext =
SQLContextSingleton.getInstance(rdd.sparkContext)

   import sqlContext.implicits._



   // Map MyMsg RDD

   val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}



   // Convert RDD[MyMsg] to DataFrame

   val MyMsgDf = MyMsgRdd.toDF()

.select(

 $"prim1

Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

2016-02-09 Thread bernhard
The filter in the join is re-arranged in the DAG (from what I can tell  
--> explain/UI) and should therefore be pushed accordingly. I also  
made experiments applying the filter to base_data before the join  
explicitly, effectively creating a new DF, but no luck either.



Quoting Mohammed Guller <moham...@glassbeam.com>:

Moving the spark mailing list to BCC since this is not really  
related to Spark.


May be I am missing something, but where are you calling the filter  
method on the base_data DF to push down the predicates to Cassandra  
before calling the join method?


Mohammed
Author: Big Data Analytics with Spark


-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 10:47 PM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Hi Mohammed

I'm aware of that documentation, what are you hinting at specifically?
I'm pushing all elements of the partition key, so that should work.  
As user zero323 on SO pointed out it the problem is most probably  
related to the dynamic nature of the predicate elements (two  
distributed collections per filter per join).


The statement "To push down partition keys, all of them must be  
included, but not more than one predicate per partition key,  
otherwise nothing is pushed down."


Does not apply IMO?

Bernhard

Quoting Mohammed Guller <moham...@glassbeam.com>:


Hi Bernhard,

Take a look at the examples shown under the "Pushing down clauses to
Cassandra" sections on this page:

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/
14_data_frames.md


Mohammed
Author: Big Data Analytics with Spark

-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 10:05 PM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Hi Mohammed

Thanks for hint, I should probably do that :)

As for the DF singleton:

/**
  * Lazily instantiated singleton instance of base_data DataFrame
  */
object base_data_df {

   @transient private var instance: DataFrame = _

   def getInstance(sqlContext: SQLContext): DataFrame = {
 if (instance == null) {
   // Load DataFrame with C* data-source
   instance = sqlContext.read
 .format("org.apache.spark.sql.cassandra")
 .options(Map("table" -> "cf", "keyspace" -> "ks"))
 .load()
 }
 instance
   }
}

Bernhard

Quoting Mohammed Guller <moham...@glassbeam.com>:


You may have better luck with this question on the Spark Cassandra
Connector mailing list.



One quick question about this code from your email:

   // Load DataFrame from C* data-source

   val base_data = base_data_df.getInstance(sqlContext)



What exactly is base_data_df and how are you creating it?

Mohammed
Author: Big Data Analytics with
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp
/
1484209656/>



-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 6:58 AM
To: user@spark.apache.org
Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames



All,



I'm new to Spark and I'm having a hard time doing a simple join of
two DFs



Intent:

-  I'm receiving data from Kafka via direct stream and would like to
enrich the messages with data from Cassandra. The Kafka messages

(Protobufs) are decoded into DataFrames and then joined with a
(supposedly pre-filtered) DF from Cassandra. The relation of (Kafka)
streaming batch size to raw C* data is [several streaming messages to
millions of C* rows], BUT the join always yields exactly ONE result
[1:1] per message. After the join the resulting DF is eventually
stored to another C* table.



Problem:

- Even though I'm joining the two DFs on the full Cassandra primary
key and pushing the corresponding filter to C*, it seems that Spark
is loading the whole C* data-set into memory before actually joining
(which I'd like to prevent by using the filter/predicate pushdown).

This leads to a lot of shuffling and tasks being spawned, hence the
"simple" join takes forever...



Could anyone shed some light on this? In my perception this should be
a prime-example for DFs and Spark Streaming.



Environment:

- Spark 1.6

- Cassandra 2.1.12

- Cassandra-Spark-Connector 1.5-RC1

- Kafka 0.8.2.2



Code:



def main(args: Array[String]) {

 val conf = new SparkConf()

   .setAppName("test")

   .set("spark.cassandra.connection.host", "xxx")

   .set("spark.cassandra.connection.keep_alive_ms", "3")

   .setMaster("local[*]")



 val ssc = new StreamingContext(conf, Seconds(10))

 ssc.sparkContext.setLogLevel("INFO")




RE: [Spark Streaming] Joining Kafka and Cassandra DataFrames

2016-02-09 Thread Mohammed Guller
Hi Bernhard,

Take a look at the examples shown under the "Pushing down clauses to Cassandra" 
sections on this page:

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md


Mohammed
Author: Big Data Analytics with Spark

-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch] 
Sent: Tuesday, February 9, 2016 10:05 PM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Hi Mohammed

Thanks for hint, I should probably do that :)

As for the DF singleton:

/**
  * Lazily instantiated singleton instance of base_data DataFrame
  */
object base_data_df {

   @transient private var instance: DataFrame = _

   def getInstance(sqlContext: SQLContext): DataFrame = {
 if (instance == null) {
   // Load DataFrame with C* data-source
   instance = sqlContext.read
 .format("org.apache.spark.sql.cassandra")
 .options(Map("table" -> "cf", "keyspace" -> "ks"))
 .load()
 }
 instance
   }
}

Bernhard

Quoting Mohammed Guller <moham...@glassbeam.com>:

> You may have better luck with this question on the Spark Cassandra 
> Connector mailing list.
>
>
>
> One quick question about this code from your email:
>
>// Load DataFrame from C* data-source
>
>val base_data = base_data_df.getInstance(sqlContext)
>
>
>
> What exactly is base_data_df and how are you creating it?
>
> Mohammed
> Author: Big Data Analytics with
> Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/
> 1484209656/>
>
>
>
> -Original Message-
> From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
> Sent: Tuesday, February 9, 2016 6:58 AM
> To: user@spark.apache.org
> Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>
>
>
> All,
>
>
>
> I'm new to Spark and I'm having a hard time doing a simple join of two 
> DFs
>
>
>
> Intent:
>
> -  I'm receiving data from Kafka via direct stream and would like to 
> enrich the messages with data from Cassandra. The Kafka messages
>
> (Protobufs) are decoded into DataFrames and then joined with a 
> (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka) 
> streaming batch size to raw C* data is [several streaming messages to 
> millions of C* rows], BUT the join always yields exactly ONE result 
> [1:1] per message. After the join the resulting DF is eventually 
> stored to another C* table.
>
>
>
> Problem:
>
> - Even though I'm joining the two DFs on the full Cassandra primary 
> key and pushing the corresponding filter to C*, it seems that Spark is 
> loading the whole C* data-set into memory before actually joining 
> (which I'd like to prevent by using the filter/predicate pushdown).
>
> This leads to a lot of shuffling and tasks being spawned, hence the 
> "simple" join takes forever...
>
>
>
> Could anyone shed some light on this? In my perception this should be 
> a prime-example for DFs and Spark Streaming.
>
>
>
> Environment:
>
> - Spark 1.6
>
> - Cassandra 2.1.12
>
> - Cassandra-Spark-Connector 1.5-RC1
>
> - Kafka 0.8.2.2
>
>
>
> Code:
>
>
>
> def main(args: Array[String]) {
>
>  val conf = new SparkConf()
>
>.setAppName("test")
>
>.set("spark.cassandra.connection.host", "xxx")
>
>.set("spark.cassandra.connection.keep_alive_ms", "3")
>
>.setMaster("local[*]")
>
>
>
>  val ssc = new StreamingContext(conf, Seconds(10))
>
>  ssc.sparkContext.setLogLevel("INFO")
>
>
>
>  // Initialise Kafka
>
>  val kafkaTopics = Set[String]("xxx")
>
>  val kafkaParams = Map[String, String](
>
>"metadata.broker.list" -> 
> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
>
>"auto.offset.reset" -> "smallest")
>
>
>
>  // Kafka stream
>
>  val messages = KafkaUtils.createDirectStream[String, MyMsg, 
> StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)
>
>
>
>  // Executed on the driver
>
>  messages.foreachRDD { rdd =>
>
>
>
>// Create an instance of SQLContext
>
>val sqlContext = 
> SQLContextSingleton.getInstance(rdd.sparkContext)
>
>import sqlContext.implicits._
>
>
>
>// Map MyMsg RDD
>
>val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
>
>
>
>// Convert RDD[MyMsg] to DataFrame
>
>  

RE: [Spark Streaming] Joining Kafka and Cassandra DataFrames

2016-02-09 Thread Mohammed Guller
Moving the spark mailing list to BCC since this is not really related to Spark.

May be I am missing something, but where are you calling the filter method on 
the base_data DF to push down the predicates to Cassandra before calling the 
join method? 

Mohammed
Author: Big Data Analytics with Spark


-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch] 
Sent: Tuesday, February 9, 2016 10:47 PM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

Hi Mohammed

I'm aware of that documentation, what are you hinting at specifically?  
I'm pushing all elements of the partition key, so that should work. As user 
zero323 on SO pointed out it the problem is most probably related to the 
dynamic nature of the predicate elements (two distributed collections per 
filter per join).

The statement "To push down partition keys, all of them must be included, but 
not more than one predicate per partition key, otherwise nothing is pushed 
down."

Does not apply IMO?

Bernhard

Quoting Mohammed Guller <moham...@glassbeam.com>:

> Hi Bernhard,
>
> Take a look at the examples shown under the "Pushing down clauses to 
> Cassandra" sections on this page:
>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/
> 14_data_frames.md
>
>
> Mohammed
> Author: Big Data Analytics with Spark
>
> -Original Message-
> From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
> Sent: Tuesday, February 9, 2016 10:05 PM
> To: Mohammed Guller
> Cc: user@spark.apache.org
> Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>
> Hi Mohammed
>
> Thanks for hint, I should probably do that :)
>
> As for the DF singleton:
>
> /**
>   * Lazily instantiated singleton instance of base_data DataFrame
>   */
> object base_data_df {
>
>@transient private var instance: DataFrame = _
>
>def getInstance(sqlContext: SQLContext): DataFrame = {
>  if (instance == null) {
>// Load DataFrame with C* data-source
>instance = sqlContext.read
>  .format("org.apache.spark.sql.cassandra")
>  .options(Map("table" -> "cf", "keyspace" -> "ks"))
>  .load()
>  }
>  instance
>}
> }
>
> Bernhard
>
> Quoting Mohammed Guller <moham...@glassbeam.com>:
>
>> You may have better luck with this question on the Spark Cassandra 
>> Connector mailing list.
>>
>>
>>
>> One quick question about this code from your email:
>>
>>// Load DataFrame from C* data-source
>>
>>val base_data = base_data_df.getInstance(sqlContext)
>>
>>
>>
>> What exactly is base_data_df and how are you creating it?
>>
>> Mohammed
>> Author: Big Data Analytics with
>> Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp
>> /
>> 1484209656/>
>>
>>
>>
>> -Original Message-
>> From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
>> Sent: Tuesday, February 9, 2016 6:58 AM
>> To: user@spark.apache.org
>> Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames
>>
>>
>>
>> All,
>>
>>
>>
>> I'm new to Spark and I'm having a hard time doing a simple join of 
>> two DFs
>>
>>
>>
>> Intent:
>>
>> -  I'm receiving data from Kafka via direct stream and would like to 
>> enrich the messages with data from Cassandra. The Kafka messages
>>
>> (Protobufs) are decoded into DataFrames and then joined with a 
>> (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka) 
>> streaming batch size to raw C* data is [several streaming messages to 
>> millions of C* rows], BUT the join always yields exactly ONE result 
>> [1:1] per message. After the join the resulting DF is eventually 
>> stored to another C* table.
>>
>>
>>
>> Problem:
>>
>> - Even though I'm joining the two DFs on the full Cassandra primary 
>> key and pushing the corresponding filter to C*, it seems that Spark 
>> is loading the whole C* data-set into memory before actually joining 
>> (which I'd like to prevent by using the filter/predicate pushdown).
>>
>> This leads to a lot of shuffling and tasks being spawned, hence the 
>> "simple" join takes forever...
>>
>>
>>
>> Could anyone shed some light on this? In my perception this should be 
>> a prime-example for DFs and Spark Streaming.
>>
>>
>>
>> Environment:
>>
>> - Spa

Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames

2016-02-09 Thread bernhard

Hi Mohammed

Thanks for hint, I should probably do that :)

As for the DF singleton:

/**
 * Lazily instantiated singleton instance of base_data DataFrame
 */
object base_data_df {

  @transient private var instance: DataFrame = _

  def getInstance(sqlContext: SQLContext): DataFrame = {
if (instance == null) {
  // Load DataFrame with C* data-source
  instance = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "cf", "keyspace" -> "ks"))
.load()
}
instance
  }
}

Bernhard

Quoting Mohammed Guller <moham...@glassbeam.com>:

You may have better luck with this question on the Spark Cassandra  
Connector mailing list.




One quick question about this code from your email:

   // Load DataFrame from C* data-source

   val base_data = base_data_df.getInstance(sqlContext)



What exactly is base_data_df and how are you creating it?

Mohammed
Author: Big Data Analytics with  
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>




-Original Message-
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 6:58 AM
To: user@spark.apache.org
Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames



All,



I'm new to Spark and I'm having a hard time doing a simple join of two DFs



Intent:

-  I'm receiving data from Kafka via direct stream and would like to  
enrich the messages with data from Cassandra. The Kafka messages


(Protobufs) are decoded into DataFrames and then joined with a  
(supposedly pre-filtered) DF from Cassandra. The relation of (Kafka)  
streaming batch size to raw C* data is [several streaming messages  
to millions of C* rows], BUT the join always yields exactly ONE  
result [1:1] per message. After the join the resulting DF is  
eventually stored to another C* table.




Problem:

- Even though I'm joining the two DFs on the full Cassandra primary  
key and pushing the corresponding filter to C*, it seems that Spark  
is loading the whole C* data-set into memory before actually joining  
(which I'd like to prevent by using the filter/predicate pushdown).


This leads to a lot of shuffling and tasks being spawned, hence the  
"simple" join takes forever...




Could anyone shed some light on this? In my perception this should  
be a prime-example for DFs and Spark Streaming.




Environment:

- Spark 1.6

- Cassandra 2.1.12

- Cassandra-Spark-Connector 1.5-RC1

- Kafka 0.8.2.2



Code:



def main(args: Array[String]) {

 val conf = new SparkConf()

   .setAppName("test")

   .set("spark.cassandra.connection.host", "xxx")

   .set("spark.cassandra.connection.keep_alive_ms", "3")

   .setMaster("local[*]")



 val ssc = new StreamingContext(conf, Seconds(10))

 ssc.sparkContext.setLogLevel("INFO")



 // Initialise Kafka

 val kafkaTopics = Set[String]("xxx")

 val kafkaParams = Map[String, String](

   "metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",

   "auto.offset.reset" -> "smallest")



 // Kafka stream

 val messages = KafkaUtils.createDirectStream[String, MyMsg,  
StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)




 // Executed on the driver

 messages.foreachRDD { rdd =>



   // Create an instance of SQLContext

   val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)

   import sqlContext.implicits._



   // Map MyMsg RDD

   val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}



   // Convert RDD[MyMsg] to DataFrame

   val MyMsgDf = MyMsgRdd.toDF()

.select(

 $"prim1Id" as 'prim1_id,

 $"prim2Id" as 'prim2_id,

 $...

   )



   // Load DataFrame from C* data-source

   val base_data = base_data_df.getInstance(sqlContext)



   // Inner join on prim1Id and prim2Id

   val joinedDf = MyMsgDf.join(base_data,

 MyMsgDf("prim1_id") === base_data("prim1_id") &&

 MyMsgDf("prim2_id") === base_data("prim2_id"), "left")

 .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))

 && base_data("prim2_id").isin(MyMsgDf("prim2_id")))



   joinedDf.show()

   joinedDf.printSchema()



   // Select relevant fields



   // Persist



 }



 // Start the computation

 ssc.start()

 ssc.awaitTermination()

}



SO:

http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandra-dataframes-in-spark-streaming-ignores-c-predicate-p








Spark Streaming: My kafka receivers are not consuming in parallel

2016-02-03 Thread Jorge Rodriguez
Hello Spark users,

We are setting up our fist bach of spark streaming pipelines.  And I am
running into an issue which I am not sure how to resolve, but seems like
should be fairly trivial.

I am using receiver-mode Kafka consumer that comes with Spark, and running
in standalone mode.  I've setup two receivers, which are consuming from a 4
broker, 4 partition kafka topic.

If you will look at the image below, you will see that* even though I have
two receivers, only one of them ever consumes data at a time*.  I believe
this to be my current bottleneck for scaling.

What am I missing?

To me, order of events consumed is not important.  I just want to optimize
for maximum throughput.


[image: Inline image 1]

Thanks in advance for any help or tips!

Jorge


Re: Spark Streaming: My kafka receivers are not consuming in parallel

2016-02-03 Thread Jorge Rodriguez
Please ignore this question, as i've figured out what my problem was.

In the case that anyone else runs into something similar, the problem was
on the kafka side.  I was using the console producer to generate the
messages going into the kafka logs.  This producer will send all of the
messages to the same partition, unless you specify the "--new-producer"
parameter.

Thanks,
Jorge

On Wed, Feb 3, 2016 at 12:44 PM, Jorge Rodriguez 
wrote:

> Hello Spark users,
>
> We are setting up our fist bach of spark streaming pipelines.  And I am
> running into an issue which I am not sure how to resolve, but seems like
> should be fairly trivial.
>
> I am using receiver-mode Kafka consumer that comes with Spark, and running
> in standalone mode.  I've setup two receivers, which are consuming from a 4
> broker, 4 partition kafka topic.
>
> If you will look at the image below, you will see that* even though I
> have two receivers, only one of them ever consumes data at a time*.  I
> believe this to be my current bottleneck for scaling.
>
> What am I missing?
>
> To me, order of events consumed is not important.  I just want to optimize
> for maximum throughput.
>
>
> [image: Inline image 1]
>
> Thanks in advance for any help or tips!
>
> Jorge
>


Re: Spark Streaming with Kafka - batch DStreams in memory

2016-02-02 Thread Cody Koeninger
It's possible you could (ab)use updateStateByKey or mapWithState for this.

But honestly it's probably a lot more straightforward to just choose a
reasonable batch size that gets you a reasonable file size for most of your
keys, then use filecrush or something similar to deal with the hdfs small
file problem.

On Mon, Feb 1, 2016 at 10:11 PM, p pathiyil  wrote:

> Hi,
>
> Are there any ways to store DStreams / RDD read from Kafka in memory to be
> processed at a later time ? What we need to do is to read data from Kafka,
> process it to be keyed by some attribute that is present in the Kafka
> messages, and write out the data related to each key when we have
> accumulated enough data for that key to write out a file that is close to
> the HDFS block size, say 64MB. We are looking at ways to avoid writing out
> some file of the entire Kafka content periodically and then later run a
> second job to read those files and split them out to another set of files
> as necessary.
>
> Thanks.
>


Spark Streaming with Kafka - batch DStreams in memory

2016-02-01 Thread p pathiyil
Hi,

Are there any ways to store DStreams / RDD read from Kafka in memory to be
processed at a later time ? What we need to do is to read data from Kafka,
process it to be keyed by some attribute that is present in the Kafka
messages, and write out the data related to each key when we have
accumulated enough data for that key to write out a file that is close to
the HDFS block size, say 64MB. We are looking at ways to avoid writing out
some file of the entire Kafka content periodically and then later run a
second job to read those files and split them out to another set of files
as necessary.

Thanks.


Re: Spark Streaming 1.5.2+Kafka+Python. Strange reading

2015-12-24 Thread Akhil Das
Would you mind posting the relevant code snippet?

Thanks
Best Regards

On Wed, Dec 23, 2015 at 7:33 PM, Vyacheslav Yanuk 
wrote:

> Hi.
> I have very strange situation with direct reading from Kafka.
> For example.
> I have 1000 messages in Kafka.
> After submitting my application I read this data and process it.
> As I process the data I have accumulated 10 new entries.
> In next reading from Kafka I read only 3 records, but not 10!!!
> Why???
> I don't understand...
> Explain to me please!
>
> --
> WBR, Vyacheslav Yanuk
> Codeminders.com
>


Spark Streaming 1.5.2+Kafka+Python. Strange reading

2015-12-23 Thread Vyacheslav Yanuk
Hi.
I have very strange situation with direct reading from Kafka.
For example.
I have 1000 messages in Kafka.
After submitting my application I read this data and process it.
As I process the data I have accumulated 10 new entries.
In next reading from Kafka I read only 3 records, but not 10!!!
Why???
I don't understand...
Explain to me please!

-- 
WBR, Vyacheslav Yanuk
Codeminders.com


Spark Streaming 1.5.2+Kafka+Python (docs)

2015-12-23 Thread Vyacheslav Yanuk
Colleagues
Documents written about  createDirectStream that

"This does not use Zookeeper to store offsets. The consumed offsets are
tracked by the stream itself. For interoperability with Kafka monitoring
tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself
from the streaming application. You can access the offsets used in each
batch from the generated RDDs (see   "

My question is.
How I can access the offsets used in each batch ???
What I should SEE???

-- 
WBR, Vyacheslav Yanuk
Codeminders.com


Re: Spark Streaming 1.5.2+Kafka+Python (docs)

2015-12-23 Thread Cody Koeninger
Read the documentation
spark.apache.org/docs/latest/streaming-kafka-integration.html
If you still have questions, read the resources linked from
https://github.com/koeninger/kafka-exactly-once

On Wed, Dec 23, 2015 at 7:24 AM, Vyacheslav Yanuk 
wrote:

> Colleagues
> Documents written about  createDirectStream that
>
> "This does not use Zookeeper to store offsets. The consumed offsets are
> tracked by the stream itself. For interoperability with Kafka monitoring
> tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself
> from the streaming application. You can access the offsets used in each
> batch from the generated RDDs (see   "
>
> My question is.
> How I can access the offsets used in each batch ???
> What I should SEE???
>
> --
> WBR, Vyacheslav Yanuk
> Codeminders.com
>


Re: Spark Streaming Specify Kafka Partition

2015-12-04 Thread Cody Koeninger
So createDirectStream will give you a JavaInputDStream of R, where R is the
return type you chose for your message handler.

If you want a JavaPairInputDStream, you may have to call .mapToPair in
order to convert the stream, even if the type you chose for R was already
Tuple2

(note that I try to stay as far away from Java as possible, so this answer
is untested, possibly inaccurate, may throw checked exceptions etc etc)

On Thu, Dec 3, 2015 at 5:21 PM, Alan Braithwaite <a...@cloudflare.com>
wrote:

> One quick newbie question since I got another chance to look at this
> today.  We're using java for our spark applications.  The
> createDirectStream we were using previously [1] returns a
> JavaPairInputDStream, but the createDirectStream with fromOffsets expects
> an argument recordClass to pass into the generic constructor for
> createDirectStream.
>
> In the code for the first function signature (without fromOffsets) it's
> being constructed in Scala as just a tuple (K, V).   How do I pass this
> same class/type information from java as the record class to get a 
> JavaPairInputDStream<K,
> V>?
>
> I understand this might be a question more fit for a scala mailing list
> but google is failing me at the moment for hints on the interoperability of
> scala and java generics.
>
> [1] The original createDirectStream:
> https://github.com/apache/spark/blob/branch-1.5/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala#L395-L423
>
> Thanks,
> - Alan
>
> On Tue, Dec 1, 2015 at 8:12 AM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> I actually haven't tried that, since I tend to do the offset lookups if
>> necessary.
>>
>> It's possible that it will work, try it and let me know.
>>
>> Be aware that if you're doing a count() or take() operation directly on
>> the rdd it'll definitely give you the wrong result if you're using -1 for
>> one of the offsets.
>>
>>
>>
>> On Tue, Dec 1, 2015 at 9:58 AM, Alan Braithwaite <a...@cloudflare.com>
>> wrote:
>>
>>> Neat, thanks.  If I specify something like -1 as the offset, will it
>>> consume from the latest offset or do I have to instrument that manually?
>>>
>>> - Alan
>>>
>>> On Tue, Dec 1, 2015 at 6:43 AM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> Yes, there is a version of createDirectStream that lets you specify
>>>> fromOffsets: Map[TopicAndPartition, Long]
>>>>
>>>> On Mon, Nov 30, 2015 at 7:43 PM, Alan Braithwaite <a...@cloudflare.com>
>>>> wrote:
>>>>
>>>>> Is there any mechanism in the kafka streaming source to specify the
>>>>> exact partition id that we want a streaming job to consume from?
>>>>>
>>>>> If not, is there a workaround besides writing our a custom receiver?
>>>>>
>>>>> Thanks,
>>>>> - Alan
>>>>>
>>>>
>>>>
>>>
>>
>


Higher Processing times in Spark Streaming with kafka Direct

2015-12-04 Thread SRK
Hi,

Our processing times in  Spark Streaming with kafka Direct approach seems to
have increased considerably with increase in the Site traffic. Would
increasing the number of kafka partitions decrease  the processing times?
Any suggestions on tuning to reduce the processing times would be of great
help.

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Higher-Processing-times-in-Spark-Streaming-with-kafka-Direct-tp25571.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Higher Processing times in Spark Streaming with kafka Direct

2015-12-04 Thread u...@moosheimer.com
Hi,

processing time depends on what you are doing with the events.
Increasing the number of partitions could be an idea if you write more messages 
to the topic than you read currently via Spark.

Can you write more details?

Mit freundlichen Grüßen / best regards
Kay-Uwe Moosheimer

> Am 04.12.2015 um 22:21 schrieb SRK <swethakasire...@gmail.com>:
> 
> Hi,
> 
> Our processing times in  Spark Streaming with kafka Direct approach seems to
> have increased considerably with increase in the Site traffic. Would
> increasing the number of kafka partitions decrease  the processing times?
> Any suggestions on tuning to reduce the processing times would be of great
> help.
> 
> Thanks,
> Swetha
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Higher-Processing-times-in-Spark-Streaming-with-kafka-Direct-tp25571.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming Specify Kafka Partition

2015-12-03 Thread Alan Braithwaite
One quick newbie question since I got another chance to look at this
today.  We're using java for our spark applications.  The createDirectStream
we were using previously [1] returns a JavaPairInputDStream, but the
createDirectStream with fromOffsets expects an argument recordClass to pass
into the generic constructor for createDirectStream.

In the code for the first function signature (without fromOffsets) it's
being constructed in Scala as just a tuple (K, V).   How do I pass this
same class/type information from java as the record class to get a
JavaPairInputDStream<K,
V>?

I understand this might be a question more fit for a scala mailing list but
google is failing me at the moment for hints on the interoperability of
scala and java generics.

[1] The original createDirectStream:
https://github.com/apache/spark/blob/branch-1.5/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala#L395-L423

Thanks,
- Alan

On Tue, Dec 1, 2015 at 8:12 AM, Cody Koeninger <c...@koeninger.org> wrote:

> I actually haven't tried that, since I tend to do the offset lookups if
> necessary.
>
> It's possible that it will work, try it and let me know.
>
> Be aware that if you're doing a count() or take() operation directly on
> the rdd it'll definitely give you the wrong result if you're using -1 for
> one of the offsets.
>
>
>
> On Tue, Dec 1, 2015 at 9:58 AM, Alan Braithwaite <a...@cloudflare.com>
> wrote:
>
>> Neat, thanks.  If I specify something like -1 as the offset, will it
>> consume from the latest offset or do I have to instrument that manually?
>>
>> - Alan
>>
>> On Tue, Dec 1, 2015 at 6:43 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Yes, there is a version of createDirectStream that lets you specify
>>> fromOffsets: Map[TopicAndPartition, Long]
>>>
>>> On Mon, Nov 30, 2015 at 7:43 PM, Alan Braithwaite <a...@cloudflare.com>
>>> wrote:
>>>
>>>> Is there any mechanism in the kafka streaming source to specify the
>>>> exact partition id that we want a streaming job to consume from?
>>>>
>>>> If not, is there a workaround besides writing our a custom receiver?
>>>>
>>>> Thanks,
>>>> - Alan
>>>>
>>>
>>>
>>
>


Re: Spark Streaming Specify Kafka Partition

2015-12-01 Thread Alan Braithwaite
Neat, thanks.  If I specify something like -1 as the offset, will it
consume from the latest offset or do I have to instrument that manually?

- Alan

On Tue, Dec 1, 2015 at 6:43 AM, Cody Koeninger  wrote:

> Yes, there is a version of createDirectStream that lets you specify
> fromOffsets: Map[TopicAndPartition, Long]
>
> On Mon, Nov 30, 2015 at 7:43 PM, Alan Braithwaite 
> wrote:
>
>> Is there any mechanism in the kafka streaming source to specify the exact
>> partition id that we want a streaming job to consume from?
>>
>> If not, is there a workaround besides writing our a custom receiver?
>>
>> Thanks,
>> - Alan
>>
>
>


Re: Spark Streaming Specify Kafka Partition

2015-12-01 Thread Cody Koeninger
Yes, there is a version of createDirectStream that lets you specify
fromOffsets: Map[TopicAndPartition, Long]

On Mon, Nov 30, 2015 at 7:43 PM, Alan Braithwaite 
wrote:

> Is there any mechanism in the kafka streaming source to specify the exact
> partition id that we want a streaming job to consume from?
>
> If not, is there a workaround besides writing our a custom receiver?
>
> Thanks,
> - Alan
>


Re: Spark Streaming Specify Kafka Partition

2015-12-01 Thread Cody Koeninger
I actually haven't tried that, since I tend to do the offset lookups if
necessary.

It's possible that it will work, try it and let me know.

Be aware that if you're doing a count() or take() operation directly on the
rdd it'll definitely give you the wrong result if you're using -1 for one
of the offsets.



On Tue, Dec 1, 2015 at 9:58 AM, Alan Braithwaite 
wrote:

> Neat, thanks.  If I specify something like -1 as the offset, will it
> consume from the latest offset or do I have to instrument that manually?
>
> - Alan
>
> On Tue, Dec 1, 2015 at 6:43 AM, Cody Koeninger  wrote:
>
>> Yes, there is a version of createDirectStream that lets you specify
>> fromOffsets: Map[TopicAndPartition, Long]
>>
>> On Mon, Nov 30, 2015 at 7:43 PM, Alan Braithwaite 
>> wrote:
>>
>>> Is there any mechanism in the kafka streaming source to specify the
>>> exact partition id that we want a streaming job to consume from?
>>>
>>> If not, is there a workaround besides writing our a custom receiver?
>>>
>>> Thanks,
>>> - Alan
>>>
>>
>>
>


Spark Streaming Specify Kafka Partition

2015-11-30 Thread Alan Braithwaite
Is there any mechanism in the kafka streaming source to specify the exact
partition id that we want a streaming job to consume from?

If not, is there a workaround besides writing our a custom receiver?

Thanks,
- Alan


Spark Streaming and Kafka MultiNode Setup - Data Locality

2015-09-21 Thread Ashish Soni
Hi All ,

Just wanted to find out if there is an benefits to installing  kafka
brokers and spark nodes on the same machine ?

is it possible that spark can pull data from kafka if it is local to the
node i.e. the broker or partition is on the same machine.

Thanks,
Ashish


  1   2   3   >