Re: Re: spark+kafka+dynamic resource allocation

2023-01-30 Thread Mich Talebzadeh
Sure, I suggest that you add a note to that Jira and express your interest.

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jan 2023 at 01:56, Lingzhe Sun  wrote:

> Hi Mich,
>
> Thanks for the information. I myself think this open issue should have
> higher priority as more streaming application is being built using spark.
> Hope this becomes a feature in the coming future.
>
> --
> BR
> Lingzhe Sun
>
>
> *From:* Mich Talebzadeh 
> *Date:* 2023-01-30 02:14
> *To:* Lingzhe Sun 
> *CC:* ashok34...@yahoo.com; User 
> *Subject:* Re: Re: spark+kafka+dynamic resource allocation
> Hi,
>
> Spark Structured Streaming currently does not support dynamic allocation
> (see SPARK-24815: Structured Streaming should support dynamic allocation
> <https://issues.apache.org/jira/browse/SPARK-24815>). which is still open
>
>
> Autoscaling in Cloud offerings
> <https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/autoscaling#autoscaling_and_spark_streaming>like
> Google Dataproc does not support Spark Structured Streaming either
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 29 Jan 2023 at 01:49, Lingzhe Sun  wrote:
>
>> Thank you for the response. But the reference does not seem to be
>> answering any of those questions.
>>
>> BS
>> Lingzhe Sun
>>
>>
>> *From:* ashok34...@yahoo.com
>> *Date:* 2023-01-29 04:01
>> *To:* User ; Lingzhe Sun 
>> *Subject:* Re: spark+kafka+dynamic resource allocation
>> Hi,
>>
>> Worth checking this link
>>
>>
>> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>>
>> On Saturday, 28 January 2023 at 06:18:28 GMT, Lingzhe Sun <
>> lingzhe@hirain.com> wrote:
>>
>>
>> Hi all,
>>
>> I'm wondering if dynamic resource allocation works in spark+kafka
>> streaming applications. Here're some questions:
>>
>>- Will structured streaming be supported?
>>- Is the number of consumers always equal to the number of the
>>partitions of subscribed topic (let's say there's only one topic)?
>>- If consumers is evenly distributed across executors, will newly
>>added executor(through dynamic resource allocation) trigger a consumer
>>reassignment?
>>- Would it be simply a bad idea to use dynamic resource allocation in
>>streaming app, because there's no way to scale down number of executors
>>unless no data is coming in?
>>
>> Any thoughts are welcomed.
>>
>> Lingzhe Sun
>> Hirain Technology
>>
>>


Re: Re: spark+kafka+dynamic resource allocation

2023-01-29 Thread Lingzhe Sun
Hi Mich,

Thanks for the information. I myself think this open issue should have higher 
priority as more streaming application is being built using spark. Hope this 
becomes a feature in the coming future.



BR
Lingzhe Sun
 
From: Mich Talebzadeh
Date: 2023-01-30 02:14
To: Lingzhe Sun
CC: ashok34...@yahoo.com; User
Subject: Re: Re: spark+kafka+dynamic resource allocation
Hi,

Spark Structured Streaming currently does not support dynamic allocation (see 
SPARK-24815: Structured Streaming should support dynamic allocation). which is 
still open

Autoscaling in Cloud offerings like Google Dataproc does not support Spark 
Structured Streaming either

HTH

   view my Linkedin profile

 https://en.everybodywiki.com/Mich_Talebzadeh
 
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
 


On Sun, 29 Jan 2023 at 01:49, Lingzhe Sun  wrote:
Thank you for the response. But the reference does not seem to be answering any 
of those questions.

BS
Lingzhe Sun
 
From: ashok34...@yahoo.com
Date: 2023-01-29 04:01
To: User; Lingzhe Sun
Subject: Re: spark+kafka+dynamic resource allocation
Hi,

Worth checking this link

https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

On Saturday, 28 January 2023 at 06:18:28 GMT, Lingzhe Sun 
 wrote: 


Hi all,

I'm wondering if dynamic resource allocation works in spark+kafka streaming 
applications. Here're some questions:
Will structured streaming be supported?
Is the number of consumers always equal to the number of the partitions of 
subscribed topic (let's say there's only one topic)?
If consumers is evenly distributed across executors, will newly added 
executor(through dynamic resource allocation) trigger a consumer reassignment?
Would it be simply a bad idea to use dynamic resource allocation in streaming 
app, because there's no way to scale down number of executors unless no data is 
coming in?
Any thoughts are welcomed.

Lingzhe Sun 
Hirain Technology


Re: Re: spark+kafka+dynamic resource allocation

2023-01-29 Thread Mich Talebzadeh
Hi,

Spark Structured Streaming currently does not support dynamic allocation
(see SPARK-24815: Structured Streaming should support dynamic allocation
<https://issues.apache.org/jira/browse/SPARK-24815>). which is still open


Autoscaling in Cloud offerings
<https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/autoscaling#autoscaling_and_spark_streaming>like
Google Dataproc does not support Spark Structured Streaming either


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 29 Jan 2023 at 01:49, Lingzhe Sun  wrote:

> Thank you for the response. But the reference does not seem to be
> answering any of those questions.
>
> BS
> Lingzhe Sun
>
>
> *From:* ashok34...@yahoo.com
> *Date:* 2023-01-29 04:01
> *To:* User ; Lingzhe Sun 
> *Subject:* Re: spark+kafka+dynamic resource allocation
> Hi,
>
> Worth checking this link
>
>
> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
> On Saturday, 28 January 2023 at 06:18:28 GMT, Lingzhe Sun <
> lingzhe@hirain.com> wrote:
>
>
> Hi all,
>
> I'm wondering if dynamic resource allocation works in spark+kafka
> streaming applications. Here're some questions:
>
>- Will structured streaming be supported?
>- Is the number of consumers always equal to the number of the
>partitions of subscribed topic (let's say there's only one topic)?
>- If consumers is evenly distributed across executors, will newly
>added executor(through dynamic resource allocation) trigger a consumer
>reassignment?
>- Would it be simply a bad idea to use dynamic resource allocation in
>streaming app, because there's no way to scale down number of executors
>unless no data is coming in?
>
> Any thoughts are welcomed.
>
> Lingzhe Sun
> Hirain Technology
>
>


Re: Re: spark+kafka+dynamic resource allocation

2023-01-28 Thread Lingzhe Sun
Thank you for the response. But the reference does not seem to be answering any 
of those questions.

BS
Lingzhe Sun
 
From: ashok34...@yahoo.com
Date: 2023-01-29 04:01
To: User; Lingzhe Sun
Subject: Re: spark+kafka+dynamic resource allocation
Hi,

Worth checking this link

https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

On Saturday, 28 January 2023 at 06:18:28 GMT, Lingzhe Sun 
 wrote: 


Hi all,

I'm wondering if dynamic resource allocation works in spark+kafka streaming 
applications. Here're some questions:
Will structured streaming be supported?
Is the number of consumers always equal to the number of the partitions of 
subscribed topic (let's say there's only one topic)?
If consumers is evenly distributed across executors, will newly added 
executor(through dynamic resource allocation) trigger a consumer reassignment?
Would it be simply a bad idea to use dynamic resource allocation in streaming 
app, because there's no way to scale down number of executors unless no data is 
coming in?
Any thoughts are welcomed.

Lingzhe Sun 
Hirain Technology


Re: spark+kafka+dynamic resource allocation

2023-01-28 Thread ashok34...@yahoo.com.INVALID
 Hi,
Worth checking this link
https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

On Saturday, 28 January 2023 at 06:18:28 GMT, Lingzhe Sun 
 wrote:  
 
 #yiv9684413148 body {line-height:1.5;}#yiv9684413148 ol, #yiv9684413148 ul 
{margin-top:0px;margin-bottom:0px;list-style-position:inside;}#yiv9684413148 
body {font-size:10.5pt;font-family:'Microsoft YaHei UI';color:rgb(0, 0, 
0);line-height:1.5;}#yiv9684413148 body 
{font-size:10.5pt;font-family:'Microsoft YaHei UI';color:rgb(0, 0, 
0);line-height:1.5;}Hi all,
I'm wondering if dynamic resource allocation works in spark+kafka streaming 
applications. Here're some questions:   
   - Will structured streaming be supported?
   - Is the number of consumers always equal to the number of the partitions of 
subscribed topic (let's say there's only one topic)?
   - If consumers is evenly distributed across executors, will newly added 
executor(through dynamic resource allocation) trigger a consumer reassignment?
   - Would it be simply a bad idea to use dynamic resource allocation in 
streaming app, because there's no way to scale down number of executors unless 
no data is coming in?
Any thoughts are welcomed.
Lingzhe SunHirain Technology  

spark+kafka+dynamic resource allocation

2023-01-27 Thread Lingzhe Sun
Hi all,

I'm wondering if dynamic resource allocation works in spark+kafka streaming 
applications. Here're some questions:
Will structured streaming be supported?
Is the number of consumers always equal to the number of the partitions of 
subscribed topic (let's say there's only one topic)?
If consumers is evenly distributed across executors, will newly added 
executor(through dynamic resource allocation) trigger a consumer reassignment?
Would it be simply a bad idea to use dynamic resource allocation in streaming 
app, because there's no way to scale down number of executors unless no data is 
coming in?
Any thoughts are welcomed.

Lingzhe Sun 
Hirain Technology


RE: Spark kafka structured streaming - how to prevent dataloss

2022-03-22 Thread Gnanasoundari Soundarajan
Hi all,

Any suggestion?

Regards,
Gnana

From: Gnanasoundari Soundarajan 
Sent: Tuesday, March 8, 2022 10:02 PM
To: user@spark.apache.org
Subject: Spark kafka structured streaming - how to prevent dataloss

Hi,

In spark, it uses checkpoints to keep track of offsets in kafka. If there is 
any data loss, can we edit the file and reduce the data loss? Please suggest 
the best practices to reduce the data loss under exceptional scenarios.

Regards,
Gnana


Spark kafka structured streaming - how to prevent dataloss

2022-03-08 Thread Gnanasoundari Soundarajan
Hi,

In spark, it uses checkpoints to keep track of offsets in kafka. If there is 
any data loss, can we edit the file and reduce the data loss? Please suggest 
the best practices to reduce the data loss under exceptional scenarios.

Regards,
Gnana


RE: Spark Kafka Integration

2022-02-25 Thread Michael Williams (SSI)
Ahh, ok.  So, Kafka 3.1 is supported for Spark 3.2.1.  Thank you very much.

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Friday, February 25, 2022 2:50 PM
To: Michael Williams (SSI) 
Cc: user@spark.apache.org
Subject: Re: Spark Kafka Integration

these are the old and news ones

For spark 3.1.1 I needed these jar files to make it work

kafka-clients-2.7.0.jar  --> 
kafka-clients-3.1.0.jar<https://urldefense.com/v3/__https:/repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.1.0/kafka-clients-3.1.0.jar__;!!IPetXT4!kJE9FXhR63484NpSMIlBVr4evPJX7uzsB8-Yyaij23Vi19p8rIhJ9VGv_odk5bK3y6aqXdI$>
commons-pool2-2.9.0.jar  --> 
commons-pool2-2.11.1.jar<https://urldefense.com/v3/__https:/repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar__;!!IPetXT4!kJE9FXhR63484NpSMIlBVr4evPJX7uzsB8-Yyaij23Vi19p8rIhJ9VGv_odk5bK3hIXV3OM$>
spark-streaming_2.12-3.1.1.jar  --> 
spark-streaming_2.12-3.2.1.jar<https://urldefense.com/v3/__https:/repo1.maven.org/maven2/org/apache/spark/spark-streaming_2.12/3.2.1/spark-streaming_2.12-3.2.1.jar__;!!IPetXT4!kJE9FXhR63484NpSMIlBVr4evPJX7uzsB8-Yyaij23Vi19p8rIhJ9VGv_odk5bK3iEMgkFE$>
spark-sql-kafka-0-10_2.12-3.1.0.jar -> 
spark-sql-kafka-0-10_2.12-3.2.1.jar<https://urldefense.com/v3/__https:/repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.2.1/spark-sql-kafka-0-10_2.12-3.2.1.jar__;!!IPetXT4!kJE9FXhR63484NpSMIlBVr4evPJX7uzsB8-Yyaij23Vi19p8rIhJ9VGv_odk5bK3hhNtm10$>



HTH


 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!kJE9FXhR63484NpSMIlBVr4evPJX7uzsB8-Yyaij23Vi19p8rIhJ9VGv_odk5bK3Zlx4tlQ$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!kJE9FXhR63484NpSMIlBVr4evPJX7uzsB8-Yyaij23Vi19p8rIhJ9VGv_odk5bK3nARtlMw$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 20:40, Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>> wrote:
I believe it is 3.1, but if there is a different version that “works better” 
with spark, any advice would be appreciated.  Our entire team is totally new to 
spark and kafka (this is a poc trial).

From: Mich Talebzadeh 
[mailto:mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>]
Sent: Friday, February 25, 2022 2:30 PM
To: Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark Kafka Integration

and what version of kafka do you have 2.7?

for spark 3.1.1 I needed these jar files to make it work

kafka-clients-2.7.0.jar
commons-pool2-2.9.0.jar
spark-streaming_2.12-3.1.1.jar
spark-sql-kafka-0-10_2.12-3.1.0.jar



HTH


 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 20:15, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:
What is the use case? Is this for spark structured streaming?

HTH




 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other propert

RE: Spark Kafka Integration

2022-02-25 Thread Michael Williams (SSI)
Thank you, that is good to know.

From: Sean Owen [mailto:sro...@gmail.com]
Sent: Friday, February 25, 2022 2:46 PM
To: Michael Williams (SSI) 
Cc: Mich Talebzadeh ; user@spark.apache.org
Subject: Re: Spark Kafka Integration

Spark 3.2.1 is compiled vs Kafka 2.8.0; the forthcoming Spark 3.3 against Kafka 
3.1.0.
It may well be mutually compatible though.

On Fri, Feb 25, 2022 at 2:40 PM Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>> wrote:
I believe it is 3.1, but if there is a different version that “works better” 
with spark, any advice would be appreciated.  Our entire team is totally new to 
spark and kafka (this is a poc trial).

From: Mich Talebzadeh 
[mailto:mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>]
Sent: Friday, February 25, 2022 2:30 PM
To: Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark Kafka Integration

and what version of kafka do you have 2.7?

for spark 3.1.1 I needed these jar files to make it work

kafka-clients-2.7.0.jar
commons-pool2-2.9.0.jar
spark-streaming_2.12-3.1.1.jar
spark-sql-kafka-0-10_2.12-3.1.0.jar



HTH


 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 20:15, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:
What is the use case? Is this for spark structured streaming?

HTH




 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>> wrote:
After reviewing Spark's Kafka Integration guide, it indicates that 
spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for Spark 
3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the cleanest, 
most repeatable (reliable) way to acquire these jars for including in a Spark 
Docker image without introducing version compatibility issues?

Thank you,
Mike


This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.


This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.



This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not d

Re: Spark Kafka Integration

2022-02-25 Thread Mich Talebzadeh
these are the old and news ones

For spark 3.1.1 I needed these jar files to make it work

kafka-clients-2.7.0.jar  --> kafka-clients-3.1.0.jar
<https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.1.0/kafka-clients-3.1.0.jar>
commons-pool2-2.9.0.jar  --> commons-pool2-2.11.1.jar
<https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar>
spark-streaming_2.12-3.1.1.jar  --> spark-streaming_2.12-3.2.1.jar
<https://repo1.maven.org/maven2/org/apache/spark/spark-streaming_2.12/3.2.1/spark-streaming_2.12-3.2.1.jar>
spark-sql-kafka-0-10_2.12-3.1.0.jar -> spark-sql-kafka-0-10_2.12-3.2.1.jar
<https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.2.1/spark-sql-kafka-0-10_2.12-3.2.1.jar>


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 25 Feb 2022 at 20:40, Michael Williams (SSI) <
michael.willi...@ssigroup.com> wrote:

> I believe it is 3.1, but if there is a different version that “works
> better” with spark, any advice would be appreciated.  Our entire team is
> totally new to spark and kafka (this is a poc trial).
>
>
>
> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
> *Sent:* Friday, February 25, 2022 2:30 PM
> *To:* Michael Williams (SSI) 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Kafka Integration
>
>
>
> and what version of kafka do you have 2.7?
>
>
>
> for spark 3.1.1 I needed these jar files to make it work
>
>
>
> kafka-clients-2.7.0.jar
> commons-pool2-2.9.0.jar
> spark-streaming_2.12-3.1.1.jar
> spark-sql-kafka-0-10_2.12-3.1.0.jar
>
>
>
> HTH
>
>
>
>view my Linkedin profile
> <https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
> <https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Fri, 25 Feb 2022 at 20:15, Mich Talebzadeh 
> wrote:
>
> What is the use case? Is this for spark structured streaming?
>
>
>
> HTH
>
>
>
>
>
>view my Linkedin profile
> <https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
> <https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) <
> michael.willi...@ssigroup.com> wrote:
>
> After reviewing Spark's Kafka Integration guide, it indicates that
> spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for
> Spark 3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the
> cleanest, most repeatable (reliable) way to acquire these jars for
> including in a Spark Docker image without introducing version compatibility
> issues?
>
>
>
> Thank you,
>
> Mike
>
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deli

Re: Spark Kafka Integration

2022-02-25 Thread Sean Owen
Spark 3.2.1 is compiled vs Kafka 2.8.0; the forthcoming Spark 3.3 against
Kafka 3.1.0.
It may well be mutually compatible though.

On Fri, Feb 25, 2022 at 2:40 PM Michael Williams (SSI) <
michael.willi...@ssigroup.com> wrote:

> I believe it is 3.1, but if there is a different version that “works
> better” with spark, any advice would be appreciated.  Our entire team is
> totally new to spark and kafka (this is a poc trial).
>
>
>
> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
> *Sent:* Friday, February 25, 2022 2:30 PM
> *To:* Michael Williams (SSI) 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Kafka Integration
>
>
>
> and what version of kafka do you have 2.7?
>
>
>
> for spark 3.1.1 I needed these jar files to make it work
>
>
>
> kafka-clients-2.7.0.jar
> commons-pool2-2.9.0.jar
> spark-streaming_2.12-3.1.1.jar
> spark-sql-kafka-0-10_2.12-3.1.0.jar
>
>
>
> HTH
>
>
>
>view my Linkedin profile
> <https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
> <https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Fri, 25 Feb 2022 at 20:15, Mich Talebzadeh 
> wrote:
>
> What is the use case? Is this for spark structured streaming?
>
>
>
> HTH
>
>
>
>
>
>view my Linkedin profile
> <https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
> <https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) <
> michael.willi...@ssigroup.com> wrote:
>
> After reviewing Spark's Kafka Integration guide, it indicates that
> spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for
> Spark 3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the
> cleanest, most repeatable (reliable) way to acquire these jars for
> including in a Spark Docker image without introducing version compatibility
> issues?
>
>
>
> Thank you,
>
> Mike
>
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>


RE: Spark Kafka Integration

2022-02-25 Thread Michael Williams (SSI)
Thank you

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Friday, February 25, 2022 2:35 PM
To: Michael Williams (SSI) 
Cc: Sean Owen ; user@spark.apache.org
Subject: Re: Spark Kafka Integration

please see my earlier reply for 3.1.1 tested and worked in Google Dataproc 
environment

Also this article of mine may be useful

Processing Change Data Capture with Spark Structured 
Streaming<https://urldefense.com/v3/__https:/www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/__;!!IPetXT4!gikpPwpJyYjmHQoX8PFvKTkBOOZ8_AsRuHBnyxqAZxWiejSbWOTRn16Kh1Sd8F-4--GdsLI$>

HTH




 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gikpPwpJyYjmHQoX8PFvKTkBOOZ8_AsRuHBnyxqAZxWiejSbWOTRn16Kh1Sd8F-4OpVyh2c$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gikpPwpJyYjmHQoX8PFvKTkBOOZ8_AsRuHBnyxqAZxWiejSbWOTRn16Kh1Sd8F-4-AqCCmk$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 20:30, Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>> wrote:
The use case is for spark structured streaming (a spark app will be launched by 
a worker service that monitors the kafka topic for new messages, once the 
messages are consumed, the spark app will terminate), but if there is a hitch 
here, it is that the Spark environment includes the MS dotnet for Spark 
wrapper, which means the each spark app will consume from one kafka topic and 
will be written in C#.  If possible, I’d really like to be able to manually 
download the necessary jars and do the kafka client installation as part of the 
docker image build, so that the dependencies already exist on disk.  If that 
makes any sense.

Thank you

From: Mich Talebzadeh 
[mailto:mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>]
Sent: Friday, February 25, 2022 2:16 PM
To: Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark Kafka Integration

What is the use case? Is this for spark structured streaming?

HTH




 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!lCrzhG1VJ33tiaN9wEQbbz1YK22GuptNP0ttoU3MuEFoo5yhyYOunxqT6ntBaiGS-IYaQ48$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!lCrzhG1VJ33tiaN9wEQbbz1YK22GuptNP0ttoU3MuEFoo5yhyYOunxqT6ntBaiGSmSLsUws$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>> wrote:
After reviewing Spark's Kafka Integration guide, it indicates that 
spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for Spark 
3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the cleanest, 
most repeatable (reliable) way to acquire these jars for including in a Spark 
Docker image without introducing version compatibility issues?

Thank you,
Mike


This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.


This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any a

RE: Spark Kafka Integration

