Re: Spark streaming with Kafka

2020-11-03 Thread Kevin Pis
Hi,

this is my  Word Count demo.  https://github.com/kevincmchen/wordcount

MohitAbbi  于2020年11月4日周三 上午3:32写道:

> Hi,
>
> Can you please share the correct versions of JAR files which you used to
> resolve the issue. I'm also facing the same issue.
>
> Thanks
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 

Best,

Kevin Pis


Re: Spark streaming with Kafka

2020-11-03 Thread MohitAbbi
Hi,

Can you please share the correct versions of JAR files which you used to
resolve the issue. I'm also facing the same issue.

Thanks




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Kafka and Python

2020-08-12 Thread Sean Owen
What supports Python in (Kafka?) 0.8? I don't think Spark ever had a
specific Python-Kafka integration. But you have always been able to
use it to read DataFrames as in Structured Streaming.
Kafka 0.8 support is deprecated (gone in 3.0) but 0.10 means 0.10+ -
works with the latest 2.x.
What is the issue?

On Wed, Aug 12, 2020 at 7:53 AM German Schiavon
 wrote:
>
> Hey,
>
> Maybe I'm missing some restriction with EMR, but have you tried to use 
> Structured Streaming instead of Spark Streaming?
>
> https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html
>
> Regards
>
> On Wed, 12 Aug 2020 at 14:12, Hamish Whittal  
> wrote:
>>
>> Hi folks,
>>
>> Thought I would ask here because it's somewhat confusing. I'm using Spark 
>> 2.4.5 on EMR 5.30.1 with Amazon MSK.
>>
>> The version of Scala used is 2.11.12. I'm using this version of the 
>> libraries spark-streaming-kafka-0-8_2.11-2.4.5.jar
>>
>> Now I'm wanting to read from Kafka topics using Python (I need to stick to 
>> Python specifically).
>>
>> What seems confusing is that 0.8 has Python support, but 0.10 does not. Then 
>> 0.8 seems to have been deprecated as of Spark 2.3.0, so if I'm using 2.4.5 
>> then clearly I'm going to hit a roadblock here.
>>
>> Can someone clarify these things for me? Have I got this right?
>>
>> Thanks in advance,
>> Hamish

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Kafka and Python

2020-08-12 Thread German Schiavon
Hey,

Maybe I'm missing some restriction with EMR, but have you tried to use
Structured Streaming instead of Spark Streaming?

https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html

Regards

On Wed, 12 Aug 2020 at 14:12, Hamish Whittal 
wrote:

> Hi folks,
>
> Thought I would ask here because it's somewhat confusing. I'm using Spark
> 2.4.5 on EMR 5.30.1 with Amazon MSK.
>
> The version of Scala used is 2.11.12. I'm using this version of the
> libraries spark-streaming-kafka-0-8_2.11-2.4.5.jar
>
> Now I'm wanting to read from Kafka topics using Python (I need to stick to
> Python specifically).
>
> What seems confusing is that 0.8 has Python support, but 0.10 does not.
> Then 0.8 seems to have been deprecated as of Spark 2.3.0, so if I'm using
> 2.4.5 then clearly I'm going to hit a roadblock here.
>
> Can someone clarify these things for me? Have I got this right?
>
> Thanks in advance,
> Hamish
>


Spark Streaming with Kafka and Python

2020-08-12 Thread Hamish Whittal
Hi folks,

Thought I would ask here because it's somewhat confusing. I'm using Spark
2.4.5 on EMR 5.30.1 with Amazon MSK.

The version of Scala used is 2.11.12. I'm using this version of the
libraries spark-streaming-kafka-0-8_2.11-2.4.5.jar

Now I'm wanting to read from Kafka topics using Python (I need to stick to
Python specifically).

What seems confusing is that 0.8 has Python support, but 0.10 does not.
Then 0.8 seems to have been deprecated as of Spark 2.3.0, so if I'm using
2.4.5 then clearly I'm going to hit a roadblock here.

Can someone clarify these things for me? Have I got this right?

Thanks in advance,
Hamish


Re: Spark streaming with Kafka

2020-07-02 Thread dwgw
Hi

I am able to correct the issue. The issue was due to wrong version of JAR
file I have used. I have removed the these JAR files and copied correct
version of JAR files and the error has gone away.

Regards



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming with Kafka

2020-07-02 Thread Jungtaek Lim
I can't reproduce. Could you please make sure you're running spark-shell
with official spark 3.0.0 distribution? Please try out changing the
directory and using relative path like "./spark-shell".

On Thu, Jul 2, 2020 at 9:59 PM dwgw  wrote:

> Hi
> I am trying to stream kafka topic from spark shell but i am getting the
> following error.
> I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM) 64-Bit Server VM,
> *Java 1.8.0_212*)
>
> *[spark@hdp-dev ~]$ spark-shell --packages
> org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*
> Ivy Default Cache set to: /home/spark/.ivy2/cache
> The jars for the packages stored in: /home/spark/.ivy2/jars
> :: loading settings :: url =
>
> jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
> org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
> :: resolving dependencies ::
>
> org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0
> confs: [default]
> found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
> found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0
> in
> central
> found org.apache.kafka#kafka-clients;2.4.1 in central
> found com.github.luben#zstd-jni;1.4.4-3 in central
> found org.lz4#lz4-java;1.7.1 in central
> found org.xerial.snappy#snappy-java;1.1.7.5 in central
> found org.slf4j#slf4j-api;1.7.30 in central
> found org.spark-project.spark#unused;1.0.0 in central
> found org.apache.commons#commons-pool2;2.6.2 in central
> :: resolution report :: resolve 502ms :: artifacts dl 10ms
> :: modules in use:
> com.github.luben#zstd-jni;1.4.4-3 from central in [default]
> org.apache.commons#commons-pool2;2.6.2 from central in [default]
> org.apache.kafka#kafka-clients;2.4.1 from central in [default]
> org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in
> [default]
> org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from
> central in [default]
> org.lz4#lz4-java;1.7.1 from central in [default]
> org.slf4j#slf4j-api;1.7.30 from central in [default]
> org.spark-project.spark#unused;1.0.0 from central in [default]
> org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]
>
> -
> |  |modules||   artifacts
> |
> |   conf   | number| search|dwnlded|evicted||
> number|dwnlded|
>
> -
> |  default |   9   |   0   |   0   |   0   ||   9   |   0
> |
>
> -
> :: retrieving ::
> org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226
> confs: [default]
> 0 artifacts copied, 9 already retrieved (0kB/13ms)
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
> Spark context Web UI available at http://hdp-dev.infodetics.com:4040
> Spark context available as 'sc' (master = yarn, app id =
> application_1593620640299_0015).
> Spark session available as 'spark'.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 3.0.0
>   /_/
>
> Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_212)
> Type in expressions to have them evaluated.
> Type :help for more information.
>
>
> scala> val df = spark.
>  | readStream.
>  | format("kafka").
>  | option("kafka.bootstrap.servers", "XXX").
>  | option("subscribe", "XXX").
>  | option("kafka.sasl.mechanisms", "XXX").
>  | option("kafka.security.protocol", "XXX").
>  | option("kafka.sasl.username","XXX").
>  | option("kafka.sasl.password", "XXX").
>  | option("startingOffsets", "earliest").
>  | load
> java.lang.AbstractMethodError: Method
>
> org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
> is abstract
>   at
>
> org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
>   at
>
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
>   at
>
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
>   ... 57 elided
>
> Looking forward for a response.
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark streaming with Kafka

2020-07-02 Thread dwgw
Hi
I am trying to stream kafka topic from spark shell but i am getting the
following error. 
I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM) 64-Bit Server VM,
*Java 1.8.0_212*)

*[spark@hdp-dev ~]$ spark-shell --packages
org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*
Ivy Default Cache set to: /home/spark/.ivy2/cache
The jars for the packages stored in: /home/spark/.ivy2/jars
:: loading settings :: url =
jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in
central
found org.apache.kafka#kafka-clients;2.4.1 in central
found com.github.luben#zstd-jni;1.4.4-3 in central
found org.lz4#lz4-java;1.7.1 in central
found org.xerial.snappy#snappy-java;1.1.7.5 in central
found org.slf4j#slf4j-api;1.7.30 in central
found org.spark-project.spark#unused;1.0.0 in central
found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 502ms :: artifacts dl 10ms
:: modules in use:
com.github.luben#zstd-jni;1.4.4-3 from central in [default]
org.apache.commons#commons-pool2;2.6.2 from central in [default]
org.apache.kafka#kafka-clients;2.4.1 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in
[default]
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from
central in [default]
org.lz4#lz4-java;1.7.1 from central in [default]
org.slf4j#slf4j-api;1.7.30 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]
   
-
|  |modules||   artifacts  
|
|   conf   | number| search|dwnlded|evicted||
number|dwnlded|
   
-
|  default |   9   |   0   |   0   |   0   ||   9   |   0  
|
   
-
:: retrieving ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226
confs: [default]
0 artifacts copied, 9 already retrieved (0kB/13ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
Spark context Web UI available at http://hdp-dev.infodetics.com:4040
Spark context available as 'sc' (master = yarn, app id =
application_1593620640299_0015).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0
  /_/
 
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.


scala> val df = spark.
 | readStream.
 | format("kafka").
 | option("kafka.bootstrap.servers", "XXX").
 | option("subscribe", "XXX").
 | option("kafka.sasl.mechanisms", "XXX").
 | option("kafka.security.protocol", "XXX").
 | option("kafka.sasl.username","XXX").
 | option("kafka.sasl.password", "XXX").
 | option("startingOffsets", "earliest").
 | load
java.lang.AbstractMethodError: Method
org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
is abstract
  at
org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
  at
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
  at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
  ... 57 elided

Looking forward for a response.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming with Kafka

2020-07-02 Thread dwgw
HiI am trying to stream kafka topic from spark shell but i am getting the
following error. I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM)
64-Bit Server VM, *Java 1.8.0_212*)*[spark@hdp-dev ~]$ spark-shell
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*Ivy Default
Cache set to: /home/spark/.ivy2/cacheThe jars for the packages stored in:
/home/spark/.ivy2/jars:: loading settings :: url =
jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xmlorg.apache.spark#spark-sql-kafka-0-10_2.12
added as a dependency:: resolving dependencies ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0   

