Re: Re: spark+kafka+dynamic resource allocation

2023-01-30 Thread Mich Talebzadeh
Sure, I suggest that you add a note to that Jira and express your interest.

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jan 2023 at 01:56, Lingzhe Sun  wrote:

> Hi Mich,
>
> Thanks for the information. I myself think this open issue should have
> higher priority as more streaming application is being built using spark.
> Hope this becomes a feature in the coming future.
>
> --
> BR
> Lingzhe Sun
>
>
> *From:* Mich Talebzadeh 
> *Date:* 2023-01-30 02:14
> *To:* Lingzhe Sun 
> *CC:* ashok34...@yahoo.com; User 
> *Subject:* Re: Re: spark+kafka+dynamic resource allocation
> Hi,
>
> Spark Structured Streaming currently does not support dynamic allocation
> (see SPARK-24815: Structured Streaming should support dynamic allocation
> <https://issues.apache.org/jira/browse/SPARK-24815>). which is still open
>
>
> Autoscaling in Cloud offerings
> <https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/autoscaling#autoscaling_and_spark_streaming>like
> Google Dataproc does not support Spark Structured Streaming either
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 29 Jan 2023 at 01:49, Lingzhe Sun  wrote:
>
>> Thank you for the response. But the reference does not seem to be
>> answering any of those questions.
>>
>> BS
>> Lingzhe Sun
>>
>>
>> *From:* ashok34...@yahoo.com
>> *Date:* 2023-01-29 04:01
>> *To:* User ; Lingzhe Sun 
>> *Subject:* Re: spark+kafka+dynamic resource allocation
>> Hi,
>>
>> Worth checking this link
>>
>>
>> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>>
>> On Saturday, 28 January 2023 at 06:18:28 GMT, Lingzhe Sun <
>> lingzhe@hirain.com> wrote:
>>
>>
>> Hi all,
>>
>> I'm wondering if dynamic resource allocation works in spark+kafka
>> streaming applications. Here're some questions:
>>
>>- Will structured streaming be supported?
>>- Is the number of consumers always equal to the number of the
>>partitions of subscribed topic (let's say there's only one topic)?
>>- If consumers is evenly distributed across executors, will newly
>>added executor(through dynamic resource allocation) trigger a consumer
>>reassignment?
>>- Would it be simply a bad idea to use dynamic resource allocation in
>>streaming app, because there's no way to scale down number of executors
>>unless no data is coming in?
>>
>> Any thoughts are welcomed.
>>
>> Lingzhe Sun
>> Hirain Technology
>>
>>


Re: Re: spark+kafka+dynamic resource allocation

2023-01-29 Thread Lingzhe Sun
Hi Mich,

Thanks for the information. I myself think this open issue should have higher 
priority as more streaming application is being built using spark. Hope this 
becomes a feature in the coming future.



BR
Lingzhe Sun
 
From: Mich Talebzadeh
Date: 2023-01-30 02:14
To: Lingzhe Sun
CC: ashok34...@yahoo.com; User
Subject: Re: Re: spark+kafka+dynamic resource allocation
Hi,

Spark Structured Streaming currently does not support dynamic allocation (see 
SPARK-24815: Structured Streaming should support dynamic allocation). which is 
still open

Autoscaling in Cloud offerings like Google Dataproc does not support Spark 
Structured Streaming either

HTH

   view my Linkedin profile

 https://en.everybodywiki.com/Mich_Talebzadeh
 
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
 


On Sun, 29 Jan 2023 at 01:49, Lingzhe Sun  wrote:
Thank you for the response. But the reference does not seem to be answering any 
of those questions.

BS
Lingzhe Sun
 
From: ashok34...@yahoo.com
Date: 2023-01-29 04:01
To: User; Lingzhe Sun
Subject: Re: spark+kafka+dynamic resource allocation
Hi,

Worth checking this link

https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

On Saturday, 28 January 2023 at 06:18:28 GMT, Lingzhe Sun 
 wrote: 


Hi all,

I'm wondering if dynamic resource allocation works in spark+kafka streaming 
applications. Here're some questions:
Will structured streaming be supported?
Is the number of consumers always equal to the number of the partitions of 
subscribed topic (let's say there's only one topic)?
If consumers is evenly distributed across executors, will newly added 
executor(through dynamic resource allocation) trigger a consumer reassignment?
Would it be simply a bad idea to use dynamic resource allocation in streaming 
app, because there's no way to scale down number of executors unless no data is 
coming in?
Any thoughts are welcomed.

Lingzhe Sun 
Hirain Technology


Re: Re: spark+kafka+dynamic resource allocation

2023-01-29 Thread Mich Talebzadeh
Hi,

Spark Structured Streaming currently does not support dynamic allocation
(see SPARK-24815: Structured Streaming should support dynamic allocation
<https://issues.apache.org/jira/browse/SPARK-24815>). which is still open


Autoscaling in Cloud offerings
<https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/autoscaling#autoscaling_and_spark_streaming>like
Google Dataproc does not support Spark Structured Streaming either


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 29 Jan 2023 at 01:49, Lingzhe Sun  wrote:

> Thank you for the response. But the reference does not seem to be
> answering any of those questions.
>
> BS
> Lingzhe Sun
>
>
> *From:* ashok34...@yahoo.com
> *Date:* 2023-01-29 04:01
> *To:* User ; Lingzhe Sun 
> *Subject:* Re: spark+kafka+dynamic resource allocation
> Hi,
>
> Worth checking this link
>
>
> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
> On Saturday, 28 January 2023 at 06:18:28 GMT, Lingzhe Sun <
> lingzhe@hirain.com> wrote:
>
>
> Hi all,
>
> I'm wondering if dynamic resource allocation works in spark+kafka
> streaming applications. Here're some questions:
>
>- Will structured streaming be supported?
>- Is the number of consumers always equal to the number of the
>partitions of subscribed topic (let's say there's only one topic)?
>- If consumers is evenly distributed across executors, will newly
>added executor(through dynamic resource allocation) trigger a consumer
>reassignment?
>- Would it be simply a bad idea to use dynamic resource allocation in
>streaming app, because there's no way to scale down number of executors
>unless no data is coming in?
>
> Any thoughts are welcomed.
>
> Lingzhe Sun
> Hirain Technology
>
>


Re: Re: spark+kafka+dynamic resource allocation

2023-01-28 Thread Lingzhe Sun
Thank you for the response. But the reference does not seem to be answering any 
of those questions.

BS
Lingzhe Sun
 
From: ashok34...@yahoo.com
Date: 2023-01-29 04:01
To: User; Lingzhe Sun
Subject: Re: spark+kafka+dynamic resource allocation
Hi,

Worth checking this link

https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

On Saturday, 28 January 2023 at 06:18:28 GMT, Lingzhe Sun 
 wrote: 


Hi all,

I'm wondering if dynamic resource allocation works in spark+kafka streaming 
applications. Here're some questions:
Will structured streaming be supported?
Is the number of consumers always equal to the number of the partitions of 
subscribed topic (let's say there's only one topic)?
If consumers is evenly distributed across executors, will newly added 
executor(through dynamic resource allocation) trigger a consumer reassignment?
Would it be simply a bad idea to use dynamic resource allocation in streaming 
app, because there's no way to scale down number of executors unless no data is 
coming in?
Any thoughts are welcomed.

Lingzhe Sun 
Hirain Technology


Re: spark+kafka+dynamic resource allocation

2023-01-28 Thread ashok34...@yahoo.com.INVALID
 Hi,
Worth checking this link
https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

On Saturday, 28 January 2023 at 06:18:28 GMT, Lingzhe Sun 
 wrote:  
 
 #yiv9684413148 body {line-height:1.5;}#yiv9684413148 ol, #yiv9684413148 ul 
{margin-top:0px;margin-bottom:0px;list-style-position:inside;}#yiv9684413148 
body {font-size:10.5pt;font-family:'Microsoft YaHei UI';color:rgb(0, 0, 
0);line-height:1.5;}#yiv9684413148 body 
{font-size:10.5pt;font-family:'Microsoft YaHei UI';color:rgb(0, 0, 
0);line-height:1.5;}Hi all,
I'm wondering if dynamic resource allocation works in spark+kafka streaming 
applications. Here're some questions:   
   - Will structured streaming be supported?
   - Is the number of consumers always equal to the number of the partitions of 
subscribed topic (let's say there's only one topic)?
   - If consumers is evenly distributed across executors, will newly added 
executor(through dynamic resource allocation) trigger a consumer reassignment?
   - Would it be simply a bad idea to use dynamic resource allocation in 
streaming app, because there's no way to scale down number of executors unless 
no data is coming in?
Any thoughts are welcomed.
Lingzhe SunHirain Technology  

RE: Spark kafka structured streaming - how to prevent dataloss

2022-03-22 Thread Gnanasoundari Soundarajan
Hi all,

Any suggestion?

Regards,
Gnana

From: Gnanasoundari Soundarajan 
Sent: Tuesday, March 8, 2022 10:02 PM
To: user@spark.apache.org
Subject: Spark kafka structured streaming - how to prevent dataloss

Hi,

In spark, it uses checkpoints to keep track of offsets in kafka. If there is 
any data loss, can we edit the file and reduce the data loss? Please suggest 
the best practices to reduce the data loss under exceptional scenarios.

Regards,
Gnana


RE: Spark Kafka Integration

2022-02-25 Thread Michael Williams (SSI)
Ahh, ok.  So, Kafka 3.1 is supported for Spark 3.2.1.  Thank you very much.

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Friday, February 25, 2022 2:50 PM
To: Michael Williams (SSI) 
Cc: user@spark.apache.org
Subject: Re: Spark Kafka Integration

these are the old and news ones

For spark 3.1.1 I needed these jar files to make it work

kafka-clients-2.7.0.jar  --> 
kafka-clients-3.1.0.jar<https://urldefense.com/v3/__https:/repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.1.0/kafka-clients-3.1.0.jar__;!!IPetXT4!kJE9FXhR63484NpSMIlBVr4evPJX7uzsB8-Yyaij23Vi19p8rIhJ9VGv_odk5bK3y6aqXdI$>
commons-pool2-2.9.0.jar  --> 
commons-pool2-2.11.1.jar<https://urldefense.com/v3/__https:/repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar__;!!IPetXT4!kJE9FXhR63484NpSMIlBVr4evPJX7uzsB8-Yyaij23Vi19p8rIhJ9VGv_odk5bK3hIXV3OM$>
spark-streaming_2.12-3.1.1.jar  --> 
spark-streaming_2.12-3.2.1.jar<https://urldefense.com/v3/__https:/repo1.maven.org/maven2/org/apache/spark/spark-streaming_2.12/3.2.1/spark-streaming_2.12-3.2.1.jar__;!!IPetXT4!kJE9FXhR63484NpSMIlBVr4evPJX7uzsB8-Yyaij23Vi19p8rIhJ9VGv_odk5bK3iEMgkFE$>
spark-sql-kafka-0-10_2.12-3.1.0.jar -> 
spark-sql-kafka-0-10_2.12-3.2.1.jar<https://urldefense.com/v3/__https:/repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.2.1/spark-sql-kafka-0-10_2.12-3.2.1.jar__;!!IPetXT4!kJE9FXhR63484NpSMIlBVr4evPJX7uzsB8-Yyaij23Vi19p8rIhJ9VGv_odk5bK3hhNtm10$>



HTH


 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!kJE9FXhR63484NpSMIlBVr4evPJX7uzsB8-Yyaij23Vi19p8rIhJ9VGv_odk5bK3Zlx4tlQ$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!kJE9FXhR63484NpSMIlBVr4evPJX7uzsB8-Yyaij23Vi19p8rIhJ9VGv_odk5bK3nARtlMw$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 20:40, Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>> wrote:
I believe it is 3.1, but if there is a different version that “works better” 
with spark, any advice would be appreciated.  Our entire team is totally new to 
spark and kafka (this is a poc trial).

From: Mich Talebzadeh 
[mailto:mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>]
Sent: Friday, February 25, 2022 2:30 PM
To: Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark Kafka Integration

and what version of kafka do you have 2.7?

for spark 3.1.1 I needed these jar files to make it work

kafka-clients-2.7.0.jar
commons-pool2-2.9.0.jar
spark-streaming_2.12-3.1.1.jar
spark-sql-kafka-0-10_2.12-3.1.0.jar



HTH


 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 20:15, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:
What is the use case? Is this for spark structured streaming?

HTH




 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other propert

RE: Spark Kafka Integration

2022-02-25 Thread Michael Williams (SSI)
Thank you, that is good to know.

From: Sean Owen [mailto:sro...@gmail.com]
Sent: Friday, February 25, 2022 2:46 PM
To: Michael Williams (SSI) 
Cc: Mich Talebzadeh ; user@spark.apache.org
Subject: Re: Spark Kafka Integration

Spark 3.2.1 is compiled vs Kafka 2.8.0; the forthcoming Spark 3.3 against Kafka 
3.1.0.
It may well be mutually compatible though.

On Fri, Feb 25, 2022 at 2:40 PM Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>> wrote:
I believe it is 3.1, but if there is a different version that “works better” 
with spark, any advice would be appreciated.  Our entire team is totally new to 
spark and kafka (this is a poc trial).

From: Mich Talebzadeh 
[mailto:mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>]
Sent: Friday, February 25, 2022 2:30 PM
To: Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark Kafka Integration

and what version of kafka do you have 2.7?

for spark 3.1.1 I needed these jar files to make it work

kafka-clients-2.7.0.jar
commons-pool2-2.9.0.jar
spark-streaming_2.12-3.1.1.jar
spark-sql-kafka-0-10_2.12-3.1.0.jar



HTH


 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 20:15, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:
What is the use case? Is this for spark structured streaming?

HTH




 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>> wrote:
After reviewing Spark's Kafka Integration guide, it indicates that 
spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for Spark 
3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the cleanest, 
most repeatable (reliable) way to acquire these jars for including in a Spark 
Docker image without introducing version compatibility issues?

Thank you,
Mike


This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.


This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.



This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not d

Re: Spark Kafka Integration

2022-02-25 Thread Mich Talebzadeh
these are the old and news ones

For spark 3.1.1 I needed these jar files to make it work

kafka-clients-2.7.0.jar  --> kafka-clients-3.1.0.jar
<https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.1.0/kafka-clients-3.1.0.jar>
commons-pool2-2.9.0.jar  --> commons-pool2-2.11.1.jar
<https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar>
spark-streaming_2.12-3.1.1.jar  --> spark-streaming_2.12-3.2.1.jar
<https://repo1.maven.org/maven2/org/apache/spark/spark-streaming_2.12/3.2.1/spark-streaming_2.12-3.2.1.jar>
spark-sql-kafka-0-10_2.12-3.1.0.jar -> spark-sql-kafka-0-10_2.12-3.2.1.jar
<https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.2.1/spark-sql-kafka-0-10_2.12-3.2.1.jar>


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 25 Feb 2022 at 20:40, Michael Williams (SSI) <
michael.willi...@ssigroup.com> wrote:

> I believe it is 3.1, but if there is a different version that “works
> better” with spark, any advice would be appreciated.  Our entire team is
> totally new to spark and kafka (this is a poc trial).
>
>
>
> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
> *Sent:* Friday, February 25, 2022 2:30 PM
> *To:* Michael Williams (SSI) 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Kafka Integration
>
>
>
> and what version of kafka do you have 2.7?
>
>
>
> for spark 3.1.1 I needed these jar files to make it work
>
>
>
> kafka-clients-2.7.0.jar
> commons-pool2-2.9.0.jar
> spark-streaming_2.12-3.1.1.jar
> spark-sql-kafka-0-10_2.12-3.1.0.jar
>
>
>
> HTH
>
>
>
>view my Linkedin profile
> <https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
> <https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Fri, 25 Feb 2022 at 20:15, Mich Talebzadeh 
> wrote:
>
> What is the use case? Is this for spark structured streaming?
>
>
>
> HTH
>
>
>
>
>
>view my Linkedin profile
> <https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
> <https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) <
> michael.willi...@ssigroup.com> wrote:
>
> After reviewing Spark's Kafka Integration guide, it indicates that
> spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for
> Spark 3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the
> cleanest, most repeatable (reliable) way to acquire these jars for
> including in a Spark Docker image without introducing version compatibility
> issues?
>
>
>
> Thank you,
>
> Mike
>
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deli

Re: Spark Kafka Integration

2022-02-25 Thread Sean Owen
Spark 3.2.1 is compiled vs Kafka 2.8.0; the forthcoming Spark 3.3 against
Kafka 3.1.0.
It may well be mutually compatible though.

On Fri, Feb 25, 2022 at 2:40 PM Michael Williams (SSI) <
michael.willi...@ssigroup.com> wrote:

> I believe it is 3.1, but if there is a different version that “works
> better” with spark, any advice would be appreciated.  Our entire team is
> totally new to spark and kafka (this is a poc trial).
>
>
>
> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
> *Sent:* Friday, February 25, 2022 2:30 PM
> *To:* Michael Williams (SSI) 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Kafka Integration
>
>
>
> and what version of kafka do you have 2.7?
>
>
>
> for spark 3.1.1 I needed these jar files to make it work
>
>
>
> kafka-clients-2.7.0.jar
> commons-pool2-2.9.0.jar
> spark-streaming_2.12-3.1.1.jar
> spark-sql-kafka-0-10_2.12-3.1.0.jar
>
>
>
> HTH
>
>
>
>view my Linkedin profile
> <https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
> <https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Fri, 25 Feb 2022 at 20:15, Mich Talebzadeh 
> wrote:
>
> What is the use case? Is this for spark structured streaming?
>
>
>
> HTH
>
>
>
>
>
>view my Linkedin profile
> <https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
> <https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) <
> michael.willi...@ssigroup.com> wrote:
>
> After reviewing Spark's Kafka Integration guide, it indicates that
> spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for
> Spark 3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the
> cleanest, most repeatable (reliable) way to acquire these jars for
> including in a Spark Docker image without introducing version compatibility
> issues?
>
>
>
> Thank you,
>
> Mike
>
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>


RE: Spark Kafka Integration

2022-02-25 Thread Michael Williams (SSI)
Thank you

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Friday, February 25, 2022 2:35 PM
To: Michael Williams (SSI) 
Cc: Sean Owen ; user@spark.apache.org
Subject: Re: Spark Kafka Integration

please see my earlier reply for 3.1.1 tested and worked in Google Dataproc 
environment

Also this article of mine may be useful

Processing Change Data Capture with Spark Structured 
Streaming<https://urldefense.com/v3/__https:/www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/__;!!IPetXT4!gikpPwpJyYjmHQoX8PFvKTkBOOZ8_AsRuHBnyxqAZxWiejSbWOTRn16Kh1Sd8F-4--GdsLI$>

HTH




 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gikpPwpJyYjmHQoX8PFvKTkBOOZ8_AsRuHBnyxqAZxWiejSbWOTRn16Kh1Sd8F-4OpVyh2c$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gikpPwpJyYjmHQoX8PFvKTkBOOZ8_AsRuHBnyxqAZxWiejSbWOTRn16Kh1Sd8F-4-AqCCmk$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 20:30, Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>> wrote:
The use case is for spark structured streaming (a spark app will be launched by 
a worker service that monitors the kafka topic for new messages, once the 
messages are consumed, the spark app will terminate), but if there is a hitch 
here, it is that the Spark environment includes the MS dotnet for Spark 
wrapper, which means the each spark app will consume from one kafka topic and 
will be written in C#.  If possible, I’d really like to be able to manually 
download the necessary jars and do the kafka client installation as part of the 
docker image build, so that the dependencies already exist on disk.  If that 
makes any sense.

Thank you

From: Mich Talebzadeh 
[mailto:mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>]
Sent: Friday, February 25, 2022 2:16 PM
To: Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark Kafka Integration

What is the use case? Is this for spark structured streaming?

HTH




 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!lCrzhG1VJ33tiaN9wEQbbz1YK22GuptNP0ttoU3MuEFoo5yhyYOunxqT6ntBaiGS-IYaQ48$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!lCrzhG1VJ33tiaN9wEQbbz1YK22GuptNP0ttoU3MuEFoo5yhyYOunxqT6ntBaiGSmSLsUws$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>> wrote:
After reviewing Spark's Kafka Integration guide, it indicates that 
spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for Spark 
3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the cleanest, 
most repeatable (reliable) way to acquire these jars for including in a Spark 
Docker image without introducing version compatibility issues?

Thank you,
Mike


This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.


This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any a

RE: Spark Kafka Integration

2022-02-25 Thread Michael Williams (SSI)
I believe it is 3.1, but if there is a different version that “works better” 
with spark, any advice would be appreciated.  Our entire team is totally new to 
spark and kafka (this is a poc trial).

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Friday, February 25, 2022 2:30 PM
To: Michael Williams (SSI) 
Cc: user@spark.apache.org
Subject: Re: Spark Kafka Integration

and what version of kafka do you have 2.7?

for spark 3.1.1 I needed these jar files to make it work

kafka-clients-2.7.0.jar
commons-pool2-2.9.0.jar
spark-streaming_2.12-3.1.1.jar
spark-sql-kafka-0-10_2.12-3.1.0.jar



HTH


 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 20:15, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:
What is the use case? Is this for spark structured streaming?

HTH




 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>> wrote:
After reviewing Spark's Kafka Integration guide, it indicates that 
spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for Spark 
3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the cleanest, 
most repeatable (reliable) way to acquire these jars for including in a Spark 
Docker image without introducing version compatibility issues?

Thank you,
Mike


This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.



This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.


Re: Spark Kafka Integration

2022-02-25 Thread Mich Talebzadeh
please see my earlier reply for 3.1.1 tested and worked in Google Dataproc
environment

Also this article of mine may be useful

Processing Change Data Capture with Spark Structured Streaming
<https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/>

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 25 Feb 2022 at 20:30, Michael Williams (SSI) <
michael.willi...@ssigroup.com> wrote:

> The use case is for spark structured streaming (a spark app will be
> launched by a worker service that monitors the kafka topic for new
> messages, once the messages are consumed, the spark app will terminate),
> but if there is a hitch here, it is that the Spark environment includes the
> MS dotnet for Spark wrapper, which means the each spark app will consume
> from one kafka topic and will be written in C#.  If possible, I’d really
> like to be able to manually download the necessary jars and do the kafka
> client installation as part of the docker image build, so that the
> dependencies already exist on disk.  If that makes any sense.
>
>
>
> Thank you
>
>
>
> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
> *Sent:* Friday, February 25, 2022 2:16 PM
> *To:* Michael Williams (SSI) 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Kafka Integration
>
>
>
> What is the use case? Is this for spark structured streaming?
>
>
>
> HTH
>
>
>
>
>
>view my Linkedin profile
> <https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!lCrzhG1VJ33tiaN9wEQbbz1YK22GuptNP0ttoU3MuEFoo5yhyYOunxqT6ntBaiGS-IYaQ48$>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
> <https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!lCrzhG1VJ33tiaN9wEQbbz1YK22GuptNP0ttoU3MuEFoo5yhyYOunxqT6ntBaiGSmSLsUws$>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) <
> michael.willi...@ssigroup.com> wrote:
>
> After reviewing Spark's Kafka Integration guide, it indicates that
> spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for
> Spark 3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the
> cleanest, most repeatable (reliable) way to acquire these jars for
> including in a Spark Docker image without introducing version compatibility
> issues?
>
>
>
> Thank you,
>
> Mike
>
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>


Re: Spark Kafka Integration

2022-02-25 Thread Mich Talebzadeh
and what version of kafka do you have 2.7?

for spark 3.1.1 I needed these jar files to make it work

kafka-clients-2.7.0.jar
commons-pool2-2.9.0.jar
spark-streaming_2.12-3.1.1.jar
spark-sql-kafka-0-10_2.12-3.1.0.jar


HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 25 Feb 2022 at 20:15, Mich Talebzadeh 
wrote:

> What is the use case? Is this for spark structured streaming?
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) <
> michael.willi...@ssigroup.com> wrote:
>
>> After reviewing Spark's Kafka Integration guide, it indicates that
>> spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for
>> Spark 3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the
>> cleanest, most repeatable (reliable) way to acquire these jars for
>> including in a Spark Docker image without introducing version compatibility
>> issues?
>>
>>
>>
>> Thank you,
>>
>> Mike
>>
>>
>> This electronic message may contain information that is Proprietary,
>> Confidential, or legally privileged or protected. It is intended only for
>> the use of the individual(s) and entity named in the message. If you are
>> not an intended recipient of this message, please notify the sender
>> immediately and delete the material from your computer. Do not deliver,
>> distribute or copy this message and do not disclose its contents or take
>> any action in reliance on the information it contains. Thank You.
>>
>


RE: Spark Kafka Integration

2022-02-25 Thread Michael Williams (SSI)
The use case is for spark structured streaming (a spark app will be launched by 
a worker service that monitors the kafka topic for new messages, once the 
messages are consumed, the spark app will terminate), but if there is a hitch 
here, it is that the Spark environment includes the MS dotnet for Spark 
wrapper, which means the each spark app will consume from one kafka topic and 
will be written in C#.  If possible, I’d really like to be able to manually 
download the necessary jars and do the kafka client installation as part of the 
docker image build, so that the dependencies already exist on disk.  If that 
makes any sense.

Thank you

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Friday, February 25, 2022 2:16 PM
To: Michael Williams (SSI) 
Cc: user@spark.apache.org
Subject: Re: Spark Kafka Integration

What is the use case? Is this for spark structured streaming?

HTH




 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!lCrzhG1VJ33tiaN9wEQbbz1YK22GuptNP0ttoU3MuEFoo5yhyYOunxqT6ntBaiGS-IYaQ48$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!lCrzhG1VJ33tiaN9wEQbbz1YK22GuptNP0ttoU3MuEFoo5yhyYOunxqT6ntBaiGSmSLsUws$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>> wrote:
After reviewing Spark's Kafka Integration guide, it indicates that 
spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for Spark 
3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the cleanest, 
most repeatable (reliable) way to acquire these jars for including in a Spark 
Docker image without introducing version compatibility issues?

Thank you,
Mike


This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.



This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.


Re: Spark Kafka Integration

2022-02-25 Thread Sean Owen
That .jar is available on Maven, though typically you depend on it in your
app, and compile an uber JAR which will contain it and all its dependencies.
You can I suppose manage to compile an uber JAR from that dependency itself
with tools if needed.

On Fri, Feb 25, 2022 at 1:37 PM Michael Williams (SSI) <
michael.willi...@ssigroup.com> wrote:

> After reviewing Spark's Kafka Integration guide, it indicates that
> spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for
> Spark 3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the
> cleanest, most repeatable (reliable) way to acquire these jars for
> including in a Spark Docker image without introducing version compatibility
> issues?
>
>
>
> Thank you,
>
> Mike
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>


Re: Spark Kafka Integration

2022-02-25 Thread Mich Talebzadeh
What is the use case? Is this for spark structured streaming?

HTH



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) <
michael.willi...@ssigroup.com> wrote:

> After reviewing Spark's Kafka Integration guide, it indicates that
> spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for
> Spark 3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the
> cleanest, most repeatable (reliable) way to acquire these jars for
> including in a Spark Docker image without introducing version compatibility
> issues?
>
>
>
> Thank you,
>
> Mike
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>


Re: Spark Kafka Streaming With Transactional Messages

2020-09-16 Thread jianyangusa
I have the same issue. Do you have a solution? Maybe spark stream not support
transaction message. I use Kafka stream to retrieve the transaction message.
Maybe we can ask Spark support this feature.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Kafka Structured Streaming] Adding partition and topic to the kafka dynamically

2020-08-28 Thread Amit Joshi
Hi Jungtaek,

Thanks for the input. I did tried and it worked.
I got confused earlier after reading some blogs.

Regards
Amit

On Friday, August 28, 2020, Jungtaek Lim 
wrote:

> Hi Amit,
>
> if I remember correctly, you don't need to restart the query to reflect
> the newly added topic and partition, if your subscription covers the topic
> (like subscribe pattern). Please try it out.
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Fri, Aug 28, 2020 at 1:56 PM Amit Joshi 
> wrote:
>
>> Any pointers will be appreciated.
>>
>> On Thursday, August 27, 2020, Amit Joshi 
>> wrote:
>>
>>> Hi All,
>>>
>>> I am trying to understand the effect of adding topics and partitions to
>>> a topic in kafka, which is being consumed by spark structured streaming
>>> applications.
>>>
>>> Do we have to restart the spark structured streaming application to read
>>> from the newly added topic?
>>> Do we have to restart the spark structured streaming application to read
>>> from the newly added partition to a topic?
>>>
>>> Kafka consumers have a meta data refresh property that works without
>>> restarting.
>>>
>>> Thanks advance.
>>>
>>> Regards
>>> Amit Joshi
>>>
>>


Re: [Spark Kafka Structured Streaming] Adding partition and topic to the kafka dynamically

2020-08-28 Thread Gabor Somogyi
Hi Amit,

The answer is no.

G


On Fri, Aug 28, 2020 at 9:16 AM Jungtaek Lim 
wrote:

> Hi Amit,
>
> if I remember correctly, you don't need to restart the query to reflect
> the newly added topic and partition, if your subscription covers the topic
> (like subscribe pattern). Please try it out.
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Fri, Aug 28, 2020 at 1:56 PM Amit Joshi 
> wrote:
>
>> Any pointers will be appreciated.
>>
>> On Thursday, August 27, 2020, Amit Joshi 
>> wrote:
>>
>>> Hi All,
>>>
>>> I am trying to understand the effect of adding topics and partitions to
>>> a topic in kafka, which is being consumed by spark structured streaming
>>> applications.
>>>
>>> Do we have to restart the spark structured streaming application to read
>>> from the newly added topic?
>>> Do we have to restart the spark structured streaming application to read
>>> from the newly added partition to a topic?
>>>
>>> Kafka consumers have a meta data refresh property that works without
>>> restarting.
>>>
>>> Thanks advance.
>>>
>>> Regards
>>> Amit Joshi
>>>
>>


Re: [Spark Kafka Structured Streaming] Adding partition and topic to the kafka dynamically

2020-08-28 Thread Jungtaek Lim
Hi Amit,

if I remember correctly, you don't need to restart the query to reflect the
newly added topic and partition, if your subscription covers the topic
(like subscribe pattern). Please try it out.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Fri, Aug 28, 2020 at 1:56 PM Amit Joshi 
wrote:

> Any pointers will be appreciated.
>
> On Thursday, August 27, 2020, Amit Joshi 
> wrote:
>
>> Hi All,
>>
>> I am trying to understand the effect of adding topics and partitions to a
>> topic in kafka, which is being consumed by spark structured streaming
>> applications.
>>
>> Do we have to restart the spark structured streaming application to read
>> from the newly added topic?
>> Do we have to restart the spark structured streaming application to read
>> from the newly added partition to a topic?
>>
>> Kafka consumers have a meta data refresh property that works without
>> restarting.
>>
>> Thanks advance.
>>
>> Regards
>> Amit Joshi
>>
>


Re: [Spark Kafka Structured Streaming] Adding partition and topic to the kafka dynamically

2020-08-27 Thread Amit Joshi
Any pointers will be appreciated.

On Thursday, August 27, 2020, Amit Joshi  wrote:

> Hi All,
>
> I am trying to understand the effect of adding topics and partitions to a
> topic in kafka, which is being consumed by spark structured streaming
> applications.
>
> Do we have to restart the spark structured streaming application to read
> from the newly added topic?
> Do we have to restart the spark structured streaming application to read
> from the newly added partition to a topic?
>
> Kafka consumers have a meta data refresh property that works without
> restarting.
>
> Thanks advance.
>
> Regards
> Amit Joshi
>


Re: Spark Kafka Streaming with Offset Gaps

2020-05-21 Thread nimmi.cv
Did you resolve this issue ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Charles vinodh
Thanks Dhaval, that fixed the issue. The constant resetting of Kafka
offsets misled me about the issue. Please feel free the answer the SO
question here

if
you would like to..





On Wed, Sep 11, 2019 at 9:03 PM Dhaval Patel 
wrote:

> Hi Charles,
>
> Can you check is any of the case related to output directory and
> checkpoint location mentioned in below link is applicable in your case?
>
> https://kb.databricks.com/streaming/file-sink-streaming.html
>
> Regards
> Dhaval
>
> On Wed, Sep 11, 2019 at 9:29 PM Burak Yavuz  wrote:
>
>> Hey Charles,
>> If you are using maxOffsetsPerTrigger, you will likely rest the offsets
>> every microbatch, because:
>>  1. Spark will figure out a range of offsets to process (let's call them
>> x and y)
>>  2. If these offsets have fallen out of the retention period, Spark will
>> try to set the offset to x which is less than z > y > x.
>>  3. Since z > y, Spark will not process any of the data
>>  4. Goto 1
>>
>> On Wed, Sep 11, 2019, 6:09 PM Charles vinodh 
>> wrote:
>>
>>> Hi Sandish,
>>>
>>> as I have said if the offset reset happens only once that would make
>>> sense. But I am not sure how to explain why the offset reset is happening
>>> for every micro-batch...
>>> ideally once the offset reset happens the app should move to a valid
>>> offset and start consuming data. but in my case for every batch the offset
>>> is getting reset and no data is ever getting generated.
>>>
>>> Thanks,
>>> Charles
>>>
>>> On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN 
>>> wrote:
>>>
 You can see this kind of error, if there is consumer lag more than
 Kafka retention period.
 You will not see any failures if below option is not set.

 Set failOnDataLoss=true option to see failures.

 On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
 wrote:

> The only form of rate limiting I have set is *maxOffsetsPerTrigger *
> and *fetch.message.max.bytes. *
>
> *"*may be that you are trying to process records that have passed the
> retention period within Kafka.*"*
> If the above is true then I should have my offsets reset only once
> ideally when my application starts. But mu offsets are resetting for every
> batch. if my application is using offsets that are no longer available in
> Kafka it will reset to earliest or latest offset available in Kafka and 
> the
> next request made to Kafka should provide proper data. But in case for all
> micro-batches the offsets are getting reseted and the batch is producing 
> no
> data.
>
>
>
> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:
>
>> Do you have rate limiting set on your stream? It may be that you are
>> trying to process records that have passed the retention period within
>> Kafka.
>>
>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am trying to run a spark application ingesting data from Kafka
>>> using the Spark structured streaming and the spark library
>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very 
>>> weird
>>> issue where during execution of all my micro-batches the Kafka consumer 
>>> is
>>> not able to fetch the offsets and its having its offsets reset as show
>>> below in this log
>>>
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>>
>>>
>>> It is reasonable if this resetting happens once in application due
>>> to the fact that the offsets stored in my checkpoint are no longer valid
>>> and will have to reset our offsets to a new value. But I am seeing this
>>> reset happening for every micro batch execution in my streaming job. In 
>>> at
>>> the end the streaming query progress emits the following
>>>
>>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made 
>>> progress: {
>>>   "id" : 

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Dhaval Patel
Hi Charles,

Can you check is any of the case related to output directory and checkpoint
location mentioned in below link is applicable in your case?

https://kb.databricks.com/streaming/file-sink-streaming.html

Regards
Dhaval

On Wed, Sep 11, 2019 at 9:29 PM Burak Yavuz  wrote:

> Hey Charles,
> If you are using maxOffsetsPerTrigger, you will likely rest the offsets
> every microbatch, because:
>  1. Spark will figure out a range of offsets to process (let's call them x
> and y)
>  2. If these offsets have fallen out of the retention period, Spark will
> try to set the offset to x which is less than z > y > x.
>  3. Since z > y, Spark will not process any of the data
>  4. Goto 1
>
> On Wed, Sep 11, 2019, 6:09 PM Charles vinodh 
> wrote:
>
>> Hi Sandish,
>>
>> as I have said if the offset reset happens only once that would make
>> sense. But I am not sure how to explain why the offset reset is happening
>> for every micro-batch...
>> ideally once the offset reset happens the app should move to a valid
>> offset and start consuming data. but in my case for every batch the offset
>> is getting reset and no data is ever getting generated.
>>
>> Thanks,
>> Charles
>>
>> On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN 
>> wrote:
>>
>>> You can see this kind of error, if there is consumer lag more than Kafka
>>> retention period.
>>> You will not see any failures if below option is not set.
>>>
>>> Set failOnDataLoss=true option to see failures.
>>>
>>> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
>>> wrote:
>>>
 The only form of rate limiting I have set is *maxOffsetsPerTrigger *
 and *fetch.message.max.bytes. *

 *"*may be that you are trying to process records that have passed the
 retention period within Kafka.*"*
 If the above is true then I should have my offsets reset only once
 ideally when my application starts. But mu offsets are resetting for every
 batch. if my application is using offsets that are no longer available in
 Kafka it will reset to earliest or latest offset available in Kafka and the
 next request made to Kafka should provide proper data. But in case for all
 micro-batches the offsets are getting reseted and the batch is producing no
 data.



 On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:

> Do you have rate limiting set on your stream? It may be that you are
> trying to process records that have passed the retention period within
> Kafka.
>
> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
> wrote:
>
>>
>> Hi,
>>
>> I am trying to run a spark application ingesting data from Kafka
>> using the Spark structured streaming and the spark library
>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very 
>> weird
>> issue where during execution of all my micro-batches the Kafka consumer 
>> is
>> not able to fetch the offsets and its having its offsets reset as show
>> below in this log
>>
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>
>>
>> It is reasonable if this resetting happens once in application due to
>> the fact that the offsets stored in my checkpoint are no longer valid and
>> will have to reset our offsets to a new value. But I am seeing this reset
>> happening for every micro batch execution in my streaming job. In at the
>> end the streaming query progress emits the following
>>
>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made 
>> progress: {
>>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>>   "name" : null,
>>   "timestamp" : "2019-09-10T15:55:00.000Z",
>>   "batchId" : 189,
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>> "addBatch" : 127,
>> "getBatch" : 0,
>> "getEndOffset" : 0,
>> "queryPlanning" : 24,
>> "setOffsetRange" : 36,
>> 

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Burak Yavuz
Hey Charles,
If you are using maxOffsetsPerTrigger, you will likely rest the offsets
every microbatch, because:
 1. Spark will figure out a range of offsets to process (let's call them x
and y)
 2. If these offsets have fallen out of the retention period, Spark will
try to set the offset to x which is less than z > y > x.
 3. Since z > y, Spark will not process any of the data
 4. Goto 1

On Wed, Sep 11, 2019, 6:09 PM Charles vinodh  wrote:

> Hi Sandish,
>
> as I have said if the offset reset happens only once that would make
> sense. But I am not sure how to explain why the offset reset is happening
> for every micro-batch...
> ideally once the offset reset happens the app should move to a valid
> offset and start consuming data. but in my case for every batch the offset
> is getting reset and no data is ever getting generated.
>
> Thanks,
> Charles
>
> On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN 
> wrote:
>
>> You can see this kind of error, if there is consumer lag more than Kafka
>> retention period.
>> You will not see any failures if below option is not set.
>>
>> Set failOnDataLoss=true option to see failures.
>>
>> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
>> wrote:
>>
>>> The only form of rate limiting I have set is *maxOffsetsPerTrigger *and
>>> *fetch.message.max.bytes. *
>>>
>>> *"*may be that you are trying to process records that have passed the
>>> retention period within Kafka.*"*
>>> If the above is true then I should have my offsets reset only once
>>> ideally when my application starts. But mu offsets are resetting for every
>>> batch. if my application is using offsets that are no longer available in
>>> Kafka it will reset to earliest or latest offset available in Kafka and the
>>> next request made to Kafka should provide proper data. But in case for all
>>> micro-batches the offsets are getting reseted and the batch is producing no
>>> data.
>>>
>>>
>>>
>>> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:
>>>
 Do you have rate limiting set on your stream? It may be that you are
 trying to process records that have passed the retention period within
 Kafka.

 On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
 wrote:

>
> Hi,
>
> I am trying to run a spark application ingesting data from Kafka using
> the Spark structured streaming and the spark library
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
> issue where during execution of all my micro-batches the Kafka consumer is
> not able to fetch the offsets and its having its offsets reset as show
> below in this log
>
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-5 to offset 1168959116.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-1 to offset 1218619371.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-8 to offset 1157205346.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-21 to offset 1255403059.
>
>
> It is reasonable if this resetting happens once in application due to
> the fact that the offsets stored in my checkpoint are no longer valid and
> will have to reset our offsets to a new value. But I am seeing this reset
> happening for every micro batch execution in my streaming job. In at the
> end the streaming query progress emits the following
>
> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made 
> progress: {
>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>   "name" : null,
>   "timestamp" : "2019-09-10T15:55:00.000Z",
>   "batchId" : 189,
>   "numInputRows" : 0,
>   "inputRowsPerSecond" : 0.0,
>   "processedRowsPerSecond" : 0.0,
>   "durationMs" : {
> "addBatch" : 127,
> "getBatch" : 0,
> "getEndOffset" : 0,
> "queryPlanning" : 24,
> "setOffsetRange" : 36,
> "triggerExecution" : 1859,
> "walCommit" : 1032
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
> "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
> "startOffset" : {
>   "my_kafka_topic" : {
> "23" : 1206926686,
> "8" : 1158514946,
> "17" : 1258387219,
> "11" : 1263091642,
> "2" : 1226741128,
> "20" : 

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Charles vinodh
Hi Sandish,

as I have said if the offset reset happens only once that would make sense.
But I am not sure how to explain why the offset reset is happening for
every micro-batch...
ideally once the offset reset happens the app should move to a valid offset
and start consuming data. but in my case for every batch the offset is
getting reset and no data is ever getting generated.

Thanks,
Charles

On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN 
wrote:

> You can see this kind of error, if there is consumer lag more than Kafka
> retention period.
> You will not see any failures if below option is not set.
>
> Set failOnDataLoss=true option to see failures.
>
> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
> wrote:
>
>> The only form of rate limiting I have set is *maxOffsetsPerTrigger *and
>> *fetch.message.max.bytes. *
>>
>> *"*may be that you are trying to process records that have passed the
>> retention period within Kafka.*"*
>> If the above is true then I should have my offsets reset only once
>> ideally when my application starts. But mu offsets are resetting for every
>> batch. if my application is using offsets that are no longer available in
>> Kafka it will reset to earliest or latest offset available in Kafka and the
>> next request made to Kafka should provide proper data. But in case for all
>> micro-batches the offsets are getting reseted and the batch is producing no
>> data.
>>
>>
>>
>> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:
>>
>>> Do you have rate limiting set on your stream? It may be that you are
>>> trying to process records that have passed the retention period within
>>> Kafka.
>>>
>>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
>>> wrote:
>>>

 Hi,

 I am trying to run a spark application ingesting data from Kafka using
 the Spark structured streaming and the spark library
 org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
 issue where during execution of all my micro-batches the Kafka consumer is
 not able to fetch the offsets and its having its offsets reset as show
 below in this log

 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
 groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
  Resetting offset for partition my-topic-5 to offset 1168959116.
 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
 groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
  Resetting offset for partition my-topic-1 to offset 1218619371.
 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
 groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
  Resetting offset for partition my-topic-8 to offset 1157205346.
 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
 groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
  Resetting offset for partition my-topic-21 to offset 1255403059.


 It is reasonable if this resetting happens once in application due to
 the fact that the offsets stored in my checkpoint are no longer valid and
 will have to reset our offsets to a new value. But I am seeing this reset
 happening for every micro batch execution in my streaming job. In at the
 end the streaming query progress emits the following

 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: 
 {
   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
   "name" : null,
   "timestamp" : "2019-09-10T15:55:00.000Z",
   "batchId" : 189,
   "numInputRows" : 0,
   "inputRowsPerSecond" : 0.0,
   "processedRowsPerSecond" : 0.0,
   "durationMs" : {
 "addBatch" : 127,
 "getBatch" : 0,
 "getEndOffset" : 0,
 "queryPlanning" : 24,
 "setOffsetRange" : 36,
 "triggerExecution" : 1859,
 "walCommit" : 1032
   },
   "stateOperators" : [ ],
   "sources" : [ {
 "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
 "startOffset" : {
   "my_kafka_topic" : {
 "23" : 1206926686,
 "8" : 1158514946,
 "17" : 1258387219,
 "11" : 1263091642,
 "2" : 1226741128,
 "20" : 1229560889,
 "5" : 1170304913,
 "14" : 1207333901,
 "4" : 1274242728,
 "13" : 1336386658,
 "22" : 1260210993,
 "7" : 1288639296,
 "16" : 1247462229,
 "10" : 1093157103,
 "1" : 1219904858,
 "19" : 1116269615,
 "9" : 1238935018,
 "18" : 1069224544,
 "12" : 1256018541,
 "3" : 1251150202,
 "21" : 1256774117,
 "15" : 1170591375,
 "6" : 1185108169,
 "24" : 

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Sandish Kumar HN
You can see this kind of error, if there is consumer lag more than Kafka
retention period.
You will not see any failures if below option is not set.

Set failOnDataLoss=true option to see failures.

On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
wrote:

> The only form of rate limiting I have set is *maxOffsetsPerTrigger *and
> *fetch.message.max.bytes. *
>
> *"*may be that you are trying to process records that have passed the
> retention period within Kafka.*"*
> If the above is true then I should have my offsets reset only once ideally
> when my application starts. But mu offsets are resetting for every batch.
> if my application is using offsets that are no longer available in Kafka it
> will reset to earliest or latest offset available in Kafka and the
> next request made to Kafka should provide proper data. But in case for all
> micro-batches the offsets are getting reseted and the batch is producing no
> data.
>
>
>
> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:
>
>> Do you have rate limiting set on your stream? It may be that you are
>> trying to process records that have passed the retention period within
>> Kafka.
>>
>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am trying to run a spark application ingesting data from Kafka using
>>> the Spark structured streaming and the spark library
>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
>>> issue where during execution of all my micro-batches the Kafka consumer is
>>> not able to fetch the offsets and its having its offsets reset as show
>>> below in this log
>>>
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>>
>>>
>>> It is reasonable if this resetting happens once in application due to
>>> the fact that the offsets stored in my checkpoint are no longer valid and
>>> will have to reset our offsets to a new value. But I am seeing this reset
>>> happening for every micro batch execution in my streaming job. In at the
>>> end the streaming query progress emits the following
>>>
>>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
>>>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>>>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>>>   "name" : null,
>>>   "timestamp" : "2019-09-10T15:55:00.000Z",
>>>   "batchId" : 189,
>>>   "numInputRows" : 0,
>>>   "inputRowsPerSecond" : 0.0,
>>>   "processedRowsPerSecond" : 0.0,
>>>   "durationMs" : {
>>> "addBatch" : 127,
>>> "getBatch" : 0,
>>> "getEndOffset" : 0,
>>> "queryPlanning" : 24,
>>> "setOffsetRange" : 36,
>>> "triggerExecution" : 1859,
>>> "walCommit" : 1032
>>>   },
>>>   "stateOperators" : [ ],
>>>   "sources" : [ {
>>> "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
>>> "startOffset" : {
>>>   "my_kafka_topic" : {
>>> "23" : 1206926686,
>>> "8" : 1158514946,
>>> "17" : 1258387219,
>>> "11" : 1263091642,
>>> "2" : 1226741128,
>>> "20" : 1229560889,
>>> "5" : 1170304913,
>>> "14" : 1207333901,
>>> "4" : 1274242728,
>>> "13" : 1336386658,
>>> "22" : 1260210993,
>>> "7" : 1288639296,
>>> "16" : 1247462229,
>>> "10" : 1093157103,
>>> "1" : 1219904858,
>>> "19" : 1116269615,
>>> "9" : 1238935018,
>>> "18" : 1069224544,
>>> "12" : 1256018541,
>>> "3" : 1251150202,
>>> "21" : 1256774117,
>>> "15" : 1170591375,
>>> "6" : 1185108169,
>>> "24" : 1202342095,
>>> "0" : 1165356330
>>>   }
>>> },
>>> "endOffset" : {
>>>   "my_kafka_topic" : {
>>> "23" : 1206928043,
>>> "8" : 1158516721,
>>> "17" : 1258389219,
>>> "11" : 1263093490,
>>> "2" : 1226743225,
>>> "20" : 1229562962,
>>> "5" : 1170307882,
>>> "14" : 1207335736,
>>> "4" : 1274245585,
>>> "13" : 1336388570,
>>> "22" : 1260213582,
>>> "7" : 1288641384,
>>> "16" : 1247464311,
>>> "10" : 1093159186,
>>> "1" : 1219906407,

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Charles vinodh
The only form of rate limiting I have set is *maxOffsetsPerTrigger *and
*fetch.message.max.bytes. *

*"*may be that you are trying to process records that have passed the
retention period within Kafka.*"*
If the above is true then I should have my offsets reset only once ideally
when my application starts. But mu offsets are resetting for every batch.
if my application is using offsets that are no longer available in Kafka it
will reset to earliest or latest offset available in Kafka and the
next request made to Kafka should provide proper data. But in case for all
micro-batches the offsets are getting reseted and the batch is producing no
data.



On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:

> Do you have rate limiting set on your stream? It may be that you are
> trying to process records that have passed the retention period within
> Kafka.
>
> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
> wrote:
>
>>
>> Hi,
>>
>> I am trying to run a spark application ingesting data from Kafka using
>> the Spark structured streaming and the spark library
>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
>> issue where during execution of all my micro-batches the Kafka consumer is
>> not able to fetch the offsets and its having its offsets reset as show
>> below in this log
>>
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>
>>
>> It is reasonable if this resetting happens once in application due to the
>> fact that the offsets stored in my checkpoint are no longer valid and will
>> have to reset our offsets to a new value. But I am seeing this reset
>> happening for every micro batch execution in my streaming job. In at the
>> end the streaming query progress emits the following
>>
>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
>>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>>   "name" : null,
>>   "timestamp" : "2019-09-10T15:55:00.000Z",
>>   "batchId" : 189,
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>> "addBatch" : 127,
>> "getBatch" : 0,
>> "getEndOffset" : 0,
>> "queryPlanning" : 24,
>> "setOffsetRange" : 36,
>> "triggerExecution" : 1859,
>> "walCommit" : 1032
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>> "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
>> "startOffset" : {
>>   "my_kafka_topic" : {
>> "23" : 1206926686,
>> "8" : 1158514946,
>> "17" : 1258387219,
>> "11" : 1263091642,
>> "2" : 1226741128,
>> "20" : 1229560889,
>> "5" : 1170304913,
>> "14" : 1207333901,
>> "4" : 1274242728,
>> "13" : 1336386658,
>> "22" : 1260210993,
>> "7" : 1288639296,
>> "16" : 1247462229,
>> "10" : 1093157103,
>> "1" : 1219904858,
>> "19" : 1116269615,
>> "9" : 1238935018,
>> "18" : 1069224544,
>> "12" : 1256018541,
>> "3" : 1251150202,
>> "21" : 1256774117,
>> "15" : 1170591375,
>> "6" : 1185108169,
>> "24" : 1202342095,
>> "0" : 1165356330
>>   }
>> },
>> "endOffset" : {
>>   "my_kafka_topic" : {
>> "23" : 1206928043,
>> "8" : 1158516721,
>> "17" : 1258389219,
>> "11" : 1263093490,
>> "2" : 1226743225,
>> "20" : 1229562962,
>> "5" : 1170307882,
>> "14" : 1207335736,
>> "4" : 1274245585,
>> "13" : 1336388570,
>> "22" : 1260213582,
>> "7" : 1288641384,
>> "16" : 1247464311,
>> "10" : 1093159186,
>> "1" : 1219906407,
>> "19" : 1116271435,
>> "9" : 1238936994,
>> "18" : 1069226913,
>> "12" : 1256020926,
>> "3" : 1251152579,
>> "21" : 1256776910,
>> "15" : 1170593216,
>> "6" : 1185110032,
>> "24" : 1202344538,
>> "0" : 1165358262
>>   }
>> },
>> "numInputRows" : 0,
>> "inputRowsPerSecond" : 0.0,
>> 

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Burak Yavuz
Do you have rate limiting set on your stream? It may be that you are trying
to process records that have passed the retention period within Kafka.

On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
wrote:

>
> Hi,
>
> I am trying to run a spark application ingesting data from Kafka using the
> Spark structured streaming and the spark library
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
> issue where during execution of all my micro-batches the Kafka consumer is
> not able to fetch the offsets and its having its offsets reset as show
> below in this log
>
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-5 to offset 1168959116.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-1 to offset 1218619371.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-8 to offset 1157205346.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-21 to offset 1255403059.
>
>
> It is reasonable if this resetting happens once in application due to the
> fact that the offsets stored in my checkpoint are no longer valid and will
> have to reset our offsets to a new value. But I am seeing this reset
> happening for every micro batch execution in my streaming job. In at the
> end the streaming query progress emits the following
>
> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>   "name" : null,
>   "timestamp" : "2019-09-10T15:55:00.000Z",
>   "batchId" : 189,
>   "numInputRows" : 0,
>   "inputRowsPerSecond" : 0.0,
>   "processedRowsPerSecond" : 0.0,
>   "durationMs" : {
> "addBatch" : 127,
> "getBatch" : 0,
> "getEndOffset" : 0,
> "queryPlanning" : 24,
> "setOffsetRange" : 36,
> "triggerExecution" : 1859,
> "walCommit" : 1032
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
> "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
> "startOffset" : {
>   "my_kafka_topic" : {
> "23" : 1206926686,
> "8" : 1158514946,
> "17" : 1258387219,
> "11" : 1263091642,
> "2" : 1226741128,
> "20" : 1229560889,
> "5" : 1170304913,
> "14" : 1207333901,
> "4" : 1274242728,
> "13" : 1336386658,
> "22" : 1260210993,
> "7" : 1288639296,
> "16" : 1247462229,
> "10" : 1093157103,
> "1" : 1219904858,
> "19" : 1116269615,
> "9" : 1238935018,
> "18" : 1069224544,
> "12" : 1256018541,
> "3" : 1251150202,
> "21" : 1256774117,
> "15" : 1170591375,
> "6" : 1185108169,
> "24" : 1202342095,
> "0" : 1165356330
>   }
> },
> "endOffset" : {
>   "my_kafka_topic" : {
> "23" : 1206928043,
> "8" : 1158516721,
> "17" : 1258389219,
> "11" : 1263093490,
> "2" : 1226743225,
> "20" : 1229562962,
> "5" : 1170307882,
> "14" : 1207335736,
> "4" : 1274245585,
> "13" : 1336388570,
> "22" : 1260213582,
> "7" : 1288641384,
> "16" : 1247464311,
> "10" : 1093159186,
> "1" : 1219906407,
> "19" : 1116271435,
> "9" : 1238936994,
> "18" : 1069226913,
> "12" : 1256020926,
> "3" : 1251152579,
> "21" : 1256776910,
> "15" : 1170593216,
> "6" : 1185110032,
> "24" : 1202344538,
> "0" : 1165358262
>   }
> },
> "numInputRows" : 0,
> "inputRowsPerSecond" : 0.0,
> "processedRowsPerSecond" : 0.0
>   } ],
>   "sink" : {
> "description" : "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]"
>   }
> }
>
>
> In the above StreamingQueryProgress event the numInputRows fields  is zero
> and this is the case for all micro batch executions and no data is being
> produced whatsoever. So basically for each batch my offsets are being reset
> and each batch is producing zero rows. Since there is no work being done
> and since dynamic allocation is enabled all my executors killed... I have
> tried deleting my checkpoint and started my application from scratch and I
> am still facing the same issue. What could possibly be wrong this?... what
> lines of investigation should I take?  If you are interested in getting
> Stackoverflow point you can answer my question in SO here
> 

Re: Spark kafka streaming job stopped

2019-06-11 Thread Amit Sharma
Please provide update if any one knows.

On Monday, June 10, 2019, Amit Sharma  wrote:

>
> We have spark kafka sreaming job running on standalone spark cluster. We
> have below kafka architecture
>
> 1. Two cluster running on two data centers.
> 2. There is LTM on top on each data center (load balance)
> 3. There is GSLB on top of LTM.
>
> I observed when ever any of the node in kafka cluster is down  our spark
> stream job stopped. We are using GLSB url in our code to connect to Kafka
> not the IP address. Please let me know is it expected behavior if not then
> what config we need to change.
>
> Thanks
> Amit
>


Re: Spark Kafka Batch Write guarantees

2019-04-01 Thread hemant singh
Thanks Shixiong, read in documentation as well that duplicates might exist
because of task retries.

On Mon, 1 Apr 2019 at 9:43 PM, Shixiong(Ryan) Zhu 
wrote:

> The Kafka source doesn’t support transaction. You may see partial data or
> duplicated data if a Spark task fails.
>
> On Wed, Mar 27, 2019 at 1:15 AM hemant singh  wrote:
>
>> We are using spark batch to write Dataframe to Kafka topic. The spark
>> write function with write.format(source = Kafka).
>> Does spark provide similar guarantee like it provides with saving
>> dataframe to disk; that partial data is not written to Kafka i.e. full
>> dataframe is saved or if job fails no data is written to Kafka topic.
>>
>> Thanks.
>>
> --
>
> Best Regards,
> Ryan
>


Re: Spark Kafka Batch Write guarantees

2019-04-01 Thread Shixiong(Ryan) Zhu
The Kafka source doesn’t support transaction. You may see partial data or
duplicated data if a Spark task fails.

On Wed, Mar 27, 2019 at 1:15 AM hemant singh  wrote:

> We are using spark batch to write Dataframe to Kafka topic. The spark
> write function with write.format(source = Kafka).
> Does spark provide similar guarantee like it provides with saving
> dataframe to disk; that partial data is not written to Kafka i.e. full
> dataframe is saved or if job fails no data is written to Kafka topic.
>
> Thanks.
>
-- 

Best Regards,
Ryan


Re: Spark-Kafka integration - build failing with sbt

2017-06-19 Thread Cody Koeninger
org.apache.spark.streaming.kafka.KafkaUtils
is in the
spark-streaming-kafka-0-8
project

On Mon, Jun 19, 2017 at 1:01 PM, karan alang  wrote:
> Hi Cody - i do have a additional basic question ..
>
> When i tried to compile the code in Eclipse, i was not able to do that
>
> eg.
> import org.apache.spark.streaming.kafka.KafkaUtils
>
> gave errors saying KafaUtils was not part of the package.
> However, when i used sbt to compile - the compilation went through fine
>
> So, I assume additional libraries are being downloaded when i provide the
> appropriate packages in LibraryDependencies ?
> which ones would have helped compile this ?
>
>
>
> On Sat, Jun 17, 2017 at 2:53 PM, karan alang  wrote:
>>
>> Thanks, Cody .. yes, was able to fix that.
>>
>> On Sat, Jun 17, 2017 at 1:18 PM, Cody Koeninger 
>> wrote:
>>>
>>> There are different projects for different versions of kafka,
>>> spark-streaming-kafka-0-8 and spark-streaming-kafka-0-10
>>>
>>> See
>>>
>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>>>
>>> On Fri, Jun 16, 2017 at 6:51 PM, karan alang 
>>> wrote:
>>> > I'm trying to compile kafka & Spark Streaming integration code i.e.
>>> > reading
>>> > from Kafka using Spark Streaming,
>>> >   and the sbt build is failing with error -
>>> >
>>> >   [error] (*:update) sbt.ResolveException: unresolved dependency:
>>> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
>>> >
>>> >   Scala version -> 2.10.7
>>> >   Spark Version -> 2.1.0
>>> >   Kafka version -> 0.9
>>> >   sbt version -> 0.13
>>> >
>>> > Contents of sbt files is as shown below ->
>>> >
>>> > 1)
>>> >   vi spark_kafka_code/project/plugins.sbt
>>> >
>>> >   addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
>>> >
>>> >  2)
>>> >   vi spark_kafka_code/sparkkafka.sbt
>>> >
>>> > import AssemblyKeys._
>>> > assemblySettings
>>> >
>>> > name := "SparkKafka Project"
>>> >
>>> > version := "1.0"
>>> > scalaVersion := "2.11.7"
>>> >
>>> > val sparkVers = "2.1.0"
>>> >
>>> > // Base Spark-provided dependencies
>>> > libraryDependencies ++= Seq(
>>> >   "org.apache.spark" %% "spark-core" % sparkVers % "provided",
>>> >   "org.apache.spark" %% "spark-streaming" % sparkVers % "provided",
>>> >   "org.apache.spark" %% "spark-streaming-kafka" % sparkVers)
>>> >
>>> > mergeStrategy in assembly := {
>>> >   case m if m.toLowerCase.endsWith("manifest.mf") =>
>>> > MergeStrategy.discard
>>> >   case m if m.toLowerCase.startsWith("META-INF")  =>
>>> > MergeStrategy.discard
>>> >   case "reference.conf"   =>
>>> > MergeStrategy.concat
>>> >   case m if m.endsWith("UnusedStubClass.class")   =>
>>> > MergeStrategy.discard
>>> >   case _ => MergeStrategy.first
>>> > }
>>> >
>>> >   i launch sbt, and then try to create an eclipse project, complete
>>> > error is
>>> > as shown below -
>>> >
>>> >   -
>>> >
>>> >   sbt
>>> > [info] Loading global plugins from /Users/karanalang/.sbt/0.13/plugins
>>> > [info] Loading project definition from
>>> >
>>> > /Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/project
>>> > [info] Set current project to SparkKafka Project (in build
>>> >
>>> > file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/)
>>> >> eclipse
>>> > [info] About to create Eclipse project files for your project(s).
>>> > [info] Updating
>>> >
>>> > {file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/}spark_kafka_code...
>>> > [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
>>> > [warn] module not found:
>>> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0
>>> > [warn]  local: tried
>>> > [warn]
>>> >
>>> > /Users/karanalang/.ivy2/local/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
>>> > [warn]  activator-launcher-local: tried
>>> > [warn]
>>> >
>>> > /Users/karanalang/.activator/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
>>> > [warn]  activator-local: tried
>>> > [warn]
>>> >
>>> > /Users/karanalang/Documents/Technology/SCALA/activator-dist-1.3.10/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
>>> > [warn]  public: tried
>>> > [warn]
>>> >
>>> > https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
>>> > [warn]  typesafe-releases: tried
>>> > [warn]
>>> >
>>> > http://repo.typesafe.com/typesafe/releases/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
>>> > [warn]  typesafe-ivy-releasez: tried
>>> > [warn]
>>> >
>>> > http://repo.typesafe.com/typesafe/ivy-releases/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
>>> > [info] Resolving jline#jline;2.12.1 ...
>>> > [warn] ::
>>> > [warn] ::  UNRESOLVED 

Re: Spark-Kafka integration - build failing with sbt

2017-06-19 Thread karan alang
Hi Cody - i do have a additional basic question ..

When i tried to compile the code in Eclipse, i was not able to do that

eg.
import org.apache.spark.streaming.kafka.KafkaUtils

gave errors saying KafaUtils was not part of the package.
However, when i used sbt to compile - the compilation went through fine

So, I assume additional libraries are being downloaded when i provide the
appropriate packages in LibraryDependencies ?
which ones would have helped compile this ?



On Sat, Jun 17, 2017 at 2:53 PM, karan alang  wrote:

> Thanks, Cody .. yes, was able to fix that.
>
> On Sat, Jun 17, 2017 at 1:18 PM, Cody Koeninger 
> wrote:
>
>> There are different projects for different versions of kafka,
>> spark-streaming-kafka-0-8 and spark-streaming-kafka-0-10
>>
>> See
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>>
>> On Fri, Jun 16, 2017 at 6:51 PM, karan alang 
>> wrote:
>> > I'm trying to compile kafka & Spark Streaming integration code i.e.
>> reading
>> > from Kafka using Spark Streaming,
>> >   and the sbt build is failing with error -
>> >
>> >   [error] (*:update) sbt.ResolveException: unresolved dependency:
>> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
>> >
>> >   Scala version -> 2.10.7
>> >   Spark Version -> 2.1.0
>> >   Kafka version -> 0.9
>> >   sbt version -> 0.13
>> >
>> > Contents of sbt files is as shown below ->
>> >
>> > 1)
>> >   vi spark_kafka_code/project/plugins.sbt
>> >
>> >   addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
>> >
>> >  2)
>> >   vi spark_kafka_code/sparkkafka.sbt
>> >
>> > import AssemblyKeys._
>> > assemblySettings
>> >
>> > name := "SparkKafka Project"
>> >
>> > version := "1.0"
>> > scalaVersion := "2.11.7"
>> >
>> > val sparkVers = "2.1.0"
>> >
>> > // Base Spark-provided dependencies
>> > libraryDependencies ++= Seq(
>> >   "org.apache.spark" %% "spark-core" % sparkVers % "provided",
>> >   "org.apache.spark" %% "spark-streaming" % sparkVers % "provided",
>> >   "org.apache.spark" %% "spark-streaming-kafka" % sparkVers)
>> >
>> > mergeStrategy in assembly := {
>> >   case m if m.toLowerCase.endsWith("manifest.mf") =>
>> MergeStrategy.discard
>> >   case m if m.toLowerCase.startsWith("META-INF")  =>
>> MergeStrategy.discard
>> >   case "reference.conf"   =>
>> MergeStrategy.concat
>> >   case m if m.endsWith("UnusedStubClass.class")   =>
>> MergeStrategy.discard
>> >   case _ => MergeStrategy.first
>> > }
>> >
>> >   i launch sbt, and then try to create an eclipse project, complete
>> error is
>> > as shown below -
>> >
>> >   -
>> >
>> >   sbt
>> > [info] Loading global plugins from /Users/karanalang/.sbt/0.13/plugins
>> > [info] Loading project definition from
>> > /Users/karanalang/Documents/Technology/Coursera_spark_scala/
>> spark_kafka_code/project
>> > [info] Set current project to SparkKafka Project (in build
>> > file:/Users/karanalang/Documents/Technology/Coursera_spark_
>> scala/spark_kafka_code/)
>> >> eclipse
>> > [info] About to create Eclipse project files for your project(s).
>> > [info] Updating
>> > {file:/Users/karanalang/Documents/Technology/Coursera_spark_
>> scala/spark_kafka_code/}spark_kafka_code...
>> > [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
>> > [warn] module not found:
>> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0
>> > [warn]  local: tried
>> > [warn]
>> > /Users/karanalang/.ivy2/local/org.apache.spark/spark-streami
>> ng-kafka_2.11/2.1.0/ivys/ivy.xml
>> > [warn]  activator-launcher-local: tried
>> > [warn]
>> > /Users/karanalang/.activator/repository/org.apache.spark/spa
>> rk-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
>> > [warn]  activator-local: tried
>> > [warn]
>> > /Users/karanalang/Documents/Technology/SCALA/activator-dist-
>> 1.3.10/repository/org.apache.spark/spark-streaming-kafka_2.
>> 11/2.1.0/ivys/ivy.xml
>> > [warn]  public: tried
>> > [warn]
>> > https://repo1.maven.org/maven2/org/apache/spark/spark-stream
>> ing-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
>> > [warn]  typesafe-releases: tried
>> > [warn]
>> > http://repo.typesafe.com/typesafe/releases/org/apache/spark/
>> spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
>> > [warn]  typesafe-ivy-releasez: tried
>> > [warn]
>> > http://repo.typesafe.com/typesafe/ivy-releases/org.apache.
>> spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
>> > [info] Resolving jline#jline;2.12.1 ...
>> > [warn] ::
>> > [warn] ::  UNRESOLVED DEPENDENCIES ::
>> > [warn] ::
>> > [warn] :: org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not
>> found
>> > [warn] ::
>> > [warn]
>> > [warn] Note: Unresolved dependencies path:
>> > [warn] 

Re: Spark-Kafka integration - build failing with sbt

2017-06-17 Thread karan alang
Thanks, Cody .. yes, was able to fix that.

On Sat, Jun 17, 2017 at 1:18 PM, Cody Koeninger  wrote:

> There are different projects for different versions of kafka,
> spark-streaming-kafka-0-8 and spark-streaming-kafka-0-10
>
> See
>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>
> On Fri, Jun 16, 2017 at 6:51 PM, karan alang 
> wrote:
> > I'm trying to compile kafka & Spark Streaming integration code i.e.
> reading
> > from Kafka using Spark Streaming,
> >   and the sbt build is failing with error -
> >
> >   [error] (*:update) sbt.ResolveException: unresolved dependency:
> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
> >
> >   Scala version -> 2.10.7
> >   Spark Version -> 2.1.0
> >   Kafka version -> 0.9
> >   sbt version -> 0.13
> >
> > Contents of sbt files is as shown below ->
> >
> > 1)
> >   vi spark_kafka_code/project/plugins.sbt
> >
> >   addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
> >
> >  2)
> >   vi spark_kafka_code/sparkkafka.sbt
> >
> > import AssemblyKeys._
> > assemblySettings
> >
> > name := "SparkKafka Project"
> >
> > version := "1.0"
> > scalaVersion := "2.11.7"
> >
> > val sparkVers = "2.1.0"
> >
> > // Base Spark-provided dependencies
> > libraryDependencies ++= Seq(
> >   "org.apache.spark" %% "spark-core" % sparkVers % "provided",
> >   "org.apache.spark" %% "spark-streaming" % sparkVers % "provided",
> >   "org.apache.spark" %% "spark-streaming-kafka" % sparkVers)
> >
> > mergeStrategy in assembly := {
> >   case m if m.toLowerCase.endsWith("manifest.mf") =>
> MergeStrategy.discard
> >   case m if m.toLowerCase.startsWith("META-INF")  =>
> MergeStrategy.discard
> >   case "reference.conf"   => MergeStrategy.concat
> >   case m if m.endsWith("UnusedStubClass.class")   =>
> MergeStrategy.discard
> >   case _ => MergeStrategy.first
> > }
> >
> >   i launch sbt, and then try to create an eclipse project, complete
> error is
> > as shown below -
> >
> >   -
> >
> >   sbt
> > [info] Loading global plugins from /Users/karanalang/.sbt/0.13/plugins
> > [info] Loading project definition from
> > /Users/karanalang/Documents/Technology/Coursera_spark_
> scala/spark_kafka_code/project
> > [info] Set current project to SparkKafka Project (in build
> > file:/Users/karanalang/Documents/Technology/Coursera_
> spark_scala/spark_kafka_code/)
> >> eclipse
> > [info] About to create Eclipse project files for your project(s).
> > [info] Updating
> > {file:/Users/karanalang/Documents/Technology/Coursera_
> spark_scala/spark_kafka_code/}spark_kafka_code...
> > [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
> > [warn] module not found:
> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0
> > [warn]  local: tried
> > [warn]
> > /Users/karanalang/.ivy2/local/org.apache.spark/spark-
> streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> > [warn]  activator-launcher-local: tried
> > [warn]
> > /Users/karanalang/.activator/repository/org.apache.spark/
> spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> > [warn]  activator-local: tried
> > [warn]
> > /Users/karanalang/Documents/Technology/SCALA/activator-
> dist-1.3.10/repository/org.apache.spark/spark-streaming-
> kafka_2.11/2.1.0/ivys/ivy.xml
> > [warn]  public: tried
> > [warn]
> > https://repo1.maven.org/maven2/org/apache/spark/spark-
> streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
> > [warn]  typesafe-releases: tried
> > [warn]
> > http://repo.typesafe.com/typesafe/releases/org/apache/
> spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-
> kafka_2.11-2.1.0.pom
> > [warn]  typesafe-ivy-releasez: tried
> > [warn]
> > http://repo.typesafe.com/typesafe/ivy-releases/org.
> apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> > [info] Resolving jline#jline;2.12.1 ...
> > [warn] ::
> > [warn] ::  UNRESOLVED DEPENDENCIES ::
> > [warn] ::
> > [warn] :: org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not
> found
> > [warn] ::
> > [warn]
> > [warn] Note: Unresolved dependencies path:
> > [warn] org.apache.spark:spark-streaming-kafka_2.11:2.1.0
> > (/Users/karanalang/Documents/Technology/Coursera_spark_
> scala/spark_kafka_code/sparkkafka.sbt#L12-16)
> > [warn]   +- sparkkafka-project:sparkkafka-project_2.11:1.0
> > [trace] Stack trace suppressed: run last *:update for the full output.
> > [error] (*:update) sbt.ResolveException: unresolved dependency:
> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
> > [info] Updating
> > {file:/Users/karanalang/Documents/Technology/Coursera_
> spark_scala/spark_kafka_code/}spark_kafka_code...
> > [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
> > [warn] module not found:
> > 

Re: Spark-Kafka integration - build failing with sbt

2017-06-17 Thread Cody Koeninger
There are different projects for different versions of kafka,
spark-streaming-kafka-0-8 and spark-streaming-kafka-0-10

See

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

On Fri, Jun 16, 2017 at 6:51 PM, karan alang  wrote:
> I'm trying to compile kafka & Spark Streaming integration code i.e. reading
> from Kafka using Spark Streaming,
>   and the sbt build is failing with error -
>
>   [error] (*:update) sbt.ResolveException: unresolved dependency:
> org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
>
>   Scala version -> 2.10.7
>   Spark Version -> 2.1.0
>   Kafka version -> 0.9
>   sbt version -> 0.13
>
> Contents of sbt files is as shown below ->
>
> 1)
>   vi spark_kafka_code/project/plugins.sbt
>
>   addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
>
>  2)
>   vi spark_kafka_code/sparkkafka.sbt
>
> import AssemblyKeys._
> assemblySettings
>
> name := "SparkKafka Project"
>
> version := "1.0"
> scalaVersion := "2.11.7"
>
> val sparkVers = "2.1.0"
>
> // Base Spark-provided dependencies
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % sparkVers % "provided",
>   "org.apache.spark" %% "spark-streaming" % sparkVers % "provided",
>   "org.apache.spark" %% "spark-streaming-kafka" % sparkVers)
>
> mergeStrategy in assembly := {
>   case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
>   case m if m.toLowerCase.startsWith("META-INF")  => MergeStrategy.discard
>   case "reference.conf"   => MergeStrategy.concat
>   case m if m.endsWith("UnusedStubClass.class")   => MergeStrategy.discard
>   case _ => MergeStrategy.first
> }
>
>   i launch sbt, and then try to create an eclipse project, complete error is
> as shown below -
>
>   -
>
>   sbt
> [info] Loading global plugins from /Users/karanalang/.sbt/0.13/plugins
> [info] Loading project definition from
> /Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/project
> [info] Set current project to SparkKafka Project (in build
> file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/)
>> eclipse
> [info] About to create Eclipse project files for your project(s).
> [info] Updating
> {file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/}spark_kafka_code...
> [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
> [warn] module not found:
> org.apache.spark#spark-streaming-kafka_2.11;2.1.0
> [warn]  local: tried
> [warn]
> /Users/karanalang/.ivy2/local/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [warn]  activator-launcher-local: tried
> [warn]
> /Users/karanalang/.activator/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [warn]  activator-local: tried
> [warn]
> /Users/karanalang/Documents/Technology/SCALA/activator-dist-1.3.10/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [warn]  public: tried
> [warn]
> https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
> [warn]  typesafe-releases: tried
> [warn]
> http://repo.typesafe.com/typesafe/releases/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
> [warn]  typesafe-ivy-releasez: tried
> [warn]
> http://repo.typesafe.com/typesafe/ivy-releases/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [info] Resolving jline#jline;2.12.1 ...
> [warn] ::
> [warn] ::  UNRESOLVED DEPENDENCIES ::
> [warn] ::
> [warn] :: org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
> [warn] ::
> [warn]
> [warn] Note: Unresolved dependencies path:
> [warn] org.apache.spark:spark-streaming-kafka_2.11:2.1.0
> (/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/sparkkafka.sbt#L12-16)
> [warn]   +- sparkkafka-project:sparkkafka-project_2.11:1.0
> [trace] Stack trace suppressed: run last *:update for the full output.
> [error] (*:update) sbt.ResolveException: unresolved dependency:
> org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
> [info] Updating
> {file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/}spark_kafka_code...
> [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
> [warn] module not found:
> org.apache.spark#spark-streaming-kafka_2.11;2.1.0
> [warn]  local: tried
> [warn]
> /Users/karanalang/.ivy2/local/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [warn]  activator-launcher-local: tried
> [warn]
> /Users/karanalang/.activator/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [warn]  activator-local: tried
> [warn]
> 

Re: spark kafka consumer with kerberos

2017-03-31 Thread Saisai Shao
Hi Bill,

Normally Kerberos principal and keytab should be enough, because keytab
could actually represent the password. Did you configure SASL/GSSAPI or
SASL/Plain for KafkaClient?
http://kafka.apache.org/documentation.html#security_sasl

Actually this is more like a Kafka question and normally should be a
configuration issue, I would suggest you to ask this question in Kafka mail
list.

Thanks
Saisai


On Fri, Mar 31, 2017 at 10:28 PM, Bill Schwanitz  wrote:

> Saisai,
>
> Yea that seems to have helped. Looks like the kerberos ticket when I
> submit does not get passed to the executor?
>
> ... 3 more
> Caused by: org.apache.kafka.common.KafkaException:
> javax.security.auth.login.LoginException: Unable to obtain password from
> user
>
> at org.apache.kafka.common.network.SaslChannelBuilder.
> configure(SaslChannelBuilder.java:86)
> at org.apache.kafka.common.network.ChannelBuilders.
> create(ChannelBuilders.java:70)
> at org.apache.kafka.clients.ClientUtils.createChannelBuilder(
> ClientUtils.java:83)
> at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:623)
> ... 14 more
> Caused by: javax.security.auth.login.LoginException: Unable to obtain
> password from user
>
>
> On Fri, Mar 31, 2017 at 9:08 AM, Saisai Shao 
> wrote:
>
>> Hi Bill,
>>
>> The exception is from executor side. From the gist you provided, looks
>> like the issue is that you only configured java options in driver side, I
>> think you should also configure this in executor side. You could refer to
>> here (https://github.com/hortonworks-spark/skc#running-on-a-
>> kerberos-enabled-cluster).
>>
>> --files key.conf#key.conf,v.keytab#v.keytab
>> --driver-java-options "-Djava.security.auth.login.config=./key.conf"
>> --conf 
>> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./key.conf"
>>
>>
>> On Fri, Mar 31, 2017 at 1:58 AM, Bill Schwanitz 
>> wrote:
>>
>>> I'm working on a poc spark job to pull data from a kafka topic with
>>> kerberos enabled ( required ) brokers.
>>>
>>> The code seems to connect to kafka and enter a polling mode. When I toss
>>> something onto the topic I get an exception which I just can't seem to
>>> figure out. Any ideas?
>>>
>>> I have a full gist up at https://gist.github.com/bil
>>> sch/17f4a4c4303ed3e004e2234a5904f0de with a lot of details. If I use
>>> the hdfs/spark client code for just normal operations everything works fine
>>> but for some reason the streaming code is having issues. I have verified
>>> the KafkaClient object is in the jaas config. The keytab is good etc.
>>>
>>> Guessing I'm doing something wrong I just have not figured out what yet!
>>> Any thoughts?
>>>
>>> The exception:
>>>
>>> 17/03/30 12:54:00 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID
>>> 0, host5.some.org.net): org.apache.kafka.common.KafkaException: Failed
>>> to construct kafka consumer
>>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>>> Consumer.java:702)
>>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>>> Consumer.java:557)
>>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>>> Consumer.java:540)
>>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.>> t>(CachedKafkaConsumer.scala:47)
>>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get
>>> (CachedKafkaConsumer.scala:157)
>>> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>>> r.(KafkaRDD.scala:210)
>>> at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRD
>>> D.scala:185)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: org.apache.kafka.common.KafkaException:
>>> org.apache.kafka.common.KafkaException: Jaas configuration not found
>>> at org.apache.kafka.common.network.SaslChannelBuilder.configure
>>> (SaslChannelBuilder.java:86)
>>> at org.apache.kafka.common.network.ChannelBuilders.create(Chann
>>> elBuilders.java:70)
>>> at org.apache.kafka.clients.ClientUtils.createChannelBuilder(Cl
>>> ientUtils.java:83)
>>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>>> Consumer.java:623)
>>> ... 14 more
>>> Caused by: org.apache.kafka.common.KafkaException: Jaas configuration
>>> not found
>>> at org.apache.kafka.common.security.kerberos.KerberosLogin.getS
>>> erviceName(KerberosLogin.java:299)
>>> at org.apache.kafka.common.security.kerberos.KerberosLogin.conf
>>> igure(KerberosLogin.java:103)
>>> at 

Re: spark kafka consumer with kerberos

2017-03-31 Thread Bill Schwanitz
Saisai,

Yea that seems to have helped. Looks like the kerberos ticket when I submit
does not get passed to the executor?

... 3 more
Caused by: org.apache.kafka.common.KafkaException:
javax.security.auth.login.LoginException: Unable to obtain password from
user

at
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623)
... 14 more
Caused by: javax.security.auth.login.LoginException: Unable to obtain
password from user


On Fri, Mar 31, 2017 at 9:08 AM, Saisai Shao  wrote:

> Hi Bill,
>
> The exception is from executor side. From the gist you provided, looks
> like the issue is that you only configured java options in driver side, I
> think you should also configure this in executor side. You could refer to
> here (https://github.com/hortonworks-spark/skc#running-
> on-a-kerberos-enabled-cluster).
>
> --files key.conf#key.conf,v.keytab#v.keytab
> --driver-java-options "-Djava.security.auth.login.config=./key.conf"
> --conf 
> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./key.conf"
>
>
> On Fri, Mar 31, 2017 at 1:58 AM, Bill Schwanitz  wrote:
>
>> I'm working on a poc spark job to pull data from a kafka topic with
>> kerberos enabled ( required ) brokers.
>>
>> The code seems to connect to kafka and enter a polling mode. When I toss
>> something onto the topic I get an exception which I just can't seem to
>> figure out. Any ideas?
>>
>> I have a full gist up at https://gist.github.com/bil
>> sch/17f4a4c4303ed3e004e2234a5904f0de with a lot of details. If I use the
>> hdfs/spark client code for just normal operations everything works fine but
>> for some reason the streaming code is having issues. I have verified the
>> KafkaClient object is in the jaas config. The keytab is good etc.
>>
>> Guessing I'm doing something wrong I just have not figured out what yet!
>> Any thoughts?
>>
>> The exception:
>>
>> 17/03/30 12:54:00 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
>> host5.some.org.net): org.apache.kafka.common.KafkaException: Failed to
>> construct kafka consumer
>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>> Consumer.java:702)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>> Consumer.java:557)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>> Consumer.java:540)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.> t>(CachedKafkaConsumer.scala:47)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get
>> (CachedKafkaConsumer.scala:157)
>> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>> r.(KafkaRDD.scala:210)
>> at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRD
>> D.scala:185)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.apache.kafka.common.KafkaException:
>> org.apache.kafka.common.KafkaException: Jaas configuration not found
>> at org.apache.kafka.common.network.SaslChannelBuilder.configure
>> (SaslChannelBuilder.java:86)
>> at org.apache.kafka.common.network.ChannelBuilders.create(
>> ChannelBuilders.java:70)
>> at org.apache.kafka.clients.ClientUtils.createChannelBuilder(Cl
>> ientUtils.java:83)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>> Consumer.java:623)
>> ... 14 more
>> Caused by: org.apache.kafka.common.KafkaException: Jaas configuration
>> not found
>> at org.apache.kafka.common.security.kerberos.KerberosLogin.
>> getServiceName(KerberosLogin.java:299)
>> at org.apache.kafka.common.security.kerberos.KerberosLogin.
>> configure(KerberosLogin.java:103)
>> at org.apache.kafka.common.security.authenticator.LoginManager.
>> (LoginManager.java:45)
>> at org.apache.kafka.common.security.authenticator.LoginManager.
>> acquireLoginManager(LoginManager.java:68)
>> at org.apache.kafka.common.network.SaslChannelBuilder.configure
>> (SaslChannelBuilder.java:78)
>> ... 17 more
>> Caused by: java.io.IOException: Could not find a 'KafkaClient' entry in
>> this configuration.
>> at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUt
>> ils.java:50)
>> at org.apache.kafka.common.security.kerberos.KerberosLogin.
>> getServiceName(KerberosLogin.java:297)
>> ... 21 more
>>
>
>