2022-02-25 Thread Michael Williams (SSI)
I believe it is 3.1, but if there is a different version that “works better” 
with spark, any advice would be appreciated.  Our entire team is totally new to 
spark and kafka (this is a poc trial).

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Friday, February 25, 2022 2:30 PM
To: Michael Williams (SSI) 
Cc: user@spark.apache.org
Subject: Re: Spark Kafka Integration

and what version of kafka do you have 2.7?

for spark 3.1.1 I needed these jar files to make it work

kafka-clients-2.7.0.jar
commons-pool2-2.9.0.jar
spark-streaming_2.12-3.1.1.jar
spark-sql-kafka-0-10_2.12-3.1.0.jar



HTH


 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 20:15, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:
What is the use case? Is this for spark structured streaming?

HTH




 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6MRS0hIg$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!gBJNAXItekUwDwvWFDOm6TyHbSrwbIYzHfz3Lgdat86WXOH09jgo72Z1eIt0YwL6-ShVdmU$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>> wrote:
After reviewing Spark's Kafka Integration guide, it indicates that 
spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for Spark 
3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the cleanest, 
most repeatable (reliable) way to acquire these jars for including in a Spark 
Docker image without introducing version compatibility issues?

Thank you,
Mike


This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.



This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.


Re: Spark Kafka Integration

2022-02-25 Thread Mich Talebzadeh
please see my earlier reply for 3.1.1 tested and worked in Google Dataproc
environment

Also this article of mine may be useful

Processing Change Data Capture with Spark Structured Streaming
<https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/>

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 25 Feb 2022 at 20:30, Michael Williams (SSI) <
michael.willi...@ssigroup.com> wrote:

> The use case is for spark structured streaming (a spark app will be
> launched by a worker service that monitors the kafka topic for new
> messages, once the messages are consumed, the spark app will terminate),
> but if there is a hitch here, it is that the Spark environment includes the
> MS dotnet for Spark wrapper, which means the each spark app will consume
> from one kafka topic and will be written in C#.  If possible, I’d really
> like to be able to manually download the necessary jars and do the kafka
> client installation as part of the docker image build, so that the
> dependencies already exist on disk.  If that makes any sense.
>
>
>
> Thank you
>
>
>
> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
> *Sent:* Friday, February 25, 2022 2:16 PM
> *To:* Michael Williams (SSI) 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Kafka Integration
>
>
>
> What is the use case? Is this for spark structured streaming?
>
>
>
> HTH
>
>
>
>
>
>view my Linkedin profile
> <https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!lCrzhG1VJ33tiaN9wEQbbz1YK22GuptNP0ttoU3MuEFoo5yhyYOunxqT6ntBaiGS-IYaQ48$>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
> <https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!lCrzhG1VJ33tiaN9wEQbbz1YK22GuptNP0ttoU3MuEFoo5yhyYOunxqT6ntBaiGSmSLsUws$>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) <
> michael.willi...@ssigroup.com> wrote:
>
> After reviewing Spark's Kafka Integration guide, it indicates that
> spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for
> Spark 3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the
> cleanest, most repeatable (reliable) way to acquire these jars for
> including in a Spark Docker image without introducing version compatibility
> issues?
>
>
>
> Thank you,
>
> Mike
>
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>


Re: Spark Kafka Integration

2022-02-25 Thread Mich Talebzadeh
and what version of kafka do you have 2.7?

for spark 3.1.1 I needed these jar files to make it work

kafka-clients-2.7.0.jar
commons-pool2-2.9.0.jar
spark-streaming_2.12-3.1.1.jar
spark-sql-kafka-0-10_2.12-3.1.0.jar


HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 25 Feb 2022 at 20:15, Mich Talebzadeh 
wrote:

> What is the use case? Is this for spark structured streaming?
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) <
> michael.willi...@ssigroup.com> wrote:
>
>> After reviewing Spark's Kafka Integration guide, it indicates that
>> spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for
>> Spark 3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the
>> cleanest, most repeatable (reliable) way to acquire these jars for
>> including in a Spark Docker image without introducing version compatibility
>> issues?
>>
>>
>>
>> Thank you,
>>
>> Mike
>>
>>
>> This electronic message may contain information that is Proprietary,
>> Confidential, or legally privileged or protected. It is intended only for
>> the use of the individual(s) and entity named in the message. If you are
>> not an intended recipient of this message, please notify the sender
>> immediately and delete the material from your computer. Do not deliver,
>> distribute or copy this message and do not disclose its contents or take
>> any action in reliance on the information it contains. Thank You.
>>
>


RE: Spark Kafka Integration

2022-02-25 Thread Michael Williams (SSI)
The use case is for spark structured streaming (a spark app will be launched by 
a worker service that monitors the kafka topic for new messages, once the 
messages are consumed, the spark app will terminate), but if there is a hitch 
here, it is that the Spark environment includes the MS dotnet for Spark 
wrapper, which means the each spark app will consume from one kafka topic and 
will be written in C#.  If possible, I’d really like to be able to manually 
download the necessary jars and do the kafka client installation as part of the 
docker image build, so that the dependencies already exist on disk.  If that 
makes any sense.

Thank you

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Friday, February 25, 2022 2:16 PM
To: Michael Williams (SSI) 
Cc: user@spark.apache.org
Subject: Re: Spark Kafka Integration

What is the use case? Is this for spark structured streaming?

HTH




 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https:/www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!IPetXT4!lCrzhG1VJ33tiaN9wEQbbz1YK22GuptNP0ttoU3MuEFoo5yhyYOunxqT6ntBaiGS-IYaQ48$>

 
https://en.everybodywiki.com/Mich_Talebzadeh<https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!IPetXT4!lCrzhG1VJ33tiaN9wEQbbz1YK22GuptNP0ttoU3MuEFoo5yhyYOunxqT6ntBaiGSmSLsUws$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) 
mailto:michael.willi...@ssigroup.com>> wrote:
After reviewing Spark's Kafka Integration guide, it indicates that 
spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for Spark 
3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the cleanest, 
most repeatable (reliable) way to acquire these jars for including in a Spark 
Docker image without introducing version compatibility issues?

Thank you,
Mike


This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.



This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.


Re: Spark Kafka Integration

2022-02-25 Thread Sean Owen
That .jar is available on Maven, though typically you depend on it in your
app, and compile an uber JAR which will contain it and all its dependencies.
You can I suppose manage to compile an uber JAR from that dependency itself
with tools if needed.

On Fri, Feb 25, 2022 at 1:37 PM Michael Williams (SSI) <
michael.willi...@ssigroup.com> wrote:

> After reviewing Spark's Kafka Integration guide, it indicates that
> spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for
> Spark 3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the
> cleanest, most repeatable (reliable) way to acquire these jars for
> including in a Spark Docker image without introducing version compatibility
> issues?
>
>
>
> Thank you,
>
> Mike
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>


Re: Spark Kafka Integration

2022-02-25 Thread Mich Talebzadeh
What is the use case? Is this for spark structured streaming?

HTH



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) <
michael.willi...@ssigroup.com> wrote:

> After reviewing Spark's Kafka Integration guide, it indicates that
> spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for
> Spark 3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the
> cleanest, most repeatable (reliable) way to acquire these jars for
> including in a Spark Docker image without introducing version compatibility
> issues?
>
>
>
> Thank you,
>
> Mike
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>


Spark Kafka Integration

2022-02-25 Thread Michael Williams (SSI)
After reviewing Spark's Kafka Integration guide, it indicates that 
spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for Spark 
3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the cleanest, 
most repeatable (reliable) way to acquire these jars for including in a Spark 
Docker image without introducing version compatibility issues?

Thank you,
Mike



This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.


RE: Running spark Kafka streaming jo in Azure HDInsight

2021-10-06 Thread Muhammed Favas
Hi,

Yeah, but I verified the version of spark in HDInsight and the one I used in 
code and both are same.

HDInsight(4.0) has spark version 3.0 and Scala version 2.12

[cid:image001.png@01D7BAD3.81444600]

I used Livy API in HDInsight to submit the job. This is an API available in 
HDInsight to submit job remotely. I have passed all dependent jars while 
calling the submit.


Regards,
Favas

From: Stelios Philippou 
Sent: Wednesday, October 6, 2021 16:51 PM
To: Muhammed Favas 
Cc: user@spark.apache.org
Subject: Re: Running spark Kafka streaming jo in Azure HDInsight

Hi Favas,

The error states that you are using different libraries version.


Exception in thread "streaming-start" java.lang.NoSuchMethodError: 
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V


Have in mind that Spark uses its internal libraries for the majority of this. 
So those two must be aligned between Spark and your code.

Can you verify that your HDVersion is indeed 3.0 ?

Also how are you submitting the job ?


On Wed, 6 Oct 2021 at 14:10, Muhammed Favas 
mailto:favas.muham...@expeedsoftware.com>> 
wrote:
Hi,

I am facing some dependency issue in running a spark streaming job in Azure 
HDInsight. The job is connecting to a kafka broker which is hosted in a LAN and 
has public IP access to it.

Spark job porn.xml set up – spark version 3.0.0, Scala version 2.12


  org.scala-lang
  scala-library
  2.12.12


  org.apache.spark
  spark-core_2.12
  3.0.0

 
  org.apache.spark
  spark-sql_2.12
  3.0.0
  


  org.apache.hadoop
  hadoop-common
  2.7.4


  org.apache.spark
  spark-streaming_2.12
  3.0.0
  


  org.apache.spark
  spark-streaming-kafka-0-10_2.12
  3.0.0


HDInsight version - Spark 3.0 (HDI 4.0)
I am using Livy API to start job in azure remotely. Below is the list of files 
passed in “jars” option in livy

kafka-clients-2.7.0.jar<https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/kafka-clients-2.7.0.jar>,
spark-streaming-kafka-0-10_2.12-3.0.0.jar<https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/spark-streaming-kafka-0-10_2.12-3.0.0.jar>,
spark-token-provider-kafka-0-10_2.12-3.0.0.jar<https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/spark-token-provider-kafka-0-10_2.12-3.0.0.jar>

The job is starting in azure spark cluster, but it is not receiving data from 
my kafka broker. Here is the error I am getting


Exception in thread "streaming-start" java.lang.NoSuchMethodError: 
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V

at 
org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:93)

at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)

at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:258)

at 
org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:55)

at 
org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:55)

at scala.collection.Iterator.foreach(Iterator.scala:941)

at scala.collection.Iterator.foreach$(Iterator.scala:941)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

at 
scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974)

at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)

at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)

at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)

at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)

at 
scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971)

at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)

at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)

at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)

at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)

at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)

at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)

at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)

at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

Here is the scala code which used to connect to broker.


import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe



val kafkaParams = Map[String, Object](
"bootstrap.servers" -> kafkaServer,
"key.deseria

Re: Running spark Kafka streaming jo in Azure HDInsight

2021-10-06 Thread Stelios Philippou
Hi Favas,

The error states that you are using different libraries version.

Exception in thread "streaming-start" java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V



Have in mind that Spark uses its internal libraries for the majority of
this. So those two must be aligned between Spark and your code.

Can you verify that your HDVersion is indeed 3.0 ?

Also how are you submitting the job ?


On Wed, 6 Oct 2021 at 14:10, Muhammed Favas <
favas.muham...@expeedsoftware.com> wrote:

> Hi,
>
>
>
> I am facing some dependency issue in running a spark streaming job in
> Azure HDInsight. The job is connecting to a kafka broker which is hosted in
> a LAN and has public IP access to it.
>
>
>
> Spark job porn.xml set up – spark version 3.0.0, Scala version 2.12
>
>
>
> 
>   org.scala-lang
>   scala-library
>   2.12.12
> 
> 
>   org.apache.spark
>   spark-core_2.12
>   3.0.0
> 
>  
>   org.apache.spark
>   spark-sql_2.12
>   3.0.0
>   
> 
> 
>   org.apache.hadoop
>   hadoop-common
>   2.7.4
> 
> 
>   org.apache.spark
>   spark-streaming_2.12
>   3.0.0
>   
> 
> 
>   org.apache.spark
>   spark-streaming-kafka-0-10_2.12
>   3.0.0
> 
>
>
>
> HDInsight version - Spark 3.0 (HDI 4.0)
>
> I am using Livy API to start job in azure remotely. Below is the list of
> files passed in “jars” option in livy
>
>
>
> kafka-clients-2.7.0.jar
> 
> ,
>
> spark-streaming-kafka-0-10_2.12-3.0.0.jar
> 
> ,
>
> spark-token-provider-kafka-0-10_2.12-3.0.0.jar
> 
>
>
>
> The job is starting in azure spark cluster, but it is not receiving data
> from my kafka broker. Here is the error I am getting
>
>
>
> Exception in thread "streaming-start" java.lang.NoSuchMethodError: 
> org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
>
> at 
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:93)
>
> at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)
>
> at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:258)
>
> at 
> org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:55)
>
> at 
> org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:55)
>
> at scala.collection.Iterator.foreach(Iterator.scala:941)
>
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>
> at 
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974)
>
> at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)
>
> at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>
> at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)
>
> at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)
>
> at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)
>
> at 
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971)
>
> at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)
>
> at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)
>
> at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)
>
> at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
>
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>
> at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>
> at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
>
>
>
> Here is the scala code which used to connect to broker.
>
>
>
> import org.apache.spark.streaming.kafka010.KafkaUtils
> import org.apache.spark.streaming.kafka010.LocationStrategies.
> *PreferConsistent*import 
> org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
>
>
>
> val kafkaParams = *Map*[String, Object](
> "bootstrap.servers" -> kafkaServer,
> "key.deserializer" -> *classOf*[StringDeserializer],
> "value.deserializer" -> *classOf*[StringDeserializer],
> "group.id" -> connectionID,
> "auto.offset.reset" -> "earliest",
> "enable.auto.commit" -> (true: java.lang.Boolean),
> "partition.assignment.strategy" 
> ->"org.apache.kafka.clients.consumer.RangeAssignor"
>   )
>
>   val topics = *Array*(connectionID)
>  

Running spark Kafka streaming jo in Azure HDInsight

2021-10-06 Thread Muhammed Favas
Hi,

I am facing some dependency issue in running a spark streaming job in Azure 
HDInsight. The job is connecting to a kafka broker which is hosted in a LAN and 
has public IP access to it.

Spark job porn.xml set up - spark version 3.0.0, Scala version 2.12


  org.scala-lang
  scala-library
  2.12.12


  org.apache.spark
  spark-core_2.12
  3.0.0

 
  org.apache.spark
  spark-sql_2.12
  3.0.0
  


  org.apache.hadoop
  hadoop-common
  2.7.4


  org.apache.spark
  spark-streaming_2.12
  3.0.0
  


  org.apache.spark
  spark-streaming-kafka-0-10_2.12
  3.0.0


HDInsight version - Spark 3.0 (HDI 4.0)
I am using Livy API to start job in azure remotely. Below is the list of files 
passed in "jars" option in livy

kafka-clients-2.7.0.jar,
spark-streaming-kafka-0-10_2.12-3.0.0.jar,
spark-token-provider-kafka-0-10_2.12-3.0.0.jar

The job is starting in azure spark cluster, but it is not receiving data from 
my kafka broker. Here is the error I am getting


Exception in thread "streaming-start" java.lang.NoSuchMethodError: 
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V

at 
org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:93)

at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)

at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:258)

at 
org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:55)

at 
org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:55)

at scala.collection.Iterator.foreach(Iterator.scala:941)

at scala.collection.Iterator.foreach$(Iterator.scala:941)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

at 
scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974)

at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)

at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)

at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)

at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)

at 
scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971)

at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)

at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)

at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)

at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)

at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)

at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)

at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)

at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

Here is the scala code which used to connect to broker.


import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe



val kafkaParams = Map[String, Object](
"bootstrap.servers" -> kafkaServer,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> connectionID,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (true: java.lang.Boolean),
"partition.assignment.strategy" 
->"org.apache.kafka.clients.consumer.RangeAssignor"
  )

  val topics = Array(connectionID)
  val inputMsg = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
  )

The same code with above list of dependencies is working in my local Hadoop 
cluster which runs on YARN.

Please help me to figure out what could be the specific issue in HDInsight 
cluster.

Regards,
Favas



Re: Spark Kafka Streaming With Transactional Messages

2020-09-16 Thread jianyangusa
I have the same issue. Do you have a solution? Maybe spark stream not support
transaction message. I use Kafka stream to retrieve the transaction message.
Maybe we can ask Spark support this feature.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Kafka Structured Streaming] Adding partition and topic to the kafka dynamically

2020-08-28 Thread Amit Joshi
Hi Jungtaek,

Thanks for the input. I did tried and it worked.
I got confused earlier after reading some blogs.

Regards
Amit

On Friday, August 28, 2020, Jungtaek Lim 
wrote:

> Hi Amit,
>
> if I remember correctly, you don't need to restart the query to reflect
> the newly added topic and partition, if your subscription covers the topic
> (like subscribe pattern). Please try it out.
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Fri, Aug 28, 2020 at 1:56 PM Amit Joshi 
> wrote:
>
>> Any pointers will be appreciated.
>>
>> On Thursday, August 27, 2020, Amit Joshi 
>> wrote:
>>
>>> Hi All,
>>>
>>> I am trying to understand the effect of adding topics and partitions to
>>> a topic in kafka, which is being consumed by spark structured streaming
>>> applications.
>>>
>>> Do we have to restart the spark structured streaming application to read
>>> from the newly added topic?
>>> Do we have to restart the spark structured streaming application to read
>>> from the newly added partition to a topic?
>>>
>>> Kafka consumers have a meta data refresh property that works without
>>> restarting.
>>>
>>> Thanks advance.
>>>
>>> Regards
>>> Amit Joshi
>>>
>>


Re: [Spark Kafka Structured Streaming] Adding partition and topic to the kafka dynamically

2020-08-28 Thread Gabor Somogyi
Hi Amit,

The answer is no.

G


On Fri, Aug 28, 2020 at 9:16 AM Jungtaek Lim 
wrote:

> Hi Amit,
>
> if I remember correctly, you don't need to restart the query to reflect
> the newly added topic and partition, if your subscription covers the topic
> (like subscribe pattern). Please try it out.
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Fri, Aug 28, 2020 at 1:56 PM Amit Joshi 
> wrote:
>
>> Any pointers will be appreciated.
>>
>> On Thursday, August 27, 2020, Amit Joshi 
>> wrote:
>>
>>> Hi All,
>>>
>>> I am trying to understand the effect of adding topics and partitions to
>>> a topic in kafka, which is being consumed by spark structured streaming
>>> applications.
>>>
>>> Do we have to restart the spark structured streaming application to read
>>> from the newly added topic?
>>> Do we have to restart the spark structured streaming application to read
>>> from the newly added partition to a topic?
>>>
>>> Kafka consumers have a meta data refresh property that works without
>>> restarting.
>>>
>>> Thanks advance.
>>>
>>> Regards
>>> Amit Joshi
>>>
>>


Re: [Spark Kafka Structured Streaming] Adding partition and topic to the kafka dynamically

2020-08-28 Thread Jungtaek Lim
Hi Amit,

if I remember correctly, you don't need to restart the query to reflect the
newly added topic and partition, if your subscription covers the topic
(like subscribe pattern). Please try it out.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Fri, Aug 28, 2020 at 1:56 PM Amit Joshi 
wrote:

> Any pointers will be appreciated.
>
> On Thursday, August 27, 2020, Amit Joshi 
> wrote:
>
>> Hi All,
>>
>> I am trying to understand the effect of adding topics and partitions to a
>> topic in kafka, which is being consumed by spark structured streaming
>> applications.
>>
>> Do we have to restart the spark structured streaming application to read
>> from the newly added topic?
>> Do we have to restart the spark structured streaming application to read
>> from the newly added partition to a topic?
>>
>> Kafka consumers have a meta data refresh property that works without
>> restarting.
>>
>> Thanks advance.
>>
>> Regards
>> Amit Joshi
>>
>


Re: [Spark Kafka Structured Streaming] Adding partition and topic to the kafka dynamically

2020-08-27 Thread Amit Joshi
Any pointers will be appreciated.

On Thursday, August 27, 2020, Amit Joshi  wrote:

> Hi All,
>
> I am trying to understand the effect of adding topics and partitions to a
> topic in kafka, which is being consumed by spark structured streaming
> applications.
>
> Do we have to restart the spark structured streaming application to read
> from the newly added topic?
> Do we have to restart the spark structured streaming application to read
> from the newly added partition to a topic?
>
> Kafka consumers have a meta data refresh property that works without
> restarting.
>
> Thanks advance.
>
> Regards
> Amit Joshi
>


[Spark Kafka Structured Streaming] Adding partition and topic to the kafka dynamically

2020-08-27 Thread Amit Joshi
Hi All,

I am trying to understand the effect of adding topics and partitions to a
topic in kafka, which is being consumed by spark structured streaming
applications.

Do we have to restart the spark structured streaming application to read
from the newly added topic?
Do we have to restart the spark structured streaming application to read
from the newly added partition to a topic?

Kafka consumers have a meta data refresh property that works without
restarting.

Thanks advance.

Regards
Amit Joshi


回复:[Spark-Kafka-Streaming] Verifying the approach for multiple queries

2020-08-09 Thread tianlangstudio
Hello, Sir! 
What about process and group the data first then write grouped data to Kafka 
topics A and B.   Then read topic A or B from another Spark Application and 
process it more. Like the term ETL's mean.

 
TianlangStudio
Some of the biggest lies: I will start tomorrow/Others are better than me/I am 
not good enough/I don't have time/This is the way I am
 