confs: [default]found
org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in centralfound
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central   
found org.apache.kafka#kafka-clients;2.4.1 in centralfound
com.github.luben#zstd-jni;1.4.4-3 in centralfound
org.lz4#lz4-java;1.7.1 in centralfound
org.xerial.snappy#snappy-java;1.1.7.5 in centralfound
org.slf4j#slf4j-api;1.7.30 in centralfound
org.spark-project.spark#unused;1.0.0 in centralfound
org.apache.commons#commons-pool2;2.6.2 in central:: resolution report ::
resolve 502ms :: artifacts dl 10ms:: modules in use:   
com.github.luben#zstd-jni;1.4.4-3 from central in [default]   
org.apache.commons#commons-pool2;2.6.2 from central in [default]   
org.apache.kafka#kafka-clients;2.4.1 from central in [default]   
org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in [default]  
 
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from central in
[default]org.lz4#lz4-java;1.7.1 from central in [default]   
org.slf4j#slf4j-api;1.7.30 from central in [default]   
org.spark-project.spark#unused;1.0.0 from central in [default]   
org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]   
-   
|  |modules||   artifacts   |   
|   conf   | number| search|dwnlded|evicted|| number|dwnlded|   
-   
|  default |   9   |   0   |   0   |   0   ||   9   |   0   |   
-::
retrieving ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226   
confs: [default]0 artifacts copied, 9 already retrieved
(0kB/13ms)Setting default log level to "WARN".To adjust logging level use
sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).Spark
context Web UI available at http://hdp-dev.infodetics.com:4040Spark context
available as 'sc' (master = yarn, app id =
application_1593620640299_0015).Spark session available as 'spark'.Welcome
to    __ / __/__  ___ _/ /___\ \/ _ \/ _ `/
__/  '_/   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0  /_/ Using
Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_212)Type in expressions to have them evaluated.Type :help for more
information.scala> val df = spark. | readStream. | format("kafka").
| option("kafka.bootstrap.servers", "XXX"). | option("subscribe",
"XXX"). | option("kafka.sasl.mechanisms", "XXX"). |
option("kafka.security.protocol", "XXX"). |
option("kafka.sasl.username","XXX"). | option("kafka.sasl.password",
"XXX"). | option("startingOffsets", "earliest"). |
loadjava.lang.AbstractMethodError: Method
org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
is abstract  at
org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
 
at
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
 
at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
 
... 57 elidedLooking forward for a response.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

Spark streaming with kafka input stuck in (Re-)joing group because of group rebalancing

2018-05-15 Thread JF Chen
When I terminate a spark streaming application and restart it, it always
stuck in this step:
>
> Revoking previously assigned partitions [] for group [mygroup]
> (Re-)joing group [mygroup]


If I use a new group id, even though it works fine, I may lose the data
from the last time I read the previous group id.

So how to solve it?


Regard,
Junfeng Chen


Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-21 Thread shyla deshpande
Thanks TD.

On Tue, Mar 14, 2017 at 4:37 PM, Tathagata Das  wrote:

> This setting allows multiple spark jobs generated through multiple
> foreachRDD to run concurrently, even if they are across batches. So output
> op2 from batch X, can run concurrently with op1 of batch X+1
> This is not safe because it breaks the checkpointing logic in subtle ways.
> Note that this was never documented in the spark online docs.
>
> On Tue, Mar 14, 2017 at 2:29 PM, shyla deshpande  > wrote:
>
>> Thanks TD for the response. Can you please provide more explanation. I am
>>  having multiple streams in the spark streaming application (Spark 2.0.2
>> using DStreams).  I know many people using this setting. So your
>> explanation will help a lot of people.
>>
>> Thanks
>>
>> On Fri, Mar 10, 2017 at 6:24 PM, Tathagata Das 
>> wrote:
>>
>>> That config I not safe. Please do not use it.
>>>
>>> On Mar 10, 2017 10:03 AM, "shyla deshpande" 
>>> wrote:
>>>
 I have a spark streaming application which processes 3 kafka streams
 and has 5 output operations.

 Not sure what should be the setting for spark.streaming.concurrentJobs.

 1. If the concurrentJobs setting is 4 does that mean 2 output
 operations will be run sequentially?

 2. If I had 6 cores what would be a ideal setting for concurrentJobs in
 this situation?

 I appreciate your input. Thanks

>>>
>>
>


Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-14 Thread Tathagata Das
This setting allows multiple spark jobs generated through multiple
foreachRDD to run concurrently, even if they are across batches. So output
op2 from batch X, can run concurrently with op1 of batch X+1
This is not safe because it breaks the checkpointing logic in subtle ways.
Note that this was never documented in the spark online docs.

On Tue, Mar 14, 2017 at 2:29 PM, shyla deshpande 
wrote:

> Thanks TD for the response. Can you please provide more explanation. I am
>  having multiple streams in the spark streaming application (Spark 2.0.2
> using DStreams).  I know many people using this setting. So your
> explanation will help a lot of people.
>
> Thanks
>
> On Fri, Mar 10, 2017 at 6:24 PM, Tathagata Das 
> wrote:
>
>> That config I not safe. Please do not use it.
>>
>> On Mar 10, 2017 10:03 AM, "shyla deshpande" 
>> wrote:
>>
>>> I have a spark streaming application which processes 3 kafka streams and
>>> has 5 output operations.
>>>
>>> Not sure what should be the setting for spark.streaming.concurrentJobs.
>>>
>>> 1. If the concurrentJobs setting is 4 does that mean 2 output operations
>>> will be run sequentially?
>>>
>>> 2. If I had 6 cores what would be a ideal setting for concurrentJobs in
>>> this situation?
>>>
>>> I appreciate your input. Thanks
>>>
>>
>


Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-14 Thread shyla deshpande
Thanks TD for the response. Can you please provide more explanation. I am
 having multiple streams in the spark streaming application (Spark 2.0.2
using DStreams).  I know many people using this setting. So your
explanation will help a lot of people.

Thanks

On Fri, Mar 10, 2017 at 6:24 PM, Tathagata Das  wrote:

> That config I not safe. Please do not use it.
>
> On Mar 10, 2017 10:03 AM, "shyla deshpande" 
> wrote:
>
>> I have a spark streaming application which processes 3 kafka streams and
>> has 5 output operations.
>>
>> Not sure what should be the setting for spark.streaming.concurrentJobs.
>>
>> 1. If the concurrentJobs setting is 4 does that mean 2 output operations
>> will be run sequentially?
>>
>> 2. If I had 6 cores what would be a ideal setting for concurrentJobs in
>> this situation?
>>
>> I appreciate your input. Thanks
>>
>


Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-10 Thread Tathagata Das
That config I not safe. Please do not use it.

On Mar 10, 2017 10:03 AM, "shyla deshpande" 
wrote:

> I have a spark streaming application which processes 3 kafka streams and
> has 5 output operations.
>
> Not sure what should be the setting for spark.streaming.concurrentJobs.
>
> 1. If the concurrentJobs setting is 4 does that mean 2 output operations
> will be run sequentially?
>
> 2. If I had 6 cores what would be a ideal setting for concurrentJobs in
> this situation?
>
> I appreciate your input. Thanks
>


spark streaming with kafka source, how many concurrent jobs?

2017-03-10 Thread shyla deshpande
I have a spark streaming application which processes 3 kafka streams and
has 5 output operations.

Not sure what should be the setting for spark.streaming.concurrentJobs.

1. If the concurrentJobs setting is 4 does that mean 2 output operations
will be run sequentially?

2. If I had 6 cores what would be a ideal setting for concurrentJobs in
this situation?

I appreciate your input. Thanks


Re: Spark Streaming with Kafka

2016-12-12 Thread Anton Okolnychyi
thanks for all your replies, now I have a complete picture.



2016-12-12 16:49 GMT+01:00 Cody Koeninger :

> http://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html#creating-a-direct-stream
>
> Use a separate group id for each stream, like the docs say.
>
> If you're doing multiple output operations, and aren't caching, spark
> is going to read from kafka again each time, and if some of those
> reads are happening for the same group and same topicpartition, it's
> not going to work.
>
> On Sun, Dec 11, 2016 at 2:36 PM, Oleksii Dukhno
>  wrote:
> > Hi Anton,
> >
> > What is the command you run your spark app with? Why not working with
> data
> > instead of stream on your second stage operation? Can you provide logs
> with
> > the issue?
> >
> > ConcurrentModificationException is not a spark issue, it means that you
> use
> > the same Kafka consumer instance from more than one thread.
> >
> > Additionally,
> >
> > 1) As I understand new kafka consumer is created every time when you call
> > KafkaUtils.createDirectStream.
> > 2) If you assign the same group id to several consumer instances then all
> > the consumers will get different set of messages on the same topic. This
> is
> > a kind of load balancing which kafka provides with its Consumer API.
> >
> > Oleksii
> >
> > On 11 December 2016 at 18:46, Anton Okolnychyi <
> anton.okolnyc...@gmail.com>
> > wrote:
> >>
> >> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
> >> nothing custom.
> >>
> >>
> >> I will try restate the initial question. Let's consider an example.
> >>
> >> 1. I create a stream and subscribe to a certain topic.
> >>
> >> val stream = KafkaUtils.createDirectStream(...)
> >>
> >> 2. I extract the actual data from the stream. For instance, word counts.
> >>
> >> val wordCounts = stream.map(record => (record.value(), 1))
> >>
> >> 3. Then I compute something and output the result to console.
> >>
> >> val firstResult = stream.reduceByWindow(...)
> >> firstResult.print()
> >>
> >> Once that is done, I would like to perform another computation on top of
> >> wordCounts and output that result again to console. In my current
> >> understanding, I cannot just reuse wordCounts from Step 2 and should
> create
> >> a new stream with another group id and then define the second
> computation.
> >> Am I correct that if add the next part, then I can get
> >> "ConcurrentModificationException: KafkaConsumer is not safe for
> >> multi-threaded access"?
> >>
> >> // another computation on wordCounts
> >> val secondResult = wordCounts.reduceByKeyAndWindow(...)
> >> secondResult.output()
> >>
> >> Thanks,
> >> Anton
> >>
> >> 2016-12-11 17:11 GMT+01:00 Timur Shenkao :
> >>>
> >>> Hi,
> >>> Usual general questions are:
> >>> -- what is your Spark version?
> >>> -- what is your Kafka version?
> >>> -- do you use "standard" Kafka consumer or try to implement something
> >>> custom (your own multi-threaded consumer)?
> >>>
> >>> The freshest docs
> >>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html
> >>>
> >>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10
> >>> !!!)
> 
>  kafkaParams.put("group.id", "use_a_separate_group_id_for_
> each_stream");
> >>>
> >>>
> >>>
> >>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi
> >>>  wrote:
> 
>  Hi,
> 
>  I am experimenting with Spark Streaming and Kafka. I will appreciate
> if
>  someone can say whether the following assumption is correct.
> 
>  If I have multiple computations (each with its own output) on one
> stream
>  (created as KafkaUtils.createDirectStream), then there is a chance
> to have
>  ConcurrentModificationException: KafkaConsumer is not safe for
>  multi-threaded access.  To solve this problem, I should create a new
> stream
>  with different "group.id" for each computation.
> 
>  Am I right?
> 
>  Best regards,
>  Anton
> >>>
> >>>
> >>
> >
>


Re: Spark Streaming with Kafka

2016-12-12 Thread Cody Koeninger
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-a-direct-stream

Use a separate group id for each stream, like the docs say.

If you're doing multiple output operations, and aren't caching, spark
is going to read from kafka again each time, and if some of those
reads are happening for the same group and same topicpartition, it's
not going to work.

On Sun, Dec 11, 2016 at 2:36 PM, Oleksii Dukhno
 wrote:
> Hi Anton,
>
> What is the command you run your spark app with? Why not working with data
> instead of stream on your second stage operation? Can you provide logs with
> the issue?
>
> ConcurrentModificationException is not a spark issue, it means that you use
> the same Kafka consumer instance from more than one thread.
>
> Additionally,
>
> 1) As I understand new kafka consumer is created every time when you call
> KafkaUtils.createDirectStream.
> 2) If you assign the same group id to several consumer instances then all
> the consumers will get different set of messages on the same topic. This is
> a kind of load balancing which kafka provides with its Consumer API.
>
> Oleksii
>
> On 11 December 2016 at 18:46, Anton Okolnychyi 
> wrote:
>>
>> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
>> nothing custom.
>>
>>
>> I will try restate the initial question. Let's consider an example.
>>
>> 1. I create a stream and subscribe to a certain topic.
>>
>> val stream = KafkaUtils.createDirectStream(...)
>>
>> 2. I extract the actual data from the stream. For instance, word counts.
>>
>> val wordCounts = stream.map(record => (record.value(), 1))
>>
>> 3. Then I compute something and output the result to console.
>>
>> val firstResult = stream.reduceByWindow(...)
>> firstResult.print()
>>
>> Once that is done, I would like to perform another computation on top of
>> wordCounts and output that result again to console. In my current
>> understanding, I cannot just reuse wordCounts from Step 2 and should create
>> a new stream with another group id and then define the second computation.
>> Am I correct that if add the next part, then I can get
>> "ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access"?
>>
>> // another computation on wordCounts
>> val secondResult = wordCounts.reduceByKeyAndWindow(...)
>> secondResult.output()
>>
>> Thanks,
>> Anton
>>
>> 2016-12-11 17:11 GMT+01:00 Timur Shenkao :
>>>
>>> Hi,
>>> Usual general questions are:
>>> -- what is your Spark version?
>>> -- what is your Kafka version?
>>> -- do you use "standard" Kafka consumer or try to implement something
>>> custom (your own multi-threaded consumer)?
>>>
>>> The freshest docs
>>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>>>
>>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10
>>> !!!)

 kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>>>
>>>
>>>
>>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi
>>>  wrote:

 Hi,

 I am experimenting with Spark Streaming and Kafka. I will appreciate if
 someone can say whether the following assumption is correct.

 If I have multiple computations (each with its own output) on one stream
 (created as KafkaUtils.createDirectStream), then there is a chance to have
 ConcurrentModificationException: KafkaConsumer is not safe for
 multi-threaded access.  To solve this problem, I should create a new stream
 with different "group.id" for each computation.

 Am I right?

 Best regards,
 Anton
>>>
>>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Kafka

2016-12-11 Thread Oleksii Dukhno
Hi Anton,

What is the command you run your spark app with? Why not working with data
instead of stream on your second stage operation? Can you provide logs with
the issue?

ConcurrentModificationException is not a spark issue, it means that you use
the same Kafka consumer instance from more than one thread.

Additionally,

1) As I understand new kafka consumer is created every time when you
call KafkaUtils.createDirectStream.
2) If you assign the same group id to several consumer instances then all
the consumers will get different set of messages on the same topic. This is
a kind of load balancing which kafka provides with its Consumer API.

Oleksii

On 11 December 2016 at 18:46, Anton Okolnychyi 
wrote:

> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
> nothing custom.
>
>
> I will try restate the initial question. Let's consider an example.
>
> 1. I create a stream and subscribe to a certain topic.
>
> val stream = KafkaUtils.createDirectStream(...)
>
> 2. I extract the actual data from the stream. For instance, word counts.
>
> val wordCounts = stream.map(record => (record.value(), 1))
>
> 3. Then I compute something and output the result to console.
>
> val firstResult = stream.reduceByWindow(...)
> firstResult.print()
>
> Once that is done, I would like to perform another computation on top of
> wordCounts and output that result again to console. In my current
> understanding, I cannot just reuse wordCounts from Step 2 and should create
> a new stream with another group id and then define the second computation.
> Am I correct that if add the next part, then I can get "
> ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access"?
>
> // another computation on wordCounts
> val secondResult = wordCounts.reduceByKeyAndWindow(...)
> secondResult.output()
>
> Thanks,
> Anton
>
> 2016-12-11 17:11 GMT+01:00 Timur Shenkao :
>
>> Hi,
>> Usual general questions are:
>> -- what is your Spark version?
>> -- what is your Kafka version?
>> -- do you use "standard" Kafka consumer or try to implement something
>> custom (your own multi-threaded consumer)?
>>
>> The freshest docs https://spark.apache.org/docs/
>> latest/streaming-kafka-0-10-integration.html
>>
>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10
>> !!!)
>>
>>> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>>>
>>>
>>
>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi <
>> anton.okolnyc...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am experimenting with Spark Streaming and Kafka. I will appreciate if
>>> someone can say whether the following assumption is correct.
>>>
>>> If I have multiple computations (each with its own output) on one stream
>>> (created as KafkaUtils.createDirectStream), then there is a chance to
>>> have ConcurrentModificationException: KafkaConsumer is not safe for
>>> multi-threaded access.  To solve this problem, I should create a new stream
>>> with different "group.id" for each computation.
>>>
>>> Am I right?
>>>
>>> Best regards,
>>> Anton
>>>
>>
>>
>


Re: Spark Streaming with Kafka

2016-12-11 Thread Anton Okolnychyi
sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
nothing custom.


I will try restate the initial question. Let's consider an example.

1. I create a stream and subscribe to a certain topic.

val stream = KafkaUtils.createDirectStream(...)

2. I extract the actual data from the stream. For instance, word counts.

val wordCounts = stream.map(record => (record.value(), 1))

3. Then I compute something and output the result to console.

val firstResult = stream.reduceByWindow(...)
firstResult.print()

Once that is done, I would like to perform another computation on top of
wordCounts and output that result again to console. In my current
understanding, I cannot just reuse wordCounts from Step 2 and should create
a new stream with another group id and then define the second computation.
Am I correct that if add the next part, then I can get "
ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access"?

// another computation on wordCounts
val secondResult = wordCounts.reduceByKeyAndWindow(...)
secondResult.output()

Thanks,
Anton

2016-12-11 17:11 GMT+01:00 Timur Shenkao :

> Hi,
> Usual general questions are:
> -- what is your Spark version?
> -- what is your Kafka version?
> -- do you use "standard" Kafka consumer or try to implement something
> custom (your own multi-threaded consumer)?
>
> The freshest docs https://spark.apache.org/docs/
> latest/streaming-kafka-0-10-integration.html
>
> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10 !!!)
>
>> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>>
>>
>
> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi <
> anton.okolnyc...@gmail.com> wrote:
>
>> Hi,
>>
>> I am experimenting with Spark Streaming and Kafka. I will appreciate if
>> someone can say whether the following assumption is correct.
>>
>> If I have multiple computations (each with its own output) on one stream
>> (created as KafkaUtils.createDirectStream), then there is a chance to
>> have ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access.  To solve this problem, I should create a new stream
>> with different "group.id" for each computation.
>>
>> Am I right?
>>
>> Best regards,
>> Anton
>>
>
>


Re: Spark Streaming with Kafka

2016-12-11 Thread Timur Shenkao
Hi,
Usual general questions are:
-- what is your Spark version?
-- what is your Kafka version?
-- do you use "standard" Kafka consumer or try to implement something
custom (your own multi-threaded consumer)?

The freshest docs
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10 !!!)

> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>
>

On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi <
anton.okolnyc...@gmail.com> wrote:

> Hi,
>
> I am experimenting with Spark Streaming and Kafka. I will appreciate if
> someone can say whether the following assumption is correct.
>
> If I have multiple computations (each with its own output) on one stream
> (created as KafkaUtils.createDirectStream), then there is a chance to
> have ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access.  To solve this problem, I should create a new stream
> with different "group.id" for each computation.
>
> Am I right?
>
> Best regards,
> Anton
>


Spark Streaming with Kafka

2016-12-11 Thread Anton Okolnychyi
Hi,

I am experimenting with Spark Streaming and Kafka. I will appreciate if
someone can say whether the following assumption is correct.

If I have multiple computations (each with its own output) on one stream
(created as KafkaUtils.createDirectStream), then there is a chance to have
ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access.  To solve this problem, I should create a new stream
with different "group.id" for each computation.

Am I right?

Best regards,
Anton


Re: Spark Streaming with Kafka

2016-05-24 Thread Rasika Pohankar
Hi firemonk9,

Sorry, its been too long but I just saw this. I hope you were able to
resolve it. FWIW, we were able to solve this with the help of the Low Level
Kafka Consumer, instead of the inbuilt Kafka consumer in Spark, from here:
https://github.com/dibbhatt/kafka-spark-consumer/.

Regards,
Rasika.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-tp21222p27014.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread Cody Koeninger
If by smaller block interval you mean the value in seconds passed to the
streaming context constructor, no.  You'll still get everything from the
starting offset until now in the first batch.

On Thu, Feb 18, 2016 at 10:02 AM, praveen S  wrote:

> Sorry.. Rephrasing :
> Can this issue be resolved by having a smaller block interval?
>
> Regards,
> Praveen
> On 18 Feb 2016 21:30, "praveen S"  wrote:
>
>> Can having a smaller block interval only resolve this?
>>
>> Regards,
>> Praveen
>> On 18 Feb 2016 21:13, "Cody Koeninger"  wrote:
>>
>>> Backpressure won't help you with the first batch, you'd need 
>>> spark.streaming.kafka.maxRatePerPartition
>>> for that
>>>
>>> On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:
>>>
 Have a look at

 spark.streaming.backpressure.enabled
 Property

 Regards,
 Praveen
 On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:

> I have a spark streaming application running in production. I am
> trying to find a solution for a particular use case when my application 
> has
> a downtime of say 5 hours and is restarted. Now, when I start my streaming
> application after 5 hours there would be considerable amount of data then
> in the Kafka and my cluster would be unable to repartition and process 
> that.
>
> Is there any workaround so that when my streaming application starts
> it starts taking data for 1-2 hours, process it , then take the data for
> next 1 hour process it. Now when its done processing of previous 5 hours
> data which missed, normal streaming should start with the given slide
> interval.
>
> Please suggest any ideas and feasibility of this.
>
>
> Thanks !!
> Abhi
>

>>>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread praveen S
Sorry.. Rephrasing :
Can this issue be resolved by having a smaller block interval?

Regards,
Praveen
On 18 Feb 2016 21:30, "praveen S"  wrote:

> Can having a smaller block interval only resolve this?
>
> Regards,
> Praveen
> On 18 Feb 2016 21:13, "Cody Koeninger"  wrote:
>
>> Backpressure won't help you with the first batch, you'd need 
>> spark.streaming.kafka.maxRatePerPartition
>> for that
>>
>> On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:
>>
>>> Have a look at
>>>
>>> spark.streaming.backpressure.enabled
>>> Property
>>>
>>> Regards,
>>> Praveen
>>> On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:
>>>
 I have a spark streaming application running in production. I am trying
 to find a solution for a particular use case when my application has a
 downtime of say 5 hours and is restarted. Now, when I start my streaming
 application after 5 hours there would be considerable amount of data then
 in the Kafka and my cluster would be unable to repartition and process 
 that.

 Is there any workaround so that when my streaming application starts it
 starts taking data for 1-2 hours, process it , then take the data for next
 1 hour process it. Now when its done processing of previous 5 hours data
 which missed, normal streaming should start with the given slide interval.

 Please suggest any ideas and feasibility of this.


 Thanks !!
 Abhi

>>>
>>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread praveen S
Can having a smaller block interval only resolve this?

Regards,
Praveen
On 18 Feb 2016 21:13, "Cody Koeninger"  wrote:

> Backpressure won't help you with the first batch, you'd need 
> spark.streaming.kafka.maxRatePerPartition
> for that
>
> On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:
>
>> Have a look at
>>
>> spark.streaming.backpressure.enabled
>> Property
>>
>> Regards,
>> Praveen
>> On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:
>>
>>> I have a spark streaming application running in production. I am trying
>>> to find a solution for a particular use case when my application has a
>>> downtime of say 5 hours and is restarted. Now, when I start my streaming
>>> application after 5 hours there would be considerable amount of data then
>>> in the Kafka and my cluster would be unable to repartition and process that.
>>>
>>> Is there any workaround so that when my streaming application starts it
>>> starts taking data for 1-2 hours, process it , then take the data for next
>>> 1 hour process it. Now when its done processing of previous 5 hours data
>>> which missed, normal streaming should start with the given slide interval.
>>>
>>> Please suggest any ideas and feasibility of this.
>>>
>>>
>>> Thanks !!
>>> Abhi
>>>
>>
>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread Cody Koeninger
Backpressure won't help you with the first batch, you'd need
spark.streaming.kafka.maxRatePerPartition
for that

On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:

> Have a look at
>
> spark.streaming.backpressure.enabled
> Property
>
> Regards,
> Praveen
> On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:
>
>> I have a spark streaming application running in production. I am trying
>> to find a solution for a particular use case when my application has a
>> downtime of say 5 hours and is restarted. Now, when I start my streaming
>> application after 5 hours there would be considerable amount of data then
>> in the Kafka and my cluster would be unable to repartition and process that.
>>
>> Is there any workaround so that when my streaming application starts it
>> starts taking data for 1-2 hours, process it , then take the data for next
>> 1 hour process it. Now when its done processing of previous 5 hours data
>> which missed, normal streaming should start with the given slide interval.
>>
>> Please suggest any ideas and feasibility of this.
>>
>>
>> Thanks !!
>> Abhi
>>
>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread praveen S
Have a look at

spark.streaming.backpressure.enabled
Property

Regards,
Praveen
On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:

> I have a spark streaming application running in production. I am trying to
> find a solution for a particular use case when my application has a
> downtime of say 5 hours and is restarted. Now, when I start my streaming
> application after 5 hours there would be considerable amount of data then
> in the Kafka and my cluster would be unable to repartition and process that.
>
> Is there any workaround so that when my streaming application starts it
> starts taking data for 1-2 hours, process it , then take the data for next
> 1 hour process it. Now when its done processing of previous 5 hours data
> which missed, normal streaming should start with the given slide interval.
>
> Please suggest any ideas and feasibility of this.
>
>
> Thanks !!
> Abhi
>


Re: Spark Streaming with Kafka DirectStream

2016-02-17 Thread Cody Koeninger
You can print whatever you want wherever you want, it's just a question of
whether it's going to show up on the driver or the various executors logs

On Wed, Feb 17, 2016 at 5:50 AM, Cyril Scetbon 
wrote:

> I don't think we can print an integer value in a spark streaming process
> As opposed to a spark job. I think I can print the content of an rdd but
> not debug messages. Am I wrong ?
>
> Cyril Scetbon
>
> On Feb 17, 2016, at 12:51 AM, ayan guha  wrote:
>
> Hi
>
> You can always use RDD properties, which already has partition information.
>
>
> https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/how_many_partitions_does_an_rdd_have.html
>
>
> On Wed, Feb 17, 2016 at 2:36 PM, Cyril Scetbon 
> wrote:
>
>> Your understanding is the right one (having re-read the documentation).
>> Still wondering how I can verify that 5 partitions have been created. My
>> job is reading from a topic in Kafka that has 5 partitions and sends the
>> data to E/S. I can see that when there is one task to read from Kafka there
>> are 5 tasks writing to E/S. So I'm supposing that the task reading from
>> Kafka does it in // using 5 partitions and that's why there are then 5
>> tasks to write to E/S. But I'm supposing ...
>>
>> On Feb 16, 2016, at 21:12, ayan guha  wrote:
>>
>> I have a slightly different understanding.
>>
>> Direct stream generates 1 RDD per batch, however, number of partitions in
>> that RDD = number of partitions in kafka topic.
>>
>> On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon 
>> wrote:
>>
>>> Hi guys,
>>>
>>> I'm making some tests with Spark and Kafka using a Python script. I use
>>> the second method that doesn't need any receiver (Direct Approach). It
>>> should adapt the number of RDDs to the number of partitions in the topic.
>>> I'm trying to verify it. What's the easiest way to verify it ? I also tried
>>> to co-locate Yarn, Spark and Kafka to check if RDDs are created depending
>>> on the leaders of partitions in a topic, and they are not. Can you confirm
>>> that RDDs are not created depending on the location of partitions and that
>>> co-locating Kafka with Spark is not a must-have or that Spark does not take
>>> advantage of it ?
>>>
>>> As the parallelism is simplified (by creating as many RDDs as there are
>>> partitions) I suppose that the biggest part of the tuning is playing with
>>> KafKa partitions (not talking about network configuration or management of
>>> Spark resources) ?
>>>
>>> Thank you
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>
>


Re: Spark Streaming with Kafka DirectStream

2016-02-17 Thread Cyril Scetbon
I don't think we can print an integer value in a spark streaming process As 
opposed to a spark job. I think I can print the content of an rdd but not debug 
messages. Am I wrong ? 

Cyril Scetbon

> On Feb 17, 2016, at 12:51 AM, ayan guha  wrote:
> 
> Hi
> 
> You can always use RDD properties, which already has partition information.
> 
> https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/how_many_partitions_does_an_rdd_have.html
>  
> 
>> On Wed, Feb 17, 2016 at 2:36 PM, Cyril Scetbon  wrote:
>> Your understanding is the right one (having re-read the documentation). 
>> Still wondering how I can verify that 5 partitions have been created. My job 
>> is reading from a topic in Kafka that has 5 partitions and sends the data to 
>> E/S. I can see that when there is one task to read from Kafka there are 5 
>> tasks writing to E/S. So I'm supposing that the task reading from Kafka does 
>> it in // using 5 partitions and that's why there are then 5 tasks to write 
>> to E/S. But I'm supposing ...
>> 
>>> On Feb 16, 2016, at 21:12, ayan guha  wrote:
>>> 
>>> I have a slightly different understanding. 
>>> 
>>> Direct stream generates 1 RDD per batch, however, number of partitions in 
>>> that RDD = number of partitions in kafka topic.
>>> 
 On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon  
 wrote:
 Hi guys,
 
 I'm making some tests with Spark and Kafka using a Python script. I use 
 the second method that doesn't need any receiver (Direct Approach). It 
 should adapt the number of RDDs to the number of partitions in the topic. 
 I'm trying to verify it. What's the easiest way to verify it ? I also 
 tried to co-locate Yarn, Spark and Kafka to check if RDDs are created 
 depending on the leaders of partitions in a topic, and they are not. Can 
 you confirm that RDDs are not created depending on the location of 
 partitions and that co-locating Kafka with Spark is not a must-have or 
 that Spark does not take advantage of it ?
 
 As the parallelism is simplified (by creating as many RDDs as there are 
 partitions) I suppose that the biggest part of the tuning is playing with 
 KafKa partitions (not talking about network configuration or management of 
 Spark resources) ?
 
 Thank you
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>>> 
>>> 
>>> -- 
>>> Best Regards,
>>> Ayan Guha
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha


Spark Streaming with Kafka Use Case

2016-02-17 Thread Abhishek Anand
I have a spark streaming application running in production. I am trying to
find a solution for a particular use case when my application has a
downtime of say 5 hours and is restarted. Now, when I start my streaming
application after 5 hours there would be considerable amount of data then
in the Kafka and my cluster would be unable to repartition and process that.

Is there any workaround so that when my streaming application starts it
starts taking data for 1-2 hours, process it , then take the data for next
1 hour process it. Now when its done processing of previous 5 hours data
which missed, normal streaming should start with the given slide interval.

Please suggest any ideas and feasibility of this.


Thanks !!
Abhi


Re: Spark Streaming with Kafka Use Case

2016-02-17 Thread Cody Koeninger
Just use a kafka rdd in a batch job or two, then start your streaming job.

On Wed, Feb 17, 2016 at 12:57 AM, Abhishek Anand 
wrote:

> I have a spark streaming application running in production. I am trying to
> find a solution for a particular use case when my application has a
> downtime of say 5 hours and is restarted. Now, when I start my streaming
> application after 5 hours there would be considerable amount of data then
> in the Kafka and my cluster would be unable to repartition and process that.
>
> Is there any workaround so that when my streaming application starts it
> starts taking data for 1-2 hours, process it , then take the data for next
> 1 hour process it. Now when its done processing of previous 5 hours data
> which missed, normal streaming should start with the given slide interval.
>
> Please suggest any ideas and feasibility of this.
>
>
> Thanks !!
> Abhi
>


Re: Spark Streaming with Kafka DirectStream

2016-02-16 Thread ayan guha
Hi

You can always use RDD properties, which already has partition information.

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/how_many_partitions_does_an_rdd_have.html


On Wed, Feb 17, 2016 at 2:36 PM, Cyril Scetbon 
wrote:

> Your understanding is the right one (having re-read the documentation).
> Still wondering how I can verify that 5 partitions have been created. My
> job is reading from a topic in Kafka that has 5 partitions and sends the
> data to E/S. I can see that when there is one task to read from Kafka there
> are 5 tasks writing to E/S. So I'm supposing that the task reading from
> Kafka does it in // using 5 partitions and that's why there are then 5
> tasks to write to E/S. But I'm supposing ...
>
> On Feb 16, 2016, at 21:12, ayan guha  wrote:
>
> I have a slightly different understanding.
>
> Direct stream generates 1 RDD per batch, however, number of partitions in
> that RDD = number of partitions in kafka topic.
>
> On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon 
> wrote:
>
>> Hi guys,
>>
>> I'm making some tests with Spark and Kafka using a Python script. I use
>> the second method that doesn't need any receiver (Direct Approach). It
>> should adapt the number of RDDs to the number of partitions in the topic.
>> I'm trying to verify it. What's the easiest way to verify it ? I also tried
>> to co-locate Yarn, Spark and Kafka to check if RDDs are created depending
>> on the leaders of partitions in a topic, and they are not. Can you confirm
>> that RDDs are not created depending on the location of partitions and that
>> co-locating Kafka with Spark is not a must-have or that Spark does not take
>> advantage of it ?
>>
>> As the parallelism is simplified (by creating as many RDDs as there are
>> partitions) I suppose that the biggest part of the tuning is playing with
>> KafKa partitions (not talking about network configuration or management of
>> Spark resources) ?
>>
>> Thank you
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Spark Streaming with Kafka DirectStream

2016-02-16 Thread Cyril Scetbon
Your understanding is the right one (having re-read the documentation). Still 
wondering how I can verify that 5 partitions have been created. My job is 
reading from a topic in Kafka that has 5 partitions and sends the data to E/S. 
I can see that when there is one task to read from Kafka there are 5 tasks 
writing to E/S. So I'm supposing that the task reading from Kafka does it in // 
using 5 partitions and that's why there are then 5 tasks to write to E/S. But 
I'm supposing ...

> On Feb 16, 2016, at 21:12, ayan guha  wrote:
> 
> I have a slightly different understanding. 
> 
> Direct stream generates 1 RDD per batch, however, number of partitions in 
> that RDD = number of partitions in kafka topic.
> 
> On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon  > wrote:
> Hi guys,
> 
> I'm making some tests with Spark and Kafka using a Python script. I use the 
> second method that doesn't need any receiver (Direct Approach). It should 
> adapt the number of RDDs to the number of partitions in the topic. I'm trying 
> to verify it. What's the easiest way to verify it ? I also tried to co-locate 
> Yarn, Spark and Kafka to check if RDDs are created depending on the leaders 
> of partitions in a topic, and they are not. Can you confirm that RDDs are not 
> created depending on the location of partitions and that co-locating Kafka 
> with Spark is not a must-have or that Spark does not take advantage of it ?
> 
> As the parallelism is simplified (by creating as many RDDs as there are 
> partitions) I suppose that the biggest part of the tuning is playing with 
> KafKa partitions (not talking about network configuration or management of 
> Spark resources) ?
> 
> Thank you
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha



Re: Spark Streaming with Kafka DirectStream

2016-02-16 Thread ayan guha
I have a slightly different understanding.

Direct stream generates 1 RDD per batch, however, number of partitions in
that RDD = number of partitions in kafka topic.

On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon 
wrote:

> Hi guys,
>
> I'm making some tests with Spark and Kafka using a Python script. I use
> the second method that doesn't need any receiver (Direct Approach). It
> should adapt the number of RDDs to the number of partitions in the topic.
> I'm trying to verify it. What's the easiest way to verify it ? I also tried
> to co-locate Yarn, Spark and Kafka to check if RDDs are created depending
> on the leaders of partitions in a topic, and they are not. Can you confirm
> that RDDs are not created depending on the location of partitions and that
> co-locating Kafka with Spark is not a must-have or that Spark does not take
> advantage of it ?
>
> As the parallelism is simplified (by creating as many RDDs as there are
> partitions) I suppose that the biggest part of the tuning is playing with
> KafKa partitions (not talking about network configuration or management of
> Spark resources) ?
>
> Thank you
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Spark Streaming with Kafka DirectStream

2016-02-16 Thread Cyril Scetbon
Hi guys,

I'm making some tests with Spark and Kafka using a Python script. I use the 
second method that doesn't need any receiver (Direct Approach). It should adapt 
the number of RDDs to the number of partitions in the topic. I'm trying to 
verify it. What's the easiest way to verify it ? I also tried to co-locate 
Yarn, Spark and Kafka to check if RDDs are created depending on the leaders of 
partitions in a topic, and they are not. Can you confirm that RDDs are not 
created depending on the location of partitions and that co-locating Kafka with 
Spark is not a must-have or that Spark does not take advantage of it ?

As the parallelism is simplified (by creating as many RDDs as there are 
partitions) I suppose that the biggest part of the tuning is playing with KafKa 
partitions (not talking about network configuration or management of Spark 
resources) ?

Thank you
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming with Kafka: Dealing with 'slow' partitions

2016-02-12 Thread p pathiyil
Thanks Sebastian.

I was indeed trying out FAIR scheduling with a high value for
concurrentJobs today.

It does improve the latency seen by the non-hot partitions, even if it does
not provide complete isolation. So it might be an acceptable middle ground.
On 12 Feb 2016 12:18, "Sebastian Piu"  wrote:

> Have you tried using fair scheduler and queues
> On 12 Feb 2016 4:24 a.m., "p pathiyil"  wrote:
>
>> With this setting, I can see that the next job is being executed before
>> the previous one is finished. However, the processing of the 'hot'
>> partition eventually hogs all the concurrent jobs. If there was a way to
>> restrict jobs to be one per partition, then this setting would provide the
>> per-partition isolation.
>>
>> Is there anything in the framework which would give control over that
>> aspect ?
>>
>> Thanks.
>>
>>
>> On Thu, Feb 11, 2016 at 9:55 PM, Cody Koeninger 
>> wrote:
>>
>>> spark.streaming.concurrentJobs
>>>
>>>
>>> see e.g. 
>>> http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
>>>
>>>
>>> On Thu, Feb 11, 2016 at 9:33 AM, p pathiyil  wrote:
>>>
 Thanks for the response Cody.

 The producers are out of my control, so can't really balance the
 incoming content across the various topics and partitions. The number of
 topics and partitions are quite large and the volume across then not very
 well known ahead of time. So it is quite hard to segregate low and high
 volume topics in to separate driver programs.

 Will look at shuffle / repartition.

 Could you share the setting for starting another batch in parallel ? It
 might be ok to call the 'save' of the processed messages out of order if
 that is the only consequence of this setting.

 When separate DStreams are created per partition (and if union() is not
 called on them), what aspect of the framework still ties the scheduling of
 jobs across the partitions together ? Asking this to see if creating
 multiple threads in the driver and calling createDirectStream per partition
 in those threads can provide isolation.



 On Thu, Feb 11, 2016 at 8:14 PM, Cody Koeninger 
 wrote:

> The real way to fix this is by changing partitioning, so you don't
> have a hot partition.  It would be better to do this at the time you're
> producing messages, but you can also do it with a shuffle / repartition
> during consuming.
>
> There is a setting to allow another batch to start in parallel, but
> that's likely to have unintended consequences.
>
> On Thu, Feb 11, 2016 at 7:59 AM, p pathiyil 
> wrote:
>
>> Hi,
>>
>> I am looking at a way to isolate the processing of messages from each
>> Kafka partition within the same driver.
>>
>> Scenario: A DStream is created with the createDirectStream call by
>> passing in a few partitions. Let us say that the streaming context is
>> defined to have a time duration of 2 seconds. If the processing of 
>> messages
>> from a single partition takes more than 2 seconds (while all the others
>> finish much quicker), it seems that the next set of jobs get scheduled 
>> only
>> after the processing of that last partition. This means that the delay is
>> effective for all partitions and not just the partition that was truly 
>> the
>> cause of the delay. What I would like to do is to have the delay only
>> impact the 'slow' partition.
>>
>> Tried to create one DStream per partition and then do a union of all
>> partitions, (similar to the sample in
>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
>> but that didn't seem to help.
>>
>> Please suggest the correct approach to solve this issue.
>>
>> Thanks,
>> Praveen.
>>
>
>

>>>
>>


Re: Spark Streaming with Kafka: Dealing with 'slow' partitions

2016-02-11 Thread Sebastian Piu
Have you tried using fair scheduler and queues
On 12 Feb 2016 4:24 a.m., "p pathiyil"  wrote:

> With this setting, I can see that the next job is being executed before
> the previous one is finished. However, the processing of the 'hot'
> partition eventually hogs all the concurrent jobs. If there was a way to
> restrict jobs to be one per partition, then this setting would provide the
> per-partition isolation.
>
> Is there anything in the framework which would give control over that
> aspect ?
>
> Thanks.
>
>
> On Thu, Feb 11, 2016 at 9:55 PM, Cody Koeninger 
> wrote:
>
>> spark.streaming.concurrentJobs
>>
>>
>> see e.g. 
>> http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
>>
>>
>> On Thu, Feb 11, 2016 at 9:33 AM, p pathiyil  wrote:
>>
>>> Thanks for the response Cody.
>>>
>>> The producers are out of my control, so can't really balance the
>>> incoming content across the various topics and partitions. The number of
>>> topics and partitions are quite large and the volume across then not very
>>> well known ahead of time. So it is quite hard to segregate low and high
>>> volume topics in to separate driver programs.
>>>
>>> Will look at shuffle / repartition.
>>>
>>> Could you share the setting for starting another batch in parallel ? It
>>> might be ok to call the 'save' of the processed messages out of order if
>>> that is the only consequence of this setting.
>>>
>>> When separate DStreams are created per partition (and if union() is not
>>> called on them), what aspect of the framework still ties the scheduling of
>>> jobs across the partitions together ? Asking this to see if creating
>>> multiple threads in the driver and calling createDirectStream per partition
>>> in those threads can provide isolation.
>>>
>>>
>>>
>>> On Thu, Feb 11, 2016 at 8:14 PM, Cody Koeninger 
>>> wrote:
>>>
 The real way to fix this is by changing partitioning, so you don't have
 a hot partition.  It would be better to do this at the time you're
 producing messages, but you can also do it with a shuffle / repartition
 during consuming.

 There is a setting to allow another batch to start in parallel, but
 that's likely to have unintended consequences.

 On Thu, Feb 11, 2016 at 7:59 AM, p pathiyil  wrote:

> Hi,
>
> I am looking at a way to isolate the processing of messages from each
> Kafka partition within the same driver.
>
> Scenario: A DStream is created with the createDirectStream call by
> passing in a few partitions. Let us say that the streaming context is
> defined to have a time duration of 2 seconds. If the processing of 
> messages
> from a single partition takes more than 2 seconds (while all the others
> finish much quicker), it seems that the next set of jobs get scheduled 
> only
> after the processing of that last partition. This means that the delay is
> effective for all partitions and not just the partition that was truly the
> cause of the delay. What I would like to do is to have the delay only
> impact the 'slow' partition.
>
> Tried to create one DStream per partition and then do a union of all
> partitions, (similar to the sample in
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
> but that didn't seem to help.
>
> Please suggest the correct approach to solve this issue.
>
> Thanks,
> Praveen.
>


>>>
>>
>


Re: Spark Streaming with Kafka: Dealing with 'slow' partitions

2016-02-11 Thread p pathiyil
With this setting, I can see that the next job is being executed before the
previous one is finished. However, the processing of the 'hot' partition
eventually hogs all the concurrent jobs. If there was a way to restrict
jobs to be one per partition, then this setting would provide the
per-partition isolation.

Is there anything in the framework which would give control over that
aspect ?

Thanks.


On Thu, Feb 11, 2016 at 9:55 PM, Cody Koeninger  wrote:

> spark.streaming.concurrentJobs
>
>
> see e.g. 
> http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
>
>
> On Thu, Feb 11, 2016 at 9:33 AM, p pathiyil  wrote:
>
>> Thanks for the response Cody.
>>
>> The producers are out of my control, so can't really balance the incoming
>> content across the various topics and partitions. The number of topics and
>> partitions are quite large and the volume across then not very well known
>> ahead of time. So it is quite hard to segregate low and high volume topics
>> in to separate driver programs.
>>
>> Will look at shuffle / repartition.
>>
>> Could you share the setting for starting another batch in parallel ? It
>> might be ok to call the 'save' of the processed messages out of order if
>> that is the only consequence of this setting.
>>
>> When separate DStreams are created per partition (and if union() is not
>> called on them), what aspect of the framework still ties the scheduling of
>> jobs across the partitions together ? Asking this to see if creating
>> multiple threads in the driver and calling createDirectStream per partition
>> in those threads can provide isolation.
>>
>>
>>
>> On Thu, Feb 11, 2016 at 8:14 PM, Cody Koeninger 
>> wrote:
>>
>>> The real way to fix this is by changing partitioning, so you don't have
>>> a hot partition.  It would be better to do this at the time you're
>>> producing messages, but you can also do it with a shuffle / repartition
>>> during consuming.
>>>
>>> There is a setting to allow another batch to start in parallel, but
>>> that's likely to have unintended consequences.
>>>
>>> On Thu, Feb 11, 2016 at 7:59 AM, p pathiyil  wrote:
>>>
 Hi,

 I am looking at a way to isolate the processing of messages from each
 Kafka partition within the same driver.

 Scenario: A DStream is created with the createDirectStream call by
 passing in a few partitions. Let us say that the streaming context is
 defined to have a time duration of 2 seconds. If the processing of messages
 from a single partition takes more than 2 seconds (while all the others
 finish much quicker), it seems that the next set of jobs get scheduled only
 after the processing of that last partition. This means that the delay is
 effective for all partitions and not just the partition that was truly the
 cause of the delay. What I would like to do is to have the delay only
 impact the 'slow' partition.

 Tried to create one DStream per partition and then do a union of all
 partitions, (similar to the sample in
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
 but that didn't seem to help.

 Please suggest the correct approach to solve this issue.

 Thanks,
 Praveen.

>>>
>>>
>>
>


RE: Spark Streaming with Kafka: Dealing with 'slow' partitions

2016-02-11 Thread Diwakar Dhanuskodi
Hi,

Did  you try  another  implementation  of  DirectStream where you  give  only  
topic. It would  read all  topic partitions in parallel  under a batch  
interval . You  need  not create union explicitly. 


Sent from Samsung Mobile.

 Original message From: p pathiyil 
 Date:11/02/2016  19:29  (GMT+05:30) 
To: user@spark.apache.org Cc:  Subject: Spark 
Streaming with Kafka: Dealing with 'slow' partitions 
Hi,

I am looking at a way to isolate the processing of messages from each Kafka 
partition within the same driver. 

Scenario: A DStream is created with the createDirectStream call by passing in a 
few partitions. Let us say that the streaming context is defined to have a time 
duration of 2 seconds. If the processing of messages from a single partition 
takes more than 2 seconds (while all the others finish much quicker), it seems 
that the next set of jobs get scheduled only after the processing of that last 
partition. This means that the delay is effective for all partitions and not 
just the partition that was truly the cause of the delay. What I would like to 
do is to have the delay only impact the 'slow' partition.

Tried to create one DStream per partition and then do a union of all 
partitions, (similar to the sample in 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
 but that didn't seem to help.

Please suggest the correct approach to solve this issue.

Thanks,
Praveen.

Re: Spark Streaming with Kafka: Dealing with 'slow' partitions

2016-02-11 Thread Cody Koeninger
The real way to fix this is by changing partitioning, so you don't have a
hot partition.  It would be better to do this at the time you're producing
messages, but you can also do it with a shuffle / repartition during
consuming.

There is a setting to allow another batch to start in parallel, but that's
likely to have unintended consequences.

On Thu, Feb 11, 2016 at 7:59 AM, p pathiyil  wrote:

> Hi,
>
> I am looking at a way to isolate the processing of messages from each
> Kafka partition within the same driver.
>
> Scenario: A DStream is created with the createDirectStream call by passing
> in a few partitions. Let us say that the streaming context is defined to
> have a time duration of 2 seconds. If the processing of messages from a
> single partition takes more than 2 seconds (while all the others finish
> much quicker), it seems that the next set of jobs get scheduled only after
> the processing of that last partition. This means that the delay is
> effective for all partitions and not just the partition that was truly the
> cause of the delay. What I would like to do is to have the delay only
> impact the 'slow' partition.
>
> Tried to create one DStream per partition and then do a union of all
> partitions, (similar to the sample in
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
> but that didn't seem to help.
>
> Please suggest the correct approach to solve this issue.
>
> Thanks,
> Praveen.
>


Spark Streaming with Kafka: Dealing with 'slow' partitions

2016-02-11 Thread p pathiyil
Hi,

I am looking at a way to isolate the processing of messages from each Kafka
partition within the same driver.

Scenario: A DStream is created with the createDirectStream call by passing
in a few partitions. Let us say that the streaming context is defined to
have a time duration of 2 seconds. If the processing of messages from a
single partition takes more than 2 seconds (while all the others finish
much quicker), it seems that the next set of jobs get scheduled only after
the processing of that last partition. This means that the delay is
effective for all partitions and not just the partition that was truly the
cause of the delay. What I would like to do is to have the delay only
impact the 'slow' partition.

Tried to create one DStream per partition and then do a union of all
partitions, (similar to the sample in
http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
but that didn't seem to help.

Please suggest the correct approach to solve this issue.

Thanks,
Praveen.


Re: Spark Streaming with Kafka - batch DStreams in memory

2016-02-02 Thread Cody Koeninger
It's possible you could (ab)use updateStateByKey or mapWithState for this.

But honestly it's probably a lot more straightforward to just choose a
reasonable batch size that gets you a reasonable file size for most of your
keys, then use filecrush or something similar to deal with the hdfs small
file problem.

On Mon, Feb 1, 2016 at 10:11 PM, p pathiyil  wrote:

> Hi,
>
> Are there any ways to store DStreams / RDD read from Kafka in memory to be
> processed at a later time ? What we need to do is to read data from Kafka,
> process it to be keyed by some attribute that is present in the Kafka
> messages, and write out the data related to each key when we have
> accumulated enough data for that key to write out a file that is close to
> the HDFS block size, say 64MB. We are looking at ways to avoid writing out
> some file of the entire Kafka content periodically and then later run a
> second job to read those files and split them out to another set of files
> as necessary.
>
> Thanks.
>


Spark Streaming with Kafka - batch DStreams in memory

2016-02-01 Thread p pathiyil
Hi,

Are there any ways to store DStreams / RDD read from Kafka in memory to be
processed at a later time ? What we need to do is to read data from Kafka,
process it to be keyed by some attribute that is present in the Kafka
messages, and write out the data related to each key when we have
accumulated enough data for that key to write out a file that is close to
the HDFS block size, say 64MB. We are looking at ways to avoid writing out
some file of the entire Kafka content periodically and then later run a
second job to read those files and split them out to another set of files
as necessary.

Thanks.


Re: Higher Processing times in Spark Streaming with kafka Direct

2015-12-04 Thread u...@moosheimer.com
Hi,

processing time depends on what you are doing with the events.
Increasing the number of partitions could be an idea if you write more messages 
to the topic than you read currently via Spark.

Can you write more details?

Mit freundlichen Grüßen / best regards
Kay-Uwe Moosheimer

> Am 04.12.2015 um 22:21 schrieb SRK :
> 
> Hi,
> 
> Our processing times in  Spark Streaming with kafka Direct approach seems to
> have increased considerably with increase in the Site traffic. Would
> increasing the number of kafka partitions decrease  the processing times?
> Any suggestions on tuning to reduce the processing times would be of great
> help.
> 
> Thanks,
> Swetha
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Higher-Processing-times-in-Spark-Streaming-with-kafka-Direct-tp25571.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Higher Processing times in Spark Streaming with kafka Direct

2015-12-04 Thread SRK
Hi,

Our processing times in  Spark Streaming with kafka Direct approach seems to
have increased considerably with increase in the Site traffic. Would
increasing the number of kafka partitions decrease  the processing times?
Any suggestions on tuning to reduce the processing times would be of great
help.

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Higher-Processing-times-in-Spark-Streaming-with-kafka-Direct-tp25571.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark streaming with kafka reset offset

2015-07-14 Thread Tathagata Das
Of course, exactly once receiving is not same as exactly once. In case of
direct kafka stream, the data may actually be pulled multiple time. But
even if the data of a batch is pulled twice because of some failure, the
final result (that is, transformed data accessed through foreachRDD) will
always be the same even if recomputed. In other words, the data in
partition x of the RDD of time t, will always be the same even if that
partition gets recomputed. Now, to get end-to-end exactly once, you will
have also push data out to external data stores in the exactly-once manner
- either the updates are idempotent, or you can use the unique id [(batch
time, partition ID)] to update the store transactionally (such that each
partition is inserted into the data store only once.

This is also explained in my talk. -
https://www.youtube.com/watch?v=d5UJonrruHk

On Tue, Jul 14, 2015 at 8:18 PM, Chen Song  wrote:

> Thanks TD.
>
> As for 1), if timing is not guaranteed, how does exactly once semantics
> supported? It feels like exactly once receiving is not necessarily exactly
> once processing.
>
> Chen
>
> On Tue, Jul 14, 2015 at 10:16 PM, Tathagata Das 
> wrote:
>
>>
>>
>> On Tue, Jul 14, 2015 at 6:42 PM, Chen Song 
>> wrote:
>>
>>> Thanks TD and Cody. I saw that.
>>>
>>> 1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets
>>> on HDFS at the end of each batch interval?
>>>
>>
>> The timing is not guaranteed.
>>
>>
>>> 2. In the code, if I first apply transformations and actions on the
>>> directKafkaStream and then use foreachRDD on the original KafkaDStream to
>>> commit offsets myself, will offsets commits always happen after
>>> transformation and action?
>>>
>>> What do you mean by "original KafkaDStream"? if you meant the
>> directKafkaStream? If yes, then yes, output operations like foreachRDD is
>> executed in each batch in the same order as they are defined.
>>
>> dstream1.foreachRDD { rdd => func1(rdd) }
>> dstream2.foreachRDD { rdd => func2(rdd) }
>>
>> In every batch interval, func1 will be executed before func2.
>>
>>
>>
>>
>>> Chen
>>>
>>> On Tue, Jul 14, 2015 at 6:43 PM, Tathagata Das 
>>> wrote:
>>>
 Relevant documentation -
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
 towards the end.

 directKafkaStream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  // offsetRanges.length = # of Kafka partitions being consumed
  ...
  }


 On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger 
 wrote:

> You have access to the offset ranges for a given rdd in the stream by
> typecasting to HasOffsetRanges.  You can then store the offsets wherever
> you need to.
>
> On Tue, Jul 14, 2015 at 5:00 PM, Chen Song 
> wrote:
>
>> A follow up question.
>>
>> When using createDirectStream approach, the offsets are checkpointed
>> to HDFS and it is understandable by Spark Streaming job. Is there a way 
>> to
>> expose the offsets via a REST api to end users. Or alternatively, is 
>> there
>> a way to have offsets committed to Kafka Offset Manager so users can 
>> query
>> from a consumer programmatically?
>>
>> Essentially, all I need to do is monitor the progress of data
>> consumption of the Kafka topic.
>>
>>
>> On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger 
>> wrote:
>>
>>> You can't use different versions of spark in your application vs
>>> your cluster.
>>>
>>> For the direct stream, it's not 60 partitions per executor, it's 300
>>> partitions, and executors work on them as they are scheduled.  Yes, if 
>>> you
>>> have no messages you will get an empty partition.  It's up to you 
>>> whether
>>> it's worthwhile to call coalesce or not.
>>>
>>> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 Is this 3 is no of parallel consumer threads per receiver , means
 in total we have 2*3=6 consumer in same consumer group consuming from 
 all
 300 partitions.
 3 is just parallelism on same receiver and recommendation is to use
 1 per receiver since consuming from kafka is not cpu bound rather
 NIC(network bound)  increasing consumer thread on one receiver won't 
 make
 it parallel in ideal sense ?

 In non receiver based consumer spark 1.3 If I use 5 execuots and
 kafka topic has 300 partions , does kafkaRDD created on 5 executors 
 will
 have 60 partitions per executor (total 300 one to one mapping) and if 
 some
 of kafka partitions are empty say offset of last checkpoint to current 
 is
 same for partitons P123, still it will create empty partition in 
 kafkaRDD ?
 So we should call coalesce on kafkaRDD ?


 And is 

Re: spark streaming with kafka reset offset

2015-07-14 Thread Chen Song
Thanks TD.

As for 1), if timing is not guaranteed, how does exactly once semantics
supported? It feels like exactly once receiving is not necessarily exactly
once processing.

Chen

On Tue, Jul 14, 2015 at 10:16 PM, Tathagata Das  wrote:

>
>
> On Tue, Jul 14, 2015 at 6:42 PM, Chen Song  wrote:
>
>> Thanks TD and Cody. I saw that.
>>
>> 1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets
>> on HDFS at the end of each batch interval?
>>
>
> The timing is not guaranteed.
>
>
>> 2. In the code, if I first apply transformations and actions on the
>> directKafkaStream and then use foreachRDD on the original KafkaDStream to
>> commit offsets myself, will offsets commits always happen after
>> transformation and action?
>>
>> What do you mean by "original KafkaDStream"? if you meant the
> directKafkaStream? If yes, then yes, output operations like foreachRDD is
> executed in each batch in the same order as they are defined.
>
> dstream1.foreachRDD { rdd => func1(rdd) }
> dstream2.foreachRDD { rdd => func2(rdd) }
>
> In every batch interval, func1 will be executed before func2.
>
>
>
>
>> Chen
>>
>> On Tue, Jul 14, 2015 at 6:43 PM, Tathagata Das 
>> wrote:
>>
>>> Relevant documentation -
>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
>>> towards the end.
>>>
>>> directKafkaStream.foreachRDD { rdd =>
>>>  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
>>>  // offsetRanges.length = # of Kafka partitions being consumed
>>>  ...
>>>  }
>>>
>>>
>>> On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger 
>>> wrote:
>>>
 You have access to the offset ranges for a given rdd in the stream by
 typecasting to HasOffsetRanges.  You can then store the offsets wherever
 you need to.

 On Tue, Jul 14, 2015 at 5:00 PM, Chen Song 
 wrote:

> A follow up question.
>
> When using createDirectStream approach, the offsets are checkpointed
> to HDFS and it is understandable by Spark Streaming job. Is there a way to
> expose the offsets via a REST api to end users. Or alternatively, is there
> a way to have offsets committed to Kafka Offset Manager so users can query
> from a consumer programmatically?
>
> Essentially, all I need to do is monitor the progress of data
> consumption of the Kafka topic.
>
>
> On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger 
> wrote:
>
>> You can't use different versions of spark in your application vs your
>> cluster.
>>
>> For the direct stream, it's not 60 partitions per executor, it's 300
>> partitions, and executors work on them as they are scheduled.  Yes, if 
>> you
>> have no messages you will get an empty partition.  It's up to you whether
>> it's worthwhile to call coalesce or not.
>>
>> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Is this 3 is no of parallel consumer threads per receiver , means in
>>> total we have 2*3=6 consumer in same consumer group consuming from all 
>>> 300
>>> partitions.
>>> 3 is just parallelism on same receiver and recommendation is to use
>>> 1 per receiver since consuming from kafka is not cpu bound rather
>>> NIC(network bound)  increasing consumer thread on one receiver won't 
>>> make
>>> it parallel in ideal sense ?
>>>
>>> In non receiver based consumer spark 1.3 If I use 5 execuots and
>>> kafka topic has 300 partions , does kafkaRDD created on 5 executors will
>>> have 60 partitions per executor (total 300 one to one mapping) and if 
>>> some
>>> of kafka partitions are empty say offset of last checkpoint to current 
>>> is
>>> same for partitons P123, still it will create empty partition in 
>>> kafkaRDD ?
>>> So we should call coalesce on kafkaRDD ?
>>>
>>>
>>> And is there any incompatibity issue when I include
>>> spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in 
>>> my
>>> application but my cluster has spark version 1.2 ?
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 1. Here you are basically creating 2 receivers and asking each of
 them to consume 3 kafka partitions each.

 - In 1.2 we have high level consumers so how can we restrict no of
 kafka partitions to consume from? Say I have 300 kafka partitions in 
 kafka
 topic and as in above I gave 2 receivers and 3 kafka partitions . Then 
 is
 it mean I will read from 6 out of 300 partitions only and for rest 294
 partitions data is lost?


 2.One more doubt in spark streaming how is it decided which part of
 main function of driver will run at each batch interval ? Since whole 
 code
 is written in one funct

Re: spark streaming with kafka reset offset

2015-07-14 Thread Tathagata Das
On Tue, Jul 14, 2015 at 6:42 PM, Chen Song  wrote:

> Thanks TD and Cody. I saw that.
>
> 1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets
> on HDFS at the end of each batch interval?
>

The timing is not guaranteed.


> 2. In the code, if I first apply transformations and actions on the
> directKafkaStream and then use foreachRDD on the original KafkaDStream to
> commit offsets myself, will offsets commits always happen after
> transformation and action?
>
> What do you mean by "original KafkaDStream"? if you meant the
directKafkaStream? If yes, then yes, output operations like foreachRDD is
executed in each batch in the same order as they are defined.

dstream1.foreachRDD { rdd => func1(rdd) }
dstream2.foreachRDD { rdd => func2(rdd) }

In every batch interval, func1 will be executed before func2.




> Chen
>
> On Tue, Jul 14, 2015 at 6:43 PM, Tathagata Das 
> wrote:
>
>> Relevant documentation -
>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
>> towards the end.
>>
>> directKafkaStream.foreachRDD { rdd =>
>>  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
>>  // offsetRanges.length = # of Kafka partitions being consumed
>>  ...
>>  }
>>
>>
>> On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger 
>> wrote:
>>
>>> You have access to the offset ranges for a given rdd in the stream by
>>> typecasting to HasOffsetRanges.  You can then store the offsets wherever
>>> you need to.
>>>
>>> On Tue, Jul 14, 2015 at 5:00 PM, Chen Song 
>>> wrote:
>>>
 A follow up question.

 When using createDirectStream approach, the offsets are checkpointed to
 HDFS and it is understandable by Spark Streaming job. Is there a way to
 expose the offsets via a REST api to end users. Or alternatively, is there
 a way to have offsets committed to Kafka Offset Manager so users can query
 from a consumer programmatically?

 Essentially, all I need to do is monitor the progress of data
 consumption of the Kafka topic.


 On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger 
 wrote:

> You can't use different versions of spark in your application vs your
> cluster.
>
> For the direct stream, it's not 60 partitions per executor, it's 300
> partitions, and executors work on them as they are scheduled.  Yes, if you
> have no messages you will get an empty partition.  It's up to you whether
> it's worthwhile to call coalesce or not.
>
> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> Is this 3 is no of parallel consumer threads per receiver , means in
>> total we have 2*3=6 consumer in same consumer group consuming from all 
>> 300
>> partitions.
>> 3 is just parallelism on same receiver and recommendation is to use 1
>> per receiver since consuming from kafka is not cpu bound rather 
>> NIC(network
>> bound)  increasing consumer thread on one receiver won't make it parallel
>> in ideal sense ?
>>
>> In non receiver based consumer spark 1.3 If I use 5 execuots and
>> kafka topic has 300 partions , does kafkaRDD created on 5 executors will
>> have 60 partitions per executor (total 300 one to one mapping) and if 
>> some
>> of kafka partitions are empty say offset of last checkpoint to current is
>> same for partitons P123, still it will create empty partition in 
>> kafkaRDD ?
>> So we should call coalesce on kafkaRDD ?
>>
>>
>> And is there any incompatibity issue when I include
>> spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in my
>> application but my cluster has spark version 1.2 ?
>>
>>
>>
>>
>>
>>
>> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> 1. Here you are basically creating 2 receivers and asking each of
>>> them to consume 3 kafka partitions each.
>>>
>>> - In 1.2 we have high level consumers so how can we restrict no of
>>> kafka partitions to consume from? Say I have 300 kafka partitions in 
>>> kafka
>>> topic and as in above I gave 2 receivers and 3 kafka partitions . Then 
>>> is
>>> it mean I will read from 6 out of 300 partitions only and for rest 294
>>> partitions data is lost?
>>>
>>>
>>> 2.One more doubt in spark streaming how is it decided which part of
>>> main function of driver will run at each batch interval ? Since whole 
>>> code
>>> is written in one function(main function in driver) so how it determined
>>> kafka streams receivers  not to be registered in each batch only 
>>> processing
>>> to be done .
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha 
>>> wrote:
>>>
 Hi

 Let me take ashot at your questions. (I am sure people like Cody
 and TD will co

Re: spark streaming with kafka reset offset

2015-07-14 Thread Chen Song
Thanks TD and Cody. I saw that.

1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets on
HDFS at the end of each batch interval?
2. In the code, if I first apply transformations and actions on the
directKafkaStream and then use foreachRDD on the original KafkaDStream to
commit offsets myself, will offsets commits always happen after
transformation and action?

Chen

On Tue, Jul 14, 2015 at 6:43 PM, Tathagata Das  wrote:

> Relevant documentation -
> https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
> towards the end.
>
> directKafkaStream.foreachRDD { rdd =>
>  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
>  // offsetRanges.length = # of Kafka partitions being consumed
>  ...
>  }
>
>
> On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger 
> wrote:
>
>> You have access to the offset ranges for a given rdd in the stream by
>> typecasting to HasOffsetRanges.  You can then store the offsets wherever
>> you need to.
>>
>> On Tue, Jul 14, 2015 at 5:00 PM, Chen Song 
>> wrote:
>>
>>> A follow up question.
>>>
>>> When using createDirectStream approach, the offsets are checkpointed to
>>> HDFS and it is understandable by Spark Streaming job. Is there a way to
>>> expose the offsets via a REST api to end users. Or alternatively, is there
>>> a way to have offsets committed to Kafka Offset Manager so users can query
>>> from a consumer programmatically?
>>>
>>> Essentially, all I need to do is monitor the progress of data
>>> consumption of the Kafka topic.
>>>
>>>
>>> On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger 
>>> wrote:
>>>
 You can't use different versions of spark in your application vs your
 cluster.

 For the direct stream, it's not 60 partitions per executor, it's 300
 partitions, and executors work on them as they are scheduled.  Yes, if you
 have no messages you will get an empty partition.  It's up to you whether
 it's worthwhile to call coalesce or not.

 On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> Is this 3 is no of parallel consumer threads per receiver , means in
> total we have 2*3=6 consumer in same consumer group consuming from all 300
> partitions.
> 3 is just parallelism on same receiver and recommendation is to use 1
> per receiver since consuming from kafka is not cpu bound rather 
> NIC(network
> bound)  increasing consumer thread on one receiver won't make it parallel
> in ideal sense ?
>
> In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
> topic has 300 partions , does kafkaRDD created on 5 executors will have 60
> partitions per executor (total 300 one to one mapping) and if some of 
> kafka
> partitions are empty say offset of last checkpoint to current is same for
> partitons P123, still it will create empty partition in kafkaRDD ? So we
> should call coalesce on kafkaRDD ?
>
>
> And is there any incompatibity issue when I include
> spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in my
> application but my cluster has spark version 1.2 ?
>
>
>
>
>
>
> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> 1. Here you are basically creating 2 receivers and asking each of
>> them to consume 3 kafka partitions each.
>>
>> - In 1.2 we have high level consumers so how can we restrict no of
>> kafka partitions to consume from? Say I have 300 kafka partitions in 
>> kafka
>> topic and as in above I gave 2 receivers and 3 kafka partitions . Then is
>> it mean I will read from 6 out of 300 partitions only and for rest 294
>> partitions data is lost?
>>
>>
>> 2.One more doubt in spark streaming how is it decided which part of
>> main function of driver will run at each batch interval ? Since whole 
>> code
>> is written in one function(main function in driver) so how it determined
>> kafka streams receivers  not to be registered in each batch only 
>> processing
>> to be done .
>>
>>
>>
>>
>>
>>
>> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha 
>> wrote:
>>
>>> Hi
>>>
>>> Let me take ashot at your questions. (I am sure people like Cody and
>>> TD will correct if I am wrong)
>>>
>>> 0. This is exact copy from the similar question in mail thread from
>>> Akhil D:
>>> Since you set local[4] you will have 4 threads for your computation,
>>> and since you are having 2 receivers, you are left with 2 threads
>>> to process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2
>>> means you are having 2 tasks in that stage (with id 0).
>>>
>>> 1. Here you are basically creating 2 receivers and asking each of
>>> them to consume 3 kafka partitions each.
>>> 2. How does that matter? It depends on how m

Re: spark streaming with kafka reset offset

2015-07-14 Thread Tathagata Das
Relevant documentation -
https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
towards the end.

directKafkaStream.foreachRDD { rdd =>
 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
 // offsetRanges.length = # of Kafka partitions being consumed
 ...
 }


On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger  wrote:

> You have access to the offset ranges for a given rdd in the stream by
> typecasting to HasOffsetRanges.  You can then store the offsets wherever
> you need to.
>
> On Tue, Jul 14, 2015 at 5:00 PM, Chen Song  wrote:
>
>> A follow up question.
>>
>> When using createDirectStream approach, the offsets are checkpointed to
>> HDFS and it is understandable by Spark Streaming job. Is there a way to
>> expose the offsets via a REST api to end users. Or alternatively, is there
>> a way to have offsets committed to Kafka Offset Manager so users can query
>> from a consumer programmatically?
>>
>> Essentially, all I need to do is monitor the progress of data consumption
>> of the Kafka topic.
>>
>>
>> On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger 
>> wrote:
>>
>>> You can't use different versions of spark in your application vs your
>>> cluster.
>>>
>>> For the direct stream, it's not 60 partitions per executor, it's 300
>>> partitions, and executors work on them as they are scheduled.  Yes, if you
>>> have no messages you will get an empty partition.  It's up to you whether
>>> it's worthwhile to call coalesce or not.
>>>
>>> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 Is this 3 is no of parallel consumer threads per receiver , means in
 total we have 2*3=6 consumer in same consumer group consuming from all 300
 partitions.
 3 is just parallelism on same receiver and recommendation is to use 1
 per receiver since consuming from kafka is not cpu bound rather NIC(network
 bound)  increasing consumer thread on one receiver won't make it parallel
 in ideal sense ?

 In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
 topic has 300 partions , does kafkaRDD created on 5 executors will have 60
 partitions per executor (total 300 one to one mapping) and if some of kafka
 partitions are empty say offset of last checkpoint to current is same for
 partitons P123, still it will create empty partition in kafkaRDD ? So we
 should call coalesce on kafkaRDD ?


 And is there any incompatibity issue when I include
 spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in my
 application but my cluster has spark version 1.2 ?






 On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> 1. Here you are basically creating 2 receivers and asking each of them
> to consume 3 kafka partitions each.
>
> - In 1.2 we have high level consumers so how can we restrict no of
> kafka partitions to consume from? Say I have 300 kafka partitions in kafka
> topic and as in above I gave 2 receivers and 3 kafka partitions . Then is
> it mean I will read from 6 out of 300 partitions only and for rest 294
> partitions data is lost?
>
>
> 2.One more doubt in spark streaming how is it decided which part of
> main function of driver will run at each batch interval ? Since whole code
> is written in one function(main function in driver) so how it determined
> kafka streams receivers  not to be registered in each batch only 
> processing
> to be done .
>
>
>
>
>
>
> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha 
> wrote:
>
>> Hi
>>
>> Let me take ashot at your questions. (I am sure people like Cody and
>> TD will correct if I am wrong)
>>
>> 0. This is exact copy from the similar question in mail thread from
>> Akhil D:
>> Since you set local[4] you will have 4 threads for your computation,
>> and since you are having 2 receivers, you are left with 2 threads to
>> process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2
>> means you are having 2 tasks in that stage (with id 0).
>>
>> 1. Here you are basically creating 2 receivers and asking each of
>> them to consume 3 kafka partitions each.
>> 2. How does that matter? It depends on how many receivers you have
>> created to consume that data and if you have repartitioned it. Remember,
>> spark is lazy and executors are relted to the context
>> 3. I think in java, factory method is fixed. You just pass around the
>> contextFactory object. (I love python :) see the signature isso much
>> cleaner :) )
>> 4. Yes, if you use spark checkpointing. You can use yourcustom check
>> pointing too.
>>
>> Best
>> Ayan
>>
>>
>>
>> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Few

Re: spark streaming with kafka reset offset

2015-07-14 Thread Cody Koeninger
You have access to the offset ranges for a given rdd in the stream by
typecasting to HasOffsetRanges.  You can then store the offsets wherever
you need to.

On Tue, Jul 14, 2015 at 5:00 PM, Chen Song  wrote:

> A follow up question.
>
> When using createDirectStream approach, the offsets are checkpointed to
> HDFS and it is understandable by Spark Streaming job. Is there a way to
> expose the offsets via a REST api to end users. Or alternatively, is there
> a way to have offsets committed to Kafka Offset Manager so users can query
> from a consumer programmatically?
>
> Essentially, all I need to do is monitor the progress of data consumption
> of the Kafka topic.
>
>
> On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger 
> wrote:
>
>> You can't use different versions of spark in your application vs your
>> cluster.
>>
>> For the direct stream, it's not 60 partitions per executor, it's 300
>> partitions, and executors work on them as they are scheduled.  Yes, if you
>> have no messages you will get an empty partition.  It's up to you whether
>> it's worthwhile to call coalesce or not.
>>
>> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Is this 3 is no of parallel consumer threads per receiver , means in
>>> total we have 2*3=6 consumer in same consumer group consuming from all 300
>>> partitions.
>>> 3 is just parallelism on same receiver and recommendation is to use 1
>>> per receiver since consuming from kafka is not cpu bound rather NIC(network
>>> bound)  increasing consumer thread on one receiver won't make it parallel
>>> in ideal sense ?
>>>
>>> In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
>>> topic has 300 partions , does kafkaRDD created on 5 executors will have 60
>>> partitions per executor (total 300 one to one mapping) and if some of kafka
>>> partitions are empty say offset of last checkpoint to current is same for
>>> partitons P123, still it will create empty partition in kafkaRDD ? So we
>>> should call coalesce on kafkaRDD ?
>>>
>>>
>>> And is there any incompatibity issue when I include spark-streaming_2.10
>>> (version 1.3) and spark-core_2.10(version 1.3) in my application but my
>>> cluster has spark version 1.2 ?
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 1. Here you are basically creating 2 receivers and asking each of them
 to consume 3 kafka partitions each.

 - In 1.2 we have high level consumers so how can we restrict no of
 kafka partitions to consume from? Say I have 300 kafka partitions in kafka
 topic and as in above I gave 2 receivers and 3 kafka partitions . Then is
 it mean I will read from 6 out of 300 partitions only and for rest 294
 partitions data is lost?


 2.One more doubt in spark streaming how is it decided which part of
 main function of driver will run at each batch interval ? Since whole code
 is written in one function(main function in driver) so how it determined
 kafka streams receivers  not to be registered in each batch only processing
 to be done .






 On Mon, Jun 29, 2015 at 7:35 PM, ayan guha  wrote:

> Hi
>
> Let me take ashot at your questions. (I am sure people like Cody and
> TD will correct if I am wrong)
>
> 0. This is exact copy from the similar question in mail thread from
> Akhil D:
> Since you set local[4] you will have 4 threads for your computation,
> and since you are having 2 receivers, you are left with 2 threads to
> process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2
> means you are having 2 tasks in that stage (with id 0).
>
> 1. Here you are basically creating 2 receivers and asking each of them
> to consume 3 kafka partitions each.
> 2. How does that matter? It depends on how many receivers you have
> created to consume that data and if you have repartitioned it. Remember,
> spark is lazy and executors are relted to the context
> 3. I think in java, factory method is fixed. You just pass around the
> contextFactory object. (I love python :) see the signature isso much
> cleaner :) )
> 4. Yes, if you use spark checkpointing. You can use yourcustom check
> pointing too.
>
> Best
> Ayan
>
>
>
> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> Few doubts :
>>
>> In 1.2 streaming when I use union of streams , my streaming
>> application getting hanged sometimes and nothing gets printed on driver.
>>
>>
>> [Stage 2:>
>>
>> (0 + 2) / 2]
>>  Whats is 0+2/2 here signifies.
>>
>>
>>
>> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be
>> same as numstreams=2 ? in unioned stream ?
>>
>> 2. I launched app

Re: spark streaming with kafka reset offset

2015-07-14 Thread Chen Song
A follow up question.

When using createDirectStream approach, the offsets are checkpointed to
HDFS and it is understandable by Spark Streaming job. Is there a way to
expose the offsets via a REST api to end users. Or alternatively, is there
a way to have offsets committed to Kafka Offset Manager so users can query
from a consumer programmatically?

Essentially, all I need to do is monitor the progress of data consumption
of the Kafka topic.


On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger  wrote:

> You can't use different versions of spark in your application vs your
> cluster.
>
> For the direct stream, it's not 60 partitions per executor, it's 300
> partitions, and executors work on them as they are scheduled.  Yes, if you
> have no messages you will get an empty partition.  It's up to you whether
> it's worthwhile to call coalesce or not.
>
> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora  > wrote:
>
>> Is this 3 is no of parallel consumer threads per receiver , means in
>> total we have 2*3=6 consumer in same consumer group consuming from all 300
>> partitions.
>> 3 is just parallelism on same receiver and recommendation is to use 1 per
>> receiver since consuming from kafka is not cpu bound rather NIC(network
>> bound)  increasing consumer thread on one receiver won't make it parallel
>> in ideal sense ?
>>
>> In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
>> topic has 300 partions , does kafkaRDD created on 5 executors will have 60
>> partitions per executor (total 300 one to one mapping) and if some of kafka
>> partitions are empty say offset of last checkpoint to current is same for
>> partitons P123, still it will create empty partition in kafkaRDD ? So we
>> should call coalesce on kafkaRDD ?
>>
>>
>> And is there any incompatibity issue when I include spark-streaming_2.10
>> (version 1.3) and spark-core_2.10(version 1.3) in my application but my
>> cluster has spark version 1.2 ?
>>
>>
>>
>>
>>
>>
>> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> 1. Here you are basically creating 2 receivers and asking each of them
>>> to consume 3 kafka partitions each.
>>>
>>> - In 1.2 we have high level consumers so how can we restrict no of kafka
>>> partitions to consume from? Say I have 300 kafka partitions in kafka topic
>>> and as in above I gave 2 receivers and 3 kafka partitions . Then is it mean
>>> I will read from 6 out of 300 partitions only and for rest 294 partitions
>>> data is lost?
>>>
>>>
>>> 2.One more doubt in spark streaming how is it decided which part of main
>>> function of driver will run at each batch interval ? Since whole code is
>>> written in one function(main function in driver) so how it determined kafka
>>> streams receivers  not to be registered in each batch only processing to be
>>> done .
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha  wrote:
>>>
 Hi

 Let me take ashot at your questions. (I am sure people like Cody and TD
 will correct if I am wrong)

 0. This is exact copy from the similar question in mail thread from
 Akhil D:
 Since you set local[4] you will have 4 threads for your computation,
 and since you are having 2 receivers, you are left with 2 threads to
 process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2 means
 you are having 2 tasks in that stage (with id 0).

 1. Here you are basically creating 2 receivers and asking each of them
 to consume 3 kafka partitions each.
 2. How does that matter? It depends on how many receivers you have
 created to consume that data and if you have repartitioned it. Remember,
 spark is lazy and executors are relted to the context
 3. I think in java, factory method is fixed. You just pass around the
 contextFactory object. (I love python :) see the signature isso much
 cleaner :) )
 4. Yes, if you use spark checkpointing. You can use yourcustom check
 pointing too.

 Best
 Ayan



 On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> Few doubts :
>
> In 1.2 streaming when I use union of streams , my streaming
> application getting hanged sometimes and nothing gets printed on driver.
>
>
> [Stage 2:>
>
>   (0 + 2) / 2]
>  Whats is 0+2/2 here signifies.
>
>
>
> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be
> same as numstreams=2 ? in unioned stream ?
>
> 2. I launched app on yarnRM with num-executors as 5 . It created 2
> receivers and 5 execuots . As in stream receivers nodes get fixed at start
> of app throughout its lifetime . Does executors gets allicated at start of
> each job on 1s batch interval? If yes, how does its fast to allocate
> resources. I mean if i increase num-executors to 50 , it will negotiate 50
> 

Re: spark streaming with kafka reset offset

2015-06-30 Thread Cody Koeninger
You can't use different versions of spark in your application vs your
cluster.

For the direct stream, it's not 60 partitions per executor, it's 300
partitions, and executors work on them as they are scheduled.  Yes, if you
have no messages you will get an empty partition.  It's up to you whether
it's worthwhile to call coalesce or not.

On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora 
wrote:

> Is this 3 is no of parallel consumer threads per receiver , means in total
> we have 2*3=6 consumer in same consumer group consuming from all 300
> partitions.
> 3 is just parallelism on same receiver and recommendation is to use 1 per
> receiver since consuming from kafka is not cpu bound rather NIC(network
> bound)  increasing consumer thread on one receiver won't make it parallel
> in ideal sense ?
>
> In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
> topic has 300 partions , does kafkaRDD created on 5 executors will have 60
> partitions per executor (total 300 one to one mapping) and if some of kafka
> partitions are empty say offset of last checkpoint to current is same for
> partitons P123, still it will create empty partition in kafkaRDD ? So we
> should call coalesce on kafkaRDD ?
>
>
> And is there any incompatibity issue when I include spark-streaming_2.10
> (version 1.3) and spark-core_2.10(version 1.3) in my application but my
> cluster has spark version 1.2 ?
>
>
>
>
>
>
> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora  > wrote:
>
>> 1. Here you are basically creating 2 receivers and asking each of them to
>> consume 3 kafka partitions each.
>>
>> - In 1.2 we have high level consumers so how can we restrict no of kafka
>> partitions to consume from? Say I have 300 kafka partitions in kafka topic
>> and as in above I gave 2 receivers and 3 kafka partitions . Then is it mean
>> I will read from 6 out of 300 partitions only and for rest 294 partitions
>> data is lost?
>>
>>
>> 2.One more doubt in spark streaming how is it decided which part of main
>> function of driver will run at each batch interval ? Since whole code is
>> written in one function(main function in driver) so how it determined kafka
>> streams receivers  not to be registered in each batch only processing to be
>> done .
>>
>>
>>
>>
>>
>>
>> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha  wrote:
>>
>>> Hi
>>>
>>> Let me take ashot at your questions. (I am sure people like Cody and TD
>>> will correct if I am wrong)
>>>
>>> 0. This is exact copy from the similar question in mail thread from
>>> Akhil D:
>>> Since you set local[4] you will have 4 threads for your computation, and
>>> since you are having 2 receivers, you are left with 2 threads to
>>> process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2 means
>>> you are having 2 tasks in that stage (with id 0).
>>>
>>> 1. Here you are basically creating 2 receivers and asking each of them
>>> to consume 3 kafka partitions each.
>>> 2. How does that matter? It depends on how many receivers you have
>>> created to consume that data and if you have repartitioned it. Remember,
>>> spark is lazy and executors are relted to the context
>>> 3. I think in java, factory method is fixed. You just pass around the
>>> contextFactory object. (I love python :) see the signature isso much
>>> cleaner :) )
>>> 4. Yes, if you use spark checkpointing. You can use yourcustom check
>>> pointing too.
>>>
>>> Best
>>> Ayan
>>>
>>>
>>>
>>> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 Few doubts :

 In 1.2 streaming when I use union of streams , my streaming application
 getting hanged sometimes and nothing gets printed on driver.


 [Stage 2:>

   (0 + 2) / 2]
  Whats is 0+2/2 here signifies.



 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be
 same as numstreams=2 ? in unioned stream ?

 2. I launched app on yarnRM with num-executors as 5 . It created 2
 receivers and 5 execuots . As in stream receivers nodes get fixed at start
 of app throughout its lifetime . Does executors gets allicated at start of
 each job on 1s batch interval? If yes, how does its fast to allocate
 resources. I mean if i increase num-executors to 50 , it will negotiate 50
 executors from yarnRM at start of each job so does it takes more time in
 allocating executors than batch interval(here 1s , say if 500ms).? Can i
 fixed processing executors also throughout the app?




 SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp");
 JavaStreamingContext jssc = new
 JavaStreamingContext(conf,Durations.milliseconds(1000));

 Map kafkaParams = new HashMap();
 kafkaParams.put("zookeeper.connect","ipadd:2181");
 kafkaParams.put("group.id", "testgroup");
 kafkaParams.put("zookeeper.session.timeout.ms", "1");
  Map topicsMap = new HashMap();
 t

Re: spark streaming with kafka reset offset

2015-06-30 Thread Shushant Arora
Is this 3 is no of parallel consumer threads per receiver , means in total
we have 2*3=6 consumer in same consumer group consuming from all 300
partitions.
3 is just parallelism on same receiver and recommendation is to use 1 per
receiver since consuming from kafka is not cpu bound rather NIC(network
bound)  increasing consumer thread on one receiver won't make it parallel
in ideal sense ?

In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
topic has 300 partions , does kafkaRDD created on 5 executors will have 60
partitions per executor (total 300 one to one mapping) and if some of kafka
partitions are empty say offset of last checkpoint to current is same for
partitons P123, still it will create empty partition in kafkaRDD ? So we
should call coalesce on kafkaRDD ?


And is there any incompatibity issue when I include spark-streaming_2.10
(version 1.3) and spark-core_2.10(version 1.3) in my application but my
cluster has spark version 1.2 ?






On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora 
wrote:

> 1. Here you are basically creating 2 receivers and asking each of them to
> consume 3 kafka partitions each.
>
> - In 1.2 we have high level consumers so how can we restrict no of kafka
> partitions to consume from? Say I have 300 kafka partitions in kafka topic
> and as in above I gave 2 receivers and 3 kafka partitions . Then is it mean
> I will read from 6 out of 300 partitions only and for rest 294 partitions
> data is lost?
>
>
> 2.One more doubt in spark streaming how is it decided which part of main
> function of driver will run at each batch interval ? Since whole code is
> written in one function(main function in driver) so how it determined kafka
> streams receivers  not to be registered in each batch only processing to be
> done .
>
>
>
>
>
>
> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha  wrote:
>
>> Hi
>>
>> Let me take ashot at your questions. (I am sure people like Cody and TD
>> will correct if I am wrong)
>>
>> 0. This is exact copy from the similar question in mail thread from Akhil
>> D:
>> Since you set local[4] you will have 4 threads for your computation, and
>> since you are having 2 receivers, you are left with 2 threads to process
>> ((0 + 2) <-- This 2 is your 2 threads.) And the other /2 means you are
>> having 2 tasks in that stage (with id 0).
>>
>> 1. Here you are basically creating 2 receivers and asking each of them to
>> consume 3 kafka partitions each.
>> 2. How does that matter? It depends on how many receivers you have
>> created to consume that data and if you have repartitioned it. Remember,
>> spark is lazy and executors are relted to the context
>> 3. I think in java, factory method is fixed. You just pass around the
>> contextFactory object. (I love python :) see the signature isso much
>> cleaner :) )
>> 4. Yes, if you use spark checkpointing. You can use yourcustom check
>> pointing too.
>>
>> Best
>> Ayan
>>
>>
>>
>> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Few doubts :
>>>
>>> In 1.2 streaming when I use union of streams , my streaming application
>>> getting hanged sometimes and nothing gets printed on driver.
>>>
>>>
>>> [Stage 2:>
>>>
>>> (0 + 2) / 2]
>>>  Whats is 0+2/2 here signifies.
>>>
>>>
>>>
>>> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be
>>> same as numstreams=2 ? in unioned stream ?
>>>
>>> 2. I launched app on yarnRM with num-executors as 5 . It created 2
>>> receivers and 5 execuots . As in stream receivers nodes get fixed at start
>>> of app throughout its lifetime . Does executors gets allicated at start of
>>> each job on 1s batch interval? If yes, how does its fast to allocate
>>> resources. I mean if i increase num-executors to 50 , it will negotiate 50
>>> executors from yarnRM at start of each job so does it takes more time in
>>> allocating executors than batch interval(here 1s , say if 500ms).? Can i
>>> fixed processing executors also throughout the app?
>>>
>>>
>>>
>>>
>>> SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp");
>>> JavaStreamingContext jssc = new
>>> JavaStreamingContext(conf,Durations.milliseconds(1000));
>>>
>>> Map kafkaParams = new HashMap();
>>> kafkaParams.put("zookeeper.connect","ipadd:2181");
>>> kafkaParams.put("group.id", "testgroup");
>>> kafkaParams.put("zookeeper.session.timeout.ms", "1");
>>>  Map topicsMap = new HashMap();
>>> topicsMap.put("testSparkPartitioned", 3);
>>> int numStreams = 2;
>>> List> kafkaStreams = new
>>> ArrayList>();
>>>   for(int i=0;i>>  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
>>> byte[].class,kafka.serializer.DefaultDecoder.class ,
>>> kafka.serializer.DefaultDecoder.class,
>>> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
>>> }
>>>  JavaPairDStream directKafkaStream =
>>> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
>>> kafkaStreams.size()));
>>>  JavaDStream lines = directKafkaStream.map(new

Re: spark streaming with kafka reset offset

2015-06-29 Thread Shushant Arora
1. Here you are basically creating 2 receivers and asking each of them to
consume 3 kafka partitions each.

- In 1.2 we have high level consumers so how can we restrict no of kafka
partitions to consume from? Say I have 300 kafka partitions in kafka topic
and as in above I gave 2 receivers and 3 kafka partitions . Then is it mean
I will read from 6 out of 300 partitions only and for rest 294 partitions
data is lost?


2.One more doubt in spark streaming how is it decided which part of main
function of driver will run at each batch interval ? Since whole code is
written in one function(main function in driver) so how it determined kafka
streams receivers  not to be registered in each batch only processing to be
done .






On Mon, Jun 29, 2015 at 7:35 PM, ayan guha  wrote:

> Hi
>
> Let me take ashot at your questions. (I am sure people like Cody and TD
> will correct if I am wrong)
>
> 0. This is exact copy from the similar question in mail thread from Akhil
> D:
> Since you set local[4] you will have 4 threads for your computation, and
> since you are having 2 receivers, you are left with 2 threads to process (
> (0 + 2) <-- This 2 is your 2 threads.) And the other /2 means you are
> having 2 tasks in that stage (with id 0).
>
> 1. Here you are basically creating 2 receivers and asking each of them to
> consume 3 kafka partitions each.
> 2. How does that matter? It depends on how many receivers you have created
> to consume that data and if you have repartitioned it. Remember, spark is
> lazy and executors are relted to the context
> 3. I think in java, factory method is fixed. You just pass around the
> contextFactory object. (I love python :) see the signature isso much
> cleaner :) )
> 4. Yes, if you use spark checkpointing. You can use yourcustom check
> pointing too.
>
> Best
> Ayan
>
>
>
> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora  > wrote:
>
>> Few doubts :
>>
>> In 1.2 streaming when I use union of streams , my streaming application
>> getting hanged sometimes and nothing gets printed on driver.
>>
>>
>> [Stage 2:>
>>
>> (0 + 2) / 2]
>>  Whats is 0+2/2 here signifies.
>>
>>
>>
>> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be
>> same as numstreams=2 ? in unioned stream ?
>>
>> 2. I launched app on yarnRM with num-executors as 5 . It created 2
>> receivers and 5 execuots . As in stream receivers nodes get fixed at start
>> of app throughout its lifetime . Does executors gets allicated at start of
>> each job on 1s batch interval? If yes, how does its fast to allocate
>> resources. I mean if i increase num-executors to 50 , it will negotiate 50
>> executors from yarnRM at start of each job so does it takes more time in
>> allocating executors than batch interval(here 1s , say if 500ms).? Can i
>> fixed processing executors also throughout the app?
>>
>>
>>
>>
>> SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp");
>> JavaStreamingContext jssc = new
>> JavaStreamingContext(conf,Durations.milliseconds(1000));
>>
>> Map kafkaParams = new HashMap();
>> kafkaParams.put("zookeeper.connect","ipadd:2181");
>> kafkaParams.put("group.id", "testgroup");
>> kafkaParams.put("zookeeper.session.timeout.ms", "1");
>>  Map topicsMap = new HashMap();
>> topicsMap.put("testSparkPartitioned", 3);
>> int numStreams = 2;
>> List> kafkaStreams = new
>> ArrayList>();
>>   for(int i=0;i>  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
>> byte[].class,kafka.serializer.DefaultDecoder.class ,
>> kafka.serializer.DefaultDecoder.class,
>> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
>> }
>>  JavaPairDStream directKafkaStream =
>> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
>> kafkaStreams.size()));
>>  JavaDStream lines = directKafkaStream.map(new
>> Function, String>() {
>>
>> public String call(Tuple2 arg0) throws Exception {
>> ...processing
>> ..return msg;
>> }
>> });
>> lines.print();
>> jssc.start();
>> jssc.awaitTermination();
>>
>>
>>
>>
>> ---
>> 3.For avoiding dataloss when we use checkpointing, and factory method to
>> create sparkConytext, is method name fixed
>> or we can use any name and how to set in app the method name to be used ?
>>
>> 4.In 1.3 non receiver based streaming, kafka offset is not stored in
>> zookeeper, is it because of zookeeper is not efficient for high writes and
>> read is not strictly consistent? So
>>
>>  we use simple Kafka API that does not use Zookeeper and offsets tracked
>> only by Spark Streaming within its checkpoints. This eliminates
>> inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each
>> record is received by Spark Streaming effectively exactly once despite
>> failures.
>>
>> So we have to call context.checkpoint(hdfsdir)? Or is it implicit
>> checkoint location ? Means does hdfs be used for sm

Re: spark streaming with kafka reset offset

2015-06-29 Thread ayan guha
Hi

Let me take ashot at your questions. (I am sure people like Cody and TD
will correct if I am wrong)

0. This is exact copy from the similar question in mail thread from Akhil D:
Since you set local[4] you will have 4 threads for your computation, and
since you are having 2 receivers, you are left with 2 threads to process ((0
+ 2) <-- This 2 is your 2 threads.) And the other /2 means you are having 2
tasks in that stage (with id 0).

1. Here you are basically creating 2 receivers and asking each of them to
consume 3 kafka partitions each.
2. How does that matter? It depends on how many receivers you have created
to consume that data and if you have repartitioned it. Remember, spark is
lazy and executors are relted to the context
3. I think in java, factory method is fixed. You just pass around the
contextFactory object. (I love python :) see the signature isso much
cleaner :) )
4. Yes, if you use spark checkpointing. You can use yourcustom check
pointing too.

Best
Ayan



On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora 
wrote:

> Few doubts :
>
> In 1.2 streaming when I use union of streams , my streaming application
> getting hanged sometimes and nothing gets printed on driver.
>
>
> [Stage 2:>
>
>   (0 + 2) / 2]
>  Whats is 0+2/2 here signifies.
>
>
>
> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be same
> as numstreams=2 ? in unioned stream ?
>
> 2. I launched app on yarnRM with num-executors as 5 . It created 2
> receivers and 5 execuots . As in stream receivers nodes get fixed at start
> of app throughout its lifetime . Does executors gets allicated at start of
> each job on 1s batch interval? If yes, how does its fast to allocate
> resources. I mean if i increase num-executors to 50 , it will negotiate 50
> executors from yarnRM at start of each job so does it takes more time in
> allocating executors than batch interval(here 1s , say if 500ms).? Can i
> fixed processing executors also throughout the app?
>
>
>
>
> SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp");
> JavaStreamingContext jssc = new
> JavaStreamingContext(conf,Durations.milliseconds(1000));
>
> Map kafkaParams = new HashMap();
> kafkaParams.put("zookeeper.connect","ipadd:2181");
> kafkaParams.put("group.id", "testgroup");
> kafkaParams.put("zookeeper.session.timeout.ms", "1");
>  Map topicsMap = new HashMap();
> topicsMap.put("testSparkPartitioned", 3);
> int numStreams = 2;
> List> kafkaStreams = new
> ArrayList>();
>   for(int i=0;i  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
> byte[].class,kafka.serializer.DefaultDecoder.class ,
> kafka.serializer.DefaultDecoder.class,
> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
> }
>  JavaPairDStream directKafkaStream =
> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
> kafkaStreams.size()));
>  JavaDStream lines = directKafkaStream.map(new
> Function, String>() {
>
> public String call(Tuple2 arg0) throws Exception {
> ...processing
> ..return msg;
> }
> });
> lines.print();
> jssc.start();
> jssc.awaitTermination();
>
>
>
>
> ---
> 3.For avoiding dataloss when we use checkpointing, and factory method to
> create sparkConytext, is method name fixed
> or we can use any name and how to set in app the method name to be used ?
>
> 4.In 1.3 non receiver based streaming, kafka offset is not stored in
> zookeeper, is it because of zookeeper is not efficient for high writes and
> read is not strictly consistent? So
>
>  we use simple Kafka API that does not use Zookeeper and offsets tracked
> only by Spark Streaming within its checkpoints. This eliminates
> inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each
> record is received by Spark Streaming effectively exactly once despite
> failures.
>
> So we have to call context.checkpoint(hdfsdir)? Or is it implicit
> checkoint location ? Means does hdfs be used for small data(just offset?)
>
>
>
>
>
>
>
>
>
>
> On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi,
>>
>> There is another option to try for Receiver Based Low Level Kafka
>> Consumer which is part of Spark-Packages (
>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This
>> can be used with WAL as well for end to end zero data loss.
>>
>> This is also Reliable Receiver and Commit offset to ZK.  Given the number
>> of Kafka Partitions you have ( > 100) , using High Level Kafka API for
>> Receiver based approach may leads to issues related Consumer Re-balancing
>>  which is a major issue of Kafka High Level API.
>>
>> Regards,
>> Dibyendu
>>
>>
>>
>> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das 
>> wrote:
>>
>>> In the receiver based approach, If the receiver crashes for any reason
>>> (receiver crashed or executor crashed) the receiver sh

Re: spark streaming with kafka reset offset

2015-06-29 Thread Cody Koeninger
3. You need to use your own method, because you need to set up your job.
Read the checkpoint documentation.

4.  Yes, if you want to checkpoint, you need to specify a url to store the
checkpoint at (s3 or hdfs).  Yes, for the direct stream checkpoint it's
just offsets, not all the messages.

On Sun, Jun 28, 2015 at 1:02 PM, Shushant Arora 
wrote:

> Few doubts :
>
> In 1.2 streaming when I use union of streams , my streaming application
> getting hanged sometimes and nothing gets printed on driver.
>
>
> [Stage 2:>
>
>   (0 + 2) / 2]
>  Whats is 0+2/2 here signifies.
>
>
>
> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be same
> as numstreams=2 ? in unioned stream ?
>
> 2. I launched app on yarnRM with num-executors as 5 . It created 2
> receivers and 5 execuots . As in stream receivers nodes get fixed at start
> of app throughout its lifetime . Does executors gets allicated at start of
> each job on 1s batch interval? If yes, how does its fast to allocate
> resources. I mean if i increase num-executors to 50 , it will negotiate 50
> executors from yarnRM at start of each job so does it takes more time in
> allocating executors than batch interval(here 1s , say if 500ms).? Can i
> fixed processing executors also throughout the app?
>
>
>
>
> SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp");
> JavaStreamingContext jssc = new
> JavaStreamingContext(conf,Durations.milliseconds(1000));
>
> Map kafkaParams = new HashMap();
> kafkaParams.put("zookeeper.connect","ipadd:2181");
> kafkaParams.put("group.id", "testgroup");
> kafkaParams.put("zookeeper.session.timeout.ms", "1");
>  Map topicsMap = new HashMap();
> topicsMap.put("testSparkPartitioned", 3);
> int numStreams = 2;
> List> kafkaStreams = new
> ArrayList>();
>   for(int i=0;i  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
> byte[].class,kafka.serializer.DefaultDecoder.class ,
> kafka.serializer.DefaultDecoder.class,
> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
> }
>  JavaPairDStream directKafkaStream =
> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
> kafkaStreams.size()));
>  JavaDStream lines = directKafkaStream.map(new
> Function, String>() {
>
> public String call(Tuple2 arg0) throws Exception {
> ...processing
> ..return msg;
> }
> });
> lines.print();
> jssc.start();
> jssc.awaitTermination();
>
>
>
>
> ---
> 3.For avoiding dataloss when we use checkpointing, and factory method to
> create sparkConytext, is method name fixed
> or we can use any name and how to set in app the method name to be used ?
>
> 4.In 1.3 non receiver based streaming, kafka offset is not stored in
> zookeeper, is it because of zookeeper is not efficient for high writes and
> read is not strictly consistent? So
>
>  we use simple Kafka API that does not use Zookeeper and offsets tracked
> only by Spark Streaming within its checkpoints. This eliminates
> inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each
> record is received by Spark Streaming effectively exactly once despite
> failures.
>
> So we have to call context.checkpoint(hdfsdir)? Or is it implicit
> checkoint location ? Means does hdfs be used for small data(just offset?)
>
>
>
>
>
>
>
>
>
>
> On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi,
>>
>> There is another option to try for Receiver Based Low Level Kafka
>> Consumer which is part of Spark-Packages (
>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This
>> can be used with WAL as well for end to end zero data loss.
>>
>> This is also Reliable Receiver and Commit offset to ZK.  Given the number
>> of Kafka Partitions you have ( > 100) , using High Level Kafka API for
>> Receiver based approach may leads to issues related Consumer Re-balancing
>>  which is a major issue of Kafka High Level API.
>>
>> Regards,
>> Dibyendu
>>
>>
>>
>> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das 
>> wrote:
>>
>>> In the receiver based approach, If the receiver crashes for any reason
>>> (receiver crashed or executor crashed) the receiver should get restarted on
>>> another executor and should start reading data from the offset present in
>>> the zookeeper. There is some chance of data loss which can alleviated using
>>> Write Ahead Logs (see streaming programming guide for more details, or see
>>> my talk [Slides PDF
>>> 
>>> , Video
>>> 
>>> ] from last Spark Summit 2015). But that approach can give duplicate
>>> records. The direct approach gives exactly-once guarantees, so you should
>>> try it out.
>>>
>>> TD
>>

Re: spark streaming with kafka reset offset

2015-06-28 Thread Shushant Arora
Few doubts :

In 1.2 streaming when I use union of streams , my streaming application
getting hanged sometimes and nothing gets printed on driver.


[Stage 2:>

  (0 + 2) / 2]
 Whats is 0+2/2 here signifies.



1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be same
as numstreams=2 ? in unioned stream ?

2. I launched app on yarnRM with num-executors as 5 . It created 2
receivers and 5 execuots . As in stream receivers nodes get fixed at start
of app throughout its lifetime . Does executors gets allicated at start of
each job on 1s batch interval? If yes, how does its fast to allocate
resources. I mean if i increase num-executors to 50 , it will negotiate 50
executors from yarnRM at start of each job so does it takes more time in
allocating executors than batch interval(here 1s , say if 500ms).? Can i
fixed processing executors also throughout the app?




SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp");
JavaStreamingContext jssc = new
JavaStreamingContext(conf,Durations.milliseconds(1000));

Map kafkaParams = new HashMap();
kafkaParams.put("zookeeper.connect","ipadd:2181");
kafkaParams.put("group.id", "testgroup");
kafkaParams.put("zookeeper.session.timeout.ms", "1");
 Map topicsMap = new HashMap();
topicsMap.put("testSparkPartitioned", 3);
int numStreams = 2;
List> kafkaStreams = new
ArrayList>();
  for(int i=0;i directKafkaStream =
jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
kafkaStreams.size()));
 JavaDStream lines = directKafkaStream.map(new
Function, String>() {

public String call(Tuple2 arg0) throws Exception {
...processing
..return msg;
}
});
lines.print();
jssc.start();
jssc.awaitTermination();



---
3.For avoiding dataloss when we use checkpointing, and factory method to
create sparkConytext, is method name fixed
or we can use any name and how to set in app the method name to be used ?

4.In 1.3 non receiver based streaming, kafka offset is not stored in
zookeeper, is it because of zookeeper is not efficient for high writes and
read is not strictly consistent? So

 we use simple Kafka API that does not use Zookeeper and offsets tracked
only by Spark Streaming within its checkpoints. This eliminates
inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each
record is received by Spark Streaming effectively exactly once despite
failures.

So we have to call context.checkpoint(hdfsdir)? Or is it implicit checkoint
location ? Means does hdfs be used for small data(just offset?)










On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Hi,
>
> There is another option to try for Receiver Based Low Level Kafka Consumer
> which is part of Spark-Packages (
> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This
> can be used with WAL as well for end to end zero data loss.
>
> This is also Reliable Receiver and Commit offset to ZK.  Given the number
> of Kafka Partitions you have ( > 100) , using High Level Kafka API for
> Receiver based approach may leads to issues related Consumer Re-balancing
>  which is a major issue of Kafka High Level API.
>
> Regards,
> Dibyendu
>
>
>
> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das 
> wrote:
>
>> In the receiver based approach, If the receiver crashes for any reason
>> (receiver crashed or executor crashed) the receiver should get restarted on
>> another executor and should start reading data from the offset present in
>> the zookeeper. There is some chance of data loss which can alleviated using
>> Write Ahead Logs (see streaming programming guide for more details, or see
>> my talk [Slides PDF
>> 
>> , Video
>> 
>> ] from last Spark Summit 2015). But that approach can give duplicate
>> records. The direct approach gives exactly-once guarantees, so you should
>> try it out.
>>
>> TD
>>
>> On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger 
>> wrote:
>>
>>> Read the spark streaming guide ad the kafka integration guide for a
>>> better understanding of how the receiver based stream works.
>>>
>>> Capacity planning is specific to your environment and what the job is
>>> actually doing, youll need to determine it empirically.
>>>
>>>
>>> On Friday, June 26, 2015, Shushant Arora 
>>> wrote:
>>>
 In 1.2 how to handle offset management after stream application starts
 in each job . I should commit offset after job completion manually?

 And what is recommended no of consumer threads. Say I have 300
 partitions in kafka cluster . Load is ~ 1 million events per second.Each
 event is of ~500bytes. Having 5 receivers with 60 par

Re: spark streaming with kafka reset offset

2015-06-27 Thread Dibyendu Bhattacharya
Hi,

There is another option to try for Receiver Based Low Level Kafka Consumer
which is part of Spark-Packages (
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This can
be used with WAL as well for end to end zero data loss.

This is also Reliable Receiver and Commit offset to ZK.  Given the number
of Kafka Partitions you have ( > 100) , using High Level Kafka API for
Receiver based approach may leads to issues related Consumer Re-balancing
 which is a major issue of Kafka High Level API.

Regards,
Dibyendu



On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das  wrote:

> In the receiver based approach, If the receiver crashes for any reason
> (receiver crashed or executor crashed) the receiver should get restarted on
> another executor and should start reading data from the offset present in
> the zookeeper. There is some chance of data loss which can alleviated using
> Write Ahead Logs (see streaming programming guide for more details, or see
> my talk [Slides PDF
> 
> , Video
> 
> ] from last Spark Summit 2015). But that approach can give duplicate
> records. The direct approach gives exactly-once guarantees, so you should
> try it out.
>
> TD
>
> On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger 
> wrote:
>
>> Read the spark streaming guide ad the kafka integration guide for a
>> better understanding of how the receiver based stream works.
>>
>> Capacity planning is specific to your environment and what the job is
>> actually doing, youll need to determine it empirically.
>>
>>
>> On Friday, June 26, 2015, Shushant Arora 
>> wrote:
>>
>>> In 1.2 how to handle offset management after stream application starts
>>> in each job . I should commit offset after job completion manually?
>>>
>>> And what is recommended no of consumer threads. Say I have 300
>>> partitions in kafka cluster . Load is ~ 1 million events per second.Each
>>> event is of ~500bytes. Having 5 receivers with 60 partitions each receiver
>>> is sufficient for spark streaming to consume ?
>>>
>>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger 
>>> wrote:
>>>
 The receiver-based kafka createStream in spark 1.2 uses zookeeper to
 store offsets.  If you want finer-grained control over offsets, you can
 update the values in zookeeper yourself before starting the job.

 createDirectStream in spark 1.3 is still marked as experimental, and
 subject to change.  That being said, it works better for me in production
 than the receiver based api.

 On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> I am using spark streaming 1.2.
>
> If processing executors get crashed will receiver rest the offset back
> to last processed offset?
>
> If receiver itself got crashed is there a way to reset the offset
> without restarting streaming application other than smallest or largest.
>
>
> Is spark streaming 1.3  which uses low level consumer api, stabe? And
> which is recommended for handling data  loss 1.2 or 1.3 .
>
>
>
>
>
>
>

>>>
>


Re: spark streaming with kafka reset offset

2015-06-27 Thread Tathagata Das
In the receiver based approach, If the receiver crashes for any reason
(receiver crashed or executor crashed) the receiver should get restarted on
another executor and should start reading data from the offset present in
the zookeeper. There is some chance of data loss which can alleviated using
Write Ahead Logs (see streaming programming guide for more details, or see
my talk [Slides PDF

, Video

] from last Spark Summit 2015). But that approach can give duplicate
records. The direct approach gives exactly-once guarantees, so you should
try it out.

TD

On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger  wrote:

> Read the spark streaming guide ad the kafka integration guide for a better
> understanding of how the receiver based stream works.
>
> Capacity planning is specific to your environment and what the job is
> actually doing, youll need to determine it empirically.
>
>
> On Friday, June 26, 2015, Shushant Arora 
> wrote:
>
>> In 1.2 how to handle offset management after stream application starts in
>> each job . I should commit offset after job completion manually?
>>
>> And what is recommended no of consumer threads. Say I have 300 partitions
>> in kafka cluster . Load is ~ 1 million events per second.Each event is of
>> ~500bytes. Having 5 receivers with 60 partitions each receiver is
>> sufficient for spark streaming to consume ?
>>
>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger 
>> wrote:
>>
>>> The receiver-based kafka createStream in spark 1.2 uses zookeeper to
>>> store offsets.  If you want finer-grained control over offsets, you can
>>> update the values in zookeeper yourself before starting the job.
>>>
>>> createDirectStream in spark 1.3 is still marked as experimental, and
>>> subject to change.  That being said, it works better for me in production
>>> than the receiver based api.
>>>
>>> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 I am using spark streaming 1.2.

 If processing executors get crashed will receiver rest the offset back
 to last processed offset?

 If receiver itself got crashed is there a way to reset the offset
 without restarting streaming application other than smallest or largest.


 Is spark streaming 1.3  which uses low level consumer api, stabe? And
 which is recommended for handling data  loss 1.2 or 1.3 .







>>>
>>


Re: spark streaming with kafka reset offset

2015-06-26 Thread Cody Koeninger
Read the spark streaming guide ad the kafka integration guide for a better
understanding of how the receiver based stream works.

Capacity planning is specific to your environment and what the job is
actually doing, youll need to determine it empirically.


On Friday, June 26, 2015, Shushant Arora  wrote:

> In 1.2 how to handle offset management after stream application starts in
> each job . I should commit offset after job completion manually?
>
> And what is recommended no of consumer threads. Say I have 300 partitions
> in kafka cluster . Load is ~ 1 million events per second.Each event is of
> ~500bytes. Having 5 receivers with 60 partitions each receiver is
> sufficient for spark streaming to consume ?
>
> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger  > wrote:
>
>> The receiver-based kafka createStream in spark 1.2 uses zookeeper to
>> store offsets.  If you want finer-grained control over offsets, you can
>> update the values in zookeeper yourself before starting the job.
>>
>> createDirectStream in spark 1.3 is still marked as experimental, and
>> subject to change.  That being said, it works better for me in production
>> than the receiver based api.
>>
>> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora <
>> shushantaror...@gmail.com
>> > wrote:
>>
>>> I am using spark streaming 1.2.
>>>
>>> If processing executors get crashed will receiver rest the offset back
>>> to last processed offset?
>>>
>>> If receiver itself got crashed is there a way to reset the offset
>>> without restarting streaming application other than smallest or largest.
>>>
>>>
>>> Is spark streaming 1.3  which uses low level consumer api, stabe? And
>>> which is recommended for handling data  loss 1.2 or 1.3 .
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


Re: spark streaming with kafka reset offset

2015-06-26 Thread Shushant Arora
In 1.2 how to handle offset management after stream application starts in
each job . I should commit offset after job completion manually?

And what is recommended no of consumer threads. Say I have 300 partitions
in kafka cluster . Load is ~ 1 million events per second.Each event is of
~500bytes. Having 5 receivers with 60 partitions each receiver is
sufficient for spark streaming to consume ?

On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger  wrote:

> The receiver-based kafka createStream in spark 1.2 uses zookeeper to store
> offsets.  If you want finer-grained control over offsets, you can update
> the values in zookeeper yourself before starting the job.
>
> createDirectStream in spark 1.3 is still marked as experimental, and
> subject to change.  That being said, it works better for me in production
> than the receiver based api.
>
> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora  > wrote:
>
>> I am using spark streaming 1.2.
>>
>> If processing executors get crashed will receiver rest the offset back to
>> last processed offset?
>>
>> If receiver itself got crashed is there a way to reset the offset without
>> restarting streaming application other than smallest or largest.
>>
>>
>> Is spark streaming 1.3  which uses low level consumer api, stabe? And
>> which is recommended for handling data  loss 1.2 or 1.3 .
>>
>>
>>
>>
>>
>>
>>
>


Re: spark streaming with kafka reset offset

2015-06-26 Thread Cody Koeninger
The receiver-based kafka createStream in spark 1.2 uses zookeeper to store
offsets.  If you want finer-grained control over offsets, you can update
the values in zookeeper yourself before starting the job.

createDirectStream in spark 1.3 is still marked as experimental, and
subject to change.  That being said, it works better for me in production
than the receiver based api.

On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora 
wrote:

> I am using spark streaming 1.2.
>
> If processing executors get crashed will receiver rest the offset back to
> last processed offset?
>
> If receiver itself got crashed is there a way to reset the offset without
> restarting streaming application other than smallest or largest.
>
>
> Is spark streaming 1.3  which uses low level consumer api, stabe? And
> which is recommended for handling data  loss 1.2 or 1.3 .
>
>
>
>
>
>
>


spark streaming with kafka reset offset

2015-06-26 Thread Shushant Arora
I am using spark streaming 1.2.

If processing executors get crashed will receiver rest the offset back to
last processed offset?

If receiver itself got crashed is there a way to reset the offset without
restarting streaming application other than smallest or largest.


Is spark streaming 1.3  which uses low level consumer api, stabe? And which
is recommended for handling data  loss 1.2 or 1.3 .


Re: spark streaming with kafka jar missing

2015-06-23 Thread Shushant Arora
Thanks a lot. It worked after keeping all versions to same.1.2.0

On Wed, Jun 24, 2015 at 2:24 AM, Tathagata Das  wrote:

> Why are you mixing spark versions between streaming and core??
> Your core is 1.2.0 and streaming is 1.4.0.
>
> On Tue, Jun 23, 2015 at 1:32 PM, Shushant Arora  > wrote:
>
>> It throws exception for WriteAheadLogUtils after excluding core and
>> streaming jar.
>>
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/util/WriteAheadLogUtils$
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:84)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:65)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:103)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
>> at
>> com.adobe.hadoop.saprk.sample.SampleSparkStreamApp.main(SampleSparkStreamApp.java:25)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> at java.lang.reflect.Method.invoke(Method.java:597)
>> at
>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>> pom.xml is :
>>
>> http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
>> http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="
>> http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>>   4.0.0
>>   
>>   SampleSparkStreamApp
>>   1.0
>>
>>
>> 
>> 
>> org.apache.spark
>> spark-core_2.10
>> 1.2.0
>> provided
>> 
>>  
>>  org.apache.spark
>> spark-streaming-kafka_2.10
>> 1.4.0
>> 
>>  
>> org.apache.spark
>> spark-streaming_2.10
>> provided
>> 1.4.0
>> 
>>  
>>   
>> 
>>   
>>   
>> maven-assembly-plugin
>> 
>>   
>> package
>> 
>>   single
>> 
>>   
>> 
>> 
>>   
>> jar-with-dependencies
>>   
>> 
>>   
>> 
>>   
>>
>> 
>>
>> And when I pass streaming jar using --jar option , it threw
>> same java.lang.NoClassDefFoundError: org/apache/spark/util/ThreadUtils$.
>>
>> Thanks
>>
>> On Wed, Jun 24, 2015 at 1:17 AM, Tathagata Das 
>> wrote:
>>
>>> You must not include spark-core and spark-streaming in the assembly.
>>> They are already present in the installation and the presence of multiple
>>> versions of spark may throw off the classloaders in weird ways. So make the
>>> assembly marking the those dependencies as scope=provided.
>>>
>>>
>>>
>>> On Tue, Jun 23, 2015 at 11:56 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 hi

 While using spark streaming (1.2) with kafka . I am getting below error
 and receivers are getting killed but jobs get scheduled at each stream
 interval.

 15/06/23 18:42:35 WARN TaskSetManager: Lost task 0.1 in stage 18.0 (TID
 82, ip(XX)): java.io.IOException: Failed to connect to ip()
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
 at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)


 15/06/23 18:42:36 ERROR ReceiverTracker: Deregistered receiver for
 stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
 org/apache/spark/util/ThreadUtils$
 at
 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:115)
 at
 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
 at
 org.apache.spark.streaming.receiver.Rec

Re: spark streaming with kafka jar missing

2015-06-23 Thread Tathagata Das
Why are you mixing spark versions between streaming and core??
Your core is 1.2.0 and streaming is 1.4.0.

On Tue, Jun 23, 2015 at 1:32 PM, Shushant Arora 
wrote:

> It throws exception for WriteAheadLogUtils after excluding core and
> streaming jar.
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/util/WriteAheadLogUtils$
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:84)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:65)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:103)
> at
> org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
> at
> com.adobe.hadoop.saprk.sample.SampleSparkStreamApp.main(SampleSparkStreamApp.java:25)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> pom.xml is :
>
> http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="
> http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>   4.0.0
>   
>   SampleSparkStreamApp
>   1.0
>
>
> 
> 
> org.apache.spark
> spark-core_2.10
> 1.2.0
> provided
> 
>  
>  org.apache.spark
> spark-streaming-kafka_2.10
> 1.4.0
> 
>  
> org.apache.spark
> spark-streaming_2.10
> provided
> 1.4.0
> 
>  
>   
> 
>   
>   
> maven-assembly-plugin
> 
>   
> package
> 
>   single
> 
>   
> 
> 
>   
> jar-with-dependencies
>   
> 
>   
> 
>   
>
> 
>
> And when I pass streaming jar using --jar option , it threw
> same java.lang.NoClassDefFoundError: org/apache/spark/util/ThreadUtils$.
>
> Thanks
>
> On Wed, Jun 24, 2015 at 1:17 AM, Tathagata Das 
> wrote:
>
>> You must not include spark-core and spark-streaming in the assembly. They
>> are already present in the installation and the presence of multiple
>> versions of spark may throw off the classloaders in weird ways. So make the
>> assembly marking the those dependencies as scope=provided.
>>
>>
>>
>> On Tue, Jun 23, 2015 at 11:56 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> hi
>>>
>>> While using spark streaming (1.2) with kafka . I am getting below error
>>> and receivers are getting killed but jobs get scheduled at each stream
>>> interval.
>>>
>>> 15/06/23 18:42:35 WARN TaskSetManager: Lost task 0.1 in stage 18.0 (TID
>>> 82, ip(XX)): java.io.IOException: Failed to connect to ip()
>>> at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
>>> at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>>> at
>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
>>> at
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>>> at
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>>> at
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:744)
>>>
>>>
>>> 15/06/23 18:42:36 ERROR ReceiverTracker: Deregistered receiver for
>>> stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>> org/apache/spark/util/ThreadUtils$
>>> at
>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:115)
>>> at
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>> at
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>> at
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
>>> at
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLaunch

Re: spark streaming with kafka jar missing

2015-06-23 Thread Shushant Arora
It throws exception for WriteAheadLogUtils after excluding core and
streaming jar.

Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/streaming/util/WriteAheadLogUtils$
at
org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:84)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:65)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:103)
at
org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
at
com.adobe.hadoop.saprk.sample.SampleSparkStreamApp.main(SampleSparkStreamApp.java:25)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


pom.xml is :

http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="
http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
  4.0.0
  
  SampleSparkStreamApp
  1.0




org.apache.spark
spark-core_2.10
1.2.0
provided

 
 org.apache.spark
spark-streaming-kafka_2.10
1.4.0

 
org.apache.spark
spark-streaming_2.10
provided
1.4.0

 
  

  
  
maven-assembly-plugin

  
package

  single

  


  
jar-with-dependencies
  

  

  



And when I pass streaming jar using --jar option , it threw
same java.lang.NoClassDefFoundError: org/apache/spark/util/ThreadUtils$.

Thanks

On Wed, Jun 24, 2015 at 1:17 AM, Tathagata Das  wrote:

> You must not include spark-core and spark-streaming in the assembly. They
> are already present in the installation and the presence of multiple
> versions of spark may throw off the classloaders in weird ways. So make the
> assembly marking the those dependencies as scope=provided.
>
>
>
> On Tue, Jun 23, 2015 at 11:56 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> hi
>>
>> While using spark streaming (1.2) with kafka . I am getting below error
>> and receivers are getting killed but jobs get scheduled at each stream
>> interval.
>>
>> 15/06/23 18:42:35 WARN TaskSetManager: Lost task 0.1 in stage 18.0 (TID
>> 82, ip(XX)): java.io.IOException: Failed to connect to ip()
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>> at
>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:744)
>>
>>
>> 15/06/23 18:42:36 ERROR ReceiverTracker: Deregistered receiver for stream
>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>> org/apache/spark/util/ThreadUtils$
>> at
>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:115)
>> at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>> at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>> at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
>> at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56

Re: spark streaming with kafka jar missing

2015-06-23 Thread Tathagata Das
You must not include spark-core and spark-streaming in the assembly. They
are already present in the installation and the presence of multiple
versions of spark may throw off the classloaders in weird ways. So make the
assembly marking the those dependencies as scope=provided.



On Tue, Jun 23, 2015 at 11:56 AM, Shushant Arora 
wrote:

> hi
>
> While using spark streaming (1.2) with kafka . I am getting below error
> and receivers are getting killed but jobs get scheduled at each stream
> interval.
>
> 15/06/23 18:42:35 WARN TaskSetManager: Lost task 0.1 in stage 18.0 (TID
> 82, ip(XX)): java.io.IOException: Failed to connect to ip()
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
>
>
> 15/06/23 18:42:36 ERROR ReceiverTracker: Deregistered receiver for stream
> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
> org/apache/spark/util/ThreadUtils$
> at
> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:115)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
>
>
> I created jar with include all dependencies. Which jar is missing here ?
>
>
>
>
>


spark streaming with kafka jar missing

2015-06-23 Thread Shushant Arora
hi

While using spark streaming (1.2) with kafka . I am getting below error
and receivers are getting killed but jobs get scheduled at each stream
interval.

15/06/23 18:42:35 WARN TaskSetManager: Lost task 0.1 in stage 18.0 (TID 82,
ip(XX)): java.io.IOException: Failed to connect to ip()
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)


15/06/23 18:42:36 ERROR ReceiverTracker: Deregistered receiver for stream
0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
org/apache/spark/util/ThreadUtils$
at
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:115)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)


I created jar with include all dependencies. Which jar is missing here ?


Re: Spark streaming with kafka

2015-05-29 Thread Akhil Das
Just after receiving the data from kafka, you can do a
dstream.count().print() to see spark and kafka is not the problem, after
that next step would be to identify where is the problem, you can do the
same count and print on each of the dstreams that you are creating (by
transforming), and finally, the indexing into elastic search peace, where
you need to make sure the connection is not a problem. May be you can
create a simple RDD of Map("id" -> "1", "Something" -> "Something") and
call the .saveToES to make sure it is getting written into ES.

Thanks
Best Regards

On Thu, May 28, 2015 at 7:03 PM, boci  wrote:

> Hi guys,
>
> I using spark streaming with kafka... In local machine (start as java
> application without using spark-submit) it's work, connect to kafka and do
> the job (*). I tried to put into spark docker container (hadoop 2.6, spark
> 1.3.1, try spark submit wil local[5] and yarn-client too ) but I'm out of
> success...
>
> No error on the console (the application started), I see something
> received from kafka but the result is not written out to elasticsearch...
>
> Where can I start the debug? I see in the spark console two job, both
> 0/1...
>
> Thanks
>
> --
> Skype: boci13, Hangout: boci.b...@gmail.com
>


Spark streaming with kafka

2015-05-28 Thread boci
Hi guys,

I using spark streaming with kafka... In local machine (start as java
application without using spark-submit) it's work, connect to kafka and do
the job (*). I tried to put into spark docker container (hadoop 2.6, spark
1.3.1, try spark submit wil local[5] and yarn-client too ) but I'm out of
success...

No error on the console (the application started), I see something received
from kafka but the result is not written out to elasticsearch...

Where can I start the debug? I see in the spark console two job, both 0/1...

Thanks
--
Skype: boci13, Hangout: boci.b...@gmail.com


Re: Re: spark streaming with kafka

2015-04-15 Thread Akhil Das
@Shushant: In my case, the receivers will be fixed till the end of the
application. This one's for Kafka case only, if you have a filestream
application, you will not have any receivers. Also, for kafka, next time
you  run the application, it's not fixed that the receivers will get
launched on the same machines.

@bit1129, If the receiver is crashed, it will try to restart the receiver,
since i have only 2 machines on that cluster, it will either restart on the
same node or on the other node.

Thanks
Best Regards

On Wed, Apr 15, 2015 at 4:30 PM, bit1...@163.com  wrote:

> Hi, Akhil,
>
> I would ask a question here:  Assume Receiver-0 is crashed, will it be
> restarted on other worker nodes(In your picture, there would be 2 receivers
> on the same node) or will it start on the same node?
>
> --
> bit1...@163.com
>
>
> *From:* Akhil Das 
> *Date:* 2015-04-15 19:12
> *To:* Shushant Arora 
> *CC:* user 
> *Subject:* Re: spark streaming with kafka
> Once you start your streaming application to read from Kafka, it will
> launch receivers on the executor nodes. And you can see them on the
> streaming tab of your driver ui (runs on 4040).
>
> [image: Inline image 1]
>
> These receivers will be fixed till the end of your pipeline (unless its
> crashed etc.) You can say, eah receiver will run on a single core.
>
> Thanks
> Best Regards
>
> On Wed, Apr 15, 2015 at 3:46 PM, Shushant Arora  > wrote:
>
>> Hi
>>
>> I want to understand the flow of spark streaming with kafka.
>>
>> In spark Streaming is the executor nodes at each run of streaming
>> interval same or At each stream interval cluster manager assigns new
>> executor nodes for processing this batch input. If yes then at each batch
>> interval new executors register themselves as kafka consumers?
>>
>> Even without kafka is executor nodes on each batch interval same or
>> driver nodes gets new executor nodes from cluster manager ?
>>
>> Thanks
>> Shushant
>>
>
>  [image: 提示图标] 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。
> 共有 *1* 个附件
>  image.png(25K) 极速下载
> <http://preview.mail.163.com/xdownload?filename=image.png&mid=1tbiShXcgVO-s0wyewAAsY&part=3&sign=de80f80932aeee7d4eea5a9e5f4b755e&time=1429095438&uid=bit1129%40163.com>
> 在线预览
> <http://preview.mail.163.com/preview?mid=1tbiShXcgVO-s0wyewAAsY&part=3&sign=de80f80932aeee7d4eea5a9e5f4b755e&time=1429095438&uid=bit1129%40163.com>
>
>


Re: spark streaming with kafka

2015-04-15 Thread Shushant Arora
So receivers will be fixed for every run of streaming interval job. Say I
have set stream Duration to be 10 minutes, then after each 10 minute job
will be created and same executor nodes say in your
case(spark-akhil-slave2.c.neat-axis-616.internal
and spark-akhil-slave1.c.neat-axis-616.internal) will be used ?

Is it with all streaming applications that executor nodes will be fixed and
won't change depending on load of current batch or is it for kafka case
only??









On Wed, Apr 15, 2015 at 4:12 PM, Akhil Das 
wrote:

> Once you start your streaming application to read from Kafka, it will
> launch receivers on the executor nodes. And you can see them on the
> streaming tab of your driver ui (runs on 4040).
>
> [image: Inline image 1]
>
> These receivers will be fixed till the end of your pipeline (unless its
> crashed etc.) You can say, eah receiver will run on a single core.
>
> Thanks
> Best Regards
>
> On Wed, Apr 15, 2015 at 3:46 PM, Shushant Arora  > wrote:
>
>> Hi
>>
>> I want to understand the flow of spark streaming with kafka.
>>
>> In spark Streaming is the executor nodes at each run of streaming
>> interval same or At each stream interval cluster manager assigns new
>> executor nodes for processing this batch input. If yes then at each batch
>> interval new executors register themselves as kafka consumers?
>>
>> Even without kafka is executor nodes on each batch interval same or
>> driver nodes gets new executor nodes from cluster manager ?
>>
>> Thanks
>> Shushant
>>
>
>


Re: spark streaming with kafka

2015-04-15 Thread Akhil Das
Once you start your streaming application to read from Kafka, it will
launch receivers on the executor nodes. And you can see them on the
streaming tab of your driver ui (runs on 4040).

[image: Inline image 1]

These receivers will be fixed till the end of your pipeline (unless its
crashed etc.) You can say, eah receiver will run on a single core.

Thanks
Best Regards

On Wed, Apr 15, 2015 at 3:46 PM, Shushant Arora 
wrote:

> Hi
>
> I want to understand the flow of spark streaming with kafka.
>
> In spark Streaming is the executor nodes at each run of streaming interval
> same or At each stream interval cluster manager assigns new executor nodes
> for processing this batch input. If yes then at each batch interval new
> executors register themselves as kafka consumers?
>
> Even without kafka is executor nodes on each batch interval same or driver
> nodes gets new executor nodes from cluster manager ?
>
> Thanks
> Shushant
>


spark streaming with kafka

2015-04-15 Thread Shushant Arora
Hi

I want to understand the flow of spark streaming with kafka.

In spark Streaming is the executor nodes at each run of streaming interval
same or At each stream interval cluster manager assigns new executor nodes
for processing this batch input. If yes then at each batch interval new
executors register themselves as kafka consumers?

Even without kafka is executor nodes on each batch interval same or driver
nodes gets new executor nodes from cluster manager ?

Thanks
Shushant


Re: Spark streaming with Kafka- couldnt find KafkaUtils

2015-04-07 Thread Felix C
Or you could build an uber jar ( you could google that )

https://eradiating.wordpress.com/2015/02/15/getting-spark-streaming-on-kafka-to-work/

--- Original Message ---

From: "Akhil Das" 
Sent: April 4, 2015 11:52 PM
To: "Priya Ch" 
Cc: user@spark.apache.org, "dev" 
Subject: Re: Spark streaming with Kafka- couldnt find KafkaUtils

How are you submitting the application? Use a standard build tool like
maven or sbt to build your project, it will download all the dependency
jars, when you submit your application (if you are using spark-submit, then
use --jars option to add those jars which are causing
classNotFoundException). If you are running as a standalone application
without using spark-submit, then while creating the SparkContext, use
sc.addJar() to add those dependency jars.

For Kafka streaming, when you use sbt, these will be jars that are required:


sc.addJar("/root/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.1.0.jar")
   
sc.addJar("/root/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar")
   
sc.addJar("/root/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.0.jar")
   sc.addJar("/root/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar")




Thanks
Best Regards

On Sun, Apr 5, 2015 at 12:00 PM, Priya Ch 
wrote:

> Hi All,
>
>   I configured Kafka  cluster on a  single node and I have streaming
> application which reads data from kafka topic using KafkaUtils. When I
> execute the code in local mode from the IDE, the application runs fine.
>
> But when I submit the same to spark cluster in standalone mode, I end up
> with the following exception:
> java.lang.ClassNotFoundException:
> org/apache/spark/streaming/kafka/KafkaUtils.
>
> I am using spark-1.2.1 version. when i checked the source files of
> streaming, the source files related to kafka are missing. Are these not
> included in spark-1.3.0 and spark-1.2.1 versions ?
>
> Have to manually include these ??
>
> Regards,
> Padma Ch
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming with Kafka- couldnt find KafkaUtils

2015-04-04 Thread Akhil Das
How are you submitting the application? Use a standard build tool like
maven or sbt to build your project, it will download all the dependency
jars, when you submit your application (if you are using spark-submit, then
use --jars option to add those jars which are causing
classNotFoundException). If you are running as a standalone application
without using spark-submit, then while creating the SparkContext, use
sc.addJar() to add those dependency jars.

For Kafka streaming, when you use sbt, these will be jars that are required:


sc.addJar("/root/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.1.0.jar")
   
sc.addJar("/root/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar")
   
sc.addJar("/root/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.0.jar")
   sc.addJar("/root/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar")




Thanks
Best Regards

On Sun, Apr 5, 2015 at 12:00 PM, Priya Ch 
wrote:

> Hi All,
>
>   I configured Kafka  cluster on a  single node and I have streaming
> application which reads data from kafka topic using KafkaUtils. When I
> execute the code in local mode from the IDE, the application runs fine.
>
> But when I submit the same to spark cluster in standalone mode, I end up
> with the following exception:
> java.lang.ClassNotFoundException:
> org/apache/spark/streaming/kafka/KafkaUtils.
>
> I am using spark-1.2.1 version. when i checked the source files of
> streaming, the source files related to kafka are missing. Are these not
> included in spark-1.3.0 and spark-1.2.1 versions ?
>
> Have to manually include these ??
>
> Regards,
> Padma Ch
>


Spark streaming with Kafka- couldnt find KafkaUtils

2015-04-04 Thread Priya Ch
Hi All,

  I configured Kafka  cluster on a  single node and I have streaming
application which reads data from kafka topic using KafkaUtils. When I
execute the code in local mode from the IDE, the application runs fine.

But when I submit the same to spark cluster in standalone mode, I end up
with the following exception:
java.lang.ClassNotFoundException:
org/apache/spark/streaming/kafka/KafkaUtils.

I am using spark-1.2.1 version. when i checked the source files of
streaming, the source files related to kafka are missing. Are these not
included in spark-1.3.0 and spark-1.2.1 versions ?

Have to manually include these ??

Regards,
Padma Ch


Re: Spark streaming with Kafka, multiple partitions fail, single partition ok

2015-03-31 Thread Ted Yu
Can you show us the output of DStream#print() if you have it ?

Thanks

On Tue, Mar 31, 2015 at 2:55 AM, Nicolas Phung 
wrote:

> Hello,
>
> @Akhil Das I'm trying to use the experimental API
> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
> .
> I'm reusing the same code snippet to initialize my topicSet.
>
> @Cody Koeninger I don't see any previous error messages (see the full log
> at the end). To create the topic, I'm doing the following :
>
> "kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 
> --partitions 10 --topic toto"
>
> "kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 
> --partitions 1 --topic toto-single"
>
> I'm launching my Spark Streaming in local mode.
>
> @Ted Yu There's no log "Couldn't connect to leader for topic", here's the
> full version :
>
> "spark-submit --conf config.resource=application-integration.conf --class
> nextgen.Main assembly-0.1-SNAPSHOT.jar
>
> 15/03/31 10:47:12 INFO SecurityManager: Changing view acls to: nphung
> 15/03/31 10:47:12 INFO SecurityManager: Changing modify acls to: nphung
> 15/03/31 10:47:12 INFO SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users with view permissions: Set(nphung); users 
> with modify permissions: Set(nphung)
> 15/03/31 10:47:13 INFO Slf4jLogger: Slf4jLogger started
> 15/03/31 10:47:13 INFO Remoting: Starting remoting
> 15/03/31 10:47:13 INFO Remoting: Remoting started; listening on addresses 
> :[akka.tcp://sparkDriver@int.local:44180]
> 15/03/31 10:47:13 INFO Remoting: Remoting now listens on addresses: 
> [akka.tcp://sparkDriver@int.local:44180]
> 15/03/31 10:47:13 INFO Utils: Successfully started service 'sparkDriver' on 
> port 44180.
> 15/03/31 10:47:13 INFO SparkEnv: Registering MapOutputTracker
> 15/03/31 10:47:13 INFO SparkEnv: Registering BlockManagerMaster
> 15/03/31 10:47:13 INFO DiskBlockManager: Created local directory at 
> /tmp/spark-local-20150331104713-2238
> 15/03/31 10:47:13 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
> 15/03/31 10:47:15 INFO HttpFileServer: HTTP File server directory is 
> /tmp/spark-2c8e34a0-bec3-4f1e-9fe7-83e08efc4f53
> 15/03/31 10:47:15 INFO HttpServer: Starting HTTP Server
> 15/03/31 10:47:15 INFO Utils: Successfully started service 'HTTP file server' 
> on port 50204.
> 15/03/31 10:47:15 INFO Utils: Successfully started service 'SparkUI' on port 
> 4040.
> 15/03/31 10:47:15 INFO SparkUI: Started SparkUI at http://int.local:4040
> 15/03/31 10:47:16 INFO SparkContext: Added JAR 
> file:/home/nphung/assembly-0.1-SNAPSHOT.jar at 
> http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar with timestamp 
> 1427791636151
> 15/03/31 10:47:16 INFO AkkaUtils: Connecting to HeartbeatReceiver: 
> akka.tcp://sparkDriver@int.local:44180/user/HeartbeatReceiver
> 15/03/31 10:47:16 INFO NettyBlockTransferService: Server created on 40630
> 15/03/31 10:47:16 INFO BlockManagerMaster: Trying to register BlockManager
> 15/03/31 10:47:16 INFO BlockManagerMasterActor: Registering block manager 
> localhost:40630 with 265.1 MB RAM, BlockManagerId(, localhost, 40630)
> 15/03/31 10:47:16 INFO BlockManagerMaster: Registered BlockManager
> 15/03/31 10:47:17 INFO EventLoggingListener: Logging events to 
> hdfs://int.local:8020/user/spark/applicationHistory/local-1427791636195
> 15/03/31 10:47:17 INFO VerifiableProperties: Verifying properties
> 15/03/31 10:47:17 INFO VerifiableProperties: Property group.id is overridden 
> to
> 15/03/31 10:47:17 INFO VerifiableProperties: Property zookeeper.connect is 
> overridden to
> 15/03/31 10:47:17 INFO ForEachDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Slide time = 2000 ms
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Storage level = 
> StorageLevel(false, false, false, false, 1)
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Checkpoint interval = null
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Remember duration = 2000 ms
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Initialized and validated 
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1daf3b44
> 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms
> 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, 
> false, false, false, 1)
> 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null
> 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms
> 15/03/31 10:47:17 IN

Re: Spark streaming with Kafka, multiple partitions fail, single partition ok

2015-03-31 Thread Nicolas Phung
Hello,

@Akhil Das I'm trying to use the experimental API
https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
.
I'm reusing the same code snippet to initialize my topicSet.

@Cody Koeninger I don't see any previous error messages (see the full log
at the end). To create the topic, I'm doing the following :

"kafka-topics --create --zookeeper localhost:2181 --replication-factor
1 --partitions 10 --topic toto"

"kafka-topics --create --zookeeper localhost:2181 --replication-factor
1 --partitions 1 --topic toto-single"

I'm launching my Spark Streaming in local mode.

@Ted Yu There's no log "Couldn't connect to leader for topic", here's the
full version :

"spark-submit --conf config.resource=application-integration.conf --class
nextgen.Main assembly-0.1-SNAPSHOT.jar

15/03/31 10:47:12 INFO SecurityManager: Changing view acls to: nphung
15/03/31 10:47:12 INFO SecurityManager: Changing modify acls to: nphung
15/03/31 10:47:12 INFO SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(nphung); users with modify permissions: Set(nphung)
15/03/31 10:47:13 INFO Slf4jLogger: Slf4jLogger started
15/03/31 10:47:13 INFO Remoting: Starting remoting
15/03/31 10:47:13 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://sparkDriver@int.local:44180]
15/03/31 10:47:13 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkDriver@int.local:44180]
15/03/31 10:47:13 INFO Utils: Successfully started service
'sparkDriver' on port 44180.
15/03/31 10:47:13 INFO SparkEnv: Registering MapOutputTracker
15/03/31 10:47:13 INFO SparkEnv: Registering BlockManagerMaster
15/03/31 10:47:13 INFO DiskBlockManager: Created local directory at
/tmp/spark-local-20150331104713-2238
15/03/31 10:47:13 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
15/03/31 10:47:15 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-2c8e34a0-bec3-4f1e-9fe7-83e08efc4f53
15/03/31 10:47:15 INFO HttpServer: Starting HTTP Server
15/03/31 10:47:15 INFO Utils: Successfully started service 'HTTP file
server' on port 50204.
15/03/31 10:47:15 INFO Utils: Successfully started service 'SparkUI'
on port 4040.
15/03/31 10:47:15 INFO SparkUI: Started SparkUI at http://int.local:4040
15/03/31 10:47:16 INFO SparkContext: Added JAR
file:/home/nphung/assembly-0.1-SNAPSHOT.jar at
http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar with
timestamp 1427791636151
15/03/31 10:47:16 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@int.local:44180/user/HeartbeatReceiver
15/03/31 10:47:16 INFO NettyBlockTransferService: Server created on 40630
15/03/31 10:47:16 INFO BlockManagerMaster: Trying to register BlockManager
15/03/31 10:47:16 INFO BlockManagerMasterActor: Registering block
manager localhost:40630 with 265.1 MB RAM, BlockManagerId(,
localhost, 40630)
15/03/31 10:47:16 INFO BlockManagerMaster: Registered BlockManager
15/03/31 10:47:17 INFO EventLoggingListener: Logging events to
hdfs://int.local:8020/user/spark/applicationHistory/local-1427791636195
15/03/31 10:47:17 INFO VerifiableProperties: Verifying properties
15/03/31 10:47:17 INFO VerifiableProperties: Property group.id is overridden to
15/03/31 10:47:17 INFO VerifiableProperties: Property
zookeeper.connect is overridden to
15/03/31 10:47:17 INFO ForEachDStream: metadataCleanupDelay = -1
15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
15/03/31 10:47:17 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
15/03/31 10:47:17 INFO DirectKafkaInputDStream: Slide time = 2000 ms
15/03/31 10:47:17 INFO DirectKafkaInputDStream: Storage level =
StorageLevel(false, false, false, false, 1)
15/03/31 10:47:17 INFO DirectKafkaInputDStream: Checkpoint interval = null
15/03/31 10:47:17 INFO DirectKafkaInputDStream: Remember duration = 2000 ms
15/03/31 10:47:17 INFO DirectKafkaInputDStream: Initialized and
validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1daf3b44
15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms
15/03/31 10:47:17 INFO MappedDStream: Storage level =
StorageLevel(false, false, false, false, 1)
15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null
15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms
15/03/31 10:47:17 INFO MappedDStream: Initialized and validated
org.apache.spark.streaming.dstream.MappedDStream@7fd8c559
15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms
15/03/31 10:47:17 INFO MappedDStream: Storage level =
StorageLevel(false, false, false, false, 1)
15/03/31 10:47:17 INFO MappedDStream: Checkpoint inte

Re: Spark streaming with Kafka, multiple partitions fail, single partition ok

2015-03-30 Thread Ted Yu
Nicolas:
See if there was occurrence of the following exception in the log:
  errs => throw new SparkException(
s"Couldn't connect to leader for topic ${part.topic}
${part.partition}: " +
  errs.mkString("\n")),

Cheers

On Mon, Mar 30, 2015 at 9:40 AM, Cody Koeninger  wrote:

> This line
>
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(
> KafkaRDD.scala:158)
>
> is the attempt to close the underlying kafka simple consumer.
>
> We can add a null pointer check, but the underlying issue of the consumer
> being null probably indicates a problem earlier.  Do you see any previous
> error messages?
>
> Also, can you clarify for the successful and failed cases which topics you
> are attempting this on, how many partitions there are, and whether there
> are messages in the partitions?  There's an existing jira regarding empty
> partitions.
>
>
>
>
> On Mon, Mar 30, 2015 at 11:05 AM, Nicolas Phung 
> wrote:
>
>> Hello,
>>
>> I'm using spark-streaming-kafka 1.3.0 with the new consumer "Approach 2:
>> Direct Approach (No Receivers)" (
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html).
>> I'm using the following code snippets :
>>
>> // Create direct kafka stream with brokers and topics
>> val messages = KafkaUtils.createDirectStream[String, Array[Byte],
>> StringDecoder, DefaultDecoder](
>> ssc, kafkaParams, topicsSet)
>>
>> // Get the stuff from Kafka and print them
>> val raw = messages.map(_._2)
>> val dStream: DStream[RawScala] = raw.map(
>> byte => {
>> // Avro Decoder
>> println("Byte length: " + byte.length)
>> val rawDecoder = new AvroDecoder[Raw](schema = Raw.getClassSchema)
>> RawScala.toScala(rawDecoder.fromBytes(byte))
>> }
>> )
>> // Useful for debug
>> dStream.print()
>>
>> I launch my Spark Streaming and everything is fine if there's no incoming
>> logs from Kafka. When I'm sending a log, I got the following error :
>>
>> 15/03/30 17:34:40 ERROR TaskContextImpl: Error in TaskCompletionListener
>> java.lang.NullPointerException
>> at
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
>> at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
>> at
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
>> at
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
>> at
>> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
>> at
>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
>> at
>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:58)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 3.0 in stage 28.0
>> (TID 94) in 12 ms on localhost (2/4)
>> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 2.0 in stage 28.0
>> (TID 93) in 13 ms on localhost (3/4)
>> 15/03/30 17:34:40 ERROR Executor: Exception in task 0.0 in stage 28.0
>> (TID 91)
>> org.apache.spark.util.TaskCompletionListenerException
>> at
>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
>> at org.apache.spark.scheduler.Task.run(Task.scala:58)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> 15/03/30 17:34:40 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID
>> 91, localhost): org.apache.spark.util.TaskCompletionListenerException
>> at
>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
>> at org.apache.spark.scheduler.Task.run(Task.scala:58)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 15/03/30 17:34:40 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1
>> times; aborting job
>> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Removed TaskSet 28.0, whose
>> tasks have all completed, from pool
>> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Cancelling stage 

Re: Spark streaming with Kafka, multiple partitions fail, single partition ok

2015-03-30 Thread Cody Koeninger
This line

at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(
KafkaRDD.scala:158)

is the attempt to close the underlying kafka simple consumer.

We can add a null pointer check, but the underlying issue of the consumer
being null probably indicates a problem earlier.  Do you see any previous
error messages?

Also, can you clarify for the successful and failed cases which topics you
are attempting this on, how many partitions there are, and whether there
are messages in the partitions?  There's an existing jira regarding empty
partitions.




On Mon, Mar 30, 2015 at 11:05 AM, Nicolas Phung 
wrote:

> Hello,
>
> I'm using spark-streaming-kafka 1.3.0 with the new consumer "Approach 2:
> Direct Approach (No Receivers)" (
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html).
> I'm using the following code snippets :
>
> // Create direct kafka stream with brokers and topics
> val messages = KafkaUtils.createDirectStream[String, Array[Byte],
> StringDecoder, DefaultDecoder](
> ssc, kafkaParams, topicsSet)
>
> // Get the stuff from Kafka and print them
> val raw = messages.map(_._2)
> val dStream: DStream[RawScala] = raw.map(
> byte => {
> // Avro Decoder
> println("Byte length: " + byte.length)
> val rawDecoder = new AvroDecoder[Raw](schema = Raw.getClassSchema)
> RawScala.toScala(rawDecoder.fromBytes(byte))
> }
> )
> // Useful for debug
> dStream.print()
>
> I launch my Spark Streaming and everything is fine if there's no incoming
> logs from Kafka. When I'm sending a log, I got the following error :
>
> 15/03/30 17:34:40 ERROR TaskContextImpl: Error in TaskCompletionListener
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
> at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
> at
> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
> at
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
> at
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:58)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 3.0 in stage 28.0
> (TID 94) in 12 ms on localhost (2/4)
> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 2.0 in stage 28.0
> (TID 93) in 13 ms on localhost (3/4)
> 15/03/30 17:34:40 ERROR Executor: Exception in task 0.0 in stage 28.0 (TID
> 91)
> org.apache.spark.util.TaskCompletionListenerException
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
> at org.apache.spark.scheduler.Task.run(Task.scala:58)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 15/03/30 17:34:40 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID
> 91, localhost): org.apache.spark.util.TaskCompletionListenerException
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
> at org.apache.spark.scheduler.Task.run(Task.scala:58)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> 15/03/30 17:34:40 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1
> times; aborting job
> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Removed TaskSet 28.0, whose
> tasks have all completed, from pool
> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Cancelling stage 28
> 15/03/30 17:34:40 INFO DAGScheduler: Job 28 failed: print at
> HotFCANextGen.scala:63, took 0,041068 s
> 15/03/30 17:34:40 INFO JobScheduler: Starting job streaming job
> 142772968 ms.1 from job set of time 142772968 ms
> 15/03/30 17:34:40 ERROR JobScheduler: Error running job streaming job
> 142772968 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 28.0 failed 1 times,

Re: Spark streaming with Kafka, multiple partitions fail, single partition ok

2015-03-30 Thread Akhil Das
Did you try this example?

https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala

I think you need to create a topic set with # partitions to consume.

Thanks
Best Regards

On Mon, Mar 30, 2015 at 9:35 PM, Nicolas Phung 
wrote:

> Hello,
>
> I'm using spark-streaming-kafka 1.3.0 with the new consumer "Approach 2:
> Direct Approach (No Receivers)" (
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html).
> I'm using the following code snippets :
>
> // Create direct kafka stream with brokers and topics
> val messages = KafkaUtils.createDirectStream[String, Array[Byte],
> StringDecoder, DefaultDecoder](
> ssc, kafkaParams, topicsSet)
>
> // Get the stuff from Kafka and print them
> val raw = messages.map(_._2)
> val dStream: DStream[RawScala] = raw.map(
> byte => {
> // Avro Decoder
> println("Byte length: " + byte.length)
> val rawDecoder = new AvroDecoder[Raw](schema = Raw.getClassSchema)
> RawScala.toScala(rawDecoder.fromBytes(byte))
> }
> )
> // Useful for debug
> dStream.print()
>
> I launch my Spark Streaming and everything is fine if there's no incoming
> logs from Kafka. When I'm sending a log, I got the following error :
>
> 15/03/30 17:34:40 ERROR TaskContextImpl: Error in TaskCompletionListener
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
> at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
> at
> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
> at
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
> at
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:58)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 3.0 in stage 28.0
> (TID 94) in 12 ms on localhost (2/4)
> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 2.0 in stage 28.0
> (TID 93) in 13 ms on localhost (3/4)
> 15/03/30 17:34:40 ERROR Executor: Exception in task 0.0 in stage 28.0 (TID
> 91)
> org.apache.spark.util.TaskCompletionListenerException
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
> at org.apache.spark.scheduler.Task.run(Task.scala:58)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 15/03/30 17:34:40 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID
> 91, localhost): org.apache.spark.util.TaskCompletionListenerException
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
> at org.apache.spark.scheduler.Task.run(Task.scala:58)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> 15/03/30 17:34:40 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1
> times; aborting job
> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Removed TaskSet 28.0, whose
> tasks have all completed, from pool
> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Cancelling stage 28
> 15/03/30 17:34:40 INFO DAGScheduler: Job 28 failed: print at
> HotFCANextGen.scala:63, took 0,041068 s
> 15/03/30 17:34:40 INFO JobScheduler: Starting job streaming job
> 142772968 ms.1 from job set of time 142772968 ms
> 15/03/30 17:34:40 ERROR JobScheduler: Error running job streaming job
> 142772968 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 28.0 (TID 91, localhost):
> org.apache.spark.util.TaskCompletionListenerException
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
> at org.apache.spark.scheduler.Task.run(Task.scala:58)
> at org.apache.spark.executor.Executor$Task

Spark streaming with Kafka, multiple partitions fail, single partition ok

2015-03-30 Thread Nicolas Phung
Hello,

I'm using spark-streaming-kafka 1.3.0 with the new consumer "Approach 2:
Direct Approach (No Receivers)" (
http://spark.apache.org/docs/latest/streaming-kafka-integration.html). I'm
using the following code snippets :

// Create direct kafka stream with brokers and topics
val messages = KafkaUtils.createDirectStream[String, Array[Byte],
StringDecoder, DefaultDecoder](
ssc, kafkaParams, topicsSet)

// Get the stuff from Kafka and print them
val raw = messages.map(_._2)
val dStream: DStream[RawScala] = raw.map(
byte => {
// Avro Decoder
println("Byte length: " + byte.length)
val rawDecoder = new AvroDecoder[Raw](schema = Raw.getClassSchema)
RawScala.toScala(rawDecoder.fromBytes(byte))
}
)
// Useful for debug
dStream.print()

I launch my Spark Streaming and everything is fine if there's no incoming
logs from Kafka. When I'm sending a log, I got the following error :

15/03/30 17:34:40 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.NullPointerException
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at
org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
at
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
at
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/03/30 17:34:40 INFO TaskSetManager: Finished task 3.0 in stage 28.0 (TID
94) in 12 ms on localhost (2/4)
15/03/30 17:34:40 INFO TaskSetManager: Finished task 2.0 in stage 28.0 (TID
93) in 13 ms on localhost (3/4)
15/03/30 17:34:40 ERROR Executor: Exception in task 0.0 in stage 28.0 (TID
91)
org.apache.spark.util.TaskCompletionListenerException
at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/03/30 17:34:40 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID 91,
localhost): org.apache.spark.util.TaskCompletionListenerException
at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

15/03/30 17:34:40 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1
times; aborting job
15/03/30 17:34:40 INFO TaskSchedulerImpl: Removed TaskSet 28.0, whose tasks
have all completed, from pool
15/03/30 17:34:40 INFO TaskSchedulerImpl: Cancelling stage 28
15/03/30 17:34:40 INFO DAGScheduler: Job 28 failed: print at
HotFCANextGen.scala:63, took 0,041068 s
15/03/30 17:34:40 INFO JobScheduler: Starting job streaming job
142772968 ms.1 from job set of time 142772968 ms
15/03/30 17:34:40 ERROR JobScheduler: Error running job streaming job
142772968 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage
28.0 (TID 91, localhost):
org.apache.spark.util.TaskCompletionListenerException
at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spa

Re: Spark Streaming with Kafka

2015-01-21 Thread Dibyendu Bhattacharya
You can probably try the Low Level Consumer option with Spark 1.2

https://github.com/dibbhatt/kafka-spark-consumer

This Consumer can recover from any underlying failure of Spark Platform or
Kafka and either retry or restart the receiver. This is being working
nicely for us.

Regards,
Dibyendu


On Wed, Jan 21, 2015 at 7:46 AM, firemonk9 
wrote:

> Hi,
>
>I am having similar issues. Have you found any resolution ?
>
> Thank you
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-tp21222p21276.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming with Kafka

2015-01-20 Thread firemonk9
Hi,

   I am having similar issues. Have you found any resolution ?

Thank you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-tp21222p21276.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark Streaming with Kafka

2015-01-19 Thread Shao, Saisai
Hi,

Could you please describing your monitored symptom a little more specifically, 
like is there any exceptions you monitored with Kafka, Zookeeper, what other 
exceptions from Spark side you monitored? With only this short description can 
hardly find any clue.

Thanks
Jerry

From: Eduardo Alfaia [mailto:e.costaalf...@unibs.it]
Sent: Monday, January 19, 2015 1:58 AM
To: Rasika Pohankar; user@spark.apache.org
Subject: R: Spark Streaming with Kafka

I have the same issue.

Da: Rasika Pohankar<mailto:rasikapohan...@gmail.com>
Inviato: ‎18/‎01/‎2015 18:48
A: user@spark.apache.org<mailto:user@spark.apache.org>
Oggetto: Spark Streaming with Kafka
I am using Spark Streaming to process data received through Kafka. The Spark 
version is 1.2.0. I have written the code in Java and am compiling it using 
sbt. The program runs and receives data from Kafka and processes it as well. 
But it stops receiving data suddenly after some time( it has run for an hour up 
till now while receiving data from Kafka and then always stopped receiving). 
The program continues to run, it only stops receiving data. After a while, 
sometimes it starts and sometimes doesn't. So I stop the program and start 
again.
Earlier I was using Spark 1.0.0. Upgraded to check if the problem was in that 
version. But after upgrading also, it is happening.
Is this a known issue? Can someone please help.
Thanking you.



View this message in context: Spark Streaming with 
Kafka<http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-tp21222.html>
Sent from the Apache Spark User List mailing list 
archive<http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.

Informativa sulla Privacy: http://www.unibs.it/node/8155


R: Spark Streaming with Kafka

2015-01-18 Thread Eduardo Alfaia
I have the same issue.

- Messaggio originale -
Da: "Rasika Pohankar" 
Inviato: ‎18/‎01/‎2015 18:48
A: "user@spark.apache.org" 
Oggetto: Spark Streaming with Kafka

I am using Spark Streaming to process data received through Kafka. The Spark 
version is 1.2.0. I have written the code in Java and am compiling it using 
sbt. The program runs and receives data from Kafka and processes it as well. 
But it stops receiving data suddenly after some time( it has run for an hour up 
till now while receiving data from Kafka and then always stopped receiving). 
The program continues to run, it only stops receiving data. After a while, 
sometimes it starts and sometimes doesn't. So I stop the program and start 
again.

Earlier I was using Spark 1.0.0. Upgraded to check if the problem was in that 
version. But after upgrading also, it is happening.


Is this a known issue? Can someone please help.


Thanking you.







View this message in context: Spark Streaming with Kafka
Sent from the Apache Spark User List mailing list archive at Nabble.com.
-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


Spark Streaming with Kafka

2015-01-18 Thread Rasika Pohankar
I am using Spark Streaming to process data received through Kafka. The
Spark version is 1.2.0. I have written the code in Java and am compiling it
using sbt. The program runs and receives data from Kafka and processes it
as well. But it stops receiving data suddenly after some time( it has run
for an hour up till now while receiving data from Kafka and then always
stopped receiving). The program continues to run, it only stops receiving
data. After a while, sometimes it starts and sometimes doesn't. So I stop
the program and start again.
Earlier I was using Spark 1.0.0. Upgraded to check if the problem was in
that version. But after upgrading also, it is happening.

Is this a known issue? Can someone please help.

Thanking you.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-tp21222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark Streaming with Kafka is failing with Error

2014-11-18 Thread Tobias Pfeiffer
Hi,

do you have some logging backend (log4j, logback) on your classpath? This
seems a bit like there is no particular implementation of the abstract
`log()` method available.

Tobias


Spark Streaming with Kafka is failing with Error

2014-11-18 Thread Sourav Chandra
Hi,

While running my spark streaming application built on spark 1.1.0 I am
getting below error.

*14/11/18 15:35:30 ERROR ReceiverTracker: Deregistered receiver for stream
0: Error starting receiver 0 - java.lang.AbstractMethodError*
* at org.apache.spark.Logging$class.log(Logging.scala:52)*
* at
org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:66)*
* at org.apache.spark.Logging$class.logInfo(Logging.scala:59)*
* at
org.apache.spark.streaming.kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:66)*
* at
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:86)*
* at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)*
* at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)*
* at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)*
* at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)*
* at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)*
* at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)*
* at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)*
* at org.apache.spark.scheduler.Task.run(Task.scala:54)*
* at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)*
* at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)*
* at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)*
* at java.lang.Thread.run(Thread.java:722)*



Can you guys please help me out here?
-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: Spark Streaming with Kafka, building project with 'sbt assembly' is extremely slow

2014-09-08 Thread Matt Narrell
I came across this:  https://github.com/xerial/sbt-pack

Until i found this, I was simply using the sbt-assembly plugin (sbt clean 
assembly)

mn

On Sep 4, 2014, at 2:46 PM, Aris  wrote:

> Thanks for answering Daniil - 
> 
> I have SBT version 0.13.5, is that an old version? Seems pretty up-to-date.
> 
> It turns out I figured out a way around this entire problem: just use 'sbt 
> package', and when using bin/spark-submit, pass it the "--jars" option and 
> GIVE IT ALL THE JARS from the local iv2 cache. Pretty inelegant, but at least 
> I am able to develop, and when I want to make a super JAR with sbt assembly I 
> can use the stupidly slow method.
> 
> Here is the important snippet for grabbing all the JARs for the local cache 
> of ivy2 :
> 
>  --jars $(find ~/.ivy2/cache/ -iname *.jar | tr '\n' ,) 
> 
> Here's the entire running command  - 
> 
> bin/spark-submit --master local[*] --jars $(find /home/data/.ivy2/cache/ 
> -iname *.jar | tr '\n' ,) --class KafkaStreamConsumer 
> ~/code_host/data/scala/streamingKafka/target/scala-2.10/streamingkafka_2.10-1.0.jar
>  node1:2181 my-consumer-group aris-topic 1
> 
> This is fairly bad, but it works around sbt assembly being incredibly slow
> 
> 
> On Tue, Sep 2, 2014 at 2:13 PM, Daniil Osipov  
> wrote:
> What version of sbt are you using? There is a bug in early version of 0.13 
> that causes assembly to be extremely slow - make sure you're using the latest 
> one.
> 
> 
> On Fri, Aug 29, 2014 at 1:30 PM, Aris <> wrote:
> Hi folks,
> 
> I am trying to use Kafka with Spark Streaming, and it appears I cannot do the 
> normal 'sbt package' as I do with other Spark applications, such as Spark 
> alone or Spark with MLlib. I learned I have to build with the sbt-assembly 
> plugin.
> 
> OK, so here is my build.sbt file for my extremely simple test Kafka/Spark 
> Streaming project. It Takes almost 30 minutes to build! This is a Centos 
> Linux machine on SSDs with 4GB of RAM, it's never been slow for me. To 
> compare, sbt assembly for the entire Spark project itself takes less than 10 
> minutes.
> 
> At the bottom of this file I am trying to play with 'cacheOutput' options, 
> because I read online that maybe I am calculating SHA-1 for all the *.class 
> files in this super JAR. 
> 
> I also copied the mergeStrategy from Spark contributor TD Spark Streaming 
> tutorial from Spark Summit 2014.
> 
> Again, is there some better way to build this JAR file, just using sbt 
> package? This is process is working, but very slow.
> 
> Any help with speeding up this compilation is really appreciated!!
> 
> Aris
> 
> -
> 
> import AssemblyKeys._ // put this at the top of the file
> 
> name := "streamingKafka"
> 
> version := "1.0"
> 
> scalaVersion := "2.10.4"
> 
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % "1.0.1" % "provided",
>   "org.apache.spark" %% "spark-streaming" % "1.0.1" % "provided",
>   "org.apache.spark" %% "spark-streaming-kafka" % "1.0.1"
> )
> 
> assemblySettings
> 
> jarName in assembly := "streamingkafka-assembly.jar"
> 
> mergeStrategy in assembly := {
>   case m if m.toLowerCase.endsWith("manifest.mf")  => 
> MergeStrategy.discard
>   case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  => 
> MergeStrategy.discard
>   case "log4j.properties"  => 
> MergeStrategy.discard
>   case m if m.toLowerCase.startsWith("meta-inf/services/") => 
> MergeStrategy.filterDistinctLines
>   case "reference.conf"=> 
> MergeStrategy.concat
>   case _   => 
> MergeStrategy.first
> }
> 
> assemblyOption in assembly ~= { _.copy(cacheOutput = false) }
> 
> 
> 



Re: Spark Streaming with Kafka, building project with 'sbt assembly' is extremely slow

2014-09-04 Thread Aris
Thanks for answering Daniil -

I have SBT version 0.13.5, is that an old version? Seems pretty up-to-date.

It turns out I figured out a way around this entire problem: just use 'sbt
package', and when using bin/spark-submit, pass it the "--jars" option and
GIVE IT ALL THE JARS from the local iv2 cache. Pretty inelegant, but at
least I am able to develop, and when I want to make a super JAR with sbt
assembly I can use the stupidly slow method.

Here is the important snippet for grabbing all the JARs for the local cache
of ivy2 :

 --jars $(find ~/.ivy2/cache/ -iname *.jar | tr '\n' ,)

Here's the entire running command  -

bin/spark-submit --master local[*] --jars $(find /home/data/.ivy2/cache/
-iname *.jar | tr '\n' ,) --class KafkaStreamConsumer
~/code_host/data/scala/streamingKafka/target/scala-2.10/streamingkafka_2.10-1.0.jar
node1:2181 my-consumer-group aris-topic 1

This is fairly bad, but it works around sbt assembly being incredibly slow


On Tue, Sep 2, 2014 at 2:13 PM, Daniil Osipov 
wrote:

> What version of sbt are you using? There is a bug in early version of 0.13
> that causes assembly to be extremely slow - make sure you're using the
> latest one.
>
>
> On Fri, Aug 29, 2014 at 1:30 PM, Aris <> wrote:
>
>> Hi folks,
>>
>> I am trying to use Kafka with Spark Streaming, and it appears I cannot do
>> the normal 'sbt package' as I do with other Spark applications, such as
>> Spark alone or Spark with MLlib. I learned I have to build with the
>> sbt-assembly plugin.
>>
>> OK, so here is my build.sbt file for my extremely simple test Kafka/Spark
>> Streaming project. It Takes almost 30 minutes to build! This is a Centos
>> Linux machine on SSDs with 4GB of RAM, it's never been slow for me. To
>> compare, sbt assembly for the entire Spark project itself takes less than
>> 10 minutes.
>>
>> At the bottom of this file I am trying to play with 'cacheOutput'
>> options, because I read online that maybe I am calculating SHA-1 for all
>> the *.class files in this super JAR.
>>
>> I also copied the mergeStrategy from Spark contributor TD Spark Streaming
>> tutorial from Spark Summit 2014.
>>
>> Again, is there some better way to build this JAR file, just using sbt
>> package? This is process is working, but very slow.
>>
>> Any help with speeding up this compilation is really appreciated!!
>>
>> Aris
>>
>> -
>>
>> import AssemblyKeys._ // put this at the top of the file
>>
>> name := "streamingKafka"
>>
>> version := "1.0"
>>
>> scalaVersion := "2.10.4"
>>
>> libraryDependencies ++= Seq(
>>   "org.apache.spark" %% "spark-core" % "1.0.1" % "provided",
>>   "org.apache.spark" %% "spark-streaming" % "1.0.1" % "provided",
>>   "org.apache.spark" %% "spark-streaming-kafka" % "1.0.1"
>> )
>>
>> assemblySettings
>>
>> jarName in assembly := "streamingkafka-assembly.jar"
>>
>> mergeStrategy in assembly := {
>>   case m if m.toLowerCase.endsWith("manifest.mf")  =>
>> MergeStrategy.discard
>>   case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  =>
>> MergeStrategy.discard
>>   case "log4j.properties"  =>
>> MergeStrategy.discard
>>   case m if m.toLowerCase.startsWith("meta-inf/services/") =>
>> MergeStrategy.filterDistinctLines
>>   case "reference.conf"=>
>> MergeStrategy.concat
>>   case _   =>
>> MergeStrategy.first
>> }
>>
>> assemblyOption in assembly ~= { _.copy(cacheOutput = false) }
>>
>>
>


Re: Spark Streaming with Kafka, building project with 'sbt assembly' is extremely slow

2014-09-02 Thread Daniil Osipov
What version of sbt are you using? There is a bug in early version of 0.13
that causes assembly to be extremely slow - make sure you're using the
latest one.


On Fri, Aug 29, 2014 at 1:30 PM, Aris  wrote:

> Hi folks,
>
> I am trying to use Kafka with Spark Streaming, and it appears I cannot do
> the normal 'sbt package' as I do with other Spark applications, such as
> Spark alone or Spark with MLlib. I learned I have to build with the
> sbt-assembly plugin.
>
> OK, so here is my build.sbt file for my extremely simple test Kafka/Spark
> Streaming project. It Takes almost 30 minutes to build! This is a Centos
> Linux machine on SSDs with 4GB of RAM, it's never been slow for me. To
> compare, sbt assembly for the entire Spark project itself takes less than
> 10 minutes.
>
> At the bottom of this file I am trying to play with 'cacheOutput' options,
> because I read online that maybe I am calculating SHA-1 for all the *.class
> files in this super JAR.
>
> I also copied the mergeStrategy from Spark contributor TD Spark Streaming
> tutorial from Spark Summit 2014.
>
> Again, is there some better way to build this JAR file, just using sbt
> package? This is process is working, but very slow.
>
> Any help with speeding up this compilation is really appreciated!!
>
> Aris
>
> -
>
> import AssemblyKeys._ // put this at the top of the file
>
> name := "streamingKafka"
>
> version := "1.0"
>
> scalaVersion := "2.10.4"
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % "1.0.1" % "provided",
>   "org.apache.spark" %% "spark-streaming" % "1.0.1" % "provided",
>   "org.apache.spark" %% "spark-streaming-kafka" % "1.0.1"
> )
>
> assemblySettings
>
> jarName in assembly := "streamingkafka-assembly.jar"
>
> mergeStrategy in assembly := {
>   case m if m.toLowerCase.endsWith("manifest.mf")  =>
> MergeStrategy.discard
>   case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  =>
> MergeStrategy.discard
>   case "log4j.properties"  =>
> MergeStrategy.discard
>   case m if m.toLowerCase.startsWith("meta-inf/services/") =>
> MergeStrategy.filterDistinctLines
>   case "reference.conf"=>
> MergeStrategy.concat
>   case _   =>
> MergeStrategy.first
> }
>
> assemblyOption in assembly ~= { _.copy(cacheOutput = false) }
>
>


Spark Streaming with Kafka, building project with 'sbt assembly' is extremely slow

2014-08-29 Thread Aris
Hi folks,

I am trying to use Kafka with Spark Streaming, and it appears I cannot do
the normal 'sbt package' as I do with other Spark applications, such as
Spark alone or Spark with MLlib. I learned I have to build with the
sbt-assembly plugin.

OK, so here is my build.sbt file for my extremely simple test Kafka/Spark
Streaming project. It Takes almost 30 minutes to build! This is a Centos
Linux machine on SSDs with 4GB of RAM, it's never been slow for me. To
compare, sbt assembly for the entire Spark project itself takes less than
10 minutes.

At the bottom of this file I am trying to play with 'cacheOutput' options,
because I read online that maybe I am calculating SHA-1 for all the *.class
files in this super JAR.

I also copied the mergeStrategy from Spark contributor TD Spark Streaming
tutorial from Spark Summit 2014.

Again, is there some better way to build this JAR file, just using sbt
package? This is process is working, but very slow.

Any help with speeding up this compilation is really appreciated!!

Aris

-

import AssemblyKeys._ // put this at the top of the file

name := "streamingKafka"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.0.1" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.0.1" % "provided",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.0.1"
)

assemblySettings

jarName in assembly := "streamingkafka-assembly.jar"

mergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf")  =>
MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  =>
MergeStrategy.discard
  case "log4j.properties"  =>
MergeStrategy.discard
  case m if m.toLowerCase.startsWith("meta-inf/services/") =>
MergeStrategy.filterDistinctLines
  case "reference.conf"=>
MergeStrategy.concat
  case _   =>
MergeStrategy.first
}

assemblyOption in assembly ~= { _.copy(cacheOutput = false) }


Re: Using Spark Streaming with Kafka 0.7.2

2014-07-29 Thread Andre Schumacher

Hi,

For testing you could also just use the Kafka 0.7.2 console consumer and
pipe it's output to netcat (nc) and process that as in the example

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala

That worked for me. Backporting to the older Kafka version seems tricky
due to all the protocol changes.

Andre

On 07/26/2014 12:56 AM, Tathagata Das wrote:
> Spark Streaming is built as part of the whole Spark repository. Hence
> follow Spark's building instructions
> <http://spark.apache.org/docs/latest/building-with-maven.html> to build
> Spark Streaming along with Spark.
> Spark Streaming 0.8.1 was built with kafka 0.7.2. You can take a look. If
> necessary, I recommend modifying the current Kafka Receiver based on the
> 0.8.1 Kafka Receiver
> <https://github.com/apache/spark/blob/v0.8.1-incubating/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala>
> 
> TD
> 
> 
> On Fri, Jul 25, 2014 at 10:16 AM, maddenpj  wrote:
> 
>> Hi all,
>>
>> Currently we have Kafka 0.7.2 running in production and can't upgrade for
>> external reasons however spark streaming (1.0.1) was built with Kafka
>> 0.8.0.
>> What is the best way to use spark streaming with older versions of Kafka.
>> Currently I'm investigating trying to build spark streaming myself but I
>> can't find any documentation specifically for building spark streaming.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-Streaming-with-Kafka-0-7-2-tp10674.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
> 



Re: Using Spark Streaming with Kafka 0.7.2

2014-07-25 Thread Tathagata Das
Spark Streaming is built as part of the whole Spark repository. Hence
follow Spark's building instructions
<http://spark.apache.org/docs/latest/building-with-maven.html> to build
Spark Streaming along with Spark.
Spark Streaming 0.8.1 was built with kafka 0.7.2. You can take a look. If
necessary, I recommend modifying the current Kafka Receiver based on the
0.8.1 Kafka Receiver
<https://github.com/apache/spark/blob/v0.8.1-incubating/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala>

TD


On Fri, Jul 25, 2014 at 10:16 AM, maddenpj  wrote:

> Hi all,
>
> Currently we have Kafka 0.7.2 running in production and can't upgrade for
> external reasons however spark streaming (1.0.1) was built with Kafka
> 0.8.0.
> What is the best way to use spark streaming with older versions of Kafka.
> Currently I'm investigating trying to build spark streaming myself but I
> can't find any documentation specifically for building spark streaming.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-Streaming-with-Kafka-0-7-2-tp10674.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Using Spark Streaming with Kafka 0.7.2

2014-07-25 Thread maddenpj
Hi all,

Currently we have Kafka 0.7.2 running in production and can't upgrade for
external reasons however spark streaming (1.0.1) was built with Kafka 0.8.0.
What is the best way to use spark streaming with older versions of Kafka.
Currently I'm investigating trying to build spark streaming myself but I
can't find any documentation specifically for building spark streaming.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-Streaming-with-Kafka-0-7-2-tp10674.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming with Kafka NoClassDefFoundError

2014-07-13 Thread Tathagata Das
Alsom the reason the spark-streaming-kafka is not included in the spark
assembly is that we do not want dependencies of external systems like kafka
(which itself probably has a complex dependency tree) to cause conflict
with the core spark's functionality and stability.

TD


On Sun, Jul 13, 2014 at 5:48 PM, Tathagata Das 
wrote:

> In case you still have issues with duplicate files in uber jar, here is a
> reference sbt file with assembly plugin that deals with duplicates
>
>
> https://github.com/databricks/training/blob/sparkSummit2014/streaming/scala/build.sbt
>
>
> On Fri, Jul 11, 2014 at 10:06 AM, Bill Jay 
> wrote:
>
>> You may try to use this one:
>>
>> https://github.com/sbt/sbt-assembly
>>
>> I had an issue of duplicate files in the uber jar file. But I think this
>> library will assemble dependencies into a single jar file.
>>
>> Bill
>>
>>
>> On Fri, Jul 11, 2014 at 1:34 AM, Dilip  wrote:
>>
>>>  A simple
>>> sbt assembly
>>> is not working. Is there any other way to include particular jars with
>>> assembly command?
>>>
>>> Regards,
>>> Dilip
>>>
>>> On Friday 11 July 2014 12:45 PM, Bill Jay wrote:
>>>
>>> I have met similar issues. The reason is probably because in Spark
>>> assembly, spark-streaming-kafka is not included. Currently, I am using
>>> Maven to generate a shaded package with all the dependencies. You may try
>>> to use sbt assembly to include the dependencies in your jar file.
>>>
>>>  Bill
>>>
>>>
>>> On Thu, Jul 10, 2014 at 11:48 PM, Dilip 
>>> wrote:
>>>
  Hi Akhil,

 Can you please guide me through this? Because the code I am running
 already has this in it:
 [java]

 SparkContext sc = new SparkContext();

 sc.addJar("/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar");


 Is there something I am missing?

 Thanks,
 Dilip


 On Friday 11 July 2014 12:02 PM, Akhil Das wrote:

  Easiest fix would be adding the kafka jars to the SparkContext while
 creating it.

  Thanks
 Best Regards


 On Fri, Jul 11, 2014 at 4:39 AM, Dilip 
 wrote:

> Hi,
>
> I am trying to run a program with spark streaming using Kafka on a
> stand alone system. These are my details:
>
> Spark 1.0.0 hadoop2
> Scala 2.10.3
>
> I am trying a simple program using my custom sbt project but this is
> the error I am getting:
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> kafka/serializer/StringDecoder
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94)
> at
> org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
> at SimpleJavaApp.main(SimpleJavaApp.java:40)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException:
> kafka.serializer.StringDecoder
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 11 more
>
>
> here is my .sbt file:
>
> name := "Simple Project"
>
> version := "1.0"
>
> scalaVersion := "2.10.3"
>
> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"
>
> libraryDependencies += "org.apache.spark" %% "spark-streaming" %
> "1.0.0"
>
> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.0.0"
>
> libraryDependencies += "org.apache.spark" %% "spark-examples" % "1.0.0"
>
> libraryDependencies += "org.apache.spark" %
> "spark-streaming-kafka_2.10" % "1.0.0"
>
> libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.0"
>
> resolvers += "Akka Repository" at "http://repo.akka.io/releases/";
>
> resolvers += "Maven Repository" at "http://central.maven.org/maven2/";
>
>
> sbt package was successful. I also tried sbt "++2.10.3 package" to
> build it for my scala version. Problem remains the same.
> Can anyone

Re: Spark Streaming with Kafka NoClassDefFoundError

2014-07-13 Thread Tathagata Das
In case you still have issues with duplicate files in uber jar, here is a
reference sbt file with assembly plugin that deals with duplicates

https://github.com/databricks/training/blob/sparkSummit2014/streaming/scala/build.sbt


On Fri, Jul 11, 2014 at 10:06 AM, Bill Jay 
wrote:

> You may try to use this one:
>
> https://github.com/sbt/sbt-assembly
>
> I had an issue of duplicate files in the uber jar file. But I think this
> library will assemble dependencies into a single jar file.
>
> Bill
>
>
> On Fri, Jul 11, 2014 at 1:34 AM, Dilip  wrote:
>
>>  A simple
>> sbt assembly
>> is not working. Is there any other way to include particular jars with
>> assembly command?
>>
>> Regards,
>> Dilip
>>
>> On Friday 11 July 2014 12:45 PM, Bill Jay wrote:
>>
>> I have met similar issues. The reason is probably because in Spark
>> assembly, spark-streaming-kafka is not included. Currently, I am using
>> Maven to generate a shaded package with all the dependencies. You may try
>> to use sbt assembly to include the dependencies in your jar file.
>>
>>  Bill
>>
>>
>> On Thu, Jul 10, 2014 at 11:48 PM, Dilip  wrote:
>>
>>>  Hi Akhil,
>>>
>>> Can you please guide me through this? Because the code I am running
>>> already has this in it:
>>> [java]
>>>
>>> SparkContext sc = new SparkContext();
>>>
>>> sc.addJar("/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar");
>>>
>>>
>>> Is there something I am missing?
>>>
>>> Thanks,
>>> Dilip
>>>
>>>
>>> On Friday 11 July 2014 12:02 PM, Akhil Das wrote:
>>>
>>>  Easiest fix would be adding the kafka jars to the SparkContext while
>>> creating it.
>>>
>>>  Thanks
>>> Best Regards
>>>
>>>
>>> On Fri, Jul 11, 2014 at 4:39 AM, Dilip  wrote:
>>>
 Hi,

 I am trying to run a program with spark streaming using Kafka on a
 stand alone system. These are my details:

 Spark 1.0.0 hadoop2
 Scala 2.10.3

 I am trying a simple program using my custom sbt project but this is
 the error I am getting:

 Exception in thread "main" java.lang.NoClassDefFoundError:
 kafka/serializer/StringDecoder
 at
 org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55)
 at
 org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94)
 at
 org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
 at SimpleJavaApp.main(SimpleJavaApp.java:40)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.lang.ClassNotFoundException:
 kafka.serializer.StringDecoder
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 11 more


 here is my .sbt file:

 name := "Simple Project"

 version := "1.0"

 scalaVersion := "2.10.3"

 libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"

 libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0"

 libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.0.0"

 libraryDependencies += "org.apache.spark" %% "spark-examples" % "1.0.0"

 libraryDependencies += "org.apache.spark" %
 "spark-streaming-kafka_2.10" % "1.0.0"

 libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.0"

 resolvers += "Akka Repository" at "http://repo.akka.io/releases/";

 resolvers += "Maven Repository" at "http://central.maven.org/maven2/";


 sbt package was successful. I also tried sbt "++2.10.3 package" to
 build it for my scala version. Problem remains the same.
 Can anyone help me out here? Ive been stuck on this for quite some time
 now.

 Thank You,
 Dilip

>>>
>>>
>>>
>>
>>
>


  1   2   >