Re: spark kafka consumer with kerberos

2017-03-31 Thread Saisai Shao
Hi Bill,

The exception is from executor side. From the gist you provided, looks like
the issue is that you only configured java options in driver side, I think
you should also configure this in executor side. You could refer to here (
https://github.com/hortonworks-spark/skc#running-on-a-kerberos-enabled-cluster
).

--files key.conf#key.conf,v.keytab#v.keytab
--driver-java-options "-Djava.security.auth.login.config=./key.conf"
--conf 
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./key.conf"


On Fri, Mar 31, 2017 at 1:58 AM, Bill Schwanitz  wrote:

> I'm working on a poc spark job to pull data from a kafka topic with
> kerberos enabled ( required ) brokers.
>
> The code seems to connect to kafka and enter a polling mode. When I toss
> something onto the topic I get an exception which I just can't seem to
> figure out. Any ideas?
>
> I have a full gist up at https://gist.github.com/bilsch/
> 17f4a4c4303ed3e004e2234a5904f0de with a lot of details. If I use the
> hdfs/spark client code for just normal operations everything works fine but
> for some reason the streaming code is having issues. I have verified the
> KafkaClient object is in the jaas config. The keytab is good etc.
>
> Guessing I'm doing something wrong I just have not figured out what yet!
> Any thoughts?
>
> The exception:
>
> 17/03/30 12:54:00 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> host5.some.org.net): org.apache.kafka.common.KafkaException: Failed to
> construct kafka consumer
> at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:702)
> at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:557)
> at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:540)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.<
> init>(CachedKafkaConsumer.scala:47)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.
> get(CachedKafkaConsumer.scala:157)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(
> KafkaRDD.scala:210)
> at org.apache.spark.streaming.kafka010.KafkaRDD.compute(
> KafkaRDD.scala:185)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.KafkaException: Jaas configuration not found
> at org.apache.kafka.common.network.SaslChannelBuilder.
> configure(SaslChannelBuilder.java:86)
> at org.apache.kafka.common.network.ChannelBuilders.
> create(ChannelBuilders.java:70)
> at org.apache.kafka.clients.ClientUtils.createChannelBuilder(
> ClientUtils.java:83)
> at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:623)
> ... 14 more
> Caused by: org.apache.kafka.common.KafkaException: Jaas configuration not
> found
> at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(
> KerberosLogin.java:299)
> at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(
> KerberosLogin.java:103)
> at org.apache.kafka.common.security.authenticator.LoginManager.(
> LoginManager.java:45)
> at org.apache.kafka.common.security.authenticator.LoginManager.
> acquireLoginManager(LoginManager.java:68)
> at org.apache.kafka.common.network.SaslChannelBuilder.
> configure(SaslChannelBuilder.java:78)
> ... 17 more
> Caused by: java.io.IOException: Could not find a 'KafkaClient' entry in
> this configuration.
> at org.apache.kafka.common.security.JaasUtils.jaasConfig(
> JaasUtils.java:50)
> at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(
> KerberosLogin.java:297)
> ... 21 more
>


Re: [Spark Kafka] API Doc pages for Kafka 0.10 not current

2017-02-28 Thread Cody Koeninger
The kafka-0-8 and kafka-0-10 integrations have conflicting
dependencies.  Last time I checked, Spark's doc publication puts
everything all in one classpath, so publishing them both together
won't work.  I thought there was already a Jira ticket related to
this, but a quick look didn't turn it up.

On Mon, Feb 27, 2017 at 3:01 PM, Afshartous, Nick
 wrote:
>
> Hello,
>
>
> Looks like the API docs linked from the Spark Kafka 0.10 Integration page
> are not current.
>
>
> For instance, on the page
>
>
>
> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>
>
> the code examples show the new API (i.e. class ConsumerStrategies).
> However, following the links
>
>
> API Docs --> (Scala | Java)
>
>
> leads to API pages that do not have class ConsumerStrategies) .  The API doc
> package names  also have streaming.kafka as opposed to streaming.kafka10.
>
>
> --
>
> Nick

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Kafka] How to update batch size of input dynamically for spark kafka consumer?

2017-01-03 Thread Cody Koeninger
You can't change the batch time, but you can limit the number of items
in the batch

http://spark.apache.org/docs/latest/configuration.html

spark.streaming.backpressure.enabled

spark.streaming.kafka.maxRatePerPartition

On Tue, Jan 3, 2017 at 4:00 AM, 周家帅  wrote:
> Hi,
>
> I am an intermediate spark user and have some experience in large data
> processing. I post this question in StackOverflow but receive no response.
> My problem is as follows:
>
> I use createDirectStream in my spark streaming application. I set the batch
> interval to 7 seconds and most of the time the batch job can finish within
> about 5 seconds. However, in very rare cases, the batch job need cost 60
> seconds and this will delay some batches of jobs. To cut down the total
> delay time for these batches, I hope I can process more streaming data which
> spread over the delayed jobs at one time. This will help the streaming
> return to normal as soon as possible.
>
> So, I want to know there is some method to dynamically update/merge batch
> size of input for spark and kafka when delay appears.
>
> Many thanks for your help.
>
>
> --
> Jiashuai Zhou
>
> School of Electronics Engineering and Computer Science,
> Peking University
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark kafka integration issues

2016-09-14 Thread Cody Koeninger
Yeah, an updated version of that blog post is available at

https://github.com/koeninger/kafka-exactly-once

On Wed, Sep 14, 2016 at 11:35 AM, Mukesh Jha  wrote:
> Thanks for the reply Cody.
>
> I found the below article on the same, very helpful. Thanks for the details,
> much appreciated.
>
> http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/
>
> On Tue, Sep 13, 2016 at 8:14 PM, Cody Koeninger  wrote:
>>
>> 1.  see
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>  look for HasOffsetRange.  If you really want the info per-message
>> rather than per-partition, createRDD has an overload that takes a
>> messageHandler from MessageAndMetadata to whatever you need
>>
>> 2. createRDD takes type parameters for the key and value decoder, so
>> specify them there
>>
>> 3. you can use spark-streaming-kafka-0-8 against 0.9 or 0.10 brokers.
>> There is a spark-streaming-kafka-0-10 package with additional features
>> that only works on brokers 0.10 or higher.  A pull request for
>> documenting it has been merged, but not deployed.
>>
>> On Tue, Sep 13, 2016 at 6:46 PM, Mukesh Jha 
>> wrote:
>> > Hello fellow sparkers,
>> >
>> > I'm using spark to consume messages from kafka in a non streaming
>> > fashion.
>> > I'm suing the using spark-streaming-kafka-0-8_2.10 & sparkv2.0to do the
>> > same.
>> >
>> > I have a few queries for the same, please get back if you guys have
>> > clues on
>> > the same.
>> >
>> > 1) Is there anyway to get the have the topic and partition & offset
>> > information for each item from the KafkaRDD. I'm using the
>> > KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder] to
>> > create
>> > my kafka RDD.
>> > 2) How to pass my custom Decoder instead of using the String or Byte
>> > decoder
>> > are there any examples for the same?
>> > 3) is there a newer version to consumer from kafka-0.10 & kafka-0.9
>> > clusters
>> >
>> > --
>> > Thanks & Regards,
>> >
>> > Mukesh Jha
>
>
>
>
> --
>
>
> Thanks & Regards,
>
> Mukesh Jha

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark kafka integration issues

2016-09-14 Thread Mukesh Jha
Thanks for the reply Cody.

I found the below article on the same, very helpful. Thanks for the
details, much appreciated.

http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/

On Tue, Sep 13, 2016 at 8:14 PM, Cody Koeninger  wrote:

> 1.  see http://spark.apache.org/docs/latest/streaming-kafka-
> integration.html#approach-2-direct-approach-no-receivers
>  look for HasOffsetRange.  If you really want the info per-message
> rather than per-partition, createRDD has an overload that takes a
> messageHandler from MessageAndMetadata to whatever you need
>
> 2. createRDD takes type parameters for the key and value decoder, so
> specify them there
>
> 3. you can use spark-streaming-kafka-0-8 against 0.9 or 0.10 brokers.
> There is a spark-streaming-kafka-0-10 package with additional features
> that only works on brokers 0.10 or higher.  A pull request for
> documenting it has been merged, but not deployed.
>
> On Tue, Sep 13, 2016 at 6:46 PM, Mukesh Jha 
> wrote:
> > Hello fellow sparkers,
> >
> > I'm using spark to consume messages from kafka in a non streaming
> fashion.
> > I'm suing the using spark-streaming-kafka-0-8_2.10 & sparkv2.0to do the
> > same.
> >
> > I have a few queries for the same, please get back if you guys have
> clues on
> > the same.
> >
> > 1) Is there anyway to get the have the topic and partition & offset
> > information for each item from the KafkaRDD. I'm using the
> > KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder] to
> create
> > my kafka RDD.
> > 2) How to pass my custom Decoder instead of using the String or Byte
> decoder
> > are there any examples for the same?
> > 3) is there a newer version to consumer from kafka-0.10 & kafka-0.9
> clusters
> >
> > --
> > Thanks & Regards,
> >
> > Mukesh Jha
>



-- 


Thanks & Regards,

*Mukesh Jha *


Re: Spark kafka integration issues

2016-09-13 Thread Cody Koeninger
1.  see 
http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
 look for HasOffsetRange.  If you really want the info per-message
rather than per-partition, createRDD has an overload that takes a
messageHandler from MessageAndMetadata to whatever you need

2. createRDD takes type parameters for the key and value decoder, so
specify them there

3. you can use spark-streaming-kafka-0-8 against 0.9 or 0.10 brokers.
There is a spark-streaming-kafka-0-10 package with additional features
that only works on brokers 0.10 or higher.  A pull request for
documenting it has been merged, but not deployed.

On Tue, Sep 13, 2016 at 6:46 PM, Mukesh Jha  wrote:
> Hello fellow sparkers,
>
> I'm using spark to consume messages from kafka in a non streaming fashion.
> I'm suing the using spark-streaming-kafka-0-8_2.10 & sparkv2.0to do the
> same.
>
> I have a few queries for the same, please get back if you guys have clues on
> the same.
>
> 1) Is there anyway to get the have the topic and partition & offset
> information for each item from the KafkaRDD. I'm using the
> KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder] to create
> my kafka RDD.
> 2) How to pass my custom Decoder instead of using the String or Byte decoder
> are there any examples for the same?
> 3) is there a newer version to consumer from kafka-0.10 & kafka-0.9 clusters
>
> --
> Thanks & Regards,
>
> Mukesh Jha

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Kafka stream processing time increasing gradually

2016-06-22 Thread Roshan Singh
Thanks for the detailed explanation. Just tested it, worked like a charm.

On Mon, Jun 20, 2016 at 1:02 PM, N B  wrote:

> Its actually necessary to retire keys that become "Zero" or "Empty" so to
> speak. In your case, the key is "imageURL" and values are a dictionary, one
> of whose fields is "count" that you are maintaining. For simplicity and
> illustration's sake I will assume imageURL to be a strings like "abc". Your
> slide duration is 60 and window duration is 1800 seconds.
>
> Now consider the following chain of events in your stream.
>
> batch 1 : "abc"
> batch 2 : "xyz"
> batch 3 : "abc"
>
> and now for the rest of the stream, the keys "abc" or "xyz" never occur.
>
> At the end of the third batch, the generated window rdd has
> { "abc" -> count = 2, "xyz" -> count = 1 }.
> When the first batch falls off after 1800 seconds, it will become
> { "abc -> count = 1, "xyz" -> count = 1 }.
> 60 seconds later, it will become
> { "abc" -> count = 1, "xyz" -> count = 0 }
> and a further 60 seconds later, the 3rd batch is removed from the window
> and the new window rdd becomes
> { "abc" -> count = 0, "xyz" -> count = 0 }.
>
> I hope you can see what is wrong with this. These keys will be perpetually
> held in memory even though there is no need for them to be there. This
> growth in the size of the generated window rdd is what's giving rise to the
> deteriorating processing time in your case.
>
> A filter function that's equivalent of "count != 0" will suffice to
> remember only those keys that have not become "Zero".
>
> HTH,
> NB
>
>
>
> On Thu, Jun 16, 2016 at 8:12 PM, Roshan Singh 
> wrote:
>
>> Hi,
>> According to the docs (
>> https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.DStream.reduceByKeyAndWindow),
>> filerFunc can be used to retain expiring keys. I do not want to retain any
>> expiring key, so I do not understand how can this help me stabilize it.
>> Please correct me if this is not the case.
>>
>> I am also specifying both reduceFunc and invReduceFunc. Can you can a
>> sample code of what you are using.
>>
>> Thanks.
>>
>> On Fri, Jun 17, 2016 at 3:43 AM, N B  wrote:
>>
>>> We had this same issue with the reduceByKeyAndWindow API that you are
>>> using. For fixing this issue, you have to use  different flavor of that
>>> API, specifically the 2 versions that allow you to give a 'Filter function'
>>> to them. Putting in the filter functions helped stabilize our application
>>> too.
>>>
>>> HTH
>>> NB
>>>
>>>
>>> On Sun, Jun 12, 2016 at 11:19 PM, Roshan Singh >> > wrote:
>>>
 Hi all,
 I have a python streaming job which is supposed to run 24x7. I am
 unable to stabilize it. The job just counts no of links shared in a 30
 minute sliding window. I am using reduceByKeyAndWindow operation with a
 batch of 30 seconds, slide interval of 60 seconds.

 The kafka queue has a rate of nearly 2200 messages/second which can
 increase to 3000 but the mean is 2200.

 I have played around with batch size, slide interval, and by increasing
 parallelism with no fruitful result. These just delay the destabilization.

 GC time is usually between 60-100 ms.

 I also noticed that the jobs were not distributed to other nodes in the
 spark UI, for which I have used configured spark.locality.wait as 100ms.
 After which I have noticed that the job is getting distributed properly.

 I have a cluster of 6 slaves and one master each with 16 cores and 15gb
 of ram.

 Code and configuration: http://pastebin.com/93DMSiji

 Streaming screenshot: http://imgur.com/psNfjwJ

 I need help in debugging the issue. Any help will be appreciated.

 --
 Roshan Singh


>>>
>>
>>
>> --
>> Roshan Singh
>> http://roshansingh.in
>>
>
>


-- 
Roshan Singh
http://roshansingh.in


Re: Spark Kafka stream processing time increasing gradually

2016-06-20 Thread N B
Its actually necessary to retire keys that become "Zero" or "Empty" so to
speak. In your case, the key is "imageURL" and values are a dictionary, one
of whose fields is "count" that you are maintaining. For simplicity and
illustration's sake I will assume imageURL to be a strings like "abc". Your
slide duration is 60 and window duration is 1800 seconds.

Now consider the following chain of events in your stream.

batch 1 : "abc"
batch 2 : "xyz"
batch 3 : "abc"

and now for the rest of the stream, the keys "abc" or "xyz" never occur.

At the end of the third batch, the generated window rdd has
{ "abc" -> count = 2, "xyz" -> count = 1 }.
When the first batch falls off after 1800 seconds, it will become
{ "abc -> count = 1, "xyz" -> count = 1 }.
60 seconds later, it will become
{ "abc" -> count = 1, "xyz" -> count = 0 }
and a further 60 seconds later, the 3rd batch is removed from the window
and the new window rdd becomes
{ "abc" -> count = 0, "xyz" -> count = 0 }.

I hope you can see what is wrong with this. These keys will be perpetually
held in memory even though there is no need for them to be there. This
growth in the size of the generated window rdd is what's giving rise to the
deteriorating processing time in your case.

A filter function that's equivalent of "count != 0" will suffice to
remember only those keys that have not become "Zero".

HTH,
NB



On Thu, Jun 16, 2016 at 8:12 PM, Roshan Singh 
wrote:

> Hi,
> According to the docs (
> https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.DStream.reduceByKeyAndWindow),
> filerFunc can be used to retain expiring keys. I do not want to retain any
> expiring key, so I do not understand how can this help me stabilize it.
> Please correct me if this is not the case.
>
> I am also specifying both reduceFunc and invReduceFunc. Can you can a
> sample code of what you are using.
>
> Thanks.
>
> On Fri, Jun 17, 2016 at 3:43 AM, N B  wrote:
>
>> We had this same issue with the reduceByKeyAndWindow API that you are
>> using. For fixing this issue, you have to use  different flavor of that
>> API, specifically the 2 versions that allow you to give a 'Filter function'
>> to them. Putting in the filter functions helped stabilize our application
>> too.
>>
>> HTH
>> NB
>>
>>
>> On Sun, Jun 12, 2016 at 11:19 PM, Roshan Singh 
>> wrote:
>>
>>> Hi all,
>>> I have a python streaming job which is supposed to run 24x7. I am unable
>>> to stabilize it. The job just counts no of links shared in a 30 minute
>>> sliding window. I am using reduceByKeyAndWindow operation with a batch of
>>> 30 seconds, slide interval of 60 seconds.
>>>
>>> The kafka queue has a rate of nearly 2200 messages/second which can
>>> increase to 3000 but the mean is 2200.
>>>
>>> I have played around with batch size, slide interval, and by increasing
>>> parallelism with no fruitful result. These just delay the destabilization.
>>>
>>> GC time is usually between 60-100 ms.
>>>
>>> I also noticed that the jobs were not distributed to other nodes in the
>>> spark UI, for which I have used configured spark.locality.wait as 100ms.
>>> After which I have noticed that the job is getting distributed properly.
>>>
>>> I have a cluster of 6 slaves and one master each with 16 cores and 15gb
>>> of ram.
>>>
>>> Code and configuration: http://pastebin.com/93DMSiji
>>>
>>> Streaming screenshot: http://imgur.com/psNfjwJ
>>>
>>> I need help in debugging the issue. Any help will be appreciated.
>>>
>>> --
>>> Roshan Singh
>>>
>>>
>>
>
>
> --
> Roshan Singh
> http://roshansingh.in
>


Re: Spark Kafka stream processing time increasing gradually

2016-06-16 Thread Roshan Singh
Hi,
According to the docs (
https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.DStream.reduceByKeyAndWindow),
filerFunc can be used to retain expiring keys. I do not want to retain any
expiring key, so I do not understand how can this help me stabilize it.
Please correct me if this is not the case.

I am also specifying both reduceFunc and invReduceFunc. Can you can a
sample code of what you are using.

Thanks.

On Fri, Jun 17, 2016 at 3:43 AM, N B  wrote:

> We had this same issue with the reduceByKeyAndWindow API that you are
> using. For fixing this issue, you have to use  different flavor of that
> API, specifically the 2 versions that allow you to give a 'Filter function'
> to them. Putting in the filter functions helped stabilize our application
> too.
>
> HTH
> NB
>
>
> On Sun, Jun 12, 2016 at 11:19 PM, Roshan Singh 
> wrote:
>
>> Hi all,
>> I have a python streaming job which is supposed to run 24x7. I am unable
>> to stabilize it. The job just counts no of links shared in a 30 minute
>> sliding window. I am using reduceByKeyAndWindow operation with a batch of
>> 30 seconds, slide interval of 60 seconds.
>>
>> The kafka queue has a rate of nearly 2200 messages/second which can
>> increase to 3000 but the mean is 2200.
>>
>> I have played around with batch size, slide interval, and by increasing
>> parallelism with no fruitful result. These just delay the destabilization.
>>
>> GC time is usually between 60-100 ms.
>>
>> I also noticed that the jobs were not distributed to other nodes in the
>> spark UI, for which I have used configured spark.locality.wait as 100ms.
>> After which I have noticed that the job is getting distributed properly.
>>
>> I have a cluster of 6 slaves and one master each with 16 cores and 15gb
>> of ram.
>>
>> Code and configuration: http://pastebin.com/93DMSiji
>>
>> Streaming screenshot: http://imgur.com/psNfjwJ
>>
>> I need help in debugging the issue. Any help will be appreciated.
>>
>> --
>> Roshan Singh
>>
>>
>


-- 
Roshan Singh
http://roshansingh.in


Re: Spark Kafka stream processing time increasing gradually

2016-06-16 Thread N B
We had this same issue with the reduceByKeyAndWindow API that you are
using. For fixing this issue, you have to use  different flavor of that
API, specifically the 2 versions that allow you to give a 'Filter function'
to them. Putting in the filter functions helped stabilize our application
too.

HTH
NB


On Sun, Jun 12, 2016 at 11:19 PM, Roshan Singh 
wrote:

> Hi all,
> I have a python streaming job which is supposed to run 24x7. I am unable
> to stabilize it. The job just counts no of links shared in a 30 minute
> sliding window. I am using reduceByKeyAndWindow operation with a batch of
> 30 seconds, slide interval of 60 seconds.
>
> The kafka queue has a rate of nearly 2200 messages/second which can
> increase to 3000 but the mean is 2200.
>
> I have played around with batch size, slide interval, and by increasing
> parallelism with no fruitful result. These just delay the destabilization.
>
> GC time is usually between 60-100 ms.
>
> I also noticed that the jobs were not distributed to other nodes in the
> spark UI, for which I have used configured spark.locality.wait as 100ms.
> After which I have noticed that the job is getting distributed properly.
>
> I have a cluster of 6 slaves and one master each with 16 cores and 15gb of
> ram.
>
> Code and configuration: http://pastebin.com/93DMSiji
>
> Streaming screenshot: http://imgur.com/psNfjwJ
>
> I need help in debugging the issue. Any help will be appreciated.
>
> --
> Roshan Singh
>
>


Re: Spark + Kafka processing trouble

2016-05-31 Thread Malcolm Lockyer
Thanks for the suggestions. I agree that there isn't some magic
configuration setting, or that the sql options have some flaw - I just
intended to explain the frustration of having a non-trivial (but still
simple) Spark streaming job running on tiny amounts of data performing
absolutely horribly.

.count() is something I was adding to try and force calculation and
agree it might not be the best of tests.

On Wed, Jun 1, 2016 at 2:34 AM, Cody Koeninger  wrote:
> There isn't a magic spark configuration setting that would account for
> multiple-second-long fixed overheads, you should be looking at maybe
> 200ms minimum for a streaming batch.  1024 kafka topicpartitions is
> not reasonable for the volume you're talking about.  Unless you have
> really extreme workloads, 32 or 64 is a better starting guess.
>
> Rather than jumping to conclusions about sql operations being the
> problem, start from the very beginning.  Read a stream of messages
> from kafka and just do .foreach(println), at a reasonable batch size
> (say 500ms or a second), and see how that keeps up in your
> environment.  Don't use take(), don't use count(), don't use print(),
> since they may have non-obvious performance implications.
>
> If that works, add on further operations one step at a time and see
> when issues arise.
>
> On Mon, May 30, 2016 at 8:45 PM, Malcolm Lockyer
>  wrote:
>> Hopefully this is not off topic for this list, but I am hoping to
>> reach some people who have used Kafka + Spark before.
>>
>> We are new to Spark and are setting up our first production
>> environment and hitting a speed issue that maybe configuration related
>> - and we have little experience in configuring Spark environments.
>>
>> So we've got a Spark streaming job that seems to take an inordinate
>> amount of time to process. I realize that without specifics, it is
>> difficult to trace - however the most basic primitives in Spark are
>> performing horribly. The lazy nature of Spark is making it difficult
>> for me to understand what is happening - any suggestions are very much
>> appreciated.
>>
>> Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
>> Kafka and PostgreSQL, both local. The job is designed to:
>>
>> a) grab some data from Kafka
>> b) correlate with existing data in PostgreSQL
>> c) output data to Kafka
>>
>> I am isolating timings by calling System.nanoTime() before and after
>> something that forces calculation, for example .count() on a
>> DataFrame. It seems like every operation has a MASSIVE fixed overhead
>> and that is stacking up making each iteration on the RDD extremely
>> slow. Slow operations include pulling a single item from the Kafka
>> queue, running a simple query against PostgresSQL, and running a Spark
>> aggregation on a RDD with a handful of rows.
>>
>> The machine is not maxing out on memory, disk or CPU. The machine
>> seems to be doing nothing for a high percentage of the execution time.
>> We have reproduced this behavior on two other machines. So we're
>> suspecting a configuration issue
>>
>> As a concrete example, we have a DataFrame produced by running a JDBC
>> query by mapping over an RDD from Kafka. Calling count() (I guess
>> forcing execution) on this DataFrame when there is *1* item/row (Note:
>> SQL database is EMPTY at this point so this is not a factor) takes 4.5
>> seconds, calling count when there are 10,000 items takes 7 seconds.
>>
>> Can anybody offer experience of something like this happening for
>> them? Any suggestions on how to understand what is going wrong?
>>
>> I have tried tuning the number of Kafka partitions - increasing this
>> seems to increase the concurrency and ultimately number of things
>> processed per minute, but to get something half decent, I'm going to
>> need running with 1024 or more partitions. Is 1024 partitions a
>> reasonable number? What do you use in you environments?
>>
>> I've tried different options for batchDuration. The calculation seems
>> to be batchDuration * Kafka partitions for number of items per
>> iteration, but this is always still extremely slow (many per iteration
>> vs. very few doesn't seem to really improve things). Can you suggest a
>> list of the Spark configuration parameters related to speed that you
>> think are key - preferably with the values you use for those
>> parameters?
>>
>> I'd really really appreciate any help or suggestions as I've been
>> working on this speed issue for 3 days without success and my head is
>> starting to hurt. Thanks in advance.
>>
>>
>>
>> Thanks,
>>
>> --
>>
>> Malcolm Lockyer
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>



-- 

Malcolm Lockyer
M: +64 21 258 6121
Level 10, 99 Queen Street, Auckland, New Zealand
hapara.com  ●  @hapara_team

Check out this video!


Re: Spark + Kafka processing trouble

2016-05-31 Thread Cody Koeninger
>  500ms is I believe the minimum batch interval for Spark micro batching.

It's better to test than to believe, I've run 250ms jobs.  Same
applies to the comments around JDBC, why assume when you could
(dis)prove?  It's not like it's a lot of effort to set up a minimal
job that does foreach(println) from kafka.

On Tue, May 31, 2016 at 9:59 AM, Mich Talebzadeh
 wrote:
> 500ms is I believe the minimum batch interval for Spark micro batching.
>
> However, a JDBC call is a use of Unix file descriptor and context switch and
> it does have performance implication. That is irrespective of Kafka as it is
> happening one is actually going through Hive JDBC.
>
> It is a classic data access issue. Opening and closing JDBC connection once
> every 0.5 second is very problematic.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 31 May 2016 at 15:34, Cody Koeninger  wrote:
>>
>> There isn't a magic spark configuration setting that would account for
>> multiple-second-long fixed overheads, you should be looking at maybe
>> 200ms minimum for a streaming batch.  1024 kafka topicpartitions is
>> not reasonable for the volume you're talking about.  Unless you have
>> really extreme workloads, 32 or 64 is a better starting guess.
>>
>> Rather than jumping to conclusions about sql operations being the
>> problem, start from the very beginning.  Read a stream of messages
>> from kafka and just do .foreach(println), at a reasonable batch size
>> (say 500ms or a second), and see how that keeps up in your
>> environment.  Don't use take(), don't use count(), don't use print(),
>> since they may have non-obvious performance implications.
>>
>> If that works, add on further operations one step at a time and see
>> when issues arise.
>>
>> On Mon, May 30, 2016 at 8:45 PM, Malcolm Lockyer
>>  wrote:
>> > Hopefully this is not off topic for this list, but I am hoping to
>> > reach some people who have used Kafka + Spark before.
>> >
>> > We are new to Spark and are setting up our first production
>> > environment and hitting a speed issue that maybe configuration related
>> > - and we have little experience in configuring Spark environments.
>> >
>> > So we've got a Spark streaming job that seems to take an inordinate
>> > amount of time to process. I realize that without specifics, it is
>> > difficult to trace - however the most basic primitives in Spark are
>> > performing horribly. The lazy nature of Spark is making it difficult
>> > for me to understand what is happening - any suggestions are very much
>> > appreciated.
>> >
>> > Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
>> > Kafka and PostgreSQL, both local. The job is designed to:
>> >
>> > a) grab some data from Kafka
>> > b) correlate with existing data in PostgreSQL
>> > c) output data to Kafka
>> >
>> > I am isolating timings by calling System.nanoTime() before and after
>> > something that forces calculation, for example .count() on a
>> > DataFrame. It seems like every operation has a MASSIVE fixed overhead
>> > and that is stacking up making each iteration on the RDD extremely
>> > slow. Slow operations include pulling a single item from the Kafka
>> > queue, running a simple query against PostgresSQL, and running a Spark
>> > aggregation on a RDD with a handful of rows.
>> >
>> > The machine is not maxing out on memory, disk or CPU. The machine
>> > seems to be doing nothing for a high percentage of the execution time.
>> > We have reproduced this behavior on two other machines. So we're
>> > suspecting a configuration issue
>> >
>> > As a concrete example, we have a DataFrame produced by running a JDBC
>> > query by mapping over an RDD from Kafka. Calling count() (I guess
>> > forcing execution) on this DataFrame when there is *1* item/row (Note:
>> > SQL database is EMPTY at this point so this is not a factor) takes 4.5
>> > seconds, calling count when there are 10,000 items takes 7 seconds.
>> >
>> > Can anybody offer experience of something like this happening for
>> > them? Any suggestions on how to understand what is going wrong?
>> >
>> > I have tried tuning the number of Kafka partitions - increasing this
>> > seems to increase the concurrency and ultimately number of things
>> > processed per minute, but to get something half decent, I'm going to
>> > need running with 1024 or more partitions. Is 1024 partitions a
>> > reasonable number? What do you use in you environments?
>> >
>> > I've tried different options for batchDuration. The calculation seems
>> > to be batchDuration * Kafka partitions for number of items per
>> > iteration, but this is always still extremely slow (many per iteration
>> > vs. very few doesn't seem to really improve things). Can you suggest a
>> > list of the Spark configuration parameters related 

Re: Spark + Kafka processing trouble

2016-05-31 Thread Mich Talebzadeh
500ms is I believe the minimum batch interval for Spark micro batching.

However, a JDBC call is a use of Unix file descriptor and context switch
and it does have performance implication. That is irrespective of Kafka as
it is happening one is actually going through Hive JDBC.

It is a classic data access issue. Opening and closing JDBC connection once
every 0.5 second is very problematic.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 31 May 2016 at 15:34, Cody Koeninger  wrote:

> There isn't a magic spark configuration setting that would account for
> multiple-second-long fixed overheads, you should be looking at maybe
> 200ms minimum for a streaming batch.  1024 kafka topicpartitions is
> not reasonable for the volume you're talking about.  Unless you have
> really extreme workloads, 32 or 64 is a better starting guess.
>
> Rather than jumping to conclusions about sql operations being the
> problem, start from the very beginning.  Read a stream of messages
> from kafka and just do .foreach(println), at a reasonable batch size
> (say 500ms or a second), and see how that keeps up in your
> environment.  Don't use take(), don't use count(), don't use print(),
> since they may have non-obvious performance implications.
>
> If that works, add on further operations one step at a time and see
> when issues arise.
>
> On Mon, May 30, 2016 at 8:45 PM, Malcolm Lockyer
>  wrote:
> > Hopefully this is not off topic for this list, but I am hoping to
> > reach some people who have used Kafka + Spark before.
> >
> > We are new to Spark and are setting up our first production
> > environment and hitting a speed issue that maybe configuration related
> > - and we have little experience in configuring Spark environments.
> >
> > So we've got a Spark streaming job that seems to take an inordinate
> > amount of time to process. I realize that without specifics, it is
> > difficult to trace - however the most basic primitives in Spark are
> > performing horribly. The lazy nature of Spark is making it difficult
> > for me to understand what is happening - any suggestions are very much
> > appreciated.
> >
> > Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
> > Kafka and PostgreSQL, both local. The job is designed to:
> >
> > a) grab some data from Kafka
> > b) correlate with existing data in PostgreSQL
> > c) output data to Kafka
> >
> > I am isolating timings by calling System.nanoTime() before and after
> > something that forces calculation, for example .count() on a
> > DataFrame. It seems like every operation has a MASSIVE fixed overhead
> > and that is stacking up making each iteration on the RDD extremely
> > slow. Slow operations include pulling a single item from the Kafka
> > queue, running a simple query against PostgresSQL, and running a Spark
> > aggregation on a RDD with a handful of rows.
> >
> > The machine is not maxing out on memory, disk or CPU. The machine
> > seems to be doing nothing for a high percentage of the execution time.
> > We have reproduced this behavior on two other machines. So we're
> > suspecting a configuration issue
> >
> > As a concrete example, we have a DataFrame produced by running a JDBC
> > query by mapping over an RDD from Kafka. Calling count() (I guess
> > forcing execution) on this DataFrame when there is *1* item/row (Note:
> > SQL database is EMPTY at this point so this is not a factor) takes 4.5
> > seconds, calling count when there are 10,000 items takes 7 seconds.
> >
> > Can anybody offer experience of something like this happening for
> > them? Any suggestions on how to understand what is going wrong?
> >
> > I have tried tuning the number of Kafka partitions - increasing this
> > seems to increase the concurrency and ultimately number of things
> > processed per minute, but to get something half decent, I'm going to
> > need running with 1024 or more partitions. Is 1024 partitions a
> > reasonable number? What do you use in you environments?
> >
> > I've tried different options for batchDuration. The calculation seems
> > to be batchDuration * Kafka partitions for number of items per
> > iteration, but this is always still extremely slow (many per iteration
> > vs. very few doesn't seem to really improve things). Can you suggest a
> > list of the Spark configuration parameters related to speed that you
> > think are key - preferably with the values you use for those
> > parameters?
> >
> > I'd really really appreciate any help or suggestions as I've been
> > working on this speed issue for 3 days without success and my head is
> > starting to hurt. Thanks in advance.
> >
> >
> >
> > Thanks,
> >
> > --
> >
> > Malcolm Lockyer
> >
> > -
> 

Re: Spark + Kafka processing trouble

2016-05-31 Thread Cody Koeninger
There isn't a magic spark configuration setting that would account for
multiple-second-long fixed overheads, you should be looking at maybe
200ms minimum for a streaming batch.  1024 kafka topicpartitions is
not reasonable for the volume you're talking about.  Unless you have
really extreme workloads, 32 or 64 is a better starting guess.

Rather than jumping to conclusions about sql operations being the
problem, start from the very beginning.  Read a stream of messages
from kafka and just do .foreach(println), at a reasonable batch size
(say 500ms or a second), and see how that keeps up in your
environment.  Don't use take(), don't use count(), don't use print(),
since they may have non-obvious performance implications.

If that works, add on further operations one step at a time and see
when issues arise.

On Mon, May 30, 2016 at 8:45 PM, Malcolm Lockyer
 wrote:
> Hopefully this is not off topic for this list, but I am hoping to
> reach some people who have used Kafka + Spark before.
>
> We are new to Spark and are setting up our first production
> environment and hitting a speed issue that maybe configuration related
> - and we have little experience in configuring Spark environments.
>
> So we've got a Spark streaming job that seems to take an inordinate
> amount of time to process. I realize that without specifics, it is
> difficult to trace - however the most basic primitives in Spark are
> performing horribly. The lazy nature of Spark is making it difficult
> for me to understand what is happening - any suggestions are very much
> appreciated.
>
> Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
> Kafka and PostgreSQL, both local. The job is designed to:
>
> a) grab some data from Kafka
> b) correlate with existing data in PostgreSQL
> c) output data to Kafka
>
> I am isolating timings by calling System.nanoTime() before and after
> something that forces calculation, for example .count() on a
> DataFrame. It seems like every operation has a MASSIVE fixed overhead
> and that is stacking up making each iteration on the RDD extremely
> slow. Slow operations include pulling a single item from the Kafka
> queue, running a simple query against PostgresSQL, and running a Spark
> aggregation on a RDD with a handful of rows.
>
> The machine is not maxing out on memory, disk or CPU. The machine
> seems to be doing nothing for a high percentage of the execution time.
> We have reproduced this behavior on two other machines. So we're
> suspecting a configuration issue
>
> As a concrete example, we have a DataFrame produced by running a JDBC
> query by mapping over an RDD from Kafka. Calling count() (I guess
> forcing execution) on this DataFrame when there is *1* item/row (Note:
> SQL database is EMPTY at this point so this is not a factor) takes 4.5
> seconds, calling count when there are 10,000 items takes 7 seconds.
>
> Can anybody offer experience of something like this happening for
> them? Any suggestions on how to understand what is going wrong?
>
> I have tried tuning the number of Kafka partitions - increasing this
> seems to increase the concurrency and ultimately number of things
> processed per minute, but to get something half decent, I'm going to
> need running with 1024 or more partitions. Is 1024 partitions a
> reasonable number? What do you use in you environments?
>
> I've tried different options for batchDuration. The calculation seems
> to be batchDuration * Kafka partitions for number of items per
> iteration, but this is always still extremely slow (many per iteration
> vs. very few doesn't seem to really improve things). Can you suggest a
> list of the Spark configuration parameters related to speed that you
> think are key - preferably with the values you use for those
> parameters?
>
> I'd really really appreciate any help or suggestions as I've been
> working on this speed issue for 3 days without success and my head is
> starting to hurt. Thanks in advance.
>
>
>
> Thanks,
>
> --
>
> Malcolm Lockyer
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark + Kafka processing trouble

2016-05-31 Thread Alonso Isidoro Roman
Mich`s idea is quite fine, if i was you, i will follow his idea...

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman


2016-05-31 6:37 GMT+02:00 Mich Talebzadeh :

> how are you getting your data from the database. Are you using JDBC.
>
> Can you actually call the database first (assuming the same data, put it
> in temp table in Spark and cache it for the duration of windows length and
> use the data from the cached table?
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 31 May 2016 at 04:19, Malcolm Lockyer 
> wrote:
>
>> On Tue, May 31, 2016 at 3:14 PM, Darren Govoni 
>> wrote:
>> > Well that could be the problem. A SQL database is essential a big
>> > synchronizer. If you have a lot of spark tasks all bottlenecking on a
>> single
>> > database socket (is the database clustered or colocated with spark
>> workers?)
>> > then you will have blocked threads on the database server.
>>
>> Totally agree this could be a big killer to scaling up, we are
>> planning to migrate. But in the meantime we are seeing such big issues
>> with test data of only a few records (1, 2, 1024 etc.) produced to
>> Kafka. Currently the database is NOT busy (CPU, memory and IO usage
>> from the DB is tiny).
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark + Kafka processing trouble

2016-05-30 Thread Mich Talebzadeh
how are you getting your data from the database. Are you using JDBC.

Can you actually call the database first (assuming the same data, put it in
temp table in Spark and cache it for the duration of windows length and use
the data from the cached table?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 31 May 2016 at 04:19, Malcolm Lockyer  wrote:

> On Tue, May 31, 2016 at 3:14 PM, Darren Govoni 
> wrote:
> > Well that could be the problem. A SQL database is essential a big
> > synchronizer. If you have a lot of spark tasks all bottlenecking on a
> single
> > database socket (is the database clustered or colocated with spark
> workers?)
> > then you will have blocked threads on the database server.
>
> Totally agree this could be a big killer to scaling up, we are
> planning to migrate. But in the meantime we are seeing such big issues
> with test data of only a few records (1, 2, 1024 etc.) produced to
> Kafka. Currently the database is NOT busy (CPU, memory and IO usage
> from the DB is tiny).
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark + Kafka processing trouble

2016-05-30 Thread Malcolm Lockyer
On Tue, May 31, 2016 at 3:14 PM, Darren Govoni  wrote:
> Well that could be the problem. A SQL database is essential a big
> synchronizer. If you have a lot of spark tasks all bottlenecking on a single
> database socket (is the database clustered or colocated with spark workers?)
> then you will have blocked threads on the database server.

Totally agree this could be a big killer to scaling up, we are
planning to migrate. But in the meantime we are seeing such big issues
with test data of only a few records (1, 2, 1024 etc.) produced to
Kafka. Currently the database is NOT busy (CPU, memory and IO usage
from the DB is tiny).

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark + Kafka processing trouble

2016-05-30 Thread Darren Govoni


Well that could be the problem. A SQL database is essential a big synchronizer. 
If you have a lot of spark tasks all bottlenecking on a single database socket 
(is the database clustered or colocated with spark workers?) then you will have 
blocked threads on the database server.


Sent from my Verizon Wireless 4G LTE smartphone

 Original message 
From: Malcolm Lockyer <malcolm.lock...@hapara.com> 
Date: 05/30/2016  10:40 PM  (GMT-05:00) 
To: user@spark.apache.org 
Subject: Re: Spark + Kafka processing trouble 

On Tue, May 31, 2016 at 1:56 PM, Darren Govoni <dar...@ontrenet.com> wrote:
> So you are calling a SQL query (to a single database) within a spark
> operation distributed across your workers?

Yes, but currently with very small sets of data (1-10,000) and on a
single (dev) machine right now.





(sorry didn't reply to the list)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark + Kafka processing trouble

2016-05-30 Thread Malcolm Lockyer
On Tue, May 31, 2016 at 1:56 PM, Darren Govoni  wrote:
> So you are calling a SQL query (to a single database) within a spark
> operation distributed across your workers?

Yes, but currently with very small sets of data (1-10,000) and on a
single (dev) machine right now.





(sorry didn't reply to the list)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark + Kafka processing trouble

2016-05-30 Thread Darren Govoni


So you are calling a SQL query (to a single database) within a spark operation 
distributed across your workers? 


Sent from my Verizon Wireless 4G LTE smartphone

 Original message 
From: Malcolm Lockyer  
Date: 05/30/2016  9:45 PM  (GMT-05:00) 
To: user@spark.apache.org 
Subject: Spark + Kafka processing trouble 

Hopefully this is not off topic for this list, but I am hoping to
reach some people who have used Kafka + Spark before.

We are new to Spark and are setting up our first production
environment and hitting a speed issue that maybe configuration related
- and we have little experience in configuring Spark environments.

So we've got a Spark streaming job that seems to take an inordinate
amount of time to process. I realize that without specifics, it is
difficult to trace - however the most basic primitives in Spark are
performing horribly. The lazy nature of Spark is making it difficult
for me to understand what is happening - any suggestions are very much
appreciated.

Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
Kafka and PostgreSQL, both local. The job is designed to:

a) grab some data from Kafka
b) correlate with existing data in PostgreSQL
c) output data to Kafka

I am isolating timings by calling System.nanoTime() before and after
something that forces calculation, for example .count() on a
DataFrame. It seems like every operation has a MASSIVE fixed overhead
and that is stacking up making each iteration on the RDD extremely
slow. Slow operations include pulling a single item from the Kafka
queue, running a simple query against PostgresSQL, and running a Spark
aggregation on a RDD with a handful of rows.

The machine is not maxing out on memory, disk or CPU. The machine
seems to be doing nothing for a high percentage of the execution time.
We have reproduced this behavior on two other machines. So we're
suspecting a configuration issue

As a concrete example, we have a DataFrame produced by running a JDBC
query by mapping over an RDD from Kafka. Calling count() (I guess
forcing execution) on this DataFrame when there is *1* item/row (Note:
SQL database is EMPTY at this point so this is not a factor) takes 4.5
seconds, calling count when there are 10,000 items takes 7 seconds.

Can anybody offer experience of something like this happening for
them? Any suggestions on how to understand what is going wrong?

I have tried tuning the number of Kafka partitions - increasing this
seems to increase the concurrency and ultimately number of things
processed per minute, but to get something half decent, I'm going to
need running with 1024 or more partitions. Is 1024 partitions a
reasonable number? What do you use in you environments?

I've tried different options for batchDuration. The calculation seems
to be batchDuration * Kafka partitions for number of items per
iteration, but this is always still extremely slow (many per iteration
vs. very few doesn't seem to really improve things). Can you suggest a
list of the Spark configuration parameters related to speed that you
think are key - preferably with the values you use for those
parameters?

I'd really really appreciate any help or suggestions as I've been
working on this speed issue for 3 days without success and my head is
starting to hurt. Thanks in advance.



Thanks,

--

Malcolm Lockyer

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark + Kafka all messages being used in 1 batch

2016-03-06 Thread Shahbaz
   - Do you happen to see how busy are the nodes in terms of CPU and how
   much heap each executor is allocated with.
   - If there is enough capacity ,you may want to increase number of cores
   per executor to 2 and do the needed heap tweaking.
   - How much time did it take to process 4M+ events (In Spark UI,you can
   look at Duration) column.
   - I believe Reader is Quite fast ,however Processing could be slower ,if
   you click on the Job,it gives you break down of execution,Result
   Serialization etc ,you may want to look at that and drive from there.


Regards,
Shahbaz

On Sun, Mar 6, 2016 at 9:26 PM, Vinti Maheshwari 
wrote:

> I have 2 machines in my cluster with the below specifications:
> 128 GB RAM and 8 cores machine
>
> Regards,
> ~Vinti
>
> On Sun, Mar 6, 2016 at 7:54 AM, Vinti Maheshwari 
> wrote:
>
>> Thanks Supreeth and Shahbaz. I will try adding
>> spark.streaming.kafka.maxRatePerPartition.
>>
>> Hi Shahbaz,
>>
>> Please see comments, inline:
>>
>>
>>- Which version of Spark you are using. ==> *1.5.2*
>>- How big is the Kafka Cluster ==> *2 brokers*
>>- What is the Message Size and type.==>
>> *String, 9,550 bytes (around) *
>>- How big is the spark cluster (How many executors ,How many cores
>>Per Executor)==>* 2 Nodes, 16 executors, 1 core per executor*
>>- What does your Spark Job looks like ==>
>>
>>
>>val messages = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](
>>  ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2)
>>
>>
>>  val parsedStream = inputStream
>>.map(line => {
>>  val splitLines = line.split(",")
>>  (splitLines(1), splitLines.slice(2, 
>> splitLines.length).map((_.trim.toLong)))
>>})
>>
>>  val state: DStream[(String, Array[Long])] = 
>> parsedStream.updateStateByKey(
>>(current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>>  prev.map(_ +: current).orElse(Some(current))
>>.flatMap(as => Try(as.map(BDV(_)).reduce(_ + 
>> _).toArray).toOption)
>>})
>>  state.checkpoint(Duration(25000))
>>  state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase
>>  ssc
>>}
>>
>>
>> spark.streaming.backpressure.enabled set it to true and try?
>>  ==>
>>
>>
>> *yes, i had enabled it.*
>> Regards,
>> ~Vinti
>>
>> On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz  wrote:
>>
>>> Hello,
>>>
>>>- Which version of Spark you are using.
>>>- How big is the Kafka Cluster
>>>- What is the Message Size and type.
>>>- How big is the spark cluster (How many executors ,How many cores
>>>Per Executor)
>>>- What does your Spark Job looks like .
>>>
>>> spark.streaming.backpressure.enabled set it to true and try?
>>>
>>>
>>> Regards,
>>> Shahbaz
>>> +91-9986850670
>>>
>>> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth 
>>> wrote:
>>>
 Try setting spark.streaming.kafka.maxRatePerPartition, this can help
 control the number of messages read from Kafka per partition on the spark
 streaming consumer.

 -S


 On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari 
 wrote:

 Hello,

 I am trying to figure out why my kafka+spark job is running slow. I
 found that spark is consuming all the messages out of kafka into a single
 batch itself and not sending any messages to the other batches.

 2016/03/05 21:57:05
 
 0 events - - queued 2016/03/05 21:57:00
 
 0 events - - queued 2016/03/05 21:56:55
 
 0 events - - queued 2016/03/05 21:56:50
 
 0 events - - queued 2016/03/05 21:56:45
 
 0 events - - queued 2016/03/05 21:56:40
 
 4039573 events 6 ms - processing

 Does anyone know how this behavior can be changed so that the number of
 messages are load balanced across all the batches?

 Thanks,
 Vinti


>>>
>>
>


Re: Spark + Kafka all messages being used in 1 batch

2016-03-06 Thread Vinti Maheshwari
I have 2 machines in my cluster with the below specifications:
128 GB RAM and 8 cores machine

Regards,
~Vinti

On Sun, Mar 6, 2016 at 7:54 AM, Vinti Maheshwari 
wrote:

> Thanks Supreeth and Shahbaz. I will try adding
> spark.streaming.kafka.maxRatePerPartition.
>
> Hi Shahbaz,
>
> Please see comments, inline:
>
>
>- Which version of Spark you are using. ==> *1.5.2*
>- How big is the Kafka Cluster ==> *2 brokers*
>- What is the Message Size and type.==>
> *String, 9,550 bytes (around) *
>- How big is the spark cluster (How many executors ,How many cores Per
>Executor)==>* 2 Nodes, 16 executors, 1 core per executor*
>- What does your Spark Job looks like ==>
>
>
>val messages = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](
>  ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2)
>
>
>  val parsedStream = inputStream
>.map(line => {
>  val splitLines = line.split(",")
>  (splitLines(1), splitLines.slice(2, 
> splitLines.length).map((_.trim.toLong)))
>})
>
>  val state: DStream[(String, Array[Long])] = 
> parsedStream.updateStateByKey(
>(current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>  prev.map(_ +: current).orElse(Some(current))
>.flatMap(as => Try(as.map(BDV(_)).reduce(_ + 
> _).toArray).toOption)
>})
>  state.checkpoint(Duration(25000))
>  state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase
>  ssc
>}
>
>
> spark.streaming.backpressure.enabled set it to true and try?
>  ==>
>
>
> *yes, i had enabled it.*
> Regards,
> ~Vinti
>
> On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz  wrote:
>
>> Hello,
>>
>>- Which version of Spark you are using.
>>- How big is the Kafka Cluster
>>- What is the Message Size and type.
>>- How big is the spark cluster (How many executors ,How many cores
>>Per Executor)
>>- What does your Spark Job looks like .
>>
>> spark.streaming.backpressure.enabled set it to true and try?
>>
>>
>> Regards,
>> Shahbaz
>> +91-9986850670
>>
>> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth  wrote:
>>
>>> Try setting spark.streaming.kafka.maxRatePerPartition, this can help
>>> control the number of messages read from Kafka per partition on the spark
>>> streaming consumer.
>>>
>>> -S
>>>
>>>
>>> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari 
>>> wrote:
>>>
>>> Hello,
>>>
>>> I am trying to figure out why my kafka+spark job is running slow. I
>>> found that spark is consuming all the messages out of kafka into a single
>>> batch itself and not sending any messages to the other batches.
>>>
>>> 2016/03/05 21:57:05
>>> 
>>> 0 events - - queued 2016/03/05 21:57:00
>>> 
>>> 0 events - - queued 2016/03/05 21:56:55
>>> 
>>> 0 events - - queued 2016/03/05 21:56:50
>>> 
>>> 0 events - - queued 2016/03/05 21:56:45
>>> 
>>> 0 events - - queued 2016/03/05 21:56:40
>>> 
>>> 4039573 events 6 ms - processing
>>>
>>> Does anyone know how this behavior can be changed so that the number of
>>> messages are load balanced across all the batches?
>>>
>>> Thanks,
>>> Vinti
>>>
>>>
>>
>


Re: Spark + Kafka all messages being used in 1 batch

2016-03-06 Thread Vinti Maheshwari
Thanks Supreeth and Shahbaz. I will try adding
spark.streaming.kafka.maxRatePerPartition.

Hi Shahbaz,

Please see comments, inline:


   - Which version of Spark you are using. ==> *1.5.2*
   - How big is the Kafka Cluster ==> *2 brokers*
   - What is the Message Size and type.==>
*String, 9,550 bytes (around) *
   - How big is the spark cluster (How many executors ,How many cores Per
   Executor)==>* 2 Nodes, 16 executors, 1 core per executor*
   - What does your Spark Job looks like ==>


   val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
 ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2)


 val parsedStream = inputStream
   .map(line => {
 val splitLines = line.split(",")
 (splitLines(1), splitLines.slice(2,
splitLines.length).map((_.trim.toLong)))
   })

 val state: DStream[(String, Array[Long])] =
parsedStream.updateStateByKey(
   (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
 prev.map(_ +: current).orElse(Some(current))
   .flatMap(as => Try(as.map(BDV(_)).reduce(_ +
_).toArray).toOption)
   })
 state.checkpoint(Duration(25000))
 state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase
 ssc
   }


spark.streaming.backpressure.enabled set it to true and try?
 ==>


*yes, i had enabled it.*
Regards,
~Vinti

On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz  wrote:

> Hello,
>
>- Which version of Spark you are using.
>- How big is the Kafka Cluster
>- What is the Message Size and type.
>- How big is the spark cluster (How many executors ,How many cores Per
>Executor)
>- What does your Spark Job looks like .
>
> spark.streaming.backpressure.enabled set it to true and try?
>
>
> Regards,
> Shahbaz
> +91-9986850670
>
> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth  wrote:
>
>> Try setting spark.streaming.kafka.maxRatePerPartition, this can help
>> control the number of messages read from Kafka per partition on the spark
>> streaming consumer.
>>
>> -S
>>
>>
>> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari 
>> wrote:
>>
>> Hello,
>>
>> I am trying to figure out why my kafka+spark job is running slow. I found
>> that spark is consuming all the messages out of kafka into a single batch
>> itself and not sending any messages to the other batches.
>>
>> 2016/03/05 21:57:05
>> 
>> 0 events - - queued 2016/03/05 21:57:00
>> 
>> 0 events - - queued 2016/03/05 21:56:55
>> 
>> 0 events - - queued 2016/03/05 21:56:50
>> 
>> 0 events - - queued 2016/03/05 21:56:45
>> 
>> 0 events - - queued 2016/03/05 21:56:40
>> 
>> 4039573 events 6 ms - processing
>>
>> Does anyone know how this behavior can be changed so that the number of
>> messages are load balanced across all the batches?
>>
>> Thanks,
>> Vinti
>>
>>
>


Re: Spark + Kafka all messages being used in 1 batch

2016-03-05 Thread Supreeth
Try setting spark.streaming.kafka.maxRatePerPartition, this can help control 
the number of messages read from Kafka per partition on the spark streaming 
consumer.

-S


> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari  wrote:
> 
> Hello,
> 
> I am trying to figure out why my kafka+spark job is running slow. I found 
> that spark is consuming all the messages out of kafka into a single batch 
> itself and not sending any messages to the other batches.
> 
> 2016/03/05 21:57:05 0 events - - queued 2016/03/05 21:57:00 0 events - - 
> queued 2016/03/05 21:56:55 0 events - - queued 2016/03/05 21:56:50 0 events - 
> - queued 2016/03/05 21:56:45 0 events - - queued 2016/03/05 21:56:40 4039573 
> events 6 ms - processing
> 
> Does anyone know how this behavior can be changed so that the number of 
> messages are load balanced across all the batches?
> 
> Thanks,
> Vinti


Re: Spark Kafka Direct Error

2015-11-24 Thread swetha kasireddy
Is it possible that the kafka offset api is somehow returning the wrong
offsets. Because each time the job fails for different partitions with an
error similar to the error that I get below.

Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times,
most recent failure: Lost task 20.3 in stage 117.0 (TID 2114, 10.227.64.52):
java.lang.AssertionError: assertion failed: Ran out of messages before
reaching ending offset 221572238 for topic hubble_stream partition 88 start
221563725. This should not happen, and indicates that messages may have been
lost

On Tue, Nov 24, 2015 at 6:31 AM, Cody Koeninger  wrote:

> No, the direct stream only communicates with Kafka brokers, not Zookeeper
> directly.  It asks the leader for each topicpartition what the highest
> available offsets are, using the Kafka offset api.
>
> On Mon, Nov 23, 2015 at 11:36 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Does Kafka direct query the offsets from the zookeeper directly? From
>> where does it get the offsets? There is data in those offsets, but somehow
>> Kafka Direct does not seem to pick it up. Other Consumers that use Zoo
>> Keeper Quorum of that Stream seems to be fine. Only Kafka Direct seems to
>> have issues. How does Kafka Direct know which offsets to query after
>> getting the initial batches from  "auto.offset.reset" -> "largest"?
>>
>> On Mon, Nov 23, 2015 at 11:01 AM, Cody Koeninger 
>> wrote:
>>
>>> No, that means that at the time the batch was scheduled, the kafka
>>> leader reported the ending offset was 221572238, but during processing,
>>> kafka stopped returning messages before reaching that ending offset.
>>>
>>> That probably means something got screwed up with Kafka - e.g. you lost
>>> a leader and lost messages in the process.
>>>
>>> On Mon, Nov 23, 2015 at 12:57 PM, swetha 
>>> wrote:
>>>
 Hi,

 I see the following error in my Spark Kafka Direct. Would this mean that
 Kafka Direct is not able to catch up with the messages and is failing?

 Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times,
 most recent failure: Lost task 20.3 in stage 117.0 (TID 2114,
 10.227.64.52):
 java.lang.AssertionError: assertion failed: Ran out of messages before
 reaching ending offset 221572238 for topic hubble_stream partition 88
 start
 221563725. This should not happen, and indicates that messages may have
 been
 lost

 Thanks,
 Swetha



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Error-tp25454.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>


Re: Spark Kafka Direct Error

2015-11-24 Thread Cody Koeninger
Anything's possible, but that sounds pretty unlikely to me.
Are the partitions it's failing for all on the same leader?
Have there been any leader rebalances?
Do you have enough log retention?
If you log the offset for each message as it's processed, when do you see
the problem?

On Tue, Nov 24, 2015 at 10:28 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> Is it possible that the kafka offset api is somehow returning the wrong
> offsets. Because each time the job fails for different partitions with an
> error similar to the error that I get below.
>
> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times,
> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114,
> 10.227.64.52):
> java.lang.AssertionError: assertion failed: Ran out of messages before
> reaching ending offset 221572238 for topic hubble_stream partition 88 start
> 221563725. This should not happen, and indicates that messages may have
> been
> lost
>
> On Tue, Nov 24, 2015 at 6:31 AM, Cody Koeninger 
> wrote:
>
>> No, the direct stream only communicates with Kafka brokers, not Zookeeper
>> directly.  It asks the leader for each topicpartition what the highest
>> available offsets are, using the Kafka offset api.
>>
>> On Mon, Nov 23, 2015 at 11:36 PM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> Does Kafka direct query the offsets from the zookeeper directly? From
>>> where does it get the offsets? There is data in those offsets, but somehow
>>> Kafka Direct does not seem to pick it up. Other Consumers that use Zoo
>>> Keeper Quorum of that Stream seems to be fine. Only Kafka Direct seems to
>>> have issues. How does Kafka Direct know which offsets to query after
>>> getting the initial batches from  "auto.offset.reset" -> "largest"?
>>>
>>> On Mon, Nov 23, 2015 at 11:01 AM, Cody Koeninger 
>>> wrote:
>>>
 No, that means that at the time the batch was scheduled, the kafka
 leader reported the ending offset was 221572238, but during
 processing, kafka stopped returning messages before reaching that ending
 offset.

 That probably means something got screwed up with Kafka - e.g. you lost
 a leader and lost messages in the process.

 On Mon, Nov 23, 2015 at 12:57 PM, swetha 
 wrote:

> Hi,
>
> I see the following error in my Spark Kafka Direct. Would this mean
> that
> Kafka Direct is not able to catch up with the messages and is failing?
>
> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4
> times,
> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114,
> 10.227.64.52):
> java.lang.AssertionError: assertion failed: Ran out of messages before
> reaching ending offset 221572238 for topic hubble_stream partition 88
> start
> 221563725. This should not happen, and indicates that messages may
> have been
> lost
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Error-tp25454.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

>>>
>>
>


Re: Spark Kafka Direct Error

2015-11-24 Thread swetha kasireddy
I see the assertion error when I compare the offset ranges as shown below.
How do I log the offset for each message?


kafkaStream.transform { rdd =>
  // Get the offset ranges in the RDD
  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd
}.foreachRDD { rdd =>
  for (o <- offsetRanges) {
LOGGER.info(s"${o.topic} ${o.partition} ${o.fromOffset}
${o.untilOffset}"+"Queried offsets")
  }
 val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
// For each partition, get size of the range in the partition,
// and the number of items in the partition
val off = offsetRanges(i)
val all = iter.toSeq
val partSize = all.size
val rangeSize = off.untilOffset - off.fromOffset
Iterator((partSize, rangeSize))
  }.collect*/

  // Verify whether number of elements in each partition
  // matches with the corresponding offset range
  collected.foreach { case (partSize, rangeSize) =>
assert(partSize == rangeSize, "offset ranges are wrong")
  }
}


On Tue, Nov 24, 2015 at 8:33 AM, Cody Koeninger  wrote:

> Anything's possible, but that sounds pretty unlikely to me.
> Are the partitions it's failing for all on the same leader?
> Have there been any leader rebalances?
> Do you have enough log retention?
> If you log the offset for each message as it's processed, when do you see
> the problem?
>
> On Tue, Nov 24, 2015 at 10:28 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Is it possible that the kafka offset api is somehow returning the wrong
>> offsets. Because each time the job fails for different partitions with an
>> error similar to the error that I get below.
>>
>> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times,
>> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114,
>> 10.227.64.52):
>> java.lang.AssertionError: assertion failed: Ran out of messages before
>> reaching ending offset 221572238 for topic hubble_stream partition 88
>> start
>> 221563725. This should not happen, and indicates that messages may have
>> been
>> lost
>>
>> On Tue, Nov 24, 2015 at 6:31 AM, Cody Koeninger 
>> wrote:
>>
>>> No, the direct stream only communicates with Kafka brokers, not
>>> Zookeeper directly.  It asks the leader for each topicpartition what the
>>> highest available offsets are, using the Kafka offset api.
>>>
>>> On Mon, Nov 23, 2015 at 11:36 PM, swetha kasireddy <
>>> swethakasire...@gmail.com> wrote:
>>>
 Does Kafka direct query the offsets from the zookeeper directly? From
 where does it get the offsets? There is data in those offsets, but somehow
 Kafka Direct does not seem to pick it up. Other Consumers that use Zoo
 Keeper Quorum of that Stream seems to be fine. Only Kafka Direct seems to
 have issues. How does Kafka Direct know which offsets to query after
 getting the initial batches from  "auto.offset.reset" -> "largest"?

 On Mon, Nov 23, 2015 at 11:01 AM, Cody Koeninger 
 wrote:

> No, that means that at the time the batch was scheduled, the kafka
> leader reported the ending offset was 221572238, but during
> processing, kafka stopped returning messages before reaching that ending
> offset.
>
> That probably means something got screwed up with Kafka - e.g. you
> lost a leader and lost messages in the process.
>
> On Mon, Nov 23, 2015 at 12:57 PM, swetha 
> wrote:
>
>> Hi,
>>
>> I see the following error in my Spark Kafka Direct. Would this mean
>> that
>> Kafka Direct is not able to catch up with the messages and is failing?
>>
>> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4
>> times,
>> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114,
>> 10.227.64.52):
>> java.lang.AssertionError: assertion failed: Ran out of messages before
>> reaching ending offset 221572238 for topic hubble_stream partition 88
>> start
>> 221563725. This should not happen, and indicates that messages may
>> have been
>> lost
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Error-tp25454.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

>>>
>>
>


Re: Spark Kafka Direct Error

2015-11-23 Thread swetha kasireddy
Does Kafka direct query the offsets from the zookeeper directly? From where
does it get the offsets? There is data in those offsets, but somehow Kafka
Direct does not seem to pick it up. Other Consumers that use Zoo Keeper
Quorum of that Stream seems to be fine. Only Kafka Direct seems to have
issues. How does Kafka Direct know which offsets to query after getting the
initial batches from  "auto.offset.reset" -> "largest"?

On Mon, Nov 23, 2015 at 11:01 AM, Cody Koeninger  wrote:

> No, that means that at the time the batch was scheduled, the kafka leader
> reported the ending offset was 221572238, but during processing, kafka
> stopped returning messages before reaching that ending offset.
>
> That probably means something got screwed up with Kafka - e.g. you lost a
> leader and lost messages in the process.
>
> On Mon, Nov 23, 2015 at 12:57 PM, swetha 
> wrote:
>
>> Hi,
>>
>> I see the following error in my Spark Kafka Direct. Would this mean that
>> Kafka Direct is not able to catch up with the messages and is failing?
>>
>> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times,
>> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114,
>> 10.227.64.52):
>> java.lang.AssertionError: assertion failed: Ran out of messages before
>> reaching ending offset 221572238 for topic hubble_stream partition 88
>> start
>> 221563725. This should not happen, and indicates that messages may have
>> been
>> lost
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Error-tp25454.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark Kafka Direct Error

2015-11-23 Thread Cody Koeninger
No, that means that at the time the batch was scheduled, the kafka leader
reported the ending offset was 221572238, but during processing, kafka
stopped returning messages before reaching that ending offset.

That probably means something got screwed up with Kafka - e.g. you lost a
leader and lost messages in the process.

On Mon, Nov 23, 2015 at 12:57 PM, swetha  wrote:

> Hi,
>
> I see the following error in my Spark Kafka Direct. Would this mean that
> Kafka Direct is not able to catch up with the messages and is failing?
>
> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times,
> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114,
> 10.227.64.52):
> java.lang.AssertionError: assertion failed: Ran out of messages before
> reaching ending offset 221572238 for topic hubble_stream partition 88 start
> 221563725. This should not happen, and indicates that messages may have
> been
> lost
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Error-tp25454.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: Spark/Kafka Streaming Job Gets Stuck

2015-10-29 Thread Sabarish Sasidharan
If you are writing to S3, also make sure that you are using the direct
output committer. I don't have streaming jobs but it helps in my machine
learning jobs. Also, though more partitions help in processing faster, they
do slow down writes to S3. So you might want to coalesce before writing to
S3.

Regards
Sab
On 29-Oct-2015 6:21 pm, "Afshartous, Nick" <nafshart...@turbine.com> wrote:

> < Does it work as expected with smaller batch or smaller load? Could it be
> that it's accumulating too many events over 3 minutes?
>
> Thanks for you input.  The 3 minute window was chosen because we write the
> output of each batch into S3.  And with smaller batch time intervals there
> were many small files being written to S3, something to avoid.  That was
> the explanation of the developer who made this decision (who's no longer on
> the team).   We're in the process of re-evaluating.
> --
>  Nick
>
> -Original Message-
> From: Adrian Tanase [mailto:atan...@adobe.com]
> Sent: Wednesday, October 28, 2015 4:53 PM
> To: Afshartous, Nick <nafshart...@turbine.com>
> Cc: user@spark.apache.org
> Subject: Re: Spark/Kafka Streaming Job Gets Stuck
>
> Does it work as expected with smaller batch or smaller load? Could it be
> that it's accumulating too many events over 3 minutes?
>
> You could also try increasing the parallelism via repartition to ensure
> smaller tasks that can safely fit in working memory.
>
> Sent from my iPhone
>
> > On 28 Oct 2015, at 17:45, Afshartous, Nick <nafshart...@turbine.com>
> wrote:
> >
> >
> > Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)
> job and seeing a problem.  This is running in AWS/Yarn and the streaming
> batch interval is set to 3 minutes and this is a ten node cluster.
> >
> > Testing at 30,000 events per second we are seeing the streaming job get
> stuck (stack trace below) for over an hour.
> >
> > Thanks on any insights or suggestions.
> > --
> >  Nick
> >
> > org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartiti
> > onsToPair(JavaDStreamLike.scala:43)
> > com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
> > erDriver.runStream(StreamingKafkaConsumerDriver.java:125)
> > com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
> > erDriver.main(StreamingKafkaConsumerDriver.java:71)
> > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.j
> > ava:57)
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess
> > orImpl.java:43)
> > java.lang.reflect.Method.invoke(Method.java:606)
> > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(Application
> > Master.scala:480)
> >
> > Notice: This communication is for the intended recipient(s) only and may
> contain confidential, proprietary, legally protected or privileged
> information of Turbine, Inc. If you are not the intended recipient(s),
> please notify the sender at once and delete this communication.
> Unauthorized use of the information in this communication is strictly
> prohibited and may be unlawful. For those recipients under contract with
> Turbine, Inc., the information in this communication is subject to the
> terms and conditions of any applicable contracts or agreements.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
> > additional commands, e-mail: user-h...@spark.apache.org
> >
>
> Notice: This communication is for the intended recipient(s) only and may
> contain confidential, proprietary, legally protected or privileged
> information of Turbine, Inc. If you are not the intended recipient(s),
> please notify the sender at once and delete this communication.
> Unauthorized use of the information in this communication is strictly
> prohibited and may be unlawful. For those recipients under contract with
> Turbine, Inc., the information in this communication is subject to the
> terms and conditions of any applicable contracts or agreements.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark/Kafka Streaming Job Gets Stuck

2015-10-29 Thread Adrian Tanase
You can decouple the batch interval and the window sizes. If during processing 
you’re aggregating data and your operations benefit of an inverse function, 
then you can optimally process windows of data.

E.g. You could set a global batch interval of 10 seconds. You can process the 
incoming data from Kafka, aggregating the input.
Then you can create a window of 3 minutes (both length and slide) over the 
partial results. In this case the inverse function is not helpful as all the 
data is new in every window.

You can even coalesce the final Dstream to avoid writing many small files. For 
example you could be writing LESS files MORE OFTEN and achieve a similar effect.

All of this is of course hypothetical since I don’t know what processing you 
are applying to the data coming from Kafka. More like food for thought.

-adrian





On 10/29/15, 2:50 PM, "Afshartous, Nick" <nafshart...@turbine.com> wrote:

>< Does it work as expected with smaller batch or smaller load? Could it be 
>that it's accumulating too many events over 3 minutes?
>
>Thanks for you input.  The 3 minute window was chosen because we write the 
>output of each batch into S3.  And with smaller batch time intervals there 
>were many small files being written to S3, something to avoid.  That was the 
>explanation of the developer who made this decision (who's no longer on the 
>team).   We're in the process of re-evaluating.
>--
> Nick
>
>-Original Message-
>From: Adrian Tanase [mailto:atan...@adobe.com]
>Sent: Wednesday, October 28, 2015 4:53 PM
>To: Afshartous, Nick <nafshart...@turbine.com>
>Cc: user@spark.apache.org
>Subject: Re: Spark/Kafka Streaming Job Gets Stuck
>
>Does it work as expected with smaller batch or smaller load? Could it be that 
>it's accumulating too many events over 3 minutes?
>
>You could also try increasing the parallelism via repartition to ensure 
>smaller tasks that can safely fit in working memory.
>
>Sent from my iPhone
>
>> On 28 Oct 2015, at 17:45, Afshartous, Nick <nafshart...@turbine.com> wrote:
>>
>>
>> Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)  job 
>> and seeing a problem.  This is running in AWS/Yarn and the streaming batch 
>> interval is set to 3 minutes and this is a ten node cluster.
>>
>> Testing at 30,000 events per second we are seeing the streaming job get 
>> stuck (stack trace below) for over an hour.
>>
>> Thanks on any insights or suggestions.
>> --
>>  Nick
>>
>> org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartiti
>> onsToPair(JavaDStreamLike.scala:43)
>> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
>> erDriver.runStream(StreamingKafkaConsumerDriver.java:125)
>> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
>> erDriver.main(StreamingKafkaConsumerDriver.java:71)
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.j
>> ava:57)
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess
>> orImpl.java:43)
>> java.lang.reflect.Method.invoke(Method.java:606)
>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(Application
>> Master.scala:480)
>>
>> Notice: This communication is for the intended recipient(s) only and may 
>> contain confidential, proprietary, legally protected or privileged 
>> information of Turbine, Inc. If you are not the intended recipient(s), 
>> please notify the sender at once and delete this communication. Unauthorized 
>> use of the information in this communication is strictly prohibited and may 
>> be unlawful. For those recipients under contract with Turbine, Inc., the 
>> information in this communication is subject to the terms and conditions of 
>> any applicable contracts or agreements.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
>> additional commands, e-mail: user-h...@spark.apache.org
>>
>
>Notice: This communication is for the intended recipient(s) only and may 
>contain confidential, proprietary, legally protected or privileged information 
>of Turbine, Inc. If you are not the intended recipient(s), please notify the 
>sender at once and delete this communication. Unauthorized use of the 
>information in this communication is strictly prohibited and may be unlawful. 
>For those recipients under contract with Turbine, Inc., the information in 
>this communication is subject to the terms and conditions of any applicable 
>contracts or agreements.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark/Kafka Streaming Job Gets Stuck

2015-10-29 Thread Cody Koeninger
If you're writing to s3, want to avoid small files, and don't actually need
3 minute latency... you may want to consider just running a regular spark
job (using KafkaUtils.createRDD) at scheduled intervals rather than a
streaming job.

On Thu, Oct 29, 2015 at 8:16 AM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> If you are writing to S3, also make sure that you are using the direct
> output committer. I don't have streaming jobs but it helps in my machine
> learning jobs. Also, though more partitions help in processing faster, they
> do slow down writes to S3. So you might want to coalesce before writing to
> S3.
>
> Regards
> Sab
> On 29-Oct-2015 6:21 pm, "Afshartous, Nick" <nafshart...@turbine.com>
> wrote:
>
>> < Does it work as expected with smaller batch or smaller load? Could it
>> be that it's accumulating too many events over 3 minutes?
>>
>> Thanks for you input.  The 3 minute window was chosen because we write
>> the output of each batch into S3.  And with smaller batch time intervals
>> there were many small files being written to S3, something to avoid.  That
>> was the explanation of the developer who made this decision (who's no
>> longer on the team).   We're in the process of re-evaluating.
>> --
>>  Nick
>>
>> -Original Message-
>> From: Adrian Tanase [mailto:atan...@adobe.com]
>> Sent: Wednesday, October 28, 2015 4:53 PM
>> To: Afshartous, Nick <nafshart...@turbine.com>
>> Cc: user@spark.apache.org
>> Subject: Re: Spark/Kafka Streaming Job Gets Stuck
>>
>> Does it work as expected with smaller batch or smaller load? Could it be
>> that it's accumulating too many events over 3 minutes?
>>
>> You could also try increasing the parallelism via repartition to ensure
>> smaller tasks that can safely fit in working memory.
>>
>> Sent from my iPhone
>>
>> > On 28 Oct 2015, at 17:45, Afshartous, Nick <nafshart...@turbine.com>
>> wrote:
>> >
>> >
>> > Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)
>> job and seeing a problem.  This is running in AWS/Yarn and the streaming
>> batch interval is set to 3 minutes and this is a ten node cluster.
>> >
>> > Testing at 30,000 events per second we are seeing the streaming job get
>> stuck (stack trace below) for over an hour.
>> >
>> > Thanks on any insights or suggestions.
>> > --
>> >  Nick
>> >
>> > org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartiti
>> > onsToPair(JavaDStreamLike.scala:43)
>> > com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
>> > erDriver.runStream(StreamingKafkaConsumerDriver.java:125)
>> > com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
>> > erDriver.main(StreamingKafkaConsumerDriver.java:71)
>> > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.j
>> > ava:57)
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess
>> > orImpl.java:43)
>> > java.lang.reflect.Method.invoke(Method.java:606)
>> > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(Application
>> > Master.scala:480)
>> >
>> > Notice: This communication is for the intended recipient(s) only and
>> may contain confidential, proprietary, legally protected or privileged
>> information of Turbine, Inc. If you are not the intended recipient(s),
>> please notify the sender at once and delete this communication.
>> Unauthorized use of the information in this communication is strictly
>> prohibited and may be unlawful. For those recipients under contract with
>> Turbine, Inc., the information in this communication is subject to the
>> terms and conditions of any applicable contracts or agreements.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
>> > additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>> Notice: This communication is for the intended recipient(s) only and may
>> contain confidential, proprietary, legally protected or privileged
>> information of Turbine, Inc. If you are not the intended recipient(s),
>> please notify the sender at once and delete this communication.
>> Unauthorized use of the information in this communication is strictly
>> prohibited and may be unlawful. For those recipients under contract with
>> Turbine, Inc., the information in this communication is subject to the
>> terms and conditions of any applicable contracts or agreements.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: Spark/Kafka Streaming Job Gets Stuck

2015-10-29 Thread srungarapu vamsi
Other than @Adrian suggestions, check if the processing delay is more than
the batch processing time.

On Thu, Oct 29, 2015 at 2:23 AM, Adrian Tanase  wrote:

> Does it work as expected with smaller batch or smaller load? Could it be
> that it's accumulating too many events over 3 minutes?
>
> You could also try increasing the parallelism via repartition to ensure
> smaller tasks that can safely fit in working memory.
>
> Sent from my iPhone
>
> > On 28 Oct 2015, at 17:45, Afshartous, Nick 
> wrote:
> >
> >
> > Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)
> job and seeing a problem.  This is running in AWS/Yarn and the streaming
> batch interval is set to 3 minutes and this is a ten node cluster.
> >
> > Testing at 30,000 events per second we are seeing the streaming job get
> stuck (stack trace below) for over an hour.
> >
> > Thanks on any insights or suggestions.
> > --
> >  Nick
> >
> >
> org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartitionsToPair(JavaDStreamLike.scala:43)
> >
> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.runStream(StreamingKafkaConsumerDriver.java:125)
> >
> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.main(StreamingKafkaConsumerDriver.java:71)
> > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > java.lang.reflect.Method.invoke(Method.java:606)
> >
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480)
> >
> > Notice: This communication is for the intended recipient(s) only and may
> contain confidential, proprietary, legally protected or privileged
> information of Turbine, Inc. If you are not the intended recipient(s),
> please notify the sender at once and delete this communication.
> Unauthorized use of the information in this communication is strictly
> prohibited and may be unlawful. For those recipients under contract with
> Turbine, Inc., the information in this communication is subject to the
> terms and conditions of any applicable contracts or agreements.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
/Vamsi


Re: Spark/Kafka Streaming Job Gets Stuck

2015-10-28 Thread Adrian Tanase
Does it work as expected with smaller batch or smaller load? Could it be that 
it's accumulating too many events over 3 minutes?

You could also try increasing the parallelism via repartition to ensure smaller 
tasks that can safely fit in working memory.

Sent from my iPhone

> On 28 Oct 2015, at 17:45, Afshartous, Nick  wrote:
> 
> 
> Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)  job and 
> seeing a problem.  This is running in AWS/Yarn and the streaming batch 
> interval is set to 3 minutes and this is a ten node cluster.
> 
> Testing at 30,000 events per second we are seeing the streaming job get stuck 
> (stack trace below) for over an hour.
> 
> Thanks on any insights or suggestions.
> --
>  Nick
> 
> org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartitionsToPair(JavaDStreamLike.scala:43)
> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.runStream(StreamingKafkaConsumerDriver.java:125)
> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.main(StreamingKafkaConsumerDriver.java:71)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:606)
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480)
> 
> Notice: This communication is for the intended recipient(s) only and may 
> contain confidential, proprietary, legally protected or privileged 
> information of Turbine, Inc. If you are not the intended recipient(s), please 
> notify the sender at once and delete this communication. Unauthorized use of 
> the information in this communication is strictly prohibited and may be 
> unlawful. For those recipients under contract with Turbine, Inc., the 
> information in this communication is subject to the terms and conditions of 
> any applicable contracts or agreements.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark-Kafka Connector issue

2015-09-29 Thread Cody Koeninger
Show the output of bin/kafka-topics.sh --list.  Show the actual code with
the topic name hardcoded in the set, not loaded from an external file you
didn't show.  Show the full stacktrace you're getting.

On Mon, Sep 28, 2015 at 10:03 PM, Ratika Prasad <rpra...@couponsinc.com>
wrote:

> Yes the queues are created and gets listed as well and I have posted few
> Msges also which I am able to read using Kafka-consumer.sh --from-beginning
> how spark fails with No leader offset for Set.
>
> Tried changing the offset.storage to Kafka from zookeeper.
>
> Kindly help
>
> Sent from Outlook <http://taps.io/outlookmobile>
>
> _
> From: Cody Koeninger <c...@koeninger.org>
> Sent: Tuesday, September 29, 2015 12:33 am
> Subject: Re: Spark-Kafka Connector issue
> To: Ratika Prasad <rpra...@couponsinc.com>
> Cc: <user@spark.apache.org>
>
>
>
> Did you actually create TestTopic?  See if it shows up using
> bin/kafka-topics.sh --list, and if not, create it using bin/kafka-topics.sh
> --create
>
> On Mon, Sep 28, 2015 at 1:20 PM, Ratika Prasad <rpra...@couponsinc.com>
> wrote:
>
>> Thanks for your reply.
>>
>>
>>
>> I invoked my program with the broker ip and host and it triggered as
>> expected but I see the below error
>>
>>
>>
>> ./bin/spark-submit --class
>> org.stream.processing.JavaKafkaStreamEventProcessing --master local
>> spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar
>> 172.28.161.32:9092 TestTopic
>>
>> 15/09/28 17:45:09 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>>
>> 15/09/28 17:45:11 WARN StreamingContext: spark.master should be set as
>> local[n], n > 1 in local mode if you have receivers to get data, otherwise
>> Spark jobs will not get resources to process the received data.
>>
>> Exception in thread "main" org.apache.spark.SparkException:
>> java.nio.channels.ClosedChannelException
>>
>> org.apache.spark.SparkException: Couldn't find leader offsets for Set
>> ([TestTopic,0])
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>
>> at scala.util.Either.fold(Either.scala:97)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
>>
>> at
>> org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:497)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>>
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>> Whene I ran the below to check the offsets I get this
>>
>>
>>
>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic
>> TestTopic --group test-consumer-group --zookeeper localhost:2181
>>
>> Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
>> KeeperErrorCode = NoNode for
>> /consumers/test-consumer-group/offsets/TestTopic /0.
>>
>>
>>
>> Also I just added this below configs to my
>> kafaka/config/consumer.properties and restarted kafka
>>
>>
>>
>> auto.offset.reset=smallest
>>
>> offsets.storage=zookeeper
>>
>> offsets.channel.backoff.ms=1

Re: spark kafka partitioning

2015-08-21 Thread Gaurav Agarwal
when i send the message from kafka topic having three partitions.

Spark will listen the message when i say kafkautils.createStream or
createDirectstSream have local[4]
Now i want to see if spark will create partitions when it receive
message from kafka using dstream, how and where ,prwhich method of
spark api i have to see to find out

On 8/21/15, Gaurav Agarwal gaurav130...@gmail.com wrote:
 Hello

 Regarding Spark Streaming and Kafka Partitioning

 When i send message on kafka topic with 3 partitions and listens on
 kafkareceiver with local value[4] . how will i come to know in Spark
 Streaming that different Dstreams are created according to partitions of
 kafka messages .

 Thanks


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark kafka partitioning

2015-08-20 Thread ayan guha
If you have 1 topic, that means you have 1 DStream, which will have a
series of RDDs for each batch interval. In receiver-based integration,
there is no direct relationship b/w Kafka paritions with spark partitions.
in Direct approach, 1 partition will be created for each kafka partition.

On Fri, Aug 21, 2015 at 12:48 PM, Gaurav Agarwal gaurav130...@gmail.com
wrote:

 Hello

 Regarding Spark Streaming and Kafka Partitioning

 When i send message on kafka topic with 3 partitions and listens on
 kafkareceiver with local value[4] . how will i come to know in Spark
 Streaming that different Dstreams are created according to partitions of
 kafka messages .

 Thanks




-- 
Best Regards,
Ayan Guha


Re: spark kafka partitioning

2015-08-20 Thread Cody Koeninger
I'm not clear on your question, can you rephrase it?  Also, are you talking
about createStream or createDirectStream?

On Thu, Aug 20, 2015 at 9:48 PM, Gaurav Agarwal gaurav130...@gmail.com
wrote:

 Hello

 Regarding Spark Streaming and Kafka Partitioning

 When i send message on kafka topic with 3 partitions and listens on
 kafkareceiver with local value[4] . how will i come to know in Spark
 Streaming that different Dstreams are created according to partitions of
 kafka messages .

 Thanks




Re: spark-kafka directAPI vs receivers based API

2015-08-10 Thread Cody Koeninger
For direct stream questions:

https://github.com/koeninger/kafka-exactly-once

Yes, it is used in production.


For general spark streaming question:

http://spark.apache.org/docs/latest/streaming-programming-guide.html


On Mon, Aug 10, 2015 at 7:51 AM, Mohit Durgapal durgapalmo...@gmail.com
wrote:

 Hi All,

 I just wanted to know how does directAPI for spark streaming compare with
 earlier receivers based API. Has anyone used directAPI based approach on
 production or is it still being used for pocs?

 Also, since I'm new to spark, could anyone share a starting point from
 where I could find a working code for both of the above APIs?

 Also, in my use case I want to analyse a data stream(comma separated
 string)  aggregate over certain fields based on their types. Ideally I
 would like to push that aggregated data to a column family based
 datastore(like HBase, we are using it currently). But my first I'd like to
 find out how to aggregate that data and how does streaming work, whether It
 polls  fetches data in batches or does it continuously listen to the kafka
 queue for any new message. And how can I configure my application for
 either cases. I hope my questions make sense.


 Regards
 Mohit



Re: Spark Kafka Direct Streaming

2015-07-07 Thread Tathagata Das
When you enable checkpointing by setting the checkpoint directory, you
enable metadata checkpointing. Data checkpointing kicks in only if you are
using a DStream operation that requires it, or you are enabling Write Ahead
Logs to prevent data loss on driver failure.

More discussion -
https://spark-summit.org/2015/events/recipes-for-running-spark-streaming-applications-in-production/

On Tue, Jul 7, 2015 at 7:42 AM, abi_pat present.boiling2...@gmail.com
wrote:

 Hi,

 I am using the new experimental Direct Stream API. Everything is working
 fine but when it comes to fault tolerance, I am not sure how to achieve it.
 Presently my Kafka config map looks like this

 configMap.put(zookeeper.connect,192.168.51.98:2181);
 configMap.put(group.id, UUID.randomUUID().toString());
 configMap.put(auto.offset.reset,smallest);
 configMap.put(auto.commit.enable,true);
 configMap.put(topics,IPDR31);
 configMap.put(kafka.consumer.id,kafkasparkuser);
 configMap.put(bootstrap.servers,192.168.50.124:9092);
 SetString topic = new HashSetString();
 topic.add(IPDR31);

 JavaPairInputDStreambyte[], byte[] kafkaData =

 KafkaUtils.createDirectStream(js,byte[].class,byte[].class,DefaultDecoder.class,DefaultDecoder.class,configMap,topic);

 Questions -

 Q1- Is my Kafka configuration correct or should it be changed?

 Q2- I also looked into the Checkpointing but in my usecase, Data
 checkpointing is not required but meta checkpointing is required. Can I
 achieve this, i.e. enabling meta checkpointing and not the data
 checkpointing?



 Thanks
 Abhishek Patel



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Streaming-tp23685.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark + Kafka

2015-04-01 Thread bit1...@163.com
Please make sure that you have given more cores than Receiver numbers.



 
From: James King
Date: 2015-04-01 15:21
To: user
Subject: Spark + Kafka
I have a simple setup/runtime of Kafka and Sprak.

I have a command line consumer displaying arrivals to Kafka topic. So i know 
messages are being received.

But when I try to read from Kafka topic I get no messages, here are some logs 
below.

I'm thinking there aren't enough threads. How do i check that.

Thank you.

2015-04-01 08:56:50 INFO  JobScheduler:59 - Starting job streaming job 
142787141 ms.0 from job set of time 142787141 ms
2015-04-01 08:56:50 INFO  JobScheduler:59 - Finished job streaming job 
142787141 ms.0 from job set of time 142787141 ms
2015-04-01 08:56:50 INFO  JobScheduler:59 - Total delay: 0.002 s for time 
142787141 ms (execution: 0.000 s)
2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event 
ClearMetadata(142787141 ms)
2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time 
142787141 ms
2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old RDDs: 
[]
2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: 
2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were older 
than 1427871405000 ms: 
2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to old 
RDDs: [1427871405000 ms - 8]
2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8
2015-04-01 08:56:50 INFO  BlockRDD:59 - Removing RDD 8 from persistence list
2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received message 
RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received message 
RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled message 
(0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8
2015-04-01 08:56:50 INFO  BlockManager:59 - Removing RDD 8
2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD 8, 
response is 0
2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled message 
(0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
2015-04-01 08:56:50 INFO  KafkaInputDStream:59 - Removing blocks of RDD 
BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time 142787141 ms
2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that were older 
than 1427871405000 ms: 1427871405000 ms
2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for time 
142787141 ms
2015-04-01 08:56:50 INFO  ReceivedBlockTracker:59 - Deleting batches 
ArrayBuffer(142787140 ms)
2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 to 
Actor[akka://sparkDriver/temp/$o]
2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor] received 
message ReviveOffers from 
Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, 
runningTasks: 0
2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled 
message (0.499181 ms) ReviveOffers from 
Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
2015-04-01 08:56:50 WARN  TaskSchedulerImpl:71 - Initial job has not accepted 
any resources; check your cluster UI to ensure that workers are registered and 
have sufficient resources
2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor] received 
message ReviveOffers from 
Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, 
runningTasks: 0
2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled 
message (0.886121 ms) ReviveOffers from 
Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received message 
ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1)) from 
Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
2015-04-01 08:56:52 INFO  AppClient$ClientActor:59 - Executor updated: 
app-20150401065621-0007/0 is now EXITED (Command exited with code 1)
2015-04-01 08:56:52 INFO  SparkDeploySchedulerBackend:59 - Executor 
app-20150401065621-0007/0 removed: Command exited with code 1
2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor] received 
message RemoveExecutor(0,Unknown executor exit code (1)) from 
Actor[akka://sparkDriver/temp/$p]
2015-04-01 08:56:52 ERROR SparkDeploySchedulerBackend:75 - Asked to remove 
non-existent executor 0
2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled 
message (1.394052 ms) RemoveExecutor(0,Unknown executor exit code (1)) from 
Actor[akka://sparkDriver/temp/$p]
2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled message 

Re: Spark + Kafka

2015-04-01 Thread James King
Thanks Saisai,

Sure will do.

But just a quick note that when i set master as local[*] Spark started
showing Kafka messages as expected, so the problem in my view was to do
with not enough threads to process the incoming data.

Thanks.


On Wed, Apr 1, 2015 at 10:53 AM, Saisai Shao sai.sai.s...@gmail.com wrote:

 Would you please share your code snippet please, so we can identify is
 there anything wrong in your code.

 Beside would you please grep your driver's debug log to see if there's any
 debug log about Stream xxx received block xxx, this means that Spark
 Streaming is keeping receiving data from sources like Kafka.


 2015-04-01 16:18 GMT+08:00 James King jakwebin...@gmail.com:

 Thank you bit1129,

 From looking at the web UI i can see 2 cores

 Also looking at http://spark.apache.org/docs/1.2.1/configuration.html

 But can't see obvious configuration for number of receivers can you help
 please.


 On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com bit1...@163.com wrote:

 Please make sure that you have given more cores than Receiver numbers.




 *From:* James King jakwebin...@gmail.com
 *Date:* 2015-04-01 15:21
 *To:* user user@spark.apache.org
 *Subject:* Spark + Kafka
 I have a simple setup/runtime of Kafka and Sprak.

 I have a command line consumer displaying arrivals to Kafka topic. So i
 know messages are being received.

 But when I try to read from Kafka topic I get no messages, here are some
 logs below.

 I'm thinking there aren't enough threads. How do i check that.

 Thank you.

 2015-04-01 08:56:50 INFO  JobScheduler:59 - Starting job streaming job
 142787141 ms.0 from job set of time 142787141 ms
 2015-04-01 08:56:50 INFO  JobScheduler:59 - Finished job streaming job
 142787141 ms.0 from job set of time 142787141 ms
 2015-04-01 08:56:50 INFO  JobScheduler:59 - Total delay: 0.002 s for
 time 142787141 ms (execution: 0.000 s)
 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event
 ClearMetadata(142787141 ms)
 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time
 142787141 ms
 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old
 RDDs: []
 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs:
 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were
 older than 1427871405000 ms:
 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to
 old RDDs: [1427871405000 ms - 8]
 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8
 2015-04-01 08:56:50 INFO  BlockRDD:59 - Removing RDD 8 from persistence
 list
 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received
 message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received
 message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled
 message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8
 2015-04-01 08:56:50 INFO  BlockManager:59 - Removing RDD 8
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD
 8, response is 0
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled
 message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
 2015-04-01 08:56:50 INFO  KafkaInputDStream:59 - Removing blocks of RDD
 BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time
 142787141 ms
 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that
 were older than 1427871405000 ms: 1427871405000 ms
 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for
 time 142787141 ms
 2015-04-01 08:56:50 INFO  ReceivedBlockTracker:59 - Deleting batches
 ArrayBuffer(142787140 ms)
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0
 to Actor[akka://sparkDriver/temp/$o]
 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor]
 received message ReviveOffers from
 Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name:
 TaskSet_0, runningTasks: 0
 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor]
 handled message (0.499181 ms) ReviveOffers from
 Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
 2015-04-01 08:56:50 WARN  TaskSchedulerImpl:71 - Initial job has not
 accepted any resources; check your cluster UI to ensure that workers are
 registered and have sufficient resources
 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor]
 received message ReviveOffers from
 Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
 2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name:
 TaskSet_0, runningTasks: 0
 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor]
 handled message (0.886121 ms) ReviveOffers from
 

Re: Spark + Kafka

2015-04-01 Thread Saisai Shao
Would you please share your code snippet please, so we can identify is
there anything wrong in your code.

Beside would you please grep your driver's debug log to see if there's any
debug log about Stream xxx received block xxx, this means that Spark
Streaming is keeping receiving data from sources like Kafka.


2015-04-01 16:18 GMT+08:00 James King jakwebin...@gmail.com:

 Thank you bit1129,

 From looking at the web UI i can see 2 cores

 Also looking at http://spark.apache.org/docs/1.2.1/configuration.html

 But can't see obvious configuration for number of receivers can you help
 please.


 On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com bit1...@163.com wrote:

 Please make sure that you have given more cores than Receiver numbers.




 *From:* James King jakwebin...@gmail.com
 *Date:* 2015-04-01 15:21
 *To:* user user@spark.apache.org
 *Subject:* Spark + Kafka
 I have a simple setup/runtime of Kafka and Sprak.

 I have a command line consumer displaying arrivals to Kafka topic. So i
 know messages are being received.

 But when I try to read from Kafka topic I get no messages, here are some
 logs below.

 I'm thinking there aren't enough threads. How do i check that.

 Thank you.

 2015-04-01 08:56:50 INFO  JobScheduler:59 - Starting job streaming job
 142787141 ms.0 from job set of time 142787141 ms
 2015-04-01 08:56:50 INFO  JobScheduler:59 - Finished job streaming job
 142787141 ms.0 from job set of time 142787141 ms
 2015-04-01 08:56:50 INFO  JobScheduler:59 - Total delay: 0.002 s for time
 142787141 ms (execution: 0.000 s)
 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event
 ClearMetadata(142787141 ms)
 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time
 142787141 ms
 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old
 RDDs: []
 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs:
 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were
 older than 1427871405000 ms:
 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to
 old RDDs: [1427871405000 ms - 8]
 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8
 2015-04-01 08:56:50 INFO  BlockRDD:59 - Removing RDD 8 from persistence
 list
 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received
 message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received
 message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled
 message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8
 2015-04-01 08:56:50 INFO  BlockManager:59 - Removing RDD 8
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD
 8, response is 0
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled
 message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
 2015-04-01 08:56:50 INFO  KafkaInputDStream:59 - Removing blocks of RDD
 BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time
 142787141 ms
 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that were
 older than 1427871405000 ms: 1427871405000 ms
 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for time
 142787141 ms
 2015-04-01 08:56:50 INFO  ReceivedBlockTracker:59 - Deleting batches
 ArrayBuffer(142787140 ms)
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 to
 Actor[akka://sparkDriver/temp/$o]
 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor]
 received message ReviveOffers from
 Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name:
 TaskSet_0, runningTasks: 0
 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor]
 handled message (0.499181 ms) ReviveOffers from
 Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
 2015-04-01 08:56:50 WARN  TaskSchedulerImpl:71 - Initial job has not
 accepted any resources; check your cluster UI to ensure that workers are
 registered and have sufficient resources
 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor]
 received message ReviveOffers from
 Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
 2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name:
 TaskSet_0, runningTasks: 0
 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor]
 handled message (0.886121 ms) ReviveOffers from
 Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received
 message ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1))
 from Actor[akka.tcp://sparkMaster@somesparkhost
 :7077/user/Master#336117298]
 2015-04-01 08:56:52 INFO  

RE: Spark + Kafka

2015-04-01 Thread Shao, Saisai
OK, seems there’s nothing strange from your code. So maybe we need to narrow 
down the cause, would you please run KafkaWordCount example in Spark to see if 
it is OK, if this is OK, then we should focus on your implementation, otherwise 
Kafka potentially has some problems.

Thanks
Jerry

From: James King [mailto:jakwebin...@gmail.com]
Sent: Wednesday, April 1, 2015 6:59 PM
To: Saisai Shao
Cc: bit1...@163.com; user
Subject: Re: Spark + Kafka

This is the  code. And I couldn't find anything like the log you suggested.

public KafkaLogConsumer(int duration, String master) {
JavaStreamingContext spark = 
createSparkContext(duration, master);

MapString, Integer topics = new HashMapString, 
Integer();
topics.put(test, 1);

JavaPairDStreamString, String input = 
KafkaUtils.createStream(spark, somesparkhost:2181, groupid, topics);
input.print();

spark.start();
spark.awaitTermination();
}

private JavaStreamingContext createSparkContext(int duration, 
String master) {

SparkConf sparkConf = new SparkConf()

.setAppName(this.getClass().getSimpleName())

.setMaster(master);
JavaStreamingContext ssc = new 
JavaStreamingContext(sparkConf,


Durations.seconds(duration));
return ssc;
}

On Wed, Apr 1, 2015 at 11:37 AM, James King 
jakwebin...@gmail.commailto:jakwebin...@gmail.com wrote:
Thanks Saisai,

Sure will do.

But just a quick note that when i set master as local[*] Spark started 
showing Kafka messages as expected, so the problem in my view was to do with 
not enough threads to process the incoming data.

Thanks.


On Wed, Apr 1, 2015 at 10:53 AM, Saisai Shao 
sai.sai.s...@gmail.commailto:sai.sai.s...@gmail.com wrote:
Would you please share your code snippet please, so we can identify is there 
anything wrong in your code.

Beside would you please grep your driver's debug log to see if there's any 
debug log about Stream xxx received block xxx, this means that Spark 
Streaming is keeping receiving data from sources like Kafka.


2015-04-01 16:18 GMT+08:00 James King 
jakwebin...@gmail.commailto:jakwebin...@gmail.com:
Thank you bit1129,

From looking at the web UI i can see 2 cores

Also looking at http://spark.apache.org/docs/1.2.1/configuration.html

But can't see obvious configuration for number of receivers can you help please.


On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.commailto:bit1...@163.com 
bit1...@163.commailto:bit1...@163.com wrote:
Please make sure that you have given more cores than Receiver numbers.




From: James Kingmailto:jakwebin...@gmail.com
Date: 2015-04-01 15:21
To: usermailto:user@spark.apache.org
Subject: Spark + Kafka
I have a simple setup/runtime of Kafka and Sprak.

I have a command line consumer displaying arrivals to Kafka topic. So i know 
messages are being received.

But when I try to read from Kafka topic I get no messages, here are some logs 
below.

I'm thinking there aren't enough threads. How do i check that.

Thank you.

2015-04-01 08:56:50 INFO  JobScheduler:59 - Starting job streaming job 
142787141 ms.0 from job set of time 142787141 ms
2015-04-01 08:56:50 INFO  JobScheduler:59 - Finished job streaming job 
142787141 ms.0 from job set of time 142787141 ms
2015-04-01 08:56:50 INFO  JobScheduler:59 - Total delay: 0.002 s for time 
142787141 ms (execution: 0.000 s)
2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event 
ClearMetadata(142787141 ms)
2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time 
142787141 ms
2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old RDDs: 
[]
2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs:
2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were older 
than 1427871405000 ms:
2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to old 
RDDs: [1427871405000 ms - 8]
2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8
2015-04-01 08:56:50 INFO  BlockRDD:59 - Removing RDD 8 from persistence list
2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received message 
RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received message 
RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled message 
(0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp

Re: Spark + Kafka

2015-04-01 Thread James King
Thank you bit1129,

From looking at the web UI i can see 2 cores

Also looking at http://spark.apache.org/docs/1.2.1/configuration.html

But can't see obvious configuration for number of receivers can you help
please.


On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com bit1...@163.com wrote:

 Please make sure that you have given more cores than Receiver numbers.




 *From:* James King jakwebin...@gmail.com
 *Date:* 2015-04-01 15:21
 *To:* user user@spark.apache.org
 *Subject:* Spark + Kafka
 I have a simple setup/runtime of Kafka and Sprak.

 I have a command line consumer displaying arrivals to Kafka topic. So i
 know messages are being received.

 But when I try to read from Kafka topic I get no messages, here are some
 logs below.

 I'm thinking there aren't enough threads. How do i check that.

 Thank you.

 2015-04-01 08:56:50 INFO  JobScheduler:59 - Starting job streaming job
 142787141 ms.0 from job set of time 142787141 ms
 2015-04-01 08:56:50 INFO  JobScheduler:59 - Finished job streaming job
 142787141 ms.0 from job set of time 142787141 ms
 2015-04-01 08:56:50 INFO  JobScheduler:59 - Total delay: 0.002 s for time
 142787141 ms (execution: 0.000 s)
 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event
 ClearMetadata(142787141 ms)
 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time
 142787141 ms
 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old
 RDDs: []
 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs:
 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were
 older than 1427871405000 ms:
 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to
 old RDDs: [1427871405000 ms - 8]
 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8
 2015-04-01 08:56:50 INFO  BlockRDD:59 - Removing RDD 8 from persistence
 list
 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received
 message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received
 message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled
 message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8
 2015-04-01 08:56:50 INFO  BlockManager:59 - Removing RDD 8
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD 8,
 response is 0
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled
 message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
 2015-04-01 08:56:50 INFO  KafkaInputDStream:59 - Removing blocks of RDD
 BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time
 142787141 ms
 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that were
 older than 1427871405000 ms: 1427871405000 ms
 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for time
 142787141 ms
 2015-04-01 08:56:50 INFO  ReceivedBlockTracker:59 - Deleting batches
 ArrayBuffer(142787140 ms)
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 to
 Actor[akka://sparkDriver/temp/$o]
 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor]
 received message ReviveOffers from
 Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name:
 TaskSet_0, runningTasks: 0
 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled
 message (0.499181 ms) ReviveOffers from
 Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
 2015-04-01 08:56:50 WARN  TaskSchedulerImpl:71 - Initial job has not
 accepted any resources; check your cluster UI to ensure that workers are
 registered and have sufficient resources
 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor]
 received message ReviveOffers from
 Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
 2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name:
 TaskSet_0, runningTasks: 0
 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled
 message (0.886121 ms) ReviveOffers from
 Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received
 message ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1))
 from Actor[akka.tcp://sparkMaster@somesparkhost
 :7077/user/Master#336117298]
 2015-04-01 08:56:52 INFO  AppClient$ClientActor:59 - Executor updated:
 app-20150401065621-0007/0 is now EXITED (Command exited with code 1)
 2015-04-01 08:56:52 INFO  SparkDeploySchedulerBackend:59 - Executor
 app-20150401065621-0007/0 removed: Command exited with code 1
 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor]
 received message RemoveExecutor(0,Unknown executor exit code (1)) from
 

Re: Spark + Kafka

2015-04-01 Thread James King
This is the  code. And I couldn't find anything like the log you suggested.

public KafkaLogConsumer(int duration, String master) {
JavaStreamingContext spark = createSparkContext(duration, master);
 MapString, Integer topics = new HashMapString, Integer();
topics.put(test, 1);
 JavaPairDStreamString, String input = KafkaUtils.createStream(spark,
somesparkhost:2181, groupid, topics);
input.print();

spark.start();
spark.awaitTermination();
}
 private JavaStreamingContext createSparkContext(int duration, String
master) {

SparkConf sparkConf = new SparkConf()
.setAppName(this.getClass().getSimpleName())
.setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
Durations.seconds(duration));
return ssc;
}

On Wed, Apr 1, 2015 at 11:37 AM, James King jakwebin...@gmail.com wrote:

 Thanks Saisai,

 Sure will do.

 But just a quick note that when i set master as local[*] Spark started
 showing Kafka messages as expected, so the problem in my view was to do
 with not enough threads to process the incoming data.

 Thanks.


 On Wed, Apr 1, 2015 at 10:53 AM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Would you please share your code snippet please, so we can identify is
 there anything wrong in your code.

 Beside would you please grep your driver's debug log to see if there's
 any debug log about Stream xxx received block xxx, this means that Spark
 Streaming is keeping receiving data from sources like Kafka.


 2015-04-01 16:18 GMT+08:00 James King jakwebin...@gmail.com:

 Thank you bit1129,

 From looking at the web UI i can see 2 cores

 Also looking at http://spark.apache.org/docs/1.2.1/configuration.html

 But can't see obvious configuration for number of receivers can you help
 please.


 On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com bit1...@163.com wrote:

 Please make sure that you have given more cores than Receiver numbers.




 *From:* James King jakwebin...@gmail.com
 *Date:* 2015-04-01 15:21
 *To:* user user@spark.apache.org
 *Subject:* Spark + Kafka
 I have a simple setup/runtime of Kafka and Sprak.

 I have a command line consumer displaying arrivals to Kafka topic. So i
 know messages are being received.

 But when I try to read from Kafka topic I get no messages, here are
 some logs below.

 I'm thinking there aren't enough threads. How do i check that.

 Thank you.

 2015-04-01 08:56:50 INFO  JobScheduler:59 - Starting job streaming job
 142787141 ms.0 from job set of time 142787141 ms
 2015-04-01 08:56:50 INFO  JobScheduler:59 - Finished job streaming job
 142787141 ms.0 from job set of time 142787141 ms
 2015-04-01 08:56:50 INFO  JobScheduler:59 - Total delay: 0.002 s for
 time 142787141 ms (execution: 0.000 s)
 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event
 ClearMetadata(142787141 ms)
 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time
 142787141 ms
 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to
 old RDDs: []
 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs:
 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were
 older than 1427871405000 ms:
 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to
 old RDDs: [1427871405000 ms - 8]
 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs:
 8
 2015-04-01 08:56:50 INFO  BlockRDD:59 - Removing RDD 8 from persistence
 list
 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received
 message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received
 message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled
 message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8
 2015-04-01 08:56:50 INFO  BlockManager:59 - Removing RDD 8
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD
 8, response is 0
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled
 message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
 2015-04-01 08:56:50 INFO  KafkaInputDStream:59 - Removing blocks of RDD
 BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time
 142787141 ms
 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that
 were older than 1427871405000 ms: 1427871405000 ms
 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for
 time 142787141 ms
 2015-04-01 08:56:50 INFO  ReceivedBlockTracker:59 - Deleting batches
 ArrayBuffer(142787140 ms)
 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0
 to Actor[akka://sparkDriver/temp/$o]
 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor]
 received message ReviveOffers from
 Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - 

Re: Spark + Kafka

2015-03-19 Thread James King
Thanks Khanderao.

On Wed, Mar 18, 2015 at 7:18 PM, Khanderao Kand Gmail 
khanderao.k...@gmail.com wrote:

 I have used various version of spark (1.0, 1.2.1) without any issues .
 Though I have not significantly used kafka with 1.3.0 , a preliminary
 testing revealed no issues .

 - khanderao



  On Mar 18, 2015, at 2:38 AM, James King jakwebin...@gmail.com wrote:
 
  Hi All,
 
  Which build of Spark is best when using Kafka?
 
  Regards
  jk



Re: Spark + Kafka

2015-03-19 Thread James King
Many thanks all for the good responses, appreciated.

On Thu, Mar 19, 2015 at 8:36 AM, James King jakwebin...@gmail.com wrote:

 Thanks Khanderao.

 On Wed, Mar 18, 2015 at 7:18 PM, Khanderao Kand Gmail 
 khanderao.k...@gmail.com wrote:

 I have used various version of spark (1.0, 1.2.1) without any issues .
 Though I have not significantly used kafka with 1.3.0 , a preliminary
 testing revealed no issues .

 - khanderao



  On Mar 18, 2015, at 2:38 AM, James King jakwebin...@gmail.com wrote:
 
  Hi All,
 
  Which build of Spark is best when using Kafka?
 
  Regards
  jk





Re: Spark + Kafka

2015-03-18 Thread Jeffrey Jedele
Probably 1.3.0 - it has some improvements in the included Kafka receiver
for streaming.

https://spark.apache.org/releases/spark-release-1-3-0.html

Regards,
Jeff

2015-03-18 10:38 GMT+01:00 James King jakwebin...@gmail.com:

 Hi All,

 Which build of Spark is best when using Kafka?

 Regards
 jk



Re: Spark + Kafka

2015-03-18 Thread James King
Thanks Jeff, I'm planning to use it in standalone mode, OK will use hadoop
2.4 package. Chao!



On Wed, Mar 18, 2015 at 10:56 AM, Jeffrey Jedele jeffrey.jed...@gmail.com
wrote:

 What you call sub-category are packages pre-built to run on certain
 Hadoop environments. It really depends on where you want to run Spark. As
 far as I know, this is mainly about the included HDFS binding - so if you
 just want to play around with Spark, any of the packages should be fine. I
 wouldn't use source though, because you'd have to compile it yourself.

 PS: Make sure to use Reply to all. If you're not including the mailing
 list in the response, I'm the only one who will get your message.

 Regards,
 Jeff

 2015-03-18 10:49 GMT+01:00 James King jakwebin...@gmail.com:

 Any sub-category recommendations hadoop, MapR, CDH?

 On Wed, Mar 18, 2015 at 10:48 AM, James King jakwebin...@gmail.com
 wrote:

 Many thanks Jeff will give it a go.

 On Wed, Mar 18, 2015 at 10:47 AM, Jeffrey Jedele 
 jeffrey.jed...@gmail.com wrote:

 Probably 1.3.0 - it has some improvements in the included Kafka
 receiver for streaming.

 https://spark.apache.org/releases/spark-release-1-3-0.html

 Regards,
 Jeff

 2015-03-18 10:38 GMT+01:00 James King jakwebin...@gmail.com:

 Hi All,

 Which build of Spark is best when using Kafka?

 Regards
 jk








Re: Spark + Kafka

2015-03-18 Thread Jeffrey Jedele
What you call sub-category are packages pre-built to run on certain
Hadoop environments. It really depends on where you want to run Spark. As
far as I know, this is mainly about the included HDFS binding - so if you
just want to play around with Spark, any of the packages should be fine. I
wouldn't use source though, because you'd have to compile it yourself.

PS: Make sure to use Reply to all. If you're not including the mailing
list in the response, I'm the only one who will get your message.

Regards,
Jeff

2015-03-18 10:49 GMT+01:00 James King jakwebin...@gmail.com:

 Any sub-category recommendations hadoop, MapR, CDH?

 On Wed, Mar 18, 2015 at 10:48 AM, James King jakwebin...@gmail.com
 wrote:

 Many thanks Jeff will give it a go.

 On Wed, Mar 18, 2015 at 10:47 AM, Jeffrey Jedele 
 jeffrey.jed...@gmail.com wrote:

 Probably 1.3.0 - it has some improvements in the included Kafka receiver
 for streaming.

 https://spark.apache.org/releases/spark-release-1-3-0.html

 Regards,
 Jeff

 2015-03-18 10:38 GMT+01:00 James King jakwebin...@gmail.com:

 Hi All,

 Which build of Spark is best when using Kafka?

 Regards
 jk







Re: Spark + Kafka

2015-03-18 Thread Khanderao Kand Gmail
I have used various version of spark (1.0, 1.2.1) without any issues . Though I 
have not significantly used kafka with 1.3.0 , a preliminary testing revealed 
no issues . 

- khanderao 



 On Mar 18, 2015, at 2:38 AM, James King jakwebin...@gmail.com wrote:
 
 Hi All,
 
 Which build of Spark is best when using Kafka?
 
 Regards
 jk

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark kafka batch integration

2014-12-15 Thread Cody Koeninger
For an alternative take on a similar idea, see

https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka/src/main/scala/org/apache/spark/rdd/kafka

An advantage of the approach I'm taking is that the lower and upper offsets
of the RDD are known in advance, so it's deterministic.

I haven't had a need to write to kafka from spark yet, so that's an obvious
advantage of your library.

I think the existing kafka dstream is inadequate for a number of use cases,
and would really like to see some combination of these approaches make it
into the spark codebase.

On Sun, Dec 14, 2014 at 2:41 PM, Koert Kuipers ko...@tresata.com wrote:

 hello all,
 we at tresata wrote a library to provide for batch integration between
 spark and kafka (distributed write of rdd to kafa, distributed read of rdd
 from kafka). our main use cases are (in lambda architecture jargon):
 * period appends to the immutable master dataset on hdfs from kafka using
 spark
 * make non-streaming data available in kafka with periodic data drops from
 hdfs using spark. this is to facilitate merging the speed and batch layer
 in spark-streaming
 * distributed writes from spark-streaming

 see here:
 https://github.com/tresata/spark-kafka

 best,
 koert



Re: spark kafka batch integration

2014-12-15 Thread Koert Kuipers
thanks! i will take a look at your code. didn't realize there was already
something out there.

good point about upper offsets, i will add that feature to our version as
well if you dont mind.

i was thinking about making it deterministic for task failure transparently
(even if no upper offsets are provided) by doing a call to get the latest
offsets for all partitions, and filter the rdd based on that to make sure
nothing beyond those offsets ends up in the rdd. havent had time to test if
that works and is robust.

On Mon, Dec 15, 2014 at 11:39 AM, Cody Koeninger c...@koeninger.org wrote:

 For an alternative take on a similar idea, see


 https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka/src/main/scala/org/apache/spark/rdd/kafka

 An advantage of the approach I'm taking is that the lower and upper
 offsets of the RDD are known in advance, so it's deterministic.

 I haven't had a need to write to kafka from spark yet, so that's an
 obvious advantage of your library.

 I think the existing kafka dstream is inadequate for a number of use
 cases, and would really like to see some combination of these approaches
 make it into the spark codebase.

 On Sun, Dec 14, 2014 at 2:41 PM, Koert Kuipers ko...@tresata.com wrote:

 hello all,
 we at tresata wrote a library to provide for batch integration between
 spark and kafka (distributed write of rdd to kafa, distributed read of rdd
 from kafka). our main use cases are (in lambda architecture jargon):
 * period appends to the immutable master dataset on hdfs from kafka using
 spark
 * make non-streaming data available in kafka with periodic data drops from
 hdfs using spark. this is to facilitate merging the speed and batch layer
 in spark-streaming
 * distributed writes from spark-streaming

 see here:
 https://github.com/tresata/spark-kafka

 best,
 koert




Re: Spark / Kafka connector - CDH5 distribution

2014-10-07 Thread Abraham Jacob
Thanks Sean,

Sorry in my earlier question I meant to type CDH5.1.3 not CDH5.1.2

I presume it's included in spark-streaming_2.10-1.0.0-cdh5.1.3

But for some reason eclipse complains that import
org.apache.spark.streaming.kafka cannon be resolved, even though I have
included the spark-streaming_2.10-1.0.0-cdh5.1.3.jar file in the project.

Where can I find it in the CDH5.1.3 spark distribution?




On Tue, Oct 7, 2014 at 3:40 PM, Sean Owen so...@cloudera.com wrote:

 Yes, it is the entire Spark distribution.
 On Oct 7, 2014 11:36 PM, Abraham Jacob abe.jac...@gmail.com wrote:

 Hi All,

 Does anyone know if CDH5.1.2 packages the spark streaming kafka connector
 under the spark externals project?



 --
 ~




-- 
~


Re: Spark / Kafka connector - CDH5 distribution

2014-10-07 Thread Abraham Jacob
Never mind... my bad... made a typo.

looks good.

Thanks,

On Tue, Oct 7, 2014 at 3:57 PM, Abraham Jacob abe.jac...@gmail.com wrote:

 Thanks Sean,

 Sorry in my earlier question I meant to type CDH5.1.3 not CDH5.1.2

 I presume it's included in spark-streaming_2.10-1.0.0-cdh5.1.3

 But for some reason eclipse complains that import
 org.apache.spark.streaming.kafka cannon be resolved, even though I have
 included the spark-streaming_2.10-1.0.0-cdh5.1.3.jar file in the project.

 Where can I find it in the CDH5.1.3 spark distribution?




 On Tue, Oct 7, 2014 at 3:40 PM, Sean Owen so...@cloudera.com wrote:

 Yes, it is the entire Spark distribution.
 On Oct 7, 2014 11:36 PM, Abraham Jacob abe.jac...@gmail.com wrote:

 Hi All,

 Does anyone know if CDH5.1.2 packages the spark streaming kafka
 connector under the spark externals project?



 --
 ~




 --
 ~




-- 
~


Re: Spark Kafka streaming - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver

2014-06-11 Thread gaurav.dasgupta
Thanks Tobias for replying.

The problem was that, I have to provide the dependency jars' paths to the
StreamingContext within the code. So, providing all the jar paths, resolved
my problem. Refer the below code snippet:










*JavaStreamingContext ssc = new JavaStreamingContext(args[0],
SparkStreamExample, new Duration(1000),
System.getenv(SPARK_HOME), new String[]
{JavaStreamingContext.jarOfClass(SparkStreamExample.class)[0],
/usr/local/spark-0.9.1-bin-hadoop2/external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar,
/usr/lib/hbase/lib/zookeeper.jar,
/usr/local/kafka/kafka_2.10-0.8.1.1/libs/zkclient-0.3.jar,
/usr/local/kafka/kafka_2.10-0.8.1.1/libs/kafka_2.10-0.8.1.1.jar,
/usr/local/scala/lib/scala-library.jar,
/usr/local/shark-0.9.1-bin-hadoop2/lib_managed/jars/com.yammer.metrics/metrics-core/metrics-core-2.1.2.jar,
/usr/local/hbase.jar});*

The question is that isn't there any other way of doing this? The above
approach doesn't seem good to me. For example, what if I execute the
application on some other cluster where dependency paths are different? It
is also not feasible to parametrize these jar paths as user arguments.

Any advise will be appreciated.

Regards,
Gaurav

On Mon, Jun 9, 2014 at 6:23 AM, Tobias Pfeiffer [via Apache Spark User
List] ml-node+s1001560n7216...@n3.nabble.com wrote:

 Gaurav,

 I am not sure that the * expands to what you expect it to do.
 Normally the bash expands * to a space-separated string, not
 colon-separated. Try specifying all the jars manually, maybe?

 Tobias

 On Thu, Jun 5, 2014 at 6:45 PM, Gaurav Dasgupta [hidden email]
 http://user/SendEmail.jtp?type=nodenode=7216i=0 wrote:

  Hi,
 
  I have written my own custom Spark streaming code which connects to
 Kafka
  server and fetch data. I have tested the code on local mode and it is
  working fine. But when I am executing the same code on YARN mode, I am
  getting KafkaReceiver class not found exception. I am providing the
 Spark
  Kafka jar in the classpath and ensured that the path is correct for all
 the
  nodes in my cluster.
 
  I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes
 (10
  node cluster) in the YARN cluster.
  I am using the following command to run my code on YARN mode:
 
  SPARK_YARN_MODE=true
 
 SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar

  SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp
 
 /usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/

  SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic
  NewTestTable 1
 
  Below is the error message I am getting:
 
  14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task
 set
  2.0 with 1 tasks
  14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting task 2.0:0 as
 TID
  70 on executor 2: manny6.musigma.com (PROCESS_LOCAL)
  14/06/05 04:29:12 INFO scheduler.TaskSetManager: Serialized task 2.0:0
 as
  2971 bytes in 2 ms
  14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task
 2.0:0)
  14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to
  java.lang.ClassNotFoundException
  java.lang.ClassNotFoundException:
  org.apache.spark.streaming.kafka.KafkaReceiver
  at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:247)
  at
 
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)

  at
  java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574)
  at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495)
  at
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731)
  at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
  at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666)
  at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322)
  at
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
  at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
  at
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
  at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
  at
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
  at
  java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479)
  at
 
 

Re: Spark Kafka streaming - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver

2014-06-08 Thread Tobias Pfeiffer
Gaurav,

I am not sure that the * expands to what you expect it to do.
Normally the bash expands * to a space-separated string, not
colon-separated. Try specifying all the jars manually, maybe?

Tobias

On Thu, Jun 5, 2014 at 6:45 PM, Gaurav Dasgupta gaurav.d...@gmail.com wrote:
 Hi,

 I have written my own custom Spark streaming code which connects to Kafka
 server and fetch data. I have tested the code on local mode and it is
 working fine. But when I am executing the same code on YARN mode, I am
 getting KafkaReceiver class not found exception. I am providing the Spark
 Kafka jar in the classpath and ensured that the path is correct for all the
 nodes in my cluster.

 I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes (10
 node cluster) in the YARN cluster.
 I am using the following command to run my code on YARN mode:

 SPARK_YARN_MODE=true
 SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
 SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp
 /usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/
 SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic
 NewTestTable 1

 Below is the error message I am getting:

 14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task set
 2.0 with 1 tasks
 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID
 70 on executor 2: manny6.musigma.com (PROCESS_LOCAL)
 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as
 2971 bytes in 2 ms
 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task 2.0:0)
 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to
 java.lang.ClassNotFoundException
 java.lang.ClassNotFoundException:
 org.apache.spark.streaming.kafka.KafkaReceiver
 at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:247)
 at
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
 at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
 at
 java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479)
 at
 org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:72)
 at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
 at
 org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:145)
 at
 java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
 at
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
 at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at