--
发件人:Amit Joshi 
发送时间:2020年8月10日(星期一) 02:37
收件人:user 
主 题:[Spark-Kafka-Streaming] Verifying the approach for multiple queries

Hi,

I have a scenario where a kafka topic is being written with different types of 
json records.
I have to regroup the records based on the type and then fetch the schema and 
parse and write as parquet.
I have tried structured programming. But dynamic schema is a constraint.
So I have used DStreams and though I know the approach I have taken may not be 
good.
If anyone can pls let me know if the approach will scale and possible pros and 
cons.
I am collecting the grouped records and then again forming the dataframe for 
each grouped record.
createKeyValue -> This is creating the key value pair with schema information.
stream.foreachRDD { (rdd, time) =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  val result = rdd.map(createKeyValue).reduceByKey((x,y) => x ++ y).collect()
  result.foreach(x=> println(x._1))
  result.map(x=> {
val spark = 
SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val df = x._2 toDF("value")
df.select(from_json($"value", x._1._2, Map.empty[String,String]).as("data"))
  .select($"data.*")
  //.withColumn("entity", lit("invoice"))
  .withColumn("year",year($"TimeUpdated"))
  .withColumn("month",month($"TimeUpdated"))
  .withColumn("day",dayofmonth($"TimeUpdated"))
  
.write.partitionBy("name","year","month","day").mode("append").parquet(path)
  })
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} 

github-logo.png
Description: Binary data
<>


51cto-logo.png
Description: Binary data


duxiaomai-logo (1).png
Description: Binary data


iqiyi-logo.png
Description: Binary data


huya-logo.png
Description: Binary data


logo-baidu-220X220.png
Description: Binary data


[Spark-Kafka-Streaming] Verifying the approach for multiple queries

2020-08-09 Thread Amit Joshi
Hi,

I have a scenario where a kafka topic is being written with different types
of json records.
I have to regroup the records based on the type and then fetch the schema
and parse and write as parquet.
I have tried structured programming. But dynamic schema is a constraint.
So I have used DStreams and though I know the approach I have taken may not
be good.
If anyone can pls let me know if the approach will scale and possible pros
and cons.
I am collecting the grouped records and then again forming the dataframe
for each grouped record.
createKeyValue -> This is creating the key value pair with schema
information.

stream.foreachRDD { (rdd, time) =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  val result = rdd.map(createKeyValue).reduceByKey((x,y) => x ++ y).collect()
  result.foreach(x=> println(x._1))
  result.map(x=> {
val spark =
SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val df = x._2 toDF("value")
df.select(from_json($"value", x._1._2, Map.empty[String,String]).as("data"))
  .select($"data.*")
  //.withColumn("entity", lit("invoice"))
  .withColumn("year",year($"TimeUpdated"))
  .withColumn("month",month($"TimeUpdated"))
  .withColumn("day",dayofmonth($"TimeUpdated"))
  
.write.partitionBy("name","year","month","day").mode("append").parquet(path)
  })
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}


Spark Kafka Streaming With Transactional Messages

2020-05-21 Thread nimmi.cv
I am using Spark 2.4 and using createDstream to read from kafka topic. The
topic has messaged written from a transactional producer. 

I am getting the following error 
"requirement failed: Got wrong record for
spark-executor-FtsTopicConsumerGrp7 test11-1 even after seeking to offset 85
got offset 86 instead. If this is a compacted topic, consider enabling
spark.streaming.kafka.allowNonConsecutiveOffsets"


When i enable  spark.streaming.kafka.allowNonConsecutiveOffsets, I am
getting the following error
java.lang.IllegalArgumentException: requirement failed: Failed to get
records for compacted spark-executor-FtsTopicConsumerGrpTESTING_5
fts.analytics-0 after polling for 1
at scala.Predef$.require(Predef.scala:224)

Also I set kafka.isolation.level="read_committed".

Anu help on thisw ill be appreciated.






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Kafka Streaming with Offset Gaps

2020-05-21 Thread nimmi.cv
Did you resolve this issue ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: HDP 3.1 spark Kafka dependency

2020-03-18 Thread Zahid Rahman
I have found many library incompatibility issues including JVM headless
issues where I had to uninstall  headless jvm and install jdk
and work through them, anyway
This page shows the same error as yours,
you  may get away  with making the changes to your pom.xml as suggested.
https://stackoverflow.com/questions/41303037/why-does-spark-application-fail-with-classnotfoundexception-failed-to-find-dat

Good Luck !

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Wed, 18 Mar 2020 at 16:36, William R  wrote:

> Hi,
>
> I am finding difficulty in getting the proper Kafka lib's for spark. The
> version of HDP is 3.1 and i tried the below lib's but it produces the below
> issues.
>
> *POM entry :*
>
> 
> org.apache.kafka
> kafka-clients
> 2.0.0.3.1.0.0-78
> 
> 
> org.apache.kafka
> kafka_2.11
> 2.0.0.3.1.0.0-78
> 
>
> 
> org.apache.spark
> spark-sql_${scala.compat.version}
> ${spark.version}
> provided
> 
>
> 
> org.apache.spark
> spark-core_2.11
> 2.3.2.3.1.0.0-78
> provided
> 
> 
> org.apache.spark
> spark-streaming_2.11
> 2.3.2.3.1.0.0-78
> 
>
> *Issues while spark-submit :*
>
> Exception in thread "main" java.lang.ClassNotFoundException: Failed to
> find data source: kafka. Please find packages at
> http://spark.apache.org/third-party-projects.html
> at
> org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:639)
> at
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190)
> at
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
> at com.example.ReadDataFromKafka$.main(ReadDataFromKafka.scala:18)
> at com.example.ReadDataFromKafka.main(ReadDataFromKafka.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
> at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>
>
> Could someone help me if i am doing something wrong ?
>
> *Spark Submit:*
>
> export
> KAFKA_KERBEROS_PARAMS="-Djava.security.auth.login.config=kafka.consumer.properties"
> export
> KAFKA_OPTS="-Djava.security.auth.login.config=kafka.consumer.properties"
> export SPARK_KAFKA_VERSION=NONE
>
> spark-submit --conf
> "spark.driver.extraJavaOptions=-Djava.security.auth.login.conf=kafka.consumer.properties"
> --files "kafka.consumer.properties" --class com.example.ReadDataFromKafka
> HelloKafka-1.0-SNAPSHOT.jar
>
> *Consumer Code : *
> https://sparkbyexamples.com/spark/spark-batch-processing-produce-consume-kafka-topic/
>
>
> Regards,
> William R
>
>
>
>


HDP 3.1 spark Kafka dependency

2020-03-18 Thread William R
Hi,

I am finding difficulty in getting the proper Kafka lib's for spark. The
version of HDP is 3.1 and i tried the below lib's but it produces the below
issues.

*POM entry :*


org.apache.kafka
kafka-clients
2.0.0.3.1.0.0-78


org.apache.kafka
kafka_2.11
2.0.0.3.1.0.0-78



org.apache.spark
spark-sql_${scala.compat.version}
${spark.version}
provided



org.apache.spark
spark-core_2.11
2.3.2.3.1.0.0-78
provided


org.apache.spark
spark-streaming_2.11
2.3.2.3.1.0.0-78


*Issues while spark-submit :*

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find
data source: kafka. Please find packages at
http://spark.apache.org/third-party-projects.html
at
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:639)
at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190)
at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
at com.example.ReadDataFromKafka$.main(ReadDataFromKafka.scala:18)
at com.example.ReadDataFromKafka.main(ReadDataFromKafka.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)


Could someone help me if i am doing something wrong ?

*Spark Submit:*

export
KAFKA_KERBEROS_PARAMS="-Djava.security.auth.login.config=kafka.consumer.properties"
export
KAFKA_OPTS="-Djava.security.auth.login.config=kafka.consumer.properties"
export SPARK_KAFKA_VERSION=NONE

spark-submit --conf
"spark.driver.extraJavaOptions=-Djava.security.auth.login.conf=kafka.consumer.properties"
--files "kafka.consumer.properties" --class com.example.ReadDataFromKafka
HelloKafka-1.0-SNAPSHOT.jar

*Consumer Code : *
https://sparkbyexamples.com/spark/spark-batch-processing-produce-consume-kafka-topic/


Regards,
William R


Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Charles vinodh
Thanks Dhaval, that fixed the issue. The constant resetting of Kafka
offsets misled me about the issue. Please feel free the answer the SO
question here
<https://stackoverflow.com/questions/57874681/spark-kafka-streaming-making-progress-but-there-is-no-data-to-be-consumed>
if
you would like to..





On Wed, Sep 11, 2019 at 9:03 PM Dhaval Patel 
wrote:

> Hi Charles,
>
> Can you check is any of the case related to output directory and
> checkpoint location mentioned in below link is applicable in your case?
>
> https://kb.databricks.com/streaming/file-sink-streaming.html
>
> Regards
> Dhaval
>
> On Wed, Sep 11, 2019 at 9:29 PM Burak Yavuz  wrote:
>
>> Hey Charles,
>> If you are using maxOffsetsPerTrigger, you will likely rest the offsets
>> every microbatch, because:
>>  1. Spark will figure out a range of offsets to process (let's call them
>> x and y)
>>  2. If these offsets have fallen out of the retention period, Spark will
>> try to set the offset to x which is less than z > y > x.
>>  3. Since z > y, Spark will not process any of the data
>>  4. Goto 1
>>
>> On Wed, Sep 11, 2019, 6:09 PM Charles vinodh 
>> wrote:
>>
>>> Hi Sandish,
>>>
>>> as I have said if the offset reset happens only once that would make
>>> sense. But I am not sure how to explain why the offset reset is happening
>>> for every micro-batch...
>>> ideally once the offset reset happens the app should move to a valid
>>> offset and start consuming data. but in my case for every batch the offset
>>> is getting reset and no data is ever getting generated.
>>>
>>> Thanks,
>>> Charles
>>>
>>> On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN 
>>> wrote:
>>>
>>>> You can see this kind of error, if there is consumer lag more than
>>>> Kafka retention period.
>>>> You will not see any failures if below option is not set.
>>>>
>>>> Set failOnDataLoss=true option to see failures.
>>>>
>>>> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
>>>> wrote:
>>>>
>>>>> The only form of rate limiting I have set is *maxOffsetsPerTrigger *
>>>>> and *fetch.message.max.bytes. *
>>>>>
>>>>> *"*may be that you are trying to process records that have passed the
>>>>> retention period within Kafka.*"*
>>>>> If the above is true then I should have my offsets reset only once
>>>>> ideally when my application starts. But mu offsets are resetting for every
>>>>> batch. if my application is using offsets that are no longer available in
>>>>> Kafka it will reset to earliest or latest offset available in Kafka and 
>>>>> the
>>>>> next request made to Kafka should provide proper data. But in case for all
>>>>> micro-batches the offsets are getting reseted and the batch is producing 
>>>>> no
>>>>> data.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:
>>>>>
>>>>>> Do you have rate limiting set on your stream? It may be that you are
>>>>>> trying to process records that have passed the retention period within
>>>>>> Kafka.
>>>>>>
>>>>>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am trying to run a spark application ingesting data from Kafka
>>>>>>> using the Spark structured streaming and the spark library
>>>>>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very 
>>>>>>> weird
>>>>>>> issue where during execution of all my micro-batches the Kafka consumer 
>>>>>>> is
>>>>>>> not able to fetch the offsets and its having its offsets reset as show
>>>>>>> below in this log
>>>>>>>
>>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>>>> groupId=spark-kafka-source-5496988b-

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Dhaval Patel
Hi Charles,

Can you check is any of the case related to output directory and checkpoint
location mentioned in below link is applicable in your case?

https://kb.databricks.com/streaming/file-sink-streaming.html

Regards
Dhaval

On Wed, Sep 11, 2019 at 9:29 PM Burak Yavuz  wrote:

> Hey Charles,
> If you are using maxOffsetsPerTrigger, you will likely rest the offsets
> every microbatch, because:
>  1. Spark will figure out a range of offsets to process (let's call them x
> and y)
>  2. If these offsets have fallen out of the retention period, Spark will
> try to set the offset to x which is less than z > y > x.
>  3. Since z > y, Spark will not process any of the data
>  4. Goto 1
>
> On Wed, Sep 11, 2019, 6:09 PM Charles vinodh 
> wrote:
>
>> Hi Sandish,
>>
>> as I have said if the offset reset happens only once that would make
>> sense. But I am not sure how to explain why the offset reset is happening
>> for every micro-batch...
>> ideally once the offset reset happens the app should move to a valid
>> offset and start consuming data. but in my case for every batch the offset
>> is getting reset and no data is ever getting generated.
>>
>> Thanks,
>> Charles
>>
>> On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN 
>> wrote:
>>
>>> You can see this kind of error, if there is consumer lag more than Kafka
>>> retention period.
>>> You will not see any failures if below option is not set.
>>>
>>> Set failOnDataLoss=true option to see failures.
>>>
>>> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
>>> wrote:
>>>
>>>> The only form of rate limiting I have set is *maxOffsetsPerTrigger *
>>>> and *fetch.message.max.bytes. *
>>>>
>>>> *"*may be that you are trying to process records that have passed the
>>>> retention period within Kafka.*"*
>>>> If the above is true then I should have my offsets reset only once
>>>> ideally when my application starts. But mu offsets are resetting for every
>>>> batch. if my application is using offsets that are no longer available in
>>>> Kafka it will reset to earliest or latest offset available in Kafka and the
>>>> next request made to Kafka should provide proper data. But in case for all
>>>> micro-batches the offsets are getting reseted and the batch is producing no
>>>> data.
>>>>
>>>>
>>>>
>>>> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:
>>>>
>>>>> Do you have rate limiting set on your stream? It may be that you are
>>>>> trying to process records that have passed the retention period within
>>>>> Kafka.
>>>>>
>>>>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to run a spark application ingesting data from Kafka
>>>>>> using the Spark structured streaming and the spark library
>>>>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very 
>>>>>> weird
>>>>>> issue where during execution of all my micro-batches the Kafka consumer 
>>>>>> is
>>>>>> not able to fetch the offsets and its having its offsets reset as show
>>>>>> below in this log
>>>>>>
>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>>>>>
>>>>>>
>

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Burak Yavuz
Hey Charles,
If you are using maxOffsetsPerTrigger, you will likely rest the offsets
every microbatch, because:
 1. Spark will figure out a range of offsets to process (let's call them x
and y)
 2. If these offsets have fallen out of the retention period, Spark will
try to set the offset to x which is less than z > y > x.
 3. Since z > y, Spark will not process any of the data
 4. Goto 1

On Wed, Sep 11, 2019, 6:09 PM Charles vinodh  wrote:

> Hi Sandish,
>
> as I have said if the offset reset happens only once that would make
> sense. But I am not sure how to explain why the offset reset is happening
> for every micro-batch...
> ideally once the offset reset happens the app should move to a valid
> offset and start consuming data. but in my case for every batch the offset
> is getting reset and no data is ever getting generated.
>
> Thanks,
> Charles
>
> On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN 
> wrote:
>
>> You can see this kind of error, if there is consumer lag more than Kafka
>> retention period.
>> You will not see any failures if below option is not set.
>>
>> Set failOnDataLoss=true option to see failures.
>>
>> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
>> wrote:
>>
>>> The only form of rate limiting I have set is *maxOffsetsPerTrigger *and
>>> *fetch.message.max.bytes. *
>>>
>>> *"*may be that you are trying to process records that have passed the
>>> retention period within Kafka.*"*
>>> If the above is true then I should have my offsets reset only once
>>> ideally when my application starts. But mu offsets are resetting for every
>>> batch. if my application is using offsets that are no longer available in
>>> Kafka it will reset to earliest or latest offset available in Kafka and the
>>> next request made to Kafka should provide proper data. But in case for all
>>> micro-batches the offsets are getting reseted and the batch is producing no
>>> data.
>>>
>>>
>>>
>>> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:
>>>
>>>> Do you have rate limiting set on your stream? It may be that you are
>>>> trying to process records that have passed the retention period within
>>>> Kafka.
>>>>
>>>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
>>>> wrote:
>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am trying to run a spark application ingesting data from Kafka using
>>>>> the Spark structured streaming and the spark library
>>>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
>>>>> issue where during execution of all my micro-batches the Kafka consumer is
>>>>> not able to fetch the offsets and its having its offsets reset as show
>>>>> below in this log
>>>>>
>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>>>>
>>>>>
>>>>> It is reasonable if this resetting happens once in application due to
>>>>> the fact that the offsets stored in my checkpoint are no longer valid and
>>>>> will have to reset our offsets to a new value. But I am seeing this reset
>>>>> happening for every micro batch execution in my streaming job. In at the
>>>>> end the streaming query progress emits the following
>>>>>
>>>>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made 
>>>>> progress: {
>>>>>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
&g

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Charles vinodh
Hi Sandish,

as I have said if the offset reset happens only once that would make sense.
But I am not sure how to explain why the offset reset is happening for
every micro-batch...
ideally once the offset reset happens the app should move to a valid offset
and start consuming data. but in my case for every batch the offset is
getting reset and no data is ever getting generated.

Thanks,
Charles

On Wed, Sep 11, 2019 at 5:44 PM Sandish Kumar HN 
wrote:

> You can see this kind of error, if there is consumer lag more than Kafka
> retention period.
> You will not see any failures if below option is not set.
>
> Set failOnDataLoss=true option to see failures.
>
> On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
> wrote:
>
>> The only form of rate limiting I have set is *maxOffsetsPerTrigger *and
>> *fetch.message.max.bytes. *
>>
>> *"*may be that you are trying to process records that have passed the
>> retention period within Kafka.*"*
>> If the above is true then I should have my offsets reset only once
>> ideally when my application starts. But mu offsets are resetting for every
>> batch. if my application is using offsets that are no longer available in
>> Kafka it will reset to earliest or latest offset available in Kafka and the
>> next request made to Kafka should provide proper data. But in case for all
>> micro-batches the offsets are getting reseted and the batch is producing no
>> data.
>>
>>
>>
>> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:
>>
>>> Do you have rate limiting set on your stream? It may be that you are
>>> trying to process records that have passed the retention period within
>>> Kafka.
>>>
>>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
>>> wrote:
>>>
>>>>
>>>> Hi,
>>>>
>>>> I am trying to run a spark application ingesting data from Kafka using
>>>> the Spark structured streaming and the spark library
>>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
>>>> issue where during execution of all my micro-batches the Kafka consumer is
>>>> not able to fetch the offsets and its having its offsets reset as show
>>>> below in this log
>>>>
>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>>>
>>>>
>>>> It is reasonable if this resetting happens once in application due to
>>>> the fact that the offsets stored in my checkpoint are no longer valid and
>>>> will have to reset our offsets to a new value. But I am seeing this reset
>>>> happening for every micro batch execution in my streaming job. In at the
>>>> end the streaming query progress emits the following
>>>>
>>>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: 
>>>> {
>>>>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>>>>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>>>>   "name" : null,
>>>>   "timestamp" : "2019-09-10T15:55:00.000Z",
>>>>   "batchId" : 189,
>>>>   "numInputRows" : 0,
>>>>   "inputRowsPerSecond" : 0.0,
>>>>   "processedRowsPerSecond" : 0.0,
>>>>   "durationMs" : {
>>>> "addBatch" : 127,
>>>> "getBatch" : 0,
>>>> "getEndOffset" : 0,
>>>> "queryPlanning" : 24,
>>>> "setOffsetRange" : 36,
>>>> "triggerExecution" : 1859,
>>>

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Sandish Kumar HN
You can see this kind of error, if there is consumer lag more than Kafka
retention period.
You will not see any failures if below option is not set.

Set failOnDataLoss=true option to see failures.

On Wed, Sep 11, 2019 at 3:24 PM Charles vinodh 
wrote:

> The only form of rate limiting I have set is *maxOffsetsPerTrigger *and
> *fetch.message.max.bytes. *
>
> *"*may be that you are trying to process records that have passed the
> retention period within Kafka.*"*
> If the above is true then I should have my offsets reset only once ideally
> when my application starts. But mu offsets are resetting for every batch.
> if my application is using offsets that are no longer available in Kafka it
> will reset to earliest or latest offset available in Kafka and the
> next request made to Kafka should provide proper data. But in case for all
> micro-batches the offsets are getting reseted and the batch is producing no
> data.
>
>
>
> On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:
>
>> Do you have rate limiting set on your stream? It may be that you are
>> trying to process records that have passed the retention period within
>> Kafka.
>>
>> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am trying to run a spark application ingesting data from Kafka using
>>> the Spark structured streaming and the spark library
>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
>>> issue where during execution of all my micro-batches the Kafka consumer is
>>> not able to fetch the offsets and its having its offsets reset as show
>>> below in this log
>>>
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>>
>>>
>>> It is reasonable if this resetting happens once in application due to
>>> the fact that the offsets stored in my checkpoint are no longer valid and
>>> will have to reset our offsets to a new value. But I am seeing this reset
>>> happening for every micro batch execution in my streaming job. In at the
>>> end the streaming query progress emits the following
>>>
>>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
>>>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>>>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>>>   "name" : null,
>>>   "timestamp" : "2019-09-10T15:55:00.000Z",
>>>   "batchId" : 189,
>>>   "numInputRows" : 0,
>>>   "inputRowsPerSecond" : 0.0,
>>>   "processedRowsPerSecond" : 0.0,
>>>   "durationMs" : {
>>> "addBatch" : 127,
>>> "getBatch" : 0,
>>> "getEndOffset" : 0,
>>> "queryPlanning" : 24,
>>> "setOffsetRange" : 36,
>>> "triggerExecution" : 1859,
>>> "walCommit" : 1032
>>>   },
>>>   "stateOperators" : [ ],
>>>   "sources" : [ {
>>> "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
>>> "startOffset" : {
>>>   "my_kafka_topic" : {
>>> "23" : 1206926686,
>>> "8" : 1158514946,
>>> "17" : 1258387219,
>>> "11" : 1263091642,
>>> "2" : 1226741128,
>>> "20" : 1229560889,
>>> "5" : 1170304913,
>>> "14" : 1207333901,
>>> "4" : 1274242728,
>>> &q

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Charles vinodh
The only form of rate limiting I have set is *maxOffsetsPerTrigger *and
*fetch.message.max.bytes. *

*"*may be that you are trying to process records that have passed the
retention period within Kafka.*"*
If the above is true then I should have my offsets reset only once ideally
when my application starts. But mu offsets are resetting for every batch.
if my application is using offsets that are no longer available in Kafka it
will reset to earliest or latest offset available in Kafka and the
next request made to Kafka should provide proper data. But in case for all
micro-batches the offsets are getting reseted and the batch is producing no
data.



On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz  wrote:

> Do you have rate limiting set on your stream? It may be that you are
> trying to process records that have passed the retention period within
> Kafka.
>
> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
> wrote:
>
>>
>> Hi,
>>
>> I am trying to run a spark application ingesting data from Kafka using
>> the Spark structured streaming and the spark library
>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
>> issue where during execution of all my micro-batches the Kafka consumer is
>> not able to fetch the offsets and its having its offsets reset as show
>> below in this log
>>
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>
>>
>> It is reasonable if this resetting happens once in application due to the
>> fact that the offsets stored in my checkpoint are no longer valid and will
>> have to reset our offsets to a new value. But I am seeing this reset
>> happening for every micro batch execution in my streaming job. In at the
>> end the streaming query progress emits the following
>>
>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
>>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>>   "name" : null,
>>   "timestamp" : "2019-09-10T15:55:00.000Z",
>>   "batchId" : 189,
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>> "addBatch" : 127,
>> "getBatch" : 0,
>> "getEndOffset" : 0,
>> "queryPlanning" : 24,
>> "setOffsetRange" : 36,
>> "triggerExecution" : 1859,
>> "walCommit" : 1032
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>> "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
>> "startOffset" : {
>>   "my_kafka_topic" : {
>> "23" : 1206926686,
>> "8" : 1158514946,
>> "17" : 1258387219,
>> "11" : 1263091642,
>> "2" : 1226741128,
>> "20" : 1229560889,
>> "5" : 1170304913,
>> "14" : 1207333901,
>> "4" : 1274242728,
>> "13" : 1336386658,
>> "22" : 1260210993,
>> "7" : 1288639296,
>> "16" : 1247462229,
>> "10" : 1093157103,
>> "1" : 1219904858,
>> "19" : 1116269615,
>> "9" : 1238935018,
>> "18" : 1069224544,
>> "12" : 1256018541,
>> "3" : 1251150202,
>> "21" : 1256774117,
>> "15" : 1170591375,
>> "6"

Re: Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Burak Yavuz
Do you have rate limiting set on your stream? It may be that you are trying
to process records that have passed the retention period within Kafka.

On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh 
wrote:

>
> Hi,
>
> I am trying to run a spark application ingesting data from Kafka using the
> Spark structured streaming and the spark library
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
> issue where during execution of all my micro-batches the Kafka consumer is
> not able to fetch the offsets and its having its offsets reset as show
> below in this log
>
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-5 to offset 1168959116.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-1 to offset 1218619371.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-8 to offset 1157205346.
> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>  Resetting offset for partition my-topic-21 to offset 1255403059.
>
>
> It is reasonable if this resetting happens once in application due to the
> fact that the offsets stored in my checkpoint are no longer valid and will
> have to reset our offsets to a new value. But I am seeing this reset
> happening for every micro batch execution in my streaming job. In at the
> end the streaming query progress emits the following
>
> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>   "name" : null,
>   "timestamp" : "2019-09-10T15:55:00.000Z",
>   "batchId" : 189,
>   "numInputRows" : 0,
>   "inputRowsPerSecond" : 0.0,
>   "processedRowsPerSecond" : 0.0,
>   "durationMs" : {
> "addBatch" : 127,
> "getBatch" : 0,
> "getEndOffset" : 0,
> "queryPlanning" : 24,
> "setOffsetRange" : 36,
> "triggerExecution" : 1859,
> "walCommit" : 1032
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
> "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
> "startOffset" : {
>   "my_kafka_topic" : {
> "23" : 1206926686,
> "8" : 1158514946,
> "17" : 1258387219,
> "11" : 1263091642,
> "2" : 1226741128,
> "20" : 1229560889,
> "5" : 1170304913,
> "14" : 1207333901,
> "4" : 1274242728,
> "13" : 1336386658,
> "22" : 1260210993,
> "7" : 1288639296,
> "16" : 1247462229,
> "10" : 1093157103,
> "1" : 1219904858,
> "19" : 1116269615,
> "9" : 1238935018,
> "18" : 1069224544,
> "12" : 1256018541,
> "3" : 1251150202,
> "21" : 1256774117,
> "15" : 1170591375,
> "6" : 1185108169,
> "24" : 1202342095,
> "0" : 1165356330
>   }
> },
> "endOffset" : {
>   "my_kafka_topic" : {
> "23" : 1206928043,
> "8" : 1158516721,
> "17" : 1258389219,
> "11" : 1263093490,
> "2" : 1226743225,
> "20" : 1229562962,
> "5" : 1170307882,
> "14" : 1207335736,
> "4" : 1274245585,
> "13" : 1336388570,
> "22" : 1260213582,
> "7" : 1288641384,
> "16" : 1247464311,
> "10" : 1093159186,
> "1" : 1219906407,
> "19" : 1116271435,
> "9" : 1238936994,
> "18" : 1069226913,
> "12" : 1256020926,
> "3" : 1251152579,
> "21&quo

Spark Kafka Streaming making progress but there is no data to be consumed

2019-09-11 Thread Charles vinodh
Hi,

I am trying to run a spark application ingesting data from Kafka using the
Spark structured streaming and the spark library
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
issue where during execution of all my micro-batches the Kafka consumer is
not able to fetch the offsets and its having its offsets reset as show
below in this log

19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
Resetting offset for partition my-topic-5 to offset 1168959116.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
Resetting offset for partition my-topic-1 to offset 1218619371.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
Resetting offset for partition my-topic-8 to offset 1157205346.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
Resetting offset for partition my-topic-21 to offset 1255403059.


It is reasonable if this resetting happens once in application due to the
fact that the offsets stored in my checkpoint are no longer valid and will
have to reset our offsets to a new value. But I am seeing this reset
happening for every micro batch execution in my streaming job. In at the
end the streaming query progress emits the following

19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
  "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
  "name" : null,
  "timestamp" : "2019-09-10T15:55:00.000Z",
  "batchId" : 189,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
"addBatch" : 127,
"getBatch" : 0,
"getEndOffset" : 0,
"queryPlanning" : 24,
"setOffsetRange" : 36,
"triggerExecution" : 1859,
"walCommit" : 1032
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaV2[Subscribe[my_kafka_topic]]",
"startOffset" : {
  "my_kafka_topic" : {
"23" : 1206926686,
"8" : 1158514946,
"17" : 1258387219,
"11" : 1263091642,
"2" : 1226741128,
"20" : 1229560889,
"5" : 1170304913,
"14" : 1207333901,
"4" : 1274242728,
"13" : 1336386658,
"22" : 1260210993,
"7" : 1288639296,
"16" : 1247462229,
"10" : 1093157103,
"1" : 1219904858,
"19" : 1116269615,
"9" : 1238935018,
"18" : 1069224544,
"12" : 1256018541,
"3" : 1251150202,
"21" : 1256774117,
"15" : 1170591375,
"6" : 1185108169,
"24" : 1202342095,
"0" : 1165356330
  }
},
"endOffset" : {
  "my_kafka_topic" : {
"23" : 1206928043,
"8" : 1158516721,
"17" : 1258389219,
"11" : 1263093490,
"2" : 1226743225,
"20" : 1229562962,
"5" : 1170307882,
"14" : 1207335736,
"4" : 1274245585,
"13" : 1336388570,
"22" : 1260213582,
"7" : 1288641384,
"16" : 1247464311,
"10" : 1093159186,
"1" : 1219906407,
"19" : 1116271435,
"9" : 1238936994,
"18" : 1069226913,
"12" : 1256020926,
"3" : 1251152579,
"21" : 1256776910,
"15" : 1170593216,
"6" : 1185110032,
"24" : 1202344538,
"0" : 1165358262
  }
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
"description" : "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]"
  }
}


In the above StreamingQueryProgress event the numInputRows fields  is zero
and this is the case for all micro batch executions and no data is being
produced whatsoever. So basically for each batch my offsets are being reset
and each batch is producing zero rows. Since there is no work being done
and since dynamic allocation is enabled all my executors killed... I have
tried deleting my checkpoint and started my application from scratch and I
am still facing the same issue. What could possibly be wrong this?... what
lines of investigation should I take?  If you are interested in getting
Stackoverflow point you can answer my question in SO here
<https://stackoverflow.com/questions/57874681/spark-kafka-streaming-making-progress-but-there-is-no-data-to-be-consumed>.


Thanks,
Charles


Re: How to programmatically pause and resume Spark/Kafka structured streaming?

2019-08-06 Thread Gourav Sengupta
Hi
There is a method to iterate only once in Spark. I use it for reading files
using streaming. May be you can try that.
Regards,
Gourav

On Tue, 6 Aug 2019, 21:50 kant kodali,  wrote:

> If I stop and start while processing the batch what will happen? will that
> batch gets canceled and gets reprocessed again when I click start? Does
> that mean I need to worry about duplicates in the downstream? Kafka
> consumers have a pause and resume and they work just fine so I am not sure
> why Spark doesn't expose that.
>
>
> On Mon, Aug 5, 2019 at 10:54 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> exactly my question, I was also looking for ways to gracefully exit spark
>> structured streaming.
>>
>>
>> Regards,
>> Gourav
>>
>> On Tue, Aug 6, 2019 at 3:43 AM kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I am trying to see if there is a way to pause a spark stream that
>>> process data from Kafka such that my application can take some actions
>>> while the stream is paused and resume when the application completes those
>>> actions.
>>>
>>> Thanks!
>>>
>>


Re: How to programmatically pause and resume Spark/Kafka structured streaming?

2019-08-06 Thread kant kodali
If I stop and start while processing the batch what will happen? will that
batch gets canceled and gets reprocessed again when I click start? Does
that mean I need to worry about duplicates in the downstream? Kafka
consumers have a pause and resume and they work just fine so I am not sure
why Spark doesn't expose that.


On Mon, Aug 5, 2019 at 10:54 PM Gourav Sengupta 
wrote:

> Hi,
>
> exactly my question, I was also looking for ways to gracefully exit spark
> structured streaming.
>
>
> Regards,
> Gourav
>
> On Tue, Aug 6, 2019 at 3:43 AM kant kodali  wrote:
>
>> Hi All,
>>
>> I am trying to see if there is a way to pause a spark stream that process
>> data from Kafka such that my application can take some actions while the
>> stream is paused and resume when the application completes those actions.
>>
>> Thanks!
>>
>


Re: How to programmatically pause and resume Spark/Kafka structured streaming?

2019-08-05 Thread Gourav Sengupta
Hi,

exactly my question, I was also looking for ways to gracefully exit spark
structured streaming.


Regards,
Gourav

On Tue, Aug 6, 2019 at 3:43 AM kant kodali  wrote:

> Hi All,
>
> I am trying to see if there is a way to pause a spark stream that process
> data from Kafka such that my application can take some actions while the
> stream is paused and resume when the application completes those actions.
>
> Thanks!
>


How to programmatically pause and resume Spark/Kafka structured streaming?

2019-08-05 Thread kant kodali
Hi All,

I am trying to see if there is a way to pause a spark stream that process
data from Kafka such that my application can take some actions while the
stream is paused and resume when the application completes those actions.

Thanks!


Spark Kafka Streaming stopped

2019-06-14 Thread Amit Sharma
we are using spark kafka streaming. We have 6 nodes in kafka cluster if any
of the node is getting down we are getting below exception and streaming
stopped.
ERROR DirectKafkaInputDStream:70 -
ArrayBuffer(kafka.common.NotLeaderForPartitionException,
kafka.common.NotLeaderForPartitionException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([techops-prod2,4],
[techops-prod2,0]))

Please let me know do we missed any setting so that streaming should not
stopped even if couple of Kafka nodes are down.


Thanks
Amit


Re: Spark kafka streaming job stopped

2019-06-11 Thread Amit Sharma
Please provide update if any one knows.

On Monday, June 10, 2019, Amit Sharma  wrote:

>
> We have spark kafka sreaming job running on standalone spark cluster. We
> have below kafka architecture
>
> 1. Two cluster running on two data centers.
> 2. There is LTM on top on each data center (load balance)
> 3. There is GSLB on top of LTM.
>
> I observed when ever any of the node in kafka cluster is down  our spark
> stream job stopped. We are using GLSB url in our code to connect to Kafka
> not the IP address. Please let me know is it expected behavior if not then
> what config we need to change.
>
> Thanks
> Amit
>


Fwd: Spark kafka streaming job stopped

2019-06-10 Thread Amit Sharma
We have spark kafka sreaming job running on standalone spark cluster. We
have below kafka architecture

1. Two cluster running on two data centers.
2. There is LTM on top on each data center (load balance)
3. There is GSLB on top of LTM.

I observed when ever any of the node in kafka cluster is down  our spark
stream job stopped. We are using GLSB url in our code to connect to Kafka
not the IP address. Please let me know is it expected behavior if not then
what config we need to change.

Thanks
Amit


Re: Spark Kafka Batch Write guarantees

2019-04-01 Thread hemant singh
Thanks Shixiong, read in documentation as well that duplicates might exist
because of task retries.

On Mon, 1 Apr 2019 at 9:43 PM, Shixiong(Ryan) Zhu 
wrote:

> The Kafka source doesn’t support transaction. You may see partial data or
> duplicated data if a Spark task fails.
>
> On Wed, Mar 27, 2019 at 1:15 AM hemant singh  wrote:
>
>> We are using spark batch to write Dataframe to Kafka topic. The spark
>> write function with write.format(source = Kafka).
>> Does spark provide similar guarantee like it provides with saving
>> dataframe to disk; that partial data is not written to Kafka i.e. full
>> dataframe is saved or if job fails no data is written to Kafka topic.
>>
>> Thanks.
>>
> --
>
> Best Regards,
> Ryan
>


Re: Spark Kafka Batch Write guarantees

2019-04-01 Thread Shixiong(Ryan) Zhu
The Kafka source doesn’t support transaction. You may see partial data or
duplicated data if a Spark task fails.

On Wed, Mar 27, 2019 at 1:15 AM hemant singh  wrote:

> We are using spark batch to write Dataframe to Kafka topic. The spark
> write function with write.format(source = Kafka).
> Does spark provide similar guarantee like it provides with saving
> dataframe to disk; that partial data is not written to Kafka i.e. full
> dataframe is saved or if job fails no data is written to Kafka topic.
>
> Thanks.
>
-- 

Best Regards,
Ryan


Spark Kafka Batch Write guarantees

2019-03-27 Thread hemant singh
We are using spark batch to write Dataframe to Kafka topic. The spark write
function with write.format(source = Kafka).
Does spark provide similar guarantee like it provides with saving dataframe
to disk; that partial data is not written to Kafka i.e. full dataframe is
saved or if job fails no data is written to Kafka topic.

Thanks.


Re: Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Not authorized to access group: spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2

2019-02-13 Thread Jungtaek Lim
Adding to Gabor's answer, in Spark 3.0 end users can even provide full of
group id (Please refer SPARK-26350 [1]), but you may feel more convenient
to use prefix of group id Gabor guided (Please refer SPARK-26121 [2]) to
provide permission to broader ranges of groups.

1. https://issues.apache.org/jira/browse/SPARK-26350
2. https://issues.apache.org/jira/browse/SPARK-26121

Thanks,
Jungtaek Lim (HeartSaVioR)

2019년 2월 13일 (수) 오후 6:36, Gabor Somogyi 님이 작성:

> Hi Thomas,
>
> The issue occurs when the user does not have the READ permission on the
> consumer groups.
>
> In DStreams group ID is configured in application, for example:
> https://github.com/gaborgsomogyi/spark-dstream-secure-kafka-app/blob/161bf02eb3677aac604d63499041f72231d0e371/src/main/scala/com/cloudera/spark/examples/DirectKafkaWordCount.scala#L59
>
> In Strucuted Streaming the group ID is generated by Spark internally.
>
> Either one has to give access to "spark-kafka-source-*" group or in Spark
> 3.0 this prefix can be configured with "groupidprefix" parameter.
>
> BR,
> G
>
>
> On Wed, Feb 13, 2019 at 3:58 AM Allu Thomas
>  wrote:
>
>> Hi There,
>>
>> My use case is to read a simple json message from Kafka queue using Spark
>> Structured Streaming. But I’m getting the following error message when I
>> run  my Kafka consumer. I don’t get this error when using Spark direct
>> stream. The issue is happening only with structured streaming. Any help
>> would be greatly appreciated.
>>
>>
>> Exception in thread "main"
>> org.apache.spark.sql.streaming.StreamingQueryException: Not authorized to
>> access group:
>> spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2
>> === Streaming Query ===
>> Identifier: [id = 6ab10eab-4f71-435c-8705-820e66cee47e, runId =
>> 48430367-9e14-450b-b8e0-27199b536403]
>> Current Committed Offsets: {}
>> Current Available Offsets: {}
>>
>>
>> Current State: ACTIVE
>> Thread State: RUNNABLE
>>
>>
>> Logical Plan:
>> KafkaSource[Subscribe[cla-claim-raw]]
>> at org.apache.spark.sql.execution.streaming.StreamExecution.org
>> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
>> Caused by: org.apache.kafka.common.errors.GroupAuthorizationException:
>> Not authorized to access group:
>> spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2
>>
>> Thanks,
>> Thomas Thomas
>>
>


Re: Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Not authorized to access group: spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2

2019-02-13 Thread Gabor Somogyi
Hi Thomas,

The issue occurs when the user does not have the READ permission on the
consumer groups.

In DStreams group ID is configured in application, for example:
https://github.com/gaborgsomogyi/spark-dstream-secure-kafka-app/blob/161bf02eb3677aac604d63499041f72231d0e371/src/main/scala/com/cloudera/spark/examples/DirectKafkaWordCount.scala#L59

In Strucuted Streaming the group ID is generated by Spark internally.

Either one has to give access to "spark-kafka-source-*" group or in Spark
3.0 this prefix can be configured with "groupidprefix" parameter.

BR,
G


On Wed, Feb 13, 2019 at 3:58 AM Allu Thomas
 wrote:

> Hi There,
>
> My use case is to read a simple json message from Kafka queue using Spark
> Structured Streaming. But I’m getting the following error message when I
> run  my Kafka consumer. I don’t get this error when using Spark direct
> stream. The issue is happening only with structured streaming. Any help
> would be greatly appreciated.
>
>
> Exception in thread "main"
> org.apache.spark.sql.streaming.StreamingQueryException: Not authorized to
> access group:
> spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2
> === Streaming Query ===
> Identifier: [id = 6ab10eab-4f71-435c-8705-820e66cee47e, runId =
> 48430367-9e14-450b-b8e0-27199b536403]
> Current Committed Offsets: {}
> Current Available Offsets: {}
>
>
> Current State: ACTIVE
> Thread State: RUNNABLE
>
>
> Logical Plan:
> KafkaSource[Subscribe[cla-claim-raw]]
> at org.apache.spark.sql.execution.streaming.StreamExecution.org
> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> Caused by: org.apache.kafka.common.errors.GroupAuthorizationException: Not
> authorized to access group:
> spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2
>
> Thanks,
> Thomas Thomas
>


Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Not authorized to access group: spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2

2019-02-12 Thread Allu Thomas
Hi There,

My use case is to read a simple json message from Kafka queue using Spark 
Structured Streaming. But I’m getting the following error message when I run  
my Kafka consumer. I don’t get this error when using Spark direct stream. The 
issue is happening only with structured streaming. Any help would be greatly 
appreciated.


Exception in thread "main" 
org.apache.spark.sql.streaming.StreamingQueryException: Not authorized to 
access group: 
spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2
=== Streaming Query ===
Identifier: [id = 6ab10eab-4f71-435c-8705-820e66cee47e, runId = 
48430367-9e14-450b-b8e0-27199b536403]
Current Committed Offsets: {}
Current Available Offsets: {}
 
Current State: ACTIVE
Thread State: RUNNABLE
 
Logical Plan:
KafkaSource[Subscribe[cla-claim-raw]]
at org.apache.spark.sql.execution.streaming.StreamExecution.org 
<http://xecution.org/>$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.kafka.common.errors.GroupAuthorizationException: Not 
authorized to access group: 
spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2

Thanks,
Thomas Thomas

Spark Kafka Streaming with Offset Gaps

2018-12-19 Thread Rishabh Pugalia
I have an app that uses Kafka Streaming to pull data from `input` topic and
push to `output` topic with `processing.guarantee=exactly_once`. Due to
`exactly_once` gaps (transaction markers) are created in Kafka. Let's call
this app `kafka-streamer`.

Now I've another app that listens to this output topic (actually they are
multiple topics with a Pattern/Regex) and processes the data using
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html.
Let's call this app `spark-streamer`.

Due to the gaps, the first thing that happens is spark streaming fails. To
fix this I enabled `spark.streaming.kafka.allowNonConsecutiveOffsets=true`
in the spark config before creating the StreamingContext. Now let's look at
the issues that were faced when I start `spark-streamer` (I also went
through some of the spark-streaming-kafka code in the limited amount of
time I had):

1. Once `spark-streamer` starts if there are unconsumed offsets present in
the topic partition, it does poll them but won't process (create RDDs)
until some new message is pushed to the topic partition after the app is
started. Code:
https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala#L160
- I can see we poll the data but I'm not sure where the code is to process
it. But anyway, when I run the app I'm pretty sure the data doesn't get
processed (but it does get polled in `compactedStart()`) until
`compactedNext()` is called.
2. In `compactedNext()` if no data is polled within 120s (default timeout),
we throw an exception and the my app literally crashes. Code:
https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala#L178
- Why do we throw an exception and not keep polling just like a normal
KafkaConsumer would do/behave ?

Would be of great help if somebody can help me out with the 2 questions
listed above!

-- 
Thanks and Best Regards,
Rishabh


Spark Kafka Streaming with Offset Gaps

2018-12-19 Thread Rishabh Pugalia
I have an app that uses Kafka Streaming to pull data from `input` topic and
push to `output` topic with `processing.guarantee=exactly_once`. Due to
`exactly_once` gaps (transaction markers) are created in Kafka. Let's call
this app `kafka-streamer`.

Now I've another app that listens to this output topic (actually they are
multiple topics with a Pattern/Regex) and processes the data using
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html.
Let's call this app `spark-streamer`.

Due to the gaps, the first thing that happens is spark streaming fails. To
fix this I enabled `spark.streaming.kafka.allowNonConsecutiveOffsets=true`
in the spark config before creating the StreamingContext. Now let's look at
the issues that were faced when I start `spark-streamer` (I also went
through some of the spark-streaming-kafka code in the limited amount of
time I had):

1. Once `spark-streamer` starts if there are unconsumed offsets present in
the topic partition, it does poll them but won't process (create RDDs)
until some new message is pushed to the topic partition after the app is
started. Code:
https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala#L160
-
I can see we poll the data but I'm not sure where the code is to process
it. But anyway, when I run the app I'm pretty sure the data doesn't get
processed (but it does get polled in `compactedStart()`) until
`compactedNext()` is called.
2. In `compactedNext()` if no data is polled within 120s (default timeout),
we throw an exception and the my app literally crashes. Code:
https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala#L178
-
Why do we throw an exception and not keep polling just like a normal
KafkaConsumer would do/behave ?

Would be of great help if somebody can help me out with the 2 questions
listed above!

-- 
Thanks and Best Regards,
Rishabh


spark kafka consumer with kerberos - login error

2018-06-19 Thread Amol Zambare
I am working on a spark job which reads from kafka topic and write to HDFS 
however while submitting the job using spark-submit command I am getting 
following error.


Error log

 Caused by: org.apache.kafka.common.KafkaException: 
javax.security.auth.login.LoginException: Could not login: the client is being 
asked for a password, but the Kafka client code does not currently support 
obtaining a password from the user. Make sure -Djava.security.auth.login.config 
property passed to JVM and the client is configured to use a ticket cache 
(using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are 
using FQDN of the Kafka broker you are trying to connect to. not available to 
garner  authentication information from the user


I am passing the user keytab and kafka_client_jaas.conf file to spark submit 
command as suggested in Hortonworks documentation or 
https://github.com/hortonworks-spark/skc#running-on-a-kerberos-enabled-cluster


Passing following parameters to spark-submit

--files user.keytab,kafka_client_jaas.conf \

--driver-java-options 
"-Djava.security.auth.login.config=kafka_client_jaas.conf" \

--conf 
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf"
 \

Version information

Spark version - 2.2.0.2.6.4.0-91
Kafka version - 0.10.1

Any help is much appreciated.

Thanks,
Amol



Re: [Structured-Streaming][Beginner] Out of order messages with Spark kafka readstream from a specific partition

2018-05-10 Thread Cody Koeninger
As long as you aren't doing any spark operations that involve a
shuffle, the order you see in spark should be the same as the order in
the partition.

Can you link to a minimal code example that reproduces the issue?

On Wed, May 9, 2018 at 7:05 PM, karthikjay <aswin8...@gmail.com> wrote:
> On the producer side, I make sure data for a specific user lands on the same
> partition. On the consumer side, I use a regular Spark kafka readstream and
> read the data. I also use a console write stream to print out the spark
> kafka DataFrame. What I observer is, the data for a specific user (even
> though in the same partition) arrives out of order in the console.
>
> I also verified the data ordering by running a simple Kafka consumer in Java
> and the data seems to be ordered. What am I missing here ?
>
> Thanks,
> JK
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Structured-Streaming][Beginner] Out of order messages with Spark kafka readstream from a specific partition

2018-05-09 Thread karthikjay
On the producer side, I make sure data for a specific user lands on the same
partition. On the consumer side, I use a regular Spark kafka readstream and
read the data. I also use a console write stream to print out the spark
kafka DataFrame. What I observer is, the data for a specific user (even
though in the same partition) arrives out of order in the console. 

I also verified the data ordering by running a simple Kafka consumer in Java
and the data seems to be ordered. What am I missing here ?

Thanks,
JK



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Does structured streaming support Spark Kafka Direct?

2018-04-12 Thread Tathagata Das
The parallelism is same for Structured Streaming. In fact, the Kafka
Structured Streaming source is based on the same principle as DStream's
Kafka Direct, hence it has very similar behavior.


On Tue, Apr 10, 2018 at 11:03 PM, SRK <swethakasire...@gmail.com> wrote:

> hi,
>
> We have code based on Spark Kafka Direct in production and we want to port
> this code to Structured Streaming. Does structured streaming support spark
> kafka direct? What are the configs for parallelism and scalability in
> structured streaming? In Spark Kafka Direct, the number of kafka partitions
> take care of parallelism when doing the consumption. Is it the same case
> with Structured Streaming?
>
> Thanks for the help in Advance!
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Does structured streaming support Spark Kafka Direct?

2018-04-11 Thread SRK
hi,

We have code based on Spark Kafka Direct in production and we want to port
this code to Structured Streaming. Does structured streaming support spark
kafka direct? What are the configs for parallelism and scalability in
structured streaming? In Spark Kafka Direct, the number of kafka partitions
take care of parallelism when doing the consumption. Is it the same case
with Structured Streaming?

Thanks for the help in Advance!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Problem in Spark-Kafka Connector

2017-12-27 Thread Sitakant Mishra
Hi,

Kindly help me with this problem, for which I will be grateful.

Thanks and Regards,
Sitakanta Mishra

On Tue, Dec 26, 2017 at 12:34 PM, Sitakant Mishra <
sitakanta.mis...@gmail.com> wrote:

> Hi,
>
> I am trying to connect my Spark cluster to a single Kafka Topic which
> running as a separate process in a machine. While submitting the spark
> application, I am getting the following error.
>
>
>
> *17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result
> StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar,
> byteCount=186935315,
> body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar,
> offset=0, length=186935315}} to /129.82.44.156:55168
> ; closing connection*
> *java.nio.channels.ClosedChannelException*
> * at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown
> Source)*
> *17/12/25 16:56:57 INFO TaskSetManager: Starting task 21.0 in stage 0.0
> (TID 21, 129.82.44.156, executor 9, partition 21, PROCESS_LOCAL, 4706
> bytes)*
> *17/12/25 16:56:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> 129.82.44.156, executor 9): java.nio.channels.ClosedChannelException*
> * at
> org.apache.spark.network.client.StreamInterceptor.channelInactive(StreamInterceptor.java:60)*
> * at
> org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:179)*
> * at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)*
> * at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)*
> * at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)*
> * at
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289)*
> * at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)*
> * at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)*
> * at
> io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893)*
> * at
> io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691)*
> * at
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399)*
> * at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)*
> * at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)*
> * at
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)*
> * at java.lang.Thread.run(Thread.java:745)*
>
> *17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result
> StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar,
> byteCount=186935315,
> body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar,
> offset=0, length=186935315}} to /129.82.44.164:45988
> ; closing connection*
> *java.nio.channels.ClosedChannelException*
> * at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown
> Source)*
> *17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result
> StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar,
> byteCount=186935315,
> body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar,
> offset=0, length=186935315}} to /129.82.44.142:56136
> ; closing connection*
>
>
>
> I looked over the web and I found only the following relevant link "
> https://stackoverflow.com/questions/29781489/apache-spark-
> network-errors-between-executors?noredirect=1=1". I tried with the
> suggestion given in the discussion as below.
>
>
> val conf = new 
> SparkConf().setAppName("KafkaInput").set("spark.shuffle.blockTransferService",
> "nio")
>
>
> But still it does not work. I am using "spark-2.2.0-bin-hadoop2.7" version
> of spark. Please help me with this issue and let me know if you need any
> other information from my side.
>
>
>
> Thanks and Regards,
> Sitakanta Mishra
>


Problem in Spark-Kafka Connector

2017-12-26 Thread Sitakant Mishra
Hi,

I am trying to connect my Spark cluster to a single Kafka Topic which
running as a separate process in a machine. While submitting the spark
application, I am getting the following error.



*17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result
StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar,
byteCount=186935315,
body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar,
offset=0, length=186935315}} to /129.82.44.156:55168
; closing connection*
*java.nio.channels.ClosedChannelException*
* at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown
Source)*
*17/12/25 16:56:57 INFO TaskSetManager: Starting task 21.0 in stage 0.0
(TID 21, 129.82.44.156, executor 9, partition 21, PROCESS_LOCAL, 4706
bytes)*
*17/12/25 16:56:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
129.82.44.156, executor 9): java.nio.channels.ClosedChannelException*
* at
org.apache.spark.network.client.StreamInterceptor.channelInactive(StreamInterceptor.java:60)*
* at
org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:179)*
* at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)*
* at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)*
* at
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)*
* at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289)*
* at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)*
* at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)*
* at
io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893)*
* at
io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691)*
* at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399)*
* at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)*
* at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)*
* at
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)*
* at java.lang.Thread.run(Thread.java:745)*

*17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result
StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar,
byteCount=186935315,
body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar,
offset=0, length=186935315}} to /129.82.44.164:45988
; closing connection*
*java.nio.channels.ClosedChannelException*
* at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown
Source)*
*17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result
StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar,
byteCount=186935315,
body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar,
offset=0, length=186935315}} to /129.82.44.142:56136
; closing connection*



I looked over the web and I found only the following relevant link "
https://stackoverflow.com/questions/29781489/apache-
spark-network-errors-between-executors?noredirect=1=1". I tried with the
suggestion given in the discussion as below.


val conf = new 
SparkConf().setAppName("KafkaInput").set("spark.shuffle.blockTransferService",
"nio")


But still it does not work. I am using "spark-2.2.0-bin-hadoop2.7" version
of spark. Please help me with this issue and let me know if you need any
other information from my side.



Thanks and Regards,
Sitakanta Mishra


Spark Kafka API tries to connect to the dead node for every batch, which increases the processing time

2017-10-13 Thread supritht
Hi guys,

I have a 3 node cluster and i am running a spark streaming job. consider the
below example

/*spark-submit* --master yarn-cluster --class
com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint --jars
/opt/client/Spark/spark/lib/streamingClient/kafka-clients-0.8.2.1.jar,/opt/client/Spark/spark/lib/streamingClient/kafka_2.10-0.8.2.1.jar,/opt/client/Spark/spark/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar
/opt/SparkStreamingExample-1.0.jar  /tmp/test 10 test
189.132.190.106:21005,189.132.190.145:21005,10.1.1.1:21005/

In this case, suppose node 10.1.1.1 is down. Then for every window batch,
spark tries to send a request  to all the nodes. 
This code is in the class org.apache.spark.streaming.kafka.KafkaCluster

Function : getPartitionMetadata()
Line : val resp: TopicMetadataResponse = consumer.send(req)

The function getPartitionMetadata() is called from getPartitions() and
findLeaders() which gets called for every batch.

Hence, if the node is down, the connection fails and it wits till the
timeout to happen before continuing which adds to the processing time.

Question :
Is there any way to avoid this ?
In simple words, i do not want spark to send request to the node that is
down for every batch. How can i achieve this ?






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-29 Thread Cody Koeninger
etimes . Please see the kafka
>>>> parameters and
>>>> >> > the
>>>> >> > consumer strategy for creating the stream below. Any suggestions
>>>> on how
>>>> >> > to
>>>> >> > run this with better performance would be of great help.
>>>> >> >
>>>> >> > java.lang.AssertionError: assertion failed: Failed to get records
>>>> for
>>>> >> > test
>>>> >> > stream1 72 324027964 after polling for 12
>>>> >> >
>>>> >> > val kafkaParams = Map[String, Object](
>>>> >> >   "bootstrap.servers" -> kafkaBrokers,
>>>> >> >   "key.deserializer" -> classOf[StringDeserializer],
>>>> >> >   "value.deserializer" -> classOf[StringDeserializer],
>>>> >> >   "auto.offset.reset" -> "latest",
>>>> >> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
>>>> >> >   "session.timeout.ms" -> Integer.valueOf(6),
>>>> >> >   "request.timeout.ms" -> Integer.valueOf(9),
>>>> >> >   "enable.auto.commit" -> (false: java.lang.Boolean),
>>>> >> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>>>> >> >   "group.id" -> "test1"
>>>> >> > )
>>>> >> >
>>>> >> >   val hubbleStream = KafkaUtils.createDirectStream[String,
>>>> String](
>>>> >> > ssc,
>>>> >> > LocationStrategies.PreferConsistent,
>>>> >> > ConsumerStrategies.Subscribe[String, String](topicsSet,
>>>> >> > kafkaParams)
>>>> >> >   )
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> > --
>>>> >> > View this message in context:
>>>> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-p
>>>> erformance-while-running-Spark-Kafka-Direct-Streaming-with-K
>>>> afka-10-cluster-tp29108.html
>>>> >> > Sent from the Apache Spark User List mailing list archive at
>>>> Nabble.com.
>>>> >> >
>>>> >> > 
>>>> -
>>>> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>> >> >
>>>> >
>>>> >
>>>>
>>>
>>>
>>
>


Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-28 Thread swetha kasireddy
Hi Cody,

Following is the way that I am consuming data for a 60 second batch. Do you
see anything that is wrong with the way the data is getting consumed that
can cause slowness in performance?


val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> kafkaBrokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "auto.offset.reset" -> "latest",
  "heartbeat.interval.ms" -> Integer.valueOf(2),
  "session.timeout.ms" -> Integer.valueOf(6),
  "request.timeout.ms" -> Integer.valueOf(9),
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "spark.streaming.kafka.consumer.cache.enabled" -> "false",
  "group.id" -> "test1"
)

  val hubbleStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
  )

val kafkaStreamRdd = kafkaStream.transform { rdd =>
rdd.map(consumerRecord => (consumerRecord.key(), consumerRecord.value()))
}

On Mon, Aug 28, 2017 at 11:56 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> There is no difference in performance even with Cache being enabled.
>
> On Mon, Aug 28, 2017 at 11:27 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> There is no difference in performance even with Cache being disabled.
>>
>> On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> So if you can run with cache enabled for some time, does that
>>> significantly affect the performance issue you were seeing?
>>>
>>> Those settings seem reasonable enough.   If preferred locations is
>>> behaving correctly you shouldn't need cached consumers for all 96
>>> partitions on any one executor, so that maxCapacity setting is
>>> probably unnecessary.
>>>
>>> On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
>>> <swethakasire...@gmail.com> wrote:
>>> > Because I saw some posts that say that consumer cache  enabled will
>>> have
>>> > concurrentModification exception with reduceByKeyAndWIndow. I see those
>>> > errors as well after running for sometime with cache being enabled.
>>> So, I
>>> > had to disable it. Please see the tickets below.  We have 96
>>> partitions. So
>>> > if I enable cache, would teh following settings help to improve
>>> performance?
>>> >
>>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>>> Integer.valueOf(96),
>>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>>> Integer.valueOf(96),
>>> >
>>> > "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
>>> >
>>> >
>>> > http://markmail.org/message/n4cdxwurlhf44q5x
>>> >
>>> > https://issues.apache.org/jira/browse/SPARK-19185
>>> >
>>> > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>> >>
>>> >> Why are you setting consumer.cache.enabled to false?
>>> >>
>>> >> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com>
>>> wrote:
>>> >> > Hi,
>>> >> >
>>> >> > What would be the appropriate settings to run Spark with Kafka 10?
>>> My
>>> >> > job
>>> >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>>> >> > very
>>> >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for
>>> Kafka 10
>>> >> > . I
>>> >> > see the following error sometimes . Please see the kafka parameters
>>> and
>>> >> > the
>>> >> > consumer strategy for creating the stream below. Any suggestions on
>>> how
>>> >> > to
>>> >> > run this with better performance would be of great help.
>>> >> >
>>> >> > java.lang.AssertionError: assertion failed: Failed to get records
>>> for
>>> >> > test
>>> >> > stream1 72 324027964 after polling for 12
>>> >> >
>>> >> > val kafkaParams = Map[String, Object](
>>> >> >   "bootstrap.servers" -> kafkaBroke

Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-28 Thread swetha kasireddy
There is no difference in performance even with Cache being enabled.

On Mon, Aug 28, 2017 at 11:27 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> There is no difference in performance even with Cache being disabled.
>
> On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> So if you can run with cache enabled for some time, does that
>> significantly affect the performance issue you were seeing?
>>
>> Those settings seem reasonable enough.   If preferred locations is
>> behaving correctly you shouldn't need cached consumers for all 96
>> partitions on any one executor, so that maxCapacity setting is
>> probably unnecessary.
>>
>> On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
>> <swethakasire...@gmail.com> wrote:
>> > Because I saw some posts that say that consumer cache  enabled will have
>> > concurrentModification exception with reduceByKeyAndWIndow. I see those
>> > errors as well after running for sometime with cache being enabled. So,
>> I
>> > had to disable it. Please see the tickets below.  We have 96
>> partitions. So
>> > if I enable cache, would teh following settings help to improve
>> performance?
>> >
>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>> Integer.valueOf(96),
>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>> Integer.valueOf(96),
>> >
>> > "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
>> >
>> >
>> > http://markmail.org/message/n4cdxwurlhf44q5x
>> >
>> > https://issues.apache.org/jira/browse/SPARK-19185
>> >
>> > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>> >>
>> >> Why are you setting consumer.cache.enabled to false?
>> >>
>> >> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com>
>> wrote:
>> >> > Hi,
>> >> >
>> >> > What would be the appropriate settings to run Spark with Kafka 10? My
>> >> > job
>> >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>> >> > very
>> >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for
>> Kafka 10
>> >> > . I
>> >> > see the following error sometimes . Please see the kafka parameters
>> and
>> >> > the
>> >> > consumer strategy for creating the stream below. Any suggestions on
>> how
>> >> > to
>> >> > run this with better performance would be of great help.
>> >> >
>> >> > java.lang.AssertionError: assertion failed: Failed to get records for
>> >> > test
>> >> > stream1 72 324027964 after polling for 12
>> >> >
>> >> > val kafkaParams = Map[String, Object](
>> >> >   "bootstrap.servers" -> kafkaBrokers,
>> >> >   "key.deserializer" -> classOf[StringDeserializer],
>> >> >   "value.deserializer" -> classOf[StringDeserializer],
>> >> >   "auto.offset.reset" -> "latest",
>> >> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
>> >> >   "session.timeout.ms" -> Integer.valueOf(6),
>> >> >   "request.timeout.ms" -> Integer.valueOf(9),
>> >> >   "enable.auto.commit" -> (false: java.lang.Boolean),
>> >> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>> >> >   "group.id" -> "test1"
>> >> > )
>> >> >
>> >> >   val hubbleStream = KafkaUtils.createDirectStream[String,
>> String](
>> >> > ssc,
>> >> > LocationStrategies.PreferConsistent,
>> >> > ConsumerStrategies.Subscribe[String, String](topicsSet,
>> >> > kafkaParams)
>> >> >   )
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > View this message in context:
>> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-p
>> erformance-while-running-Spark-Kafka-Direct-Streaming-with-
>> Kafka-10-cluster-tp29108.html
>> >> > Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >> >
>> >> > 
>> -
>> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >> >
>> >
>> >
>>
>
>


Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-28 Thread swetha kasireddy
There is no difference in performance even with Cache being disabled.

On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger <c...@koeninger.org> wrote:

> So if you can run with cache enabled for some time, does that
> significantly affect the performance issue you were seeing?
>
> Those settings seem reasonable enough.   If preferred locations is
> behaving correctly you shouldn't need cached consumers for all 96
> partitions on any one executor, so that maxCapacity setting is
> probably unnecessary.
>
> On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
> <swethakasire...@gmail.com> wrote:
> > Because I saw some posts that say that consumer cache  enabled will have
> > concurrentModification exception with reduceByKeyAndWIndow. I see those
> > errors as well after running for sometime with cache being enabled. So, I
> > had to disable it. Please see the tickets below.  We have 96 partitions.
> So
> > if I enable cache, would teh following settings help to improve
> performance?
> >
> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
> Integer.valueOf(96),
> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
> Integer.valueOf(96),
> >
> > "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
> >
> >
> > http://markmail.org/message/n4cdxwurlhf44q5x
> >
> > https://issues.apache.org/jira/browse/SPARK-19185
> >
> > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> Why are you setting consumer.cache.enabled to false?
> >>
> >> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com> wrote:
> >> > Hi,
> >> >
> >> > What would be the appropriate settings to run Spark with Kafka 10? My
> >> > job
> >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
> >> > very
> >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka
> 10
> >> > . I
> >> > see the following error sometimes . Please see the kafka parameters
> and
> >> > the
> >> > consumer strategy for creating the stream below. Any suggestions on
> how
> >> > to
> >> > run this with better performance would be of great help.
> >> >
> >> > java.lang.AssertionError: assertion failed: Failed to get records for
> >> > test
> >> > stream1 72 324027964 after polling for 12
> >> >
> >> > val kafkaParams = Map[String, Object](
> >> >   "bootstrap.servers" -> kafkaBrokers,
> >> >   "key.deserializer" -> classOf[StringDeserializer],
> >> >   "value.deserializer" -> classOf[StringDeserializer],
> >> >   "auto.offset.reset" -> "latest",
> >> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
> >> >   "session.timeout.ms" -> Integer.valueOf(6),
> >> >   "request.timeout.ms" -> Integer.valueOf(9),
> >> >   "enable.auto.commit" -> (false: java.lang.Boolean),
> >> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
> >> >   "group.id" -> "test1"
> >> > )
> >> >
> >> >   val hubbleStream = KafkaUtils.createDirectStream[String,
> String](
> >> > ssc,
> >> > LocationStrategies.PreferConsistent,
> >> > ConsumerStrategies.Subscribe[String, String](topicsSet,
> >> > kafkaParams)
> >> >   )
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > View this message in context:
> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-
> performance-while-running-Spark-Kafka-Direct-Streaming-
> with-Kafka-10-cluster-tp29108.html
> >> > Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >> >
> >> > -
> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >
> >
> >
>


Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-28 Thread Cody Koeninger
So if you can run with cache enabled for some time, does that
significantly affect the performance issue you were seeing?

Those settings seem reasonable enough.   If preferred locations is
behaving correctly you shouldn't need cached consumers for all 96
partitions on any one executor, so that maxCapacity setting is
probably unnecessary.

On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
<swethakasire...@gmail.com> wrote:
> Because I saw some posts that say that consumer cache  enabled will have
> concurrentModification exception with reduceByKeyAndWIndow. I see those
> errors as well after running for sometime with cache being enabled. So, I
> had to disable it. Please see the tickets below.  We have 96 partitions. So
> if I enable cache, would teh following settings help to improve performance?
>
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.valueOf(96),
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.valueOf(96),
>
> "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
>
>
> http://markmail.org/message/n4cdxwurlhf44q5x
>
> https://issues.apache.org/jira/browse/SPARK-19185
>
> On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Why are you setting consumer.cache.enabled to false?
>>
>> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com> wrote:
>> > Hi,
>> >
>> > What would be the appropriate settings to run Spark with Kafka 10? My
>> > job
>> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>> > very
>> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10
>> > . I
>> > see the following error sometimes . Please see the kafka parameters and
>> > the
>> > consumer strategy for creating the stream below. Any suggestions on how
>> > to
>> > run this with better performance would be of great help.
>> >
>> > java.lang.AssertionError: assertion failed: Failed to get records for
>> > test
>> > stream1 72 324027964 after polling for 12
>> >
>> > val kafkaParams = Map[String, Object](
>> >   "bootstrap.servers" -> kafkaBrokers,
>> >   "key.deserializer" -> classOf[StringDeserializer],
>> >   "value.deserializer" -> classOf[StringDeserializer],
>> >   "auto.offset.reset" -> "latest",
>> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
>> >   "session.timeout.ms" -> Integer.valueOf(6),
>> >   "request.timeout.ms" -> Integer.valueOf(9),
>> >   "enable.auto.commit" -> (false: java.lang.Boolean),
>> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>> >   "group.id" -> "test1"
>> > )
>> >
>> >   val hubbleStream = KafkaUtils.createDirectStream[String, String](
>> > ssc,
>> > LocationStrategies.PreferConsistent,
>> > ConsumerStrategies.Subscribe[String, String](topicsSet,
>> > kafkaParams)
>> >   )
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-performance-while-running-Spark-Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-25 Thread swetha kasireddy
Because I saw some posts that say that consumer cache  enabled will have
concurrentModification exception with reduceByKeyAndWIndow. I see those
errors as well after running for sometime with cache being enabled. So, I
had to disable it. Please see the tickets below.  We have 96 partitions. So
if I enable cache, would teh following settings help to improve
performance?

"spark.streaming.kafka.consumer.cache.initialCapacity" -> Integer.*valueOf*
(12),
"spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*(15),

"spark.streaming.kafka.consumer.poll.ms" -> Integer.*valueOf*(1024),

http://markmail.org/message/n4cdxwurlhf44q5x

https://issues.apache.org/jira/browse/SPARK-19185


Also, I have a batch of 60 seconds. What do you suggest the following  to
be?

 session.timeout.ms, heartbeat.interval.ms

On Fri, Aug 25, 2017 at 5:04 PM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> Because I saw some posts that say that consumer cache  enabled will have
> concurrentModification exception with reduceByKeyAndWIndow. I see those
> errors as well after running for sometime with cache being enabled. So, I
> had to disable it. Please see the tickets below.  We have 96 partitions. So
> if I enable cache, would teh following settings help to improve
> performance?
>
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*
> (96),
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*
> (96),
>
> "spark.streaming.kafka.consumer.poll.ms" -> Integer.*valueOf*(1024),
>
> http://markmail.org/message/n4cdxwurlhf44q5x
>
> https://issues.apache.org/jira/browse/SPARK-19185
>
> On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Why are you setting consumer.cache.enabled to false?
>>
>> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com> wrote:
>> > Hi,
>> >
>> > What would be the appropriate settings to run Spark with Kafka 10? My
>> job
>> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>> very
>> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka
>> 10 . I
>> > see the following error sometimes . Please see the kafka parameters and
>> the
>> > consumer strategy for creating the stream below. Any suggestions on how
>> to
>> > run this with better performance would be of great help.
>> >
>> > java.lang.AssertionError: assertion failed: Failed to get records for
>> test
>> > stream1 72 324027964 after polling for 12
>> >
>> > val kafkaParams = Map[String, Object](
>> >   "bootstrap.servers" -> kafkaBrokers,
>> >   "key.deserializer" -> classOf[StringDeserializer],
>> >   "value.deserializer" -> classOf[StringDeserializer],
>> >   "auto.offset.reset" -> "latest",
>> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
>> >   "session.timeout.ms" -> Integer.valueOf(6),
>> >   "request.timeout.ms" -> Integer.valueOf(9),
>> >   "enable.auto.commit" -> (false: java.lang.Boolean),
>> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>> >   "group.id" -> "test1"
>> > )
>> >
>> >   val hubbleStream = KafkaUtils.createDirectStream[String, String](
>> > ssc,
>> > LocationStrategies.PreferConsistent,
>> > ConsumerStrategies.Subscribe[String, String](topicsSet,
>> kafkaParams)
>> >   )
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Slower-performance-while-running-Spark
>> -Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>
>


Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-25 Thread swetha kasireddy
Because I saw some posts that say that consumer cache  enabled will have
concurrentModification exception with reduceByKeyAndWIndow. I see those
errors as well after running for sometime with cache being enabled. So, I
had to disable it. Please see the tickets below.  We have 96 partitions. So
if I enable cache, would teh following settings help to improve
performance?

"spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*(96),
"spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*(96),

"spark.streaming.kafka.consumer.poll.ms" -> Integer.*valueOf*(1024),

http://markmail.org/message/n4cdxwurlhf44q5x

https://issues.apache.org/jira/browse/SPARK-19185

On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Why are you setting consumer.cache.enabled to false?
>
> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com> wrote:
> > Hi,
> >
> > What would be the appropriate settings to run Spark with Kafka 10? My job
> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its very
> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10
> . I
> > see the following error sometimes . Please see the kafka parameters and
> the
> > consumer strategy for creating the stream below. Any suggestions on how
> to
> > run this with better performance would be of great help.
> >
> > java.lang.AssertionError: assertion failed: Failed to get records for
> test
> > stream1 72 324027964 after polling for 12
> >
> > val kafkaParams = Map[String, Object](
> >   "bootstrap.servers" -> kafkaBrokers,
> >   "key.deserializer" -> classOf[StringDeserializer],
> >   "value.deserializer" -> classOf[StringDeserializer],
> >   "auto.offset.reset" -> "latest",
> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
> >   "session.timeout.ms" -> Integer.valueOf(6),
> >   "request.timeout.ms" -> Integer.valueOf(9),
> >   "enable.auto.commit" -> (false: java.lang.Boolean),
> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
> >   "group.id" -> "test1"
> > )
> >
> >       val hubbleStream = KafkaUtils.createDirectStream[String, String](
> > ssc,
> > LocationStrategies.PreferConsistent,
> > ConsumerStrategies.Subscribe[String, String](topicsSet,
> kafkaParams)
> >   )
> >
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Slower-performance-while-running-
> Spark-Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-25 Thread Cody Koeninger
Why are you setting consumer.cache.enabled to false?

On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com> wrote:
> Hi,
>
> What would be the appropriate settings to run Spark with Kafka 10? My job
> works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its very
> slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10 . I
> see the following error sometimes . Please see the kafka parameters and the
> consumer strategy for creating the stream below. Any suggestions on how to
> run this with better performance would be of great help.
>
> java.lang.AssertionError: assertion failed: Failed to get records for test
> stream1 72 324027964 after polling for 12
>
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> kafkaBrokers,
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> classOf[StringDeserializer],
>   "auto.offset.reset" -> "latest",
>   "heartbeat.interval.ms" -> Integer.valueOf(2),
>   "session.timeout.ms" -> Integer.valueOf(6),
>   "request.timeout.ms" -> Integer.valueOf(9),
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>   "group.id" -> "test1"
> )
>
>   val hubbleStream = KafkaUtils.createDirectStream[String, String](
> ssc,
> LocationStrategies.PreferConsistent,
> ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
>   )
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Slower-performance-while-running-Spark-Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-25 Thread SRK
Hi,

What would be the appropriate settings to run Spark with Kafka 10? My job
works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its very
slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10 . I
see the following error sometimes . Please see the kafka parameters and the
consumer strategy for creating the stream below. Any suggestions on how to
run this with better performance would be of great help.

java.lang.AssertionError: assertion failed: Failed to get records for test
stream1 72 324027964 after polling for 12

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> kafkaBrokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "auto.offset.reset" -> "latest",
  "heartbeat.interval.ms" -> Integer.valueOf(2),
  "session.timeout.ms" -> Integer.valueOf(6),
  "request.timeout.ms" -> Integer.valueOf(9),
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "spark.streaming.kafka.consumer.cache.enabled" -> "false",
  "group.id" -> "test1"
)

  val hubbleStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
  )





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Slower-performance-while-running-Spark-Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to force Spark Kafka Direct to start from the latest offset when the lag is huge in kafka 10?

2017-08-22 Thread Cody Koeninger
Kafka rdds need to start from a specified offset, you really don't
want the executors just starting at whatever offset happened to be
latest at the time they ran.

If you need a way to figure out the latest offset at the time the
driver starts up, you can always use a consumer to read the offsets
and then pass that to Assign (just make sure that consumer is closed
before the job starts so you don't get group id conflicts).  You can
even make your own implementation of ConsumerStrategy, which should
allow you to do pretty much whatever you need to get the consumer in
the state you want.

On Mon, Aug 21, 2017 at 6:57 PM, swetha kasireddy
<swethakasire...@gmail.com> wrote:
> Hi Cody,
>
> I think the Assign is used if we want it to start from a specified offset.
> What if we want it to start it from the latest offset with something like
> returned by "auto.offset.reset" -> "latest",.
>
>
> Thanks!
>
> On Mon, Aug 21, 2017 at 9:06 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Yes, you can start from specified offsets.  See ConsumerStrategy,
>> specifically Assign
>>
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#your-own-data-store
>>
>> On Tue, Aug 15, 2017 at 1:18 PM, SRK <swethakasire...@gmail.com> wrote:
>> > Hi,
>> >
>> > How to force Spark Kafka Direct to start from the latest offset when the
>> > lag
>> > is huge in kafka 10? It seems to be processing from the latest offset
>> > stored
>> > for a group id. One way to do this is to change the group id. But it
>> > would
>> > mean that each time that we need to process the job from the latest
>> > offset
>> > we have to provide a new group id.
>> >
>> > Is there a way to force the job to run from the latest offset in case we
>> > need to and still use the same group id?
>> >
>> > Thanks!
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-force-Spark-Kafka-Direct-to-start-from-the-latest-offset-when-the-lag-is-huge-in-kafka-10-tp29071.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to force Spark Kafka Direct to start from the latest offset when the lag is huge in kafka 10?

2017-08-21 Thread swetha kasireddy
Hi Cody,

I think the Assign is used if we want it to start from a specified offset.
What if we want it to start it from the latest offset with something like
returned by "auto.offset.reset" -> "latest",.


Thanks!

On Mon, Aug 21, 2017 at 9:06 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Yes, you can start from specified offsets.  See ConsumerStrategy,
> specifically Assign
>
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html#your-own-data-store
>
> On Tue, Aug 15, 2017 at 1:18 PM, SRK <swethakasire...@gmail.com> wrote:
> > Hi,
> >
> > How to force Spark Kafka Direct to start from the latest offset when the
> lag
> > is huge in kafka 10? It seems to be processing from the latest offset
> stored
> > for a group id. One way to do this is to change the group id. But it
> would
> > mean that each time that we need to process the job from the latest
> offset
> > we have to provide a new group id.
> >
> > Is there a way to force the job to run from the latest offset in case we
> > need to and still use the same group id?
> >
> > Thanks!
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-force-Spark-Kafka-Direct-to-
> start-from-the-latest-offset-when-the-lag-is-huge-in-kafka-10-tp29071.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Re: How to force Spark Kafka Direct to start from the latest offset when the lag is huge in kafka 10?

2017-08-21 Thread Cody Koeninger
Yes, you can start from specified offsets.  See ConsumerStrategy,
specifically Assign

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#your-own-data-store

On Tue, Aug 15, 2017 at 1:18 PM, SRK <swethakasire...@gmail.com> wrote:
> Hi,
>
> How to force Spark Kafka Direct to start from the latest offset when the lag
> is huge in kafka 10? It seems to be processing from the latest offset stored
> for a group id. One way to do this is to change the group id. But it would
> mean that each time that we need to process the job from the latest offset
> we have to provide a new group id.
>
> Is there a way to force the job to run from the latest offset in case we
> need to and still use the same group id?
>
> Thanks!
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-force-Spark-Kafka-Direct-to-start-from-the-latest-offset-when-the-lag-is-huge-in-kafka-10-tp29071.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to force Spark Kafka Direct to start from the latest offset when the lag is huge in kafka 10?

2017-08-15 Thread SRK
Hi,

How to force Spark Kafka Direct to start from the latest offset when the lag
is huge in kafka 10? It seems to be processing from the latest offset stored
for a group id. One way to do this is to change the group id. But it would
mean that each time that we need to process the job from the latest offset
we have to provide a new group id.

Is there a way to force the job to run from the latest offset in case we
need to and still use the same group id?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-force-Spark-Kafka-Direct-to-start-from-the-latest-offset-when-the-lag-is-huge-in-kafka-10-tp29071.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to make sure that Spark Kafka Direct Streaming job maintains the state upon code deployment?

2017-06-27 Thread SRK
Hi,

We use UpdateStateByKey, reduceByKeyWindow and checkpoint the data.  We
store the offsets in Zookeeper. How to make sure that the state of the job
is maintained upon redeploying the code?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-sure-that-Spark-Kafka-Direct-Streaming-job-maintains-the-state-upon-code-deployment-tp28799.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to bootstrap Spark Kafka direct with the previous state in case of a code upgrade

2017-06-20 Thread SRK
Hi,

How do we bootstrap the streaming job with the previous state when we do a
code change and redeploy? We use updateStateByKey to maintain the state and
store session objects and LinkedHashMaps in the checkpoint.

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-bootstrap-Spark-Kafka-direct-with-the-previous-state-in-case-of-a-code-upgrade-tp28775.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark-Kafka integration - build failing with sbt

2017-06-19 Thread Cody Koeninger
org.apache.spark.streaming.kafka.KafkaUtils
is in the
spark-streaming-kafka-0-8
project

On Mon, Jun 19, 2017 at 1:01 PM, karan alang  wrote:
> Hi Cody - i do have a additional basic question ..
>
> When i tried to compile the code in Eclipse, i was not able to do that
>
> eg.
> import org.apache.spark.streaming.kafka.KafkaUtils
>
> gave errors saying KafaUtils was not part of the package.
> However, when i used sbt to compile - the compilation went through fine
>
> So, I assume additional libraries are being downloaded when i provide the
> appropriate packages in LibraryDependencies ?
> which ones would have helped compile this ?
>
>
>
> On Sat, Jun 17, 2017 at 2:53 PM, karan alang  wrote:
>>
>> Thanks, Cody .. yes, was able to fix that.
>>
>> On Sat, Jun 17, 2017 at 1:18 PM, Cody Koeninger 
>> wrote:
>>>
>>> There are different projects for different versions of kafka,
>>> spark-streaming-kafka-0-8 and spark-streaming-kafka-0-10
>>>
>>> See
>>>
>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>>>
>>> On Fri, Jun 16, 2017 at 6:51 PM, karan alang 
>>> wrote:
>>> > I'm trying to compile kafka & Spark Streaming integration code i.e.
>>> > reading
>>> > from Kafka using Spark Streaming,
>>> >   and the sbt build is failing with error -
>>> >
>>> >   [error] (*:update) sbt.ResolveException: unresolved dependency:
>>> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
>>> >
>>> >   Scala version -> 2.10.7
>>> >   Spark Version -> 2.1.0
>>> >   Kafka version -> 0.9
>>> >   sbt version -> 0.13
>>> >
>>> > Contents of sbt files is as shown below ->
>>> >
>>> > 1)
>>> >   vi spark_kafka_code/project/plugins.sbt
>>> >
>>> >   addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
>>> >
>>> >  2)
>>> >   vi spark_kafka_code/sparkkafka.sbt
>>> >
>>> > import AssemblyKeys._
>>> > assemblySettings
>>> >
>>> > name := "SparkKafka Project"
>>> >
>>> > version := "1.0"
>>> > scalaVersion := "2.11.7"
>>> >
>>> > val sparkVers = "2.1.0"
>>> >
>>> > // Base Spark-provided dependencies
>>> > libraryDependencies ++= Seq(
>>> >   "org.apache.spark" %% "spark-core" % sparkVers % "provided",
>>> >   "org.apache.spark" %% "spark-streaming" % sparkVers % "provided",
>>> >   "org.apache.spark" %% "spark-streaming-kafka" % sparkVers)
>>> >
>>> > mergeStrategy in assembly := {
>>> >   case m if m.toLowerCase.endsWith("manifest.mf") =>
>>> > MergeStrategy.discard
>>> >   case m if m.toLowerCase.startsWith("META-INF")  =>
>>> > MergeStrategy.discard
>>> >   case "reference.conf"   =>
>>> > MergeStrategy.concat
>>> >   case m if m.endsWith("UnusedStubClass.class")   =>
>>> > MergeStrategy.discard
>>> >   case _ => MergeStrategy.first
>>> > }
>>> >
>>> >   i launch sbt, and then try to create an eclipse project, complete
>>> > error is
>>> > as shown below -
>>> >
>>> >   -
>>> >
>>> >   sbt
>>> > [info] Loading global plugins from /Users/karanalang/.sbt/0.13/plugins
>>> > [info] Loading project definition from
>>> >
>>> > /Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/project
>>> > [info] Set current project to SparkKafka Project (in build
>>> >
>>> > file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/)
>>> >> eclipse
>>> > [info] About to create Eclipse project files for your project(s).
>>> > [info] Updating
>>> >
>>> > {file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/}spark_kafka_code...
>>> > [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
>>> > [warn] module not found:
>>> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0
>>> > [warn]  local: tried
>>> > [warn]
>>> >
>>> > /Users/karanalang/.ivy2/local/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
>>> > [warn]  activator-launcher-local: tried
>>> > [warn]
>>> >
>>> > /Users/karanalang/.activator/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
>>> > [warn]  activator-local: tried
>>> > [warn]
>>> >
>>> > /Users/karanalang/Documents/Technology/SCALA/activator-dist-1.3.10/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
>>> > [warn]  public: tried
>>> > [warn]
>>> >
>>> > https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
>>> > [warn]  typesafe-releases: tried
>>> > [warn]
>>> >
>>> > http://repo.typesafe.com/typesafe/releases/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
>>> > [warn]  typesafe-ivy-releasez: tried
>>> > [warn]
>>> >
>>> > http://repo.typesafe.com/typesafe/ivy-releases/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
>>> > [info] Resolving jline#jline;2.12.1 ...
>>> > [warn] ::
>>> > [warn] ::  UNRESOLVED 

Re: Spark-Kafka integration - build failing with sbt

2017-06-19 Thread karan alang
Hi Cody - i do have a additional basic question ..

When i tried to compile the code in Eclipse, i was not able to do that

eg.
import org.apache.spark.streaming.kafka.KafkaUtils

gave errors saying KafaUtils was not part of the package.
However, when i used sbt to compile - the compilation went through fine

So, I assume additional libraries are being downloaded when i provide the
appropriate packages in LibraryDependencies ?
which ones would have helped compile this ?



On Sat, Jun 17, 2017 at 2:53 PM, karan alang  wrote:

> Thanks, Cody .. yes, was able to fix that.
>
> On Sat, Jun 17, 2017 at 1:18 PM, Cody Koeninger 
> wrote:
>
>> There are different projects for different versions of kafka,
>> spark-streaming-kafka-0-8 and spark-streaming-kafka-0-10
>>
>> See
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>>
>> On Fri, Jun 16, 2017 at 6:51 PM, karan alang 
>> wrote:
>> > I'm trying to compile kafka & Spark Streaming integration code i.e.
>> reading
>> > from Kafka using Spark Streaming,
>> >   and the sbt build is failing with error -
>> >
>> >   [error] (*:update) sbt.ResolveException: unresolved dependency:
>> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
>> >
>> >   Scala version -> 2.10.7
>> >   Spark Version -> 2.1.0
>> >   Kafka version -> 0.9
>> >   sbt version -> 0.13
>> >
>> > Contents of sbt files is as shown below ->
>> >
>> > 1)
>> >   vi spark_kafka_code/project/plugins.sbt
>> >
>> >   addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
>> >
>> >  2)
>> >   vi spark_kafka_code/sparkkafka.sbt
>> >
>> > import AssemblyKeys._
>> > assemblySettings
>> >
>> > name := "SparkKafka Project"
>> >
>> > version := "1.0"
>> > scalaVersion := "2.11.7"
>> >
>> > val sparkVers = "2.1.0"
>> >
>> > // Base Spark-provided dependencies
>> > libraryDependencies ++= Seq(
>> >   "org.apache.spark" %% "spark-core" % sparkVers % "provided",
>> >   "org.apache.spark" %% "spark-streaming" % sparkVers % "provided",
>> >   "org.apache.spark" %% "spark-streaming-kafka" % sparkVers)
>> >
>> > mergeStrategy in assembly := {
>> >   case m if m.toLowerCase.endsWith("manifest.mf") =>
>> MergeStrategy.discard
>> >   case m if m.toLowerCase.startsWith("META-INF")  =>
>> MergeStrategy.discard
>> >   case "reference.conf"   =>
>> MergeStrategy.concat
>> >   case m if m.endsWith("UnusedStubClass.class")   =>
>> MergeStrategy.discard
>> >   case _ => MergeStrategy.first
>> > }
>> >
>> >   i launch sbt, and then try to create an eclipse project, complete
>> error is
>> > as shown below -
>> >
>> >   -
>> >
>> >   sbt
>> > [info] Loading global plugins from /Users/karanalang/.sbt/0.13/plugins
>> > [info] Loading project definition from
>> > /Users/karanalang/Documents/Technology/Coursera_spark_scala/
>> spark_kafka_code/project
>> > [info] Set current project to SparkKafka Project (in build
>> > file:/Users/karanalang/Documents/Technology/Coursera_spark_
>> scala/spark_kafka_code/)
>> >> eclipse
>> > [info] About to create Eclipse project files for your project(s).
>> > [info] Updating
>> > {file:/Users/karanalang/Documents/Technology/Coursera_spark_
>> scala/spark_kafka_code/}spark_kafka_code...
>> > [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
>> > [warn] module not found:
>> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0
>> > [warn]  local: tried
>> > [warn]
>> > /Users/karanalang/.ivy2/local/org.apache.spark/spark-streami
>> ng-kafka_2.11/2.1.0/ivys/ivy.xml
>> > [warn]  activator-launcher-local: tried
>> > [warn]
>> > /Users/karanalang/.activator/repository/org.apache.spark/spa
>> rk-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
>> > [warn]  activator-local: tried
>> > [warn]
>> > /Users/karanalang/Documents/Technology/SCALA/activator-dist-
>> 1.3.10/repository/org.apache.spark/spark-streaming-kafka_2.
>> 11/2.1.0/ivys/ivy.xml
>> > [warn]  public: tried
>> > [warn]
>> > https://repo1.maven.org/maven2/org/apache/spark/spark-stream
>> ing-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
>> > [warn]  typesafe-releases: tried
>> > [warn]
>> > http://repo.typesafe.com/typesafe/releases/org/apache/spark/
>> spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
>> > [warn]  typesafe-ivy-releasez: tried
>> > [warn]
>> > http://repo.typesafe.com/typesafe/ivy-releases/org.apache.
>> spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
>> > [info] Resolving jline#jline;2.12.1 ...
>> > [warn] ::
>> > [warn] ::  UNRESOLVED DEPENDENCIES ::
>> > [warn] ::
>> > [warn] :: org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not
>> found
>> > [warn] ::
>> > [warn]
>> > [warn] Note: Unresolved dependencies path:
>> > [warn] 

Re: Spark-Kafka integration - build failing with sbt

2017-06-17 Thread karan alang
Thanks, Cody .. yes, was able to fix that.

On Sat, Jun 17, 2017 at 1:18 PM, Cody Koeninger  wrote:

> There are different projects for different versions of kafka,
> spark-streaming-kafka-0-8 and spark-streaming-kafka-0-10
>
> See
>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>
> On Fri, Jun 16, 2017 at 6:51 PM, karan alang 
> wrote:
> > I'm trying to compile kafka & Spark Streaming integration code i.e.
> reading
> > from Kafka using Spark Streaming,
> >   and the sbt build is failing with error -
> >
> >   [error] (*:update) sbt.ResolveException: unresolved dependency:
> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
> >
> >   Scala version -> 2.10.7
> >   Spark Version -> 2.1.0
> >   Kafka version -> 0.9
> >   sbt version -> 0.13
> >
> > Contents of sbt files is as shown below ->
> >
> > 1)
> >   vi spark_kafka_code/project/plugins.sbt
> >
> >   addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
> >
> >  2)
> >   vi spark_kafka_code/sparkkafka.sbt
> >
> > import AssemblyKeys._
> > assemblySettings
> >
> > name := "SparkKafka Project"
> >
> > version := "1.0"
> > scalaVersion := "2.11.7"
> >
> > val sparkVers = "2.1.0"
> >
> > // Base Spark-provided dependencies
> > libraryDependencies ++= Seq(
> >   "org.apache.spark" %% "spark-core" % sparkVers % "provided",
> >   "org.apache.spark" %% "spark-streaming" % sparkVers % "provided",
> >   "org.apache.spark" %% "spark-streaming-kafka" % sparkVers)
> >
> > mergeStrategy in assembly := {
> >   case m if m.toLowerCase.endsWith("manifest.mf") =>
> MergeStrategy.discard
> >   case m if m.toLowerCase.startsWith("META-INF")  =>
> MergeStrategy.discard
> >   case "reference.conf"   => MergeStrategy.concat
> >   case m if m.endsWith("UnusedStubClass.class")   =>
> MergeStrategy.discard
> >   case _ => MergeStrategy.first
> > }
> >
> >   i launch sbt, and then try to create an eclipse project, complete
> error is
> > as shown below -
> >
> >   -
> >
> >   sbt
> > [info] Loading global plugins from /Users/karanalang/.sbt/0.13/plugins
> > [info] Loading project definition from
> > /Users/karanalang/Documents/Technology/Coursera_spark_
> scala/spark_kafka_code/project
> > [info] Set current project to SparkKafka Project (in build
> > file:/Users/karanalang/Documents/Technology/Coursera_
> spark_scala/spark_kafka_code/)
> >> eclipse
> > [info] About to create Eclipse project files for your project(s).
> > [info] Updating
> > {file:/Users/karanalang/Documents/Technology/Coursera_
> spark_scala/spark_kafka_code/}spark_kafka_code...
> > [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
> > [warn] module not found:
> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0
> > [warn]  local: tried
> > [warn]
> > /Users/karanalang/.ivy2/local/org.apache.spark/spark-
> streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> > [warn]  activator-launcher-local: tried
> > [warn]
> > /Users/karanalang/.activator/repository/org.apache.spark/
> spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> > [warn]  activator-local: tried
> > [warn]
> > /Users/karanalang/Documents/Technology/SCALA/activator-
> dist-1.3.10/repository/org.apache.spark/spark-streaming-
> kafka_2.11/2.1.0/ivys/ivy.xml
> > [warn]  public: tried
> > [warn]
> > https://repo1.maven.org/maven2/org/apache/spark/spark-
> streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
> > [warn]  typesafe-releases: tried
> > [warn]
> > http://repo.typesafe.com/typesafe/releases/org/apache/
> spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-
> kafka_2.11-2.1.0.pom
> > [warn]  typesafe-ivy-releasez: tried
> > [warn]
> > http://repo.typesafe.com/typesafe/ivy-releases/org.
> apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> > [info] Resolving jline#jline;2.12.1 ...
> > [warn] ::
> > [warn] ::  UNRESOLVED DEPENDENCIES ::
> > [warn] ::
> > [warn] :: org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not
> found
> > [warn] ::
> > [warn]
> > [warn] Note: Unresolved dependencies path:
> > [warn] org.apache.spark:spark-streaming-kafka_2.11:2.1.0
> > (/Users/karanalang/Documents/Technology/Coursera_spark_
> scala/spark_kafka_code/sparkkafka.sbt#L12-16)
> > [warn]   +- sparkkafka-project:sparkkafka-project_2.11:1.0
> > [trace] Stack trace suppressed: run last *:update for the full output.
> > [error] (*:update) sbt.ResolveException: unresolved dependency:
> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
> > [info] Updating
> > {file:/Users/karanalang/Documents/Technology/Coursera_
> spark_scala/spark_kafka_code/}spark_kafka_code...
> > [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
> > [warn] module not found:
> > 

Re: Spark-Kafka integration - build failing with sbt

2017-06-17 Thread Cody Koeninger
There are different projects for different versions of kafka,
spark-streaming-kafka-0-8 and spark-streaming-kafka-0-10

See

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

On Fri, Jun 16, 2017 at 6:51 PM, karan alang  wrote:
> I'm trying to compile kafka & Spark Streaming integration code i.e. reading
> from Kafka using Spark Streaming,
>   and the sbt build is failing with error -
>
>   [error] (*:update) sbt.ResolveException: unresolved dependency:
> org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
>
>   Scala version -> 2.10.7
>   Spark Version -> 2.1.0
>   Kafka version -> 0.9
>   sbt version -> 0.13
>
> Contents of sbt files is as shown below ->
>
> 1)
>   vi spark_kafka_code/project/plugins.sbt
>
>   addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
>
>  2)
>   vi spark_kafka_code/sparkkafka.sbt
>
> import AssemblyKeys._
> assemblySettings
>
> name := "SparkKafka Project"
>
> version := "1.0"
> scalaVersion := "2.11.7"
>
> val sparkVers = "2.1.0"
>
> // Base Spark-provided dependencies
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % sparkVers % "provided",
>   "org.apache.spark" %% "spark-streaming" % sparkVers % "provided",
>   "org.apache.spark" %% "spark-streaming-kafka" % sparkVers)
>
> mergeStrategy in assembly := {
>   case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
>   case m if m.toLowerCase.startsWith("META-INF")  => MergeStrategy.discard
>   case "reference.conf"   => MergeStrategy.concat
>   case m if m.endsWith("UnusedStubClass.class")   => MergeStrategy.discard
>   case _ => MergeStrategy.first
> }
>
>   i launch sbt, and then try to create an eclipse project, complete error is
> as shown below -
>
>   -
>
>   sbt
> [info] Loading global plugins from /Users/karanalang/.sbt/0.13/plugins
> [info] Loading project definition from
> /Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/project
> [info] Set current project to SparkKafka Project (in build
> file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/)
>> eclipse
> [info] About to create Eclipse project files for your project(s).
> [info] Updating
> {file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/}spark_kafka_code...
> [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
> [warn] module not found:
> org.apache.spark#spark-streaming-kafka_2.11;2.1.0
> [warn]  local: tried
> [warn]
> /Users/karanalang/.ivy2/local/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [warn]  activator-launcher-local: tried
> [warn]
> /Users/karanalang/.activator/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [warn]  activator-local: tried
> [warn]
> /Users/karanalang/Documents/Technology/SCALA/activator-dist-1.3.10/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [warn]  public: tried
> [warn]
> https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
> [warn]  typesafe-releases: tried
> [warn]
> http://repo.typesafe.com/typesafe/releases/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
> [warn]  typesafe-ivy-releasez: tried
> [warn]
> http://repo.typesafe.com/typesafe/ivy-releases/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [info] Resolving jline#jline;2.12.1 ...
> [warn] ::
> [warn] ::  UNRESOLVED DEPENDENCIES ::
> [warn] ::
> [warn] :: org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
> [warn] ::
> [warn]
> [warn] Note: Unresolved dependencies path:
> [warn] org.apache.spark:spark-streaming-kafka_2.11:2.1.0
> (/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/sparkkafka.sbt#L12-16)
> [warn]   +- sparkkafka-project:sparkkafka-project_2.11:1.0
> [trace] Stack trace suppressed: run last *:update for the full output.
> [error] (*:update) sbt.ResolveException: unresolved dependency:
> org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
> [info] Updating
> {file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/}spark_kafka_code...
> [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
> [warn] module not found:
> org.apache.spark#spark-streaming-kafka_2.11;2.1.0
> [warn]  local: tried
> [warn]
> /Users/karanalang/.ivy2/local/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [warn]  activator-launcher-local: tried
> [warn]
> /Users/karanalang/.activator/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [warn]  activator-local: tried
> [warn]
> 

Spark-Kafka integration - build failing with sbt

2017-06-16 Thread karan alang
I'm trying to compile kafka & Spark Streaming integration code i.e. reading
from Kafka using Spark Streaming,
  and the sbt build is failing with error -

  [error] (*:update) sbt.ResolveException: unresolved dependency:
org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found

  Scala version -> 2.10.7
  Spark Version -> 2.1.0
  Kafka version -> 0.9
  sbt version -> 0.13

Contents of sbt files is as shown below ->

1)
  vi spark_kafka_code/project/plugins.sbt

  addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

 2)
  vi spark_kafka_code/sparkkafka.sbt

import AssemblyKeys._
assemblySettings

name := "SparkKafka Project"

version := "1.0"
scalaVersion := "2.11.7"

val sparkVers = "2.1.0"

// Base Spark-provided dependencies
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVers % "provided",
  "org.apache.spark" %% "spark-streaming" % sparkVers % "provided",
  "org.apache.spark" %% "spark-streaming-kafka" % sparkVers)

mergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
  case m if m.toLowerCase.startsWith("META-INF")  => MergeStrategy.discard
  case "reference.conf"   => MergeStrategy.concat
  case m if m.endsWith("UnusedStubClass.class")   => MergeStrategy.discard
  case _ => MergeStrategy.first
}

  i launch sbt, and then try to create an eclipse project, complete error
is as shown below -

  -

  sbt
[info] Loading global plugins from /Users/karanalang/.sbt/0.13/plugins
[info] Loading project definition from
/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/project
[info] Set current project to SparkKafka Project (in build
file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/)
> eclipse
[info] About to create Eclipse project files for your project(s).
[info] Updating
{file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/}spark_kafka_code...
[info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
[warn] module not found:
org.apache.spark#spark-streaming-kafka_2.11;2.1.0
[warn]  local: tried
[warn]
/Users/karanalang/.ivy2/local/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
[warn]  activator-launcher-local: tried
[warn]
/Users/karanalang/.activator/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
[warn]  activator-local: tried
[warn]
/Users/karanalang/Documents/Technology/SCALA/activator-dist-1.3.10/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
[warn]  public: tried
[warn]
https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
[warn]  typesafe-releases: tried
[warn]
http://repo.typesafe.com/typesafe/releases/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
[warn]  typesafe-ivy-releasez: tried
[warn]
http://repo.typesafe.com/typesafe/ivy-releases/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
[info] Resolving jline#jline;2.12.1 ...
[warn] ::
[warn] ::  UNRESOLVED DEPENDENCIES ::
[warn] ::
[warn] :: org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
[warn] ::
[warn]
[warn] Note: Unresolved dependencies path:
[warn] org.apache.spark:spark-streaming-kafka_2.11:2.1.0
(/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/sparkkafka.sbt#L12-16)
[warn]   +- sparkkafka-project:sparkkafka-project_2.11:1.0
[trace] Stack trace suppressed: run last *:update for the full output.
[error] (*:update) sbt.ResolveException: unresolved dependency:
org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
[info] Updating
{file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/}spark_kafka_code...
[info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
[warn] module not found:
org.apache.spark#spark-streaming-kafka_2.11;2.1.0
[warn]  local: tried
[warn]
/Users/karanalang/.ivy2/local/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
[warn]  activator-launcher-local: tried
[warn]
/Users/karanalang/.activator/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
[warn]  activator-local: tried
[warn]
/Users/karanalang/Documents/Technology/SCALA/activator-dist-1.3.10/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
[warn]  public: tried
[warn]
https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
[warn]  typesafe-releases: tried
[warn]
http://repo.typesafe.com/typesafe/releases/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
[warn]  

reading binary file in spark-kafka streaming

2017-04-05 Thread Yogesh Vyas
Hi,

I am having a binary file which I try to read in Kafka Producer and send to
message queue. This I read in the Spark-Kafka consumer as streaming job.
But it is giving me following error:

UnicodeDecodeError: 'utf8' codec can't decode byte 0xa9 in position 112:
invalid start byte

Can anyone please tell me why that error is and how to fix it?

Regards,
Yogesh


Re: spark kafka consumer with kerberos

2017-03-31 Thread Saisai Shao
Hi Bill,

Normally Kerberos principal and keytab should be enough, because keytab
could actually represent the password. Did you configure SASL/GSSAPI or
SASL/Plain for KafkaClient?
http://kafka.apache.org/documentation.html#security_sasl

Actually this is more like a Kafka question and normally should be a
configuration issue, I would suggest you to ask this question in Kafka mail
list.

Thanks
Saisai


On Fri, Mar 31, 2017 at 10:28 PM, Bill Schwanitz  wrote:

> Saisai,
>
> Yea that seems to have helped. Looks like the kerberos ticket when I
> submit does not get passed to the executor?
>
> ... 3 more
> Caused by: org.apache.kafka.common.KafkaException:
> javax.security.auth.login.LoginException: Unable to obtain password from
> user
>
> at org.apache.kafka.common.network.SaslChannelBuilder.
> configure(SaslChannelBuilder.java:86)
> at org.apache.kafka.common.network.ChannelBuilders.
> create(ChannelBuilders.java:70)
> at org.apache.kafka.clients.ClientUtils.createChannelBuilder(
> ClientUtils.java:83)
> at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:623)
> ... 14 more
> Caused by: javax.security.auth.login.LoginException: Unable to obtain
> password from user
>
>
> On Fri, Mar 31, 2017 at 9:08 AM, Saisai Shao 
> wrote:
>
>> Hi Bill,
>>
>> The exception is from executor side. From the gist you provided, looks
>> like the issue is that you only configured java options in driver side, I
>> think you should also configure this in executor side. You could refer to
>> here (https://github.com/hortonworks-spark/skc#running-on-a-
>> kerberos-enabled-cluster).
>>
>> --files key.conf#key.conf,v.keytab#v.keytab
>> --driver-java-options "-Djava.security.auth.login.config=./key.conf"
>> --conf 
>> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./key.conf"
>>
>>
>> On Fri, Mar 31, 2017 at 1:58 AM, Bill Schwanitz 
>> wrote:
>>
>>> I'm working on a poc spark job to pull data from a kafka topic with
>>> kerberos enabled ( required ) brokers.
>>>
>>> The code seems to connect to kafka and enter a polling mode. When I toss
>>> something onto the topic I get an exception which I just can't seem to
>>> figure out. Any ideas?
>>>
>>> I have a full gist up at https://gist.github.com/bil
>>> sch/17f4a4c4303ed3e004e2234a5904f0de with a lot of details. If I use
>>> the hdfs/spark client code for just normal operations everything works fine
>>> but for some reason the streaming code is having issues. I have verified
>>> the KafkaClient object is in the jaas config. The keytab is good etc.
>>>
>>> Guessing I'm doing something wrong I just have not figured out what yet!
>>> Any thoughts?
>>>
>>> The exception:
>>>
>>> 17/03/30 12:54:00 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID
>>> 0, host5.some.org.net): org.apache.kafka.common.KafkaException: Failed
>>> to construct kafka consumer
>>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>>> Consumer.java:702)
>>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>>> Consumer.java:557)
>>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>>> Consumer.java:540)
>>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.>> t>(CachedKafkaConsumer.scala:47)
>>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get
>>> (CachedKafkaConsumer.scala:157)
>>> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>>> r.(KafkaRDD.scala:210)
>>> at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRD
>>> D.scala:185)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: org.apache.kafka.common.KafkaException:
>>> org.apache.kafka.common.KafkaException: Jaas configuration not found
>>> at org.apache.kafka.common.network.SaslChannelBuilder.configure
>>> (SaslChannelBuilder.java:86)
>>> at org.apache.kafka.common.network.ChannelBuilders.create(Chann
>>> elBuilders.java:70)
>>> at org.apache.kafka.clients.ClientUtils.createChannelBuilder(Cl
>>> ientUtils.java:83)
>>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>>> Consumer.java:623)
>>> ... 14 more
>>> Caused by: org.apache.kafka.common.KafkaException: Jaas configuration
>>> not found
>>> at org.apache.kafka.common.security.kerberos.KerberosLogin.getS
>>> erviceName(KerberosLogin.java:299)
>>> at org.apache.kafka.common.security.kerberos.KerberosLogin.conf
>>> igure(KerberosLogin.java:103)
>>> at 

Re: spark kafka consumer with kerberos

2017-03-31 Thread Bill Schwanitz
Saisai,

Yea that seems to have helped. Looks like the kerberos ticket when I submit
does not get passed to the executor?

... 3 more
Caused by: org.apache.kafka.common.KafkaException:
javax.security.auth.login.LoginException: Unable to obtain password from
user

at
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623)
... 14 more
Caused by: javax.security.auth.login.LoginException: Unable to obtain
password from user


On Fri, Mar 31, 2017 at 9:08 AM, Saisai Shao  wrote:

> Hi Bill,
>
> The exception is from executor side. From the gist you provided, looks
> like the issue is that you only configured java options in driver side, I
> think you should also configure this in executor side. You could refer to
> here (https://github.com/hortonworks-spark/skc#running-
> on-a-kerberos-enabled-cluster).
>
> --files key.conf#key.conf,v.keytab#v.keytab
> --driver-java-options "-Djava.security.auth.login.config=./key.conf"
> --conf 
> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./key.conf"
>
>
> On Fri, Mar 31, 2017 at 1:58 AM, Bill Schwanitz  wrote:
>
>> I'm working on a poc spark job to pull data from a kafka topic with
>> kerberos enabled ( required ) brokers.
>>
>> The code seems to connect to kafka and enter a polling mode. When I toss
>> something onto the topic I get an exception which I just can't seem to
>> figure out. Any ideas?
>>
>> I have a full gist up at https://gist.github.com/bil
>> sch/17f4a4c4303ed3e004e2234a5904f0de with a lot of details. If I use the
>> hdfs/spark client code for just normal operations everything works fine but
>> for some reason the streaming code is having issues. I have verified the
>> KafkaClient object is in the jaas config. The keytab is good etc.
>>
>> Guessing I'm doing something wrong I just have not figured out what yet!
>> Any thoughts?
>>
>> The exception:
>>
>> 17/03/30 12:54:00 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
>> host5.some.org.net): org.apache.kafka.common.KafkaException: Failed to
>> construct kafka consumer
>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>> Consumer.java:702)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>> Consumer.java:557)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>> Consumer.java:540)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.> t>(CachedKafkaConsumer.scala:47)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get
>> (CachedKafkaConsumer.scala:157)
>> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>> r.(KafkaRDD.scala:210)
>> at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRD
>> D.scala:185)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.apache.kafka.common.KafkaException:
>> org.apache.kafka.common.KafkaException: Jaas configuration not found
>> at org.apache.kafka.common.network.SaslChannelBuilder.configure
>> (SaslChannelBuilder.java:86)
>> at org.apache.kafka.common.network.ChannelBuilders.create(
>> ChannelBuilders.java:70)
>> at org.apache.kafka.clients.ClientUtils.createChannelBuilder(Cl
>> ientUtils.java:83)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>> Consumer.java:623)
>> ... 14 more
>> Caused by: org.apache.kafka.common.KafkaException: Jaas configuration
>> not found
>> at org.apache.kafka.common.security.kerberos.KerberosLogin.
>> getServiceName(KerberosLogin.java:299)
>> at org.apache.kafka.common.security.kerberos.KerberosLogin.
>> configure(KerberosLogin.java:103)
>> at org.apache.kafka.common.security.authenticator.LoginManager.
>> (LoginManager.java:45)
>> at org.apache.kafka.common.security.authenticator.LoginManager.
>> acquireLoginManager(LoginManager.java:68)
>> at org.apache.kafka.common.network.SaslChannelBuilder.configure
>> (SaslChannelBuilder.java:78)
>> ... 17 more
>> Caused by: java.io.IOException: Could not find a 'KafkaClient' entry in
>> this configuration.
>> at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUt
>> ils.java:50)
>> at org.apache.kafka.common.security.kerberos.KerberosLogin.
>> getServiceName(KerberosLogin.java:297)
>> ... 21 more
>>
>
>


Re: spark kafka consumer with kerberos

2017-03-31 Thread Saisai Shao
Hi Bill,

The exception is from executor side. From the gist you provided, looks like
the issue is that you only configured java options in driver side, I think
you should also configure this in executor side. You could refer to here (
https://github.com/hortonworks-spark/skc#running-on-a-kerberos-enabled-cluster
).

--files key.conf#key.conf,v.keytab#v.keytab
--driver-java-options "-Djava.security.auth.login.config=./key.conf"
--conf 
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./key.conf"


On Fri, Mar 31, 2017 at 1:58 AM, Bill Schwanitz  wrote:

> I'm working on a poc spark job to pull data from a kafka topic with
> kerberos enabled ( required ) brokers.
>
> The code seems to connect to kafka and enter a polling mode. When I toss
> something onto the topic I get an exception which I just can't seem to
> figure out. Any ideas?
>
> I have a full gist up at https://gist.github.com/bilsch/
> 17f4a4c4303ed3e004e2234a5904f0de with a lot of details. If I use the
> hdfs/spark client code for just normal operations everything works fine but
> for some reason the streaming code is having issues. I have verified the
> KafkaClient object is in the jaas config. The keytab is good etc.
>
> Guessing I'm doing something wrong I just have not figured out what yet!
> Any thoughts?
>
> The exception:
>
> 17/03/30 12:54:00 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> host5.some.org.net): org.apache.kafka.common.KafkaException: Failed to
> construct kafka consumer
> at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:702)
> at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:557)
> at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:540)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.<
> init>(CachedKafkaConsumer.scala:47)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.
> get(CachedKafkaConsumer.scala:157)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(
> KafkaRDD.scala:210)
> at org.apache.spark.streaming.kafka010.KafkaRDD.compute(
> KafkaRDD.scala:185)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.KafkaException: Jaas configuration not found
> at org.apache.kafka.common.network.SaslChannelBuilder.
> configure(SaslChannelBuilder.java:86)
> at org.apache.kafka.common.network.ChannelBuilders.
> create(ChannelBuilders.java:70)
> at org.apache.kafka.clients.ClientUtils.createChannelBuilder(
> ClientUtils.java:83)
> at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:623)
> ... 14 more
> Caused by: org.apache.kafka.common.KafkaException: Jaas configuration not
> found
> at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(
> KerberosLogin.java:299)
> at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(
> KerberosLogin.java:103)
> at org.apache.kafka.common.security.authenticator.LoginManager.(
> LoginManager.java:45)
> at org.apache.kafka.common.security.authenticator.LoginManager.
> acquireLoginManager(LoginManager.java:68)
> at org.apache.kafka.common.network.SaslChannelBuilder.
> configure(SaslChannelBuilder.java:78)
> ... 17 more
> Caused by: java.io.IOException: Could not find a 'KafkaClient' entry in
> this configuration.
> at org.apache.kafka.common.security.JaasUtils.jaasConfig(
> JaasUtils.java:50)
> at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(
> KerberosLogin.java:297)
> ... 21 more
>


spark kafka consumer with kerberos

2017-03-30 Thread Bill Schwanitz
I'm working on a poc spark job to pull data from a kafka topic with
kerberos enabled ( required ) brokers.

The code seems to connect to kafka and enter a polling mode. When I toss
something onto the topic I get an exception which I just can't seem to
figure out. Any ideas?

I have a full gist up at
https://gist.github.com/bilsch/17f4a4c4303ed3e004e2234a5904f0de with a lot
of details. If I use the hdfs/spark client code for just normal operations
everything works fine but for some reason the streaming code is having
issues. I have verified the KafkaClient object is in the jaas config. The
keytab is good etc.

Guessing I'm doing something wrong I just have not figured out what yet!
Any thoughts?

The exception:

17/03/30 12:54:00 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
host5.some.org.net): org.apache.kafka.common.KafkaException: Failed to
construct kafka consumer
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:702)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:557)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:540)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.(CachedKafkaConsumer.scala:47)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:157)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210)
at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Jaas configuration not found
at
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623)
... 14 more
Caused by: org.apache.kafka.common.KafkaException: Jaas configuration not
found
at
org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:299)
at
org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103)
at
org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:45)
at
org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
at
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
... 17 more
Caused by: java.io.IOException: Could not find a 'KafkaClient' entry in
this configuration.
at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:50)
at
org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:297)
... 21 more


Re: [Spark Kafka] API Doc pages for Kafka 0.10 not current

2017-02-28 Thread Cody Koeninger
The kafka-0-8 and kafka-0-10 integrations have conflicting
dependencies.  Last time I checked, Spark's doc publication puts
everything all in one classpath, so publishing them both together
won't work.  I thought there was already a Jira ticket related to
this, but a quick look didn't turn it up.

On Mon, Feb 27, 2017 at 3:01 PM, Afshartous, Nick
<nafshart...@wbgames.com> wrote:
>
> Hello,
>
>
> Looks like the API docs linked from the Spark Kafka 0.10 Integration page
> are not current.
>
>
> For instance, on the page
>
>
>
> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>
>
> the code examples show the new API (i.e. class ConsumerStrategies).
> However, following the links
>
>
> API Docs --> (Scala | Java)
>
>
> leads to API pages that do not have class ConsumerStrategies) .  The API doc
> package names  also have streaming.kafka as opposed to streaming.kafka10.
>
>
> --
>
> Nick

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark Kafka] API Doc pages for Kafka 0.10 not current

2017-02-27 Thread Afshartous, Nick

Hello,


Looks like the API docs linked from the Spark Kafka 0.10 Integration page are 
not current.


For instance, on the page


   https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html


the code examples show the new API (i.e. class ConsumerStrategies).  However, 
following the links


API Docs --> (Scala | Java)


leads to API pages that do not have class ConsumerStrategies) .  The API doc 
package names  also have streaming.kafka as opposed to streaming.kafka10.


--

Nick


Re: [Spark Kafka] How to update batch size of input dynamically for spark kafka consumer?

2017-01-03 Thread Cody Koeninger
You can't change the batch time, but you can limit the number of items
in the batch

http://spark.apache.org/docs/latest/configuration.html

spark.streaming.backpressure.enabled

spark.streaming.kafka.maxRatePerPartition

On Tue, Jan 3, 2017 at 4:00 AM, 周家帅  wrote:
> Hi,
>
> I am an intermediate spark user and have some experience in large data
> processing. I post this question in StackOverflow but receive no response.
> My problem is as follows:
>
> I use createDirectStream in my spark streaming application. I set the batch
> interval to 7 seconds and most of the time the batch job can finish within
> about 5 seconds. However, in very rare cases, the batch job need cost 60
> seconds and this will delay some batches of jobs. To cut down the total
> delay time for these batches, I hope I can process more streaming data which
> spread over the delayed jobs at one time. This will help the streaming
> return to normal as soon as possible.
>
> So, I want to know there is some method to dynamically update/merge batch
> size of input for spark and kafka when delay appears.
>
> Many thanks for your help.
>
>
> --
> Jiashuai Zhou
>
> School of Electronics Engineering and Computer Science,
> Peking University
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark Kafka] How to update batch size of input dynamically for spark kafka consumer?

2017-01-03 Thread 周家帅
Hi,

I am an intermediate spark user and have some experience in large data
processing. I post this question in StackOverflow but receive no response.
My problem is as follows:

I use createDirectStream in my spark streaming application. I set the batch
interval to 7 seconds and most of the time the batch job can finish within
about 5 seconds. However, in very rare cases, the batch job need cost 60
seconds and this will delay some batches of jobs. To cut down the total
delay time for these batches, I hope I can process more streaming data
which spread over the delayed jobs at one time. This will help the
streaming return to normal as soon as possible.

So, I want to know there is some method to dynamically update/merge batch
size of input for spark and kafka when delay appears.

Many thanks for your help.

-- 
Jiashuai Zhou

School of Electronics Engineering and Computer Science,
Peking University


Re: Back-pressure to Spark Kafka Streaming?

2016-12-05 Thread Cody Koeninger
If you want finer-grained max rate setting, SPARK-17510 got merged a
while ago.  There's also SPARK-18580 which might help address the
issue of starting backpressure rate for the first batch.

On Mon, Dec 5, 2016 at 4:18 PM, Liren Ding <sky.gonna.bri...@gmail.com> wrote:
> Hey all,
>
> Does backressure actually work on spark kafka streaming? According to the
> latest spark streaming document:
> http://spark.apache.org/docs/latest/streaming-programming-guide.html
> "In Spark 1.5, we have introduced a feature called backpressure that
> eliminate the need to set this rate limit, as Spark Streaming automatically
> figures out the rate limits and dynamically adjusts them if the processing
> conditions change. This backpressure can be enabled by setting the
> configuration parameter spark.streaming.backpressure.enabled to true."
> But I also see a few open spark jira tickets on this option:
> https://issues.apache.org/jira/browse/SPARK-7398
> https://issues.apache.org/jira/browse/SPARK-18371
>
> The case in the second ticket describes a similar issue as we have here. We
> use Kafka to send large batches (10~100M) to spark streaming, and the spark
> streaming interval is set to 1~4 minutes. With the backpressure set to true,
> the queued active batches still pile up when average batch processing time
> takes longer than default interval. After the spark driver is restarted, all
> queued batches turn to a giant batch, which block subsequent batches and
> also have a great chance to fail eventually. The only config we found that
> might help is "spark.streaming.kafka.maxRatePerPartition". It does limit the
> incoming batch size, but not a perfect solution since it depends on size of
> partition as well as the length of batch interval. For our case, hundreds of
> partitions X minutes of interval still produce a number that is too large
> for each batch. So we still want to figure out how to make the backressure
> work in spark kafka streaming, if it is supposed to work there. Thanks.
>
>
> Liren
>
>
>
>
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Back-pressure to Spark Kafka Streaming?

2016-12-05 Thread Richard Startin
I've seen the feature work very well. For tuning, you've got:

spark.streaming.backpressure.pid.proportional (defaults to 1, non-negative) - 
weight for response to "error" (change between last batch and this batch)
spark.streaming.backpressure.pid.integral (defaults to 0.2, non-negative) - 
weight for the response to the accumulation of error. This has a dampening 
effect.
spark.streaming.backpressure.pid.derived (defaults to zero, non-negative) - 
weight for the response to the trend in error. This can cause 
arbitrary/noise-induced fluctuations in batch size, but can also help react 
quickly to increased/reduced capacity.
spark.streaming.backpressure.pid.minRate - the default value is 100 (must be 
positive), batch size won't go below this.

spark.streaming.receiver.maxRate - batch size won't go above this.


Cheers,

Richard


https://richardstartin.com/



From: Liren Ding <sky.gonna.bri...@gmail.com>
Sent: 05 December 2016 22:18
To: d...@spark.apache.org; user@spark.apache.org
Subject: Back-pressure to Spark Kafka Streaming?

Hey all,

Does backressure actually work on spark kafka streaming? According to the 
latest spark streaming document:
http://spark.apache.org/docs/latest/streaming-programming-guide.html
"In Spark 1.5, we have introduced a feature called backpressure that eliminate 
the need to set this rate limit, as Spark Streaming automatically figures out 
the rate limits and dynamically adjusts them if the processing conditions 
change. This backpressure can be enabled by setting the configuration parameter 
spark.streaming.backpressure.enabled to true."
But I also see a few open spark jira tickets on this option:
https://issues.apache.org/jira/browse/SPARK-7398
https://issues.apache.org/jira/browse/SPARK-18371

The case in the second ticket describes a similar issue as we have here. We use 
Kafka to send large batches (10~100M) to spark streaming, and the spark 
streaming interval is set to 1~4 minutes. With the backpressure set to true, 
the queued active batches still pile up when average batch processing time 
takes longer than default interval. After the spark driver is restarted, all 
queued batches turn to a giant batch, which block subsequent batches and also 
have a great chance to fail eventually. The only config we found that might 
help is "spark.streaming.kafka.maxRatePerPartition". It does limit the incoming 
batch size, but not a perfect solution since it depends on size of partition as 
well as the length of batch interval. For our case, hundreds of partitions X 
minutes of interval still produce a number that is too large for each batch. So 
we still want to figure out how to make the backressure work in spark kafka 
streaming, if it is supposed to work there. Thanks.


Liren









Back-pressure to Spark Kafka Streaming?

2016-12-05 Thread Liren Ding
Hey all,

Does backressure actually work on spark kafka streaming? According to the
latest spark streaming document:
*http://spark.apache.org/docs/latest/streaming-programming-guide.html
<http://spark.apache.org/docs/latest/streaming-programming-guide.html>*
"*In Spark 1.5, we have introduced a feature called backpressure that
eliminate the need to set this rate limit, as Spark Streaming automatically
figures out the rate limits and dynamically adjusts them if the processing
conditions change. This backpressure can be enabled by setting the
configuration parameter spark.streaming.backpressure.enabled to true.*"
But I also see a few open spark jira tickets on this option:

*https://issues.apache.org/jira/browse/SPARK-7398
<https://issues.apache.org/jira/browse/SPARK-7398>*
*https://issues.apache.org/jira/browse/SPARK-18371
<https://issues.apache.org/jira/browse/SPARK-18371>*

The case in the second ticket describes a similar issue as we have here. We
use Kafka to send large batches (10~100M) to spark streaming, and the spark
streaming interval is set to 1~4 minutes. With the backpressure set to
true, the queued active batches still pile up when average batch processing
time takes longer than default interval. After the spark driver is
restarted, all queued batches turn to a giant batch, which block subsequent
batches and also have a great chance to fail eventually. The only config we
found that might help is "*spark.streaming.kafka.maxRatePerPartition*". It
does limit the incoming batch size, but not a perfect solution since it
depends on size of partition as well as the length of batch interval. For
our case, hundreds of partitions X minutes of interval still produce a
number that is too large for each batch. So we still want to figure out how
to make the backressure work in spark kafka streaming, if it is supposed to
work there. Thanks.


Liren


Re: Spark kafka integration issues

2016-09-14 Thread Cody Koeninger
Yeah, an updated version of that blog post is available at

https://github.com/koeninger/kafka-exactly-once

On Wed, Sep 14, 2016 at 11:35 AM, Mukesh Jha  wrote:
> Thanks for the reply Cody.
>
> I found the below article on the same, very helpful. Thanks for the details,
> much appreciated.
>
> http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/
>
> On Tue, Sep 13, 2016 at 8:14 PM, Cody Koeninger  wrote:
>>
>> 1.  see
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>  look for HasOffsetRange.  If you really want the info per-message
>> rather than per-partition, createRDD has an overload that takes a
>> messageHandler from MessageAndMetadata to whatever you need
>>
>> 2. createRDD takes type parameters for the key and value decoder, so
>> specify them there
>>
>> 3. you can use spark-streaming-kafka-0-8 against 0.9 or 0.10 brokers.
>> There is a spark-streaming-kafka-0-10 package with additional features
>> that only works on brokers 0.10 or higher.  A pull request for
>> documenting it has been merged, but not deployed.
>>
>> On Tue, Sep 13, 2016 at 6:46 PM, Mukesh Jha 
>> wrote:
>> > Hello fellow sparkers,
>> >
>> > I'm using spark to consume messages from kafka in a non streaming
>> > fashion.
>> > I'm suing the using spark-streaming-kafka-0-8_2.10 & sparkv2.0to do the
>> > same.
>> >
>> > I have a few queries for the same, please get back if you guys have
>> > clues on
>> > the same.
>> >
>> > 1) Is there anyway to get the have the topic and partition & offset
>> > information for each item from the KafkaRDD. I'm using the
>> > KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder] to
>> > create
>> > my kafka RDD.
>> > 2) How to pass my custom Decoder instead of using the String or Byte
>> > decoder
>> > are there any examples for the same?
>> > 3) is there a newer version to consumer from kafka-0.10 & kafka-0.9
>> > clusters
>> >
>> > --
>> > Thanks & Regards,
>> >
>> > Mukesh Jha
>
>
>
>
> --
>
>
> Thanks & Regards,
>
> Mukesh Jha

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark kafka integration issues

2016-09-14 Thread Mukesh Jha
Thanks for the reply Cody.

I found the below article on the same, very helpful. Thanks for the
details, much appreciated.

http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/

On Tue, Sep 13, 2016 at 8:14 PM, Cody Koeninger  wrote:

> 1.  see http://spark.apache.org/docs/latest/streaming-kafka-
> integration.html#approach-2-direct-approach-no-receivers
>  look for HasOffsetRange.  If you really want the info per-message
> rather than per-partition, createRDD has an overload that takes a
> messageHandler from MessageAndMetadata to whatever you need
>
> 2. createRDD takes type parameters for the key and value decoder, so
> specify them there
>
> 3. you can use spark-streaming-kafka-0-8 against 0.9 or 0.10 brokers.
> There is a spark-streaming-kafka-0-10 package with additional features
> that only works on brokers 0.10 or higher.  A pull request for
> documenting it has been merged, but not deployed.
>
> On Tue, Sep 13, 2016 at 6:46 PM, Mukesh Jha 
> wrote:
> > Hello fellow sparkers,
> >
> > I'm using spark to consume messages from kafka in a non streaming
> fashion.
> > I'm suing the using spark-streaming-kafka-0-8_2.10 & sparkv2.0to do the
> > same.
> >
> > I have a few queries for the same, please get back if you guys have
> clues on
> > the same.
> >
> > 1) Is there anyway to get the have the topic and partition & offset
> > information for each item from the KafkaRDD. I'm using the
> > KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder] to
> create
> > my kafka RDD.
> > 2) How to pass my custom Decoder instead of using the String or Byte
> decoder
> > are there any examples for the same?
> > 3) is there a newer version to consumer from kafka-0.10 & kafka-0.9
> clusters
> >
> > --
> > Thanks & Regards,
> >
> > Mukesh Jha
>



-- 


Thanks & Regards,

*Mukesh Jha *


  1   2   >