spark3.0 read kudu data

2020-12-07 Thread 冯宝利
Hi:
Recently, we are upgrading spark from 2.4 to 3.0. We are doing performance 
testing and found some performance problems.Through the comparative test, it is 
found that spark3.0 reads kudu data much slower than 2.4. Normally, spark2.4 
takes 0.1-1s to read the same amount of data, but spark3.0 takes 1 minute to 2 
minutes.Both versions of spark use the same spark submit parameter and run in 
local mode. The read kudu clusters, tables and query conditions are consistent.
The only difference is that the kudu spark package is different, and that 
for spark2.4 is kudu-spark2_2.11,scala version is  2.11, spark3.0 uses 
kudu-spark3_2.12 ,scala  version is  2.12(This package is based on the Java 
version compiled by kudu 1.13,use spark 3.0.0 and scala 2.12 pom.xml file )
Our cluster uses CDH 6.3.1 and kudu version is 1.10.In view of this 
situation, what can be optimized or suggestions to improve the performance of 
kudu reading data?
Thanks!


Is there a better way to read kerberized impala tables by spark jdbc?

2020-12-07 Thread eab...@163.com
Hi:

I want to use spark jdbc to read kerberized impala tables, like:
```
val impalaUrl = 
"jdbc:impala://:21050;AuthMech=1;KrbRealm=REALM.COM;KrbHostFQDN=;KrbServiceName=impala"
spark.read.jdbc(impalaUrl)
```

As we know, spark will read impala data by executor rather than driver, so 
throw excepting:  javax.security.sasl.SaslException: GSS initiate failed

```
Caused by: org.ietf.jgss.GSSException: No valid credentials provided (Mechanism 
level: Failed to find any Kerberos tgt)
at 
sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
at 
sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:122)
at 
sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
at 
sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:224)
at 
sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
at 
sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
at 
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
... 20 common frames omitted

``` 

Ony way to solve this problem is set jaas.conf by 
"java.security.auth.login.config" property, 

This is jaas.conf:

```
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  doNotPrompt=true
  useTicketCache=true
  principal="test"
  keyTab="/home/keytab/user.keytab";
   };

```

Then set spark.executor.extraJavaOptions like :
```
--conf 
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/data/disk1/spark-jdbc-impala/conf/jaas.conf
 -Djavax.security.auth.useSubjectCredsOnly=false" 
```

This way required absolute jaas.conf file and keyTab file, in other words, 
these files must be placed in the same path and on each node, Is there a better 
way?

Please help.

Regards




eab...@163.com


Spark in hybrid cloud in AWS & GCP

2020-12-07 Thread Bin Fan
Dear Spark users,

If you are interested in running Spark in Hybrid Cloud? Checkout talks from
AWS & GCP at the virtual Data Orchestration Summit
 on Dec. 8-9, 2020,
register for free .

The summit has speaker lineup spans creators and committers of Alluxio,
Spark, Presto, Tensorflow, K8s to data engineers and software engineers
building cloud-native data and AI platforms at Amazon, Alibaba, Comcast,
Facebook, Google, ING Bank, Microsoft, Tencent, and more!


- Bin Fan


Re: Missing required configuration "partition.assignment.strategy" [ Kafka + Spark Structured Streaming ]

2020-12-07 Thread Amit Joshi
Hi Gabor,

Pls find the logs attached. These are truncated logs.

Command used :
spark-submit --verbose --packages
org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.typesafe:config:1.4.0
--master yarn --deploy-mode cluster --class com.stream.Main --num-executors
2 --driver-memory 2g --executor-cores 1 --executor-memory 4g --files
gs://x/jars_application.conf,gs://x/log4j.properties
gs://x/a-synch-r-1.0-SNAPSHOT.jar
For this I used a snapshot jar, not a fat jar.


Regards
Amit

On Mon, Dec 7, 2020 at 10:15 PM Gabor Somogyi 
wrote:

> Well, I can't do miracle without cluster and logs access.
> What I don't understand why you need fat jar?! Spark libraries normally
> need provided scope because it must exist on all machines...
> I would take a look at the driver and executor logs which contains the
> consumer configs + I would take a look at the exact version of the consumer
> (this is printed also in the same log)
>
> G
>
>
> On Mon, Dec 7, 2020 at 5:07 PM Amit Joshi 
> wrote:
>
>> Hi Gabor,
>>
>> The code is very simple Kafka consumption of data.
>> I guess, it may be the cluster.
>> Can you please point out the possible problem toook for in the cluster?
>>
>> Regards
>> Amit
>>
>> On Monday, December 7, 2020, Gabor Somogyi 
>> wrote:
>>
>>> + Adding back user list.
>>>
>>> I've had a look at the Spark code and it's not
>>> modifying "partition.assignment.strategy" so the problem
>>> must be either in your application or in your cluster setup.
>>>
>>> G
>>>
>>>
>>> On Mon, Dec 7, 2020 at 12:31 PM Gabor Somogyi 
>>> wrote:
>>>
 It's super interesting because that field has default value:
 *org.apache.kafka.clients.consumer.RangeAssignor*

 On Mon, 7 Dec 2020, 10:51 Amit Joshi, 
 wrote:

> Hi,
>
> Thnks for the reply.
> I did tried removing the client version.
> But got the same exception.
>
>
> Thnks
>
> On Monday, December 7, 2020, Gabor Somogyi 
> wrote:
>
>> +1 on the mentioned change, Spark uses the following kafka-clients
>> library:
>>
>> 2.4.1
>>
>> G
>>
>>
>> On Mon, Dec 7, 2020 at 9:30 AM German Schiavon <
>> gschiavonsp...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I think the issue is that you are overriding the kafka-clients that
>>> comes with  spark-sql-kafka-0-10_2.12
>>>
>>>
>>> I'd try removing the kafka-clients and see if it works
>>>
>>>
>>> On Sun, 6 Dec 2020 at 08:01, Amit Joshi 
>>> wrote:
>>>
 Hi All,

 I am running the Spark Structured Streaming along with Kafka.
 Below is the pom.xml

 
 1.8
 1.8
 UTF-8
 
 2.12.10
 3.0.1
 

 
 org.apache.kafka
 kafka-clients
 2.1.0
 

 
 org.apache.spark
 spark-core_2.12
 ${sparkVersion}
 provided
 
 
 
 org.apache.spark
 spark-sql_2.12
 ${sparkVersion}
 provided
 
 
 
 org.apache.spark
 spark-sql-kafka-0-10_2.12
 ${sparkVersion}
 

 Building the fat jar with shade plugin. The jar is running as expected 
 in my local setup with the command

 *spark-submit --master local[*] --class com.stream.Main 
 --num-executors 3 --driver-memory 2g --executor-cores 2 
 --executor-memory 3g prism-event-synch-rta.jar*

 But when I am trying to run same jar in spark cluster using yarn with 
 command:

 *spark-submit --master yarn --deploy-mode cluster --class 
 com.stream.Main --num-executors 4 --driver-memory 2g --executor-cores 
 1 --executor-memory 4g  gs://jars/prism-event-synch-rta.jar*

 Getting the this exception:




 *at org.apache.spark.sql.execution.streaming.StreamExecution.org 
 $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
   at 
 org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)Caused
  by: org.apache.kafka.common.config.ConfigException: Missing required 
 configuration "partition.assignment.strategy" which has no default 
 value. at 
 org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)*

 I have tried setting up the "partition.assignment.strategy", then also 
 its not working.

 Please help.


 Regards

 Amit Joshi


20/12/07 20:32:46 INFO ResourceUtils: 

Re: Spark UI Storage Memory

2020-12-07 Thread Amit Sharma
any suggestion please.

Thanks
Amit

On Fri, Dec 4, 2020 at 2:27 PM Amit Sharma  wrote:

> Is there any memory leak in spark 2.3.3 version as mentioned in below
> Jira.
> https://issues.apache.org/jira/browse/SPARK-29055.
>
> Please let me know how to solve it.
>
> Thanks
> Amit
>
> On Fri, Dec 4, 2020 at 1:55 PM Amit Sharma  wrote:
>
>> Can someone help me on this please.
>>
>>
>> Thanks
>> Amit
>>
>> On Wed, Dec 2, 2020 at 11:52 AM Amit Sharma  wrote:
>>
>>> Hi , I have a spark streaming job. When I am checking the Excetors tab ,
>>> there is a Storage Memory column. It displays used memory  /total memory.
>>> What is used memory. Is it memory in  use or memory used so far. How would
>>> I know how much memory is unused at 1 point of time.
>>>
>>>
>>> Thanks
>>> Amit
>>>
>>


Re: Caching

2020-12-07 Thread Lalwani, Jayesh
  *   Jayesh, but during logical plan spark would be knowing to use the same DF 
twice so it will optimize the query.

No. That would mean that Spark will need to cache DF1. Spark won’t cache 
dataframes unless you ask it to, even if it knows that the same dataframe is 
being used twice. This is because caching data frames introduces memory 
overheads, and it’s not going to prematurely do it. It will combine processing 
of various dataframes within a stage. However, in your case, you are doing 
aggregation which will create a new stage

You can check the execution plan if you like

From: Amit Sharma 
Reply-To: "resolve...@gmail.com" 
Date: Monday, December 7, 2020 at 1:47 PM
To: "Lalwani, Jayesh" , "user@spark.apache.org" 

Subject: RE: [EXTERNAL] Caching


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Jayesh, but during logical plan spark would be knowing to use the same DF twice 
so it will optimize the query.


Thanks
Amit

On Mon, Dec 7, 2020 at 1:16 PM Lalwani, Jayesh 
mailto:jlalw...@amazon.com>> wrote:
Since DF2 is dependent on DF1, and DF3 is dependent on both DF1 and DF2, 
without caching,  Spark will read the CSV twice: Once to load it for DF1, and 
once to load it for DF2. When you add a cache on DF1 or DF2, it reads from CSV 
only once.

You might want to look at doing a windowed  query on DF1 to avoid joining DF1 
with DF2. This should give you better or similar  performance when compared to  
cache because Spark will optimize for cache the data during shuffle.

From: Amit Sharma mailto:resolve...@gmail.com>>
Reply-To: "resolve...@gmail.com" 
mailto:resolve...@gmail.com>>
Date: Monday, December 7, 2020 at 12:47 PM
To: Theodoros Gkountouvas 
mailto:theo.gkountou...@futurewei.com>>, 
"user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: RE: [EXTERNAL] Caching


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Thanks for the information. I am using  spark 2.3.3 There are few more questions

1. Yes I am using DF1 two times but at the end action is one on DF3. In that 
case action of DF1 should be just 1 or it depends how many times this dataframe 
is used in transformation.

I believe even if we use a dataframe multiple times for transformation , use 
caching should be based on actions. In my case action is one save call on DF3. 
Please correct me if i am wrong.

Thanks
Amit

On Mon, Dec 7, 2020 at 11:54 AM Theodoros Gkountouvas 
mailto:theo.gkountou...@futurewei.com>> wrote:
Hi Amit,

One action might use the same DataFrame more than once. You can look at your 
LogicalPlan by executing DF3.explain (arguments different depending the version 
of Spark you are using) and see how many times you need to compute DF2 or DF1. 
Given the information you have provided I suspect that DF1 is used more than 
once (one time at  DF2 and another one at DF3). So, Spark is going to cache it 
the first time and it will load it from cache instead of running it again the 
second time.

I hope this helped,
Theo.

From: Amit Sharma mailto:resolve...@gmail.com>>
Sent: Monday, December 7, 2020 11:32 AM
To: user@spark.apache.org
Subject: Caching

Hi All, I am using caching in my code. I have a DF like
val  DF1 = read csv.
val DF2 = DF1.groupBy().agg().select(.)

Val DF3 =  read csv .join(DF1).join(DF2)
  DF3 .save.

If I do not cache DF2 or Df1 it is taking longer time  . But i am doing 1 
action only why do I need to cache.

Thanks
Amit




Re: Caching

2020-12-07 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
You are using same csv twice?

Отправлено с iPhone

> 7 дек. 2020 г., в 18:32, Amit Sharma  написал(а):
> 
> 
> Hi All, I am using caching in my code. I have a DF like
> val  DF1 = read csv.
> val DF2 = DF1.groupBy().agg().select(.)
> 
> Val DF3 =  read csv .join(DF1).join(DF2)
>   DF3 .save.
> 
> If I do not cache DF2 or Df1 it is taking longer time  . But i am doing 1 
> action only why do I need to cache.
> 
> Thanks
> Amit
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Caching

2020-12-07 Thread Amit Sharma
Jayesh, but during logical plan spark would be knowing to use the same DF
twice so it will optimize the query.


Thanks
Amit

On Mon, Dec 7, 2020 at 1:16 PM Lalwani, Jayesh  wrote:

> Since DF2 is dependent on DF1, and DF3 is dependent on both DF1 and DF2,
> without caching,  Spark will read the CSV twice: Once to load it for DF1,
> and once to load it for DF2. When you add a cache on DF1 or DF2, it reads
> from CSV only once.
>
>
>
> You might want to look at doing a windowed  query on DF1 to avoid joining
> DF1 with DF2. This should give you better or similar  performance when
> compared to  cache because Spark will optimize for cache the data during
> shuffle.
>
>
>
> *From: *Amit Sharma 
> *Reply-To: *"resolve...@gmail.com" 
> *Date: *Monday, December 7, 2020 at 12:47 PM
> *To: *Theodoros Gkountouvas , "
> user@spark.apache.org" 
> *Subject: *RE: [EXTERNAL] Caching
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Thanks for the information. I am using  spark 2.3.3 There are few more
> questions
>
>
>
> 1. Yes I am using DF1 two times but at the end action is one on DF3. In
> that case action of DF1 should be just 1 or it depends how many times this
> dataframe is used in transformation.
>
>
>
> I believe even if we use a dataframe multiple times for transformation ,
> use caching should be based on actions. In my case action is one save call
> on DF3. Please correct me if i am wrong.
>
>
>
> Thanks
>
> Amit
>
>
>
> On Mon, Dec 7, 2020 at 11:54 AM Theodoros Gkountouvas <
> theo.gkountou...@futurewei.com> wrote:
>
> Hi Amit,
>
>
>
> One action might use the same DataFrame more than once. You can look at
> your LogicalPlan by executing DF3.explain (arguments different depending
> the version of Spark you are using) and see how many times you need to
> compute DF2 or DF1. Given the information you have provided I suspect that
> DF1 is used more than once (one time at  DF2 and another one at DF3). So,
> Spark is going to cache it the first time and it will load it from cache
> instead of running it again the second time.
>
>
>
> I hope this helped,
>
> Theo.
>
>
>
> *From:* Amit Sharma 
> *Sent:* Monday, December 7, 2020 11:32 AM
> *To:* user@spark.apache.org
> *Subject:* Caching
>
>
>
> Hi All, I am using caching in my code. I have a DF like
>
> val  DF1 = read csv.
>
> val DF2 = DF1.groupBy().agg().select(.)
>
>
>
> Val DF3 =  read csv .join(DF1).join(DF2)
>
>   DF3 .save.
>
>
>
> If I do not cache DF2 or Df1 it is taking longer time  . But i am doing 1
> action only why do I need to cache.
>
>
>
> Thanks
>
> Amit
>
>
>
>
>
>


Re: Caching

2020-12-07 Thread Amit Sharma
Sean, you mean if df  is used more than once in transformation then use
cache. But be frankly that is also not true because at many places even if
df is used once with caching and without cache also it gives same result.
How to decide should we use cache or not


Thanks
Amit

On Mon, Dec 7, 2020 at 1:01 PM Sean Owen  wrote:

> No, it's not true that one action means every DF is evaluated once. This
> is a good counterexample.
>
> On Mon, Dec 7, 2020 at 11:47 AM Amit Sharma  wrote:
>
>> Thanks for the information. I am using  spark 2.3.3 There are few more
>> questions
>>
>> 1. Yes I am using DF1 two times but at the end action is one on DF3. In
>> that case action of DF1 should be just 1 or it depends how many times this
>> dataframe is used in transformation.
>>
>> I believe even if we use a dataframe multiple times for transformation ,
>> use caching should be based on actions. In my case action is one save call
>> on DF3. Please correct me if i am wrong.
>>
>> Thanks
>> Amit
>>
>> On Mon, Dec 7, 2020 at 11:54 AM Theodoros Gkountouvas <
>> theo.gkountou...@futurewei.com> wrote:
>>
>>> Hi Amit,
>>>
>>>
>>>
>>> One action might use the same DataFrame more than once. You can look at
>>> your LogicalPlan by executing DF3.explain (arguments different depending
>>> the version of Spark you are using) and see how many times you need to
>>> compute DF2 or DF1. Given the information you have provided I suspect that
>>> DF1 is used more than once (one time at  DF2 and another one at DF3). So,
>>> Spark is going to cache it the first time and it will load it from cache
>>> instead of running it again the second time.
>>>
>>>
>>>
>>> I hope this helped,
>>>
>>> Theo.
>>>
>>>
>>>
>>> *From:* Amit Sharma 
>>> *Sent:* Monday, December 7, 2020 11:32 AM
>>> *To:* user@spark.apache.org
>>> *Subject:* Caching
>>>
>>>
>>>
>>> Hi All, I am using caching in my code. I have a DF like
>>>
>>> val  DF1 = read csv.
>>>
>>> val DF2 = DF1.groupBy().agg().select(.)
>>>
>>>
>>>
>>> Val DF3 =  read csv .join(DF1).join(DF2)
>>>
>>>   DF3 .save.
>>>
>>>
>>>
>>> If I do not cache DF2 or Df1 it is taking longer time  . But i am doing
>>> 1 action only why do I need to cache.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Amit
>>>
>>>
>>>
>>>
>>>
>>


Re: Caching

2020-12-07 Thread Lalwani, Jayesh
Since DF2 is dependent on DF1, and DF3 is dependent on both DF1 and DF2, 
without caching,  Spark will read the CSV twice: Once to load it for DF1, and 
once to load it for DF2. When you add a cache on DF1 or DF2, it reads from CSV 
only once.

You might want to look at doing a windowed  query on DF1 to avoid joining DF1 
with DF2. This should give you better or similar  performance when compared to  
cache because Spark will optimize for cache the data during shuffle.

From: Amit Sharma 
Reply-To: "resolve...@gmail.com" 
Date: Monday, December 7, 2020 at 12:47 PM
To: Theodoros Gkountouvas , 
"user@spark.apache.org" 
Subject: RE: [EXTERNAL] Caching


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Thanks for the information. I am using  spark 2.3.3 There are few more questions

1. Yes I am using DF1 two times but at the end action is one on DF3. In that 
case action of DF1 should be just 1 or it depends how many times this dataframe 
is used in transformation.

I believe even if we use a dataframe multiple times for transformation , use 
caching should be based on actions. In my case action is one save call on DF3. 
Please correct me if i am wrong.

Thanks
Amit

On Mon, Dec 7, 2020 at 11:54 AM Theodoros Gkountouvas 
mailto:theo.gkountou...@futurewei.com>> wrote:
Hi Amit,

One action might use the same DataFrame more than once. You can look at your 
LogicalPlan by executing DF3.explain (arguments different depending the version 
of Spark you are using) and see how many times you need to compute DF2 or DF1. 
Given the information you have provided I suspect that DF1 is used more than 
once (one time at  DF2 and another one at DF3). So, Spark is going to cache it 
the first time and it will load it from cache instead of running it again the 
second time.

I hope this helped,
Theo.

From: Amit Sharma mailto:resolve...@gmail.com>>
Sent: Monday, December 7, 2020 11:32 AM
To: user@spark.apache.org
Subject: Caching

Hi All, I am using caching in my code. I have a DF like
val  DF1 = read csv.
val DF2 = DF1.groupBy().agg().select(.)

Val DF3 =  read csv .join(DF1).join(DF2)
  DF3 .save.

If I do not cache DF2 or Df1 it is taking longer time  . But i am doing 1 
action only why do I need to cache.

Thanks
Amit




Re: Caching

2020-12-07 Thread Sean Owen
No, it's not true that one action means every DF is evaluated once. This is
a good counterexample.

On Mon, Dec 7, 2020 at 11:47 AM Amit Sharma  wrote:

> Thanks for the information. I am using  spark 2.3.3 There are few more
> questions
>
> 1. Yes I am using DF1 two times but at the end action is one on DF3. In
> that case action of DF1 should be just 1 or it depends how many times this
> dataframe is used in transformation.
>
> I believe even if we use a dataframe multiple times for transformation ,
> use caching should be based on actions. In my case action is one save call
> on DF3. Please correct me if i am wrong.
>
> Thanks
> Amit
>
> On Mon, Dec 7, 2020 at 11:54 AM Theodoros Gkountouvas <
> theo.gkountou...@futurewei.com> wrote:
>
>> Hi Amit,
>>
>>
>>
>> One action might use the same DataFrame more than once. You can look at
>> your LogicalPlan by executing DF3.explain (arguments different depending
>> the version of Spark you are using) and see how many times you need to
>> compute DF2 or DF1. Given the information you have provided I suspect that
>> DF1 is used more than once (one time at  DF2 and another one at DF3). So,
>> Spark is going to cache it the first time and it will load it from cache
>> instead of running it again the second time.
>>
>>
>>
>> I hope this helped,
>>
>> Theo.
>>
>>
>>
>> *From:* Amit Sharma 
>> *Sent:* Monday, December 7, 2020 11:32 AM
>> *To:* user@spark.apache.org
>> *Subject:* Caching
>>
>>
>>
>> Hi All, I am using caching in my code. I have a DF like
>>
>> val  DF1 = read csv.
>>
>> val DF2 = DF1.groupBy().agg().select(.)
>>
>>
>>
>> Val DF3 =  read csv .join(DF1).join(DF2)
>>
>>   DF3 .save.
>>
>>
>>
>> If I do not cache DF2 or Df1 it is taking longer time  . But i am doing 1
>> action only why do I need to cache.
>>
>>
>>
>> Thanks
>>
>> Amit
>>
>>
>>
>>
>>
>


Re: Caching

2020-12-07 Thread Amit Sharma
Thanks for the information. I am using  spark 2.3.3 There are few more
questions

1. Yes I am using DF1 two times but at the end action is one on DF3. In
that case action of DF1 should be just 1 or it depends how many times this
dataframe is used in transformation.

I believe even if we use a dataframe multiple times for transformation ,
use caching should be based on actions. In my case action is one save call
on DF3. Please correct me if i am wrong.

Thanks
Amit

On Mon, Dec 7, 2020 at 11:54 AM Theodoros Gkountouvas <
theo.gkountou...@futurewei.com> wrote:

> Hi Amit,
>
>
>
> One action might use the same DataFrame more than once. You can look at
> your LogicalPlan by executing DF3.explain (arguments different depending
> the version of Spark you are using) and see how many times you need to
> compute DF2 or DF1. Given the information you have provided I suspect that
> DF1 is used more than once (one time at  DF2 and another one at DF3). So,
> Spark is going to cache it the first time and it will load it from cache
> instead of running it again the second time.
>
>
>
> I hope this helped,
>
> Theo.
>
>
>
> *From:* Amit Sharma 
> *Sent:* Monday, December 7, 2020 11:32 AM
> *To:* user@spark.apache.org
> *Subject:* Caching
>
>
>
> Hi All, I am using caching in my code. I have a DF like
>
> val  DF1 = read csv.
>
> val DF2 = DF1.groupBy().agg().select(.)
>
>
>
> Val DF3 =  read csv .join(DF1).join(DF2)
>
>   DF3 .save.
>
>
>
> If I do not cache DF2 or Df1 it is taking longer time  . But i am doing 1
> action only why do I need to cache.
>
>
>
> Thanks
>
> Amit
>
>
>
>
>


RE: Caching

2020-12-07 Thread Theodoros Gkountouvas
Hi Amit,

One action might use the same DataFrame more than once. You can look at your 
LogicalPlan by executing DF3.explain (arguments different depending the version 
of Spark you are using) and see how many times you need to compute DF2 or DF1. 
Given the information you have provided I suspect that DF1 is used more than 
once (one time at  DF2 and another one at DF3). So, Spark is going to cache it 
the first time and it will load it from cache instead of running it again the 
second time.

I hope this helped,
Theo.

From: Amit Sharma 
Sent: Monday, December 7, 2020 11:32 AM
To: user@spark.apache.org
Subject: Caching

Hi All, I am using caching in my code. I have a DF like
val  DF1 = read csv.
val DF2 = DF1.groupBy().agg().select(.)

Val DF3 =  read csv .join(DF1).join(DF2)
  DF3 .save.

If I do not cache DF2 or Df1 it is taking longer time  . But i am doing 1 
action only why do I need to cache.

Thanks
Amit




Re: Missing required configuration "partition.assignment.strategy" [ Kafka + Spark Structured Streaming ]

2020-12-07 Thread Gabor Somogyi
Well, I can't do miracle without cluster and logs access.
What I don't understand why you need fat jar?! Spark libraries normally
need provided scope because it must exist on all machines...
I would take a look at the driver and executor logs which contains the
consumer configs + I would take a look at the exact version of the consumer
(this is printed also in the same log)

G


On Mon, Dec 7, 2020 at 5:07 PM Amit Joshi  wrote:

> Hi Gabor,
>
> The code is very simple Kafka consumption of data.
> I guess, it may be the cluster.
> Can you please point out the possible problem toook for in the cluster?
>
> Regards
> Amit
>
> On Monday, December 7, 2020, Gabor Somogyi 
> wrote:
>
>> + Adding back user list.
>>
>> I've had a look at the Spark code and it's not
>> modifying "partition.assignment.strategy" so the problem
>> must be either in your application or in your cluster setup.
>>
>> G
>>
>>
>> On Mon, Dec 7, 2020 at 12:31 PM Gabor Somogyi 
>> wrote:
>>
>>> It's super interesting because that field has default value:
>>> *org.apache.kafka.clients.consumer.RangeAssignor*
>>>
>>> On Mon, 7 Dec 2020, 10:51 Amit Joshi,  wrote:
>>>
 Hi,

 Thnks for the reply.
 I did tried removing the client version.
 But got the same exception.


 Thnks

 On Monday, December 7, 2020, Gabor Somogyi 
 wrote:

> +1 on the mentioned change, Spark uses the following kafka-clients
> library:
>
> 2.4.1
>
> G
>
>
> On Mon, Dec 7, 2020 at 9:30 AM German Schiavon <
> gschiavonsp...@gmail.com> wrote:
>
>> Hi,
>>
>> I think the issue is that you are overriding the kafka-clients that
>> comes with  spark-sql-kafka-0-10_2.12
>>
>>
>> I'd try removing the kafka-clients and see if it works
>>
>>
>> On Sun, 6 Dec 2020 at 08:01, Amit Joshi 
>> wrote:
>>
>>> Hi All,
>>>
>>> I am running the Spark Structured Streaming along with Kafka.
>>> Below is the pom.xml
>>>
>>> 
>>> 1.8
>>> 1.8
>>> UTF-8
>>> 
>>> 2.12.10
>>> 3.0.1
>>> 
>>>
>>> 
>>> org.apache.kafka
>>> kafka-clients
>>> 2.1.0
>>> 
>>>
>>> 
>>> org.apache.spark
>>> spark-core_2.12
>>> ${sparkVersion}
>>> provided
>>> 
>>> 
>>> 
>>> org.apache.spark
>>> spark-sql_2.12
>>> ${sparkVersion}
>>> provided
>>> 
>>> 
>>> 
>>> org.apache.spark
>>> spark-sql-kafka-0-10_2.12
>>> ${sparkVersion}
>>> 
>>>
>>> Building the fat jar with shade plugin. The jar is running as expected 
>>> in my local setup with the command
>>>
>>> *spark-submit --master local[*] --class com.stream.Main --num-executors 
>>> 3 --driver-memory 2g --executor-cores 2 --executor-memory 3g 
>>> prism-event-synch-rta.jar*
>>>
>>> But when I am trying to run same jar in spark cluster using yarn with 
>>> command:
>>>
>>> *spark-submit --master yarn --deploy-mode cluster --class 
>>> com.stream.Main --num-executors 4 --driver-memory 2g --executor-cores 1 
>>> --executor-memory 4g  gs://jars/prism-event-synch-rta.jar*
>>>
>>> Getting the this exception:
>>>
>>> 
>>>
>>>
>>> *at org.apache.spark.sql.execution.streaming.StreamExecution.org 
>>> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
>>>at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)Caused
>>>  by: org.apache.kafka.common.config.ConfigException: Missing required 
>>> configuration "partition.assignment.strategy" which has no default 
>>> value. at 
>>> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)*
>>>
>>> I have tried setting up the "partition.assignment.strategy", then also 
>>> its not working.
>>>
>>> Please help.
>>>
>>>
>>> Regards
>>>
>>> Amit Joshi
>>>
>>>


Caching

2020-12-07 Thread Amit Sharma
Hi All, I am using caching in my code. I have a DF like
val  DF1 = read csv.
val DF2 = DF1.groupBy().agg().select(.)

Val DF3 =  read csv .join(DF1).join(DF2)
  DF3 .save.

If I do not cache DF2 or Df1 it is taking longer time  . But i am doing 1
action only why do I need to cache.

Thanks
Amit


Re: Missing required configuration "partition.assignment.strategy" [ Kafka + Spark Structured Streaming ]

2020-12-07 Thread Amit Joshi
Hi Gabor,

The code is very simple Kafka consumption of data.
I guess, it may be the cluster.
Can you please point out the possible problem toook for in the cluster?

Regards
Amit

On Monday, December 7, 2020, Gabor Somogyi 
wrote:

> + Adding back user list.
>
> I've had a look at the Spark code and it's not modifying 
> "partition.assignment.strategy"
> so the problem
> must be either in your application or in your cluster setup.
>
> G
>
>
> On Mon, Dec 7, 2020 at 12:31 PM Gabor Somogyi 
> wrote:
>
>> It's super interesting because that field has default value:
>> *org.apache.kafka.clients.consumer.RangeAssignor*
>>
>> On Mon, 7 Dec 2020, 10:51 Amit Joshi,  wrote:
>>
>>> Hi,
>>>
>>> Thnks for the reply.
>>> I did tried removing the client version.
>>> But got the same exception.
>>>
>>>
>>> Thnks
>>>
>>> On Monday, December 7, 2020, Gabor Somogyi 
>>> wrote:
>>>
 +1 on the mentioned change, Spark uses the following kafka-clients
 library:

 2.4.1

 G


 On Mon, Dec 7, 2020 at 9:30 AM German Schiavon <
 gschiavonsp...@gmail.com> wrote:

> Hi,
>
> I think the issue is that you are overriding the kafka-clients that
> comes with  spark-sql-kafka-0-10_2.12
>
>
> I'd try removing the kafka-clients and see if it works
>
>
> On Sun, 6 Dec 2020 at 08:01, Amit Joshi 
> wrote:
>
>> Hi All,
>>
>> I am running the Spark Structured Streaming along with Kafka.
>> Below is the pom.xml
>>
>> 
>> 1.8
>> 1.8
>> UTF-8
>> 
>> 2.12.10
>> 3.0.1
>> 
>>
>> 
>> org.apache.kafka
>> kafka-clients
>> 2.1.0
>> 
>>
>> 
>> org.apache.spark
>> spark-core_2.12
>> ${sparkVersion}
>> provided
>> 
>> 
>> 
>> org.apache.spark
>> spark-sql_2.12
>> ${sparkVersion}
>> provided
>> 
>> 
>> 
>> org.apache.spark
>> spark-sql-kafka-0-10_2.12
>> ${sparkVersion}
>> 
>>
>> Building the fat jar with shade plugin. The jar is running as expected 
>> in my local setup with the command
>>
>> *spark-submit --master local[*] --class com.stream.Main --num-executors 
>> 3 --driver-memory 2g --executor-cores 2 --executor-memory 3g 
>> prism-event-synch-rta.jar*
>>
>> But when I am trying to run same jar in spark cluster using yarn with 
>> command:
>>
>> *spark-submit --master yarn --deploy-mode cluster --class 
>> com.stream.Main --num-executors 4 --driver-memory 2g --executor-cores 1 
>> --executor-memory 4g  gs://jars/prism-event-synch-rta.jar*
>>
>> Getting the this exception:
>>
>>  
>>
>>
>> *at org.apache.spark.sql.execution.streaming.StreamExecution.org 
>> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)Caused
>>  by: org.apache.kafka.common.config.ConfigException: Missing required 
>> configuration "partition.assignment.strategy" which has no default 
>> value. at 
>> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)*
>>
>> I have tried setting up the "partition.assignment.strategy", then also 
>> its not working.
>>
>> Please help.
>>
>>
>> Regards
>>
>> Amit Joshi
>>
>>


Re: substitution invocator for a variable in PyCharm sql

2020-12-07 Thread Mich Talebzadeh
Thanks Russell f-string interpolation helped. Replace Scala 's' with Python
'f'!

Mich


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 7 Dec 2020 at 14:13, Russell Spitzer 
wrote:

> The feature you are looking for is called "String Interpolation" and is
> available in python 3.6. It uses a different syntax than scala's
> https://www.programiz.com/python-programming/string-interpolation
>
> On Mon, Dec 7, 2020 at 7:05 AM Mich Talebzadeh 
> wrote:
>
>> In Spark/Scala you can use 's' substitution invocator for a variable in
>> sql call, for example
>>
>> var sqltext =
>>   s"""
>> INSERT INTO TABLE ${broadcastStagingConfig.broadcastTable}
>> PARTITION (broadcastId = ${broadcastStagingConfig.broadcastValue},brand)
>> SELECT
>>   ocis_mrg_pty_id AS partyId
>> , target_mobile_no AS phoneNumber
>> , brand
>> FROM ${tag}
>>WHERE
>>   length(target_mobile_no) =
>> ${broadcastStagingConfig.mobileNoLength}
>>AND
>>   substring(target_mobile_no,1,1) =
>> ${broadcastStagingConfig.ukMobileNoStart}
>> """
>> spark.sql(sqltext)
>>
>> However, in PySpark the same fails
>>
>> rows = spark.sql(s"""SELECT COUNT(1) FROM
>> ${fullyQualifiedTableName}""").collect()[0][0]
>>
>>  ^
>> SyntaxError: invalid syntax
>>
>> What is the correct substitute invocation in PyCharm if any?
>>
>> Thanks,
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Re: substitution invocator for a variable in PyCharm sql

2020-12-07 Thread Russell Spitzer
The feature you are looking for is called "String Interpolation" and is
available in python 3.6. It uses a different syntax than scala's
https://www.programiz.com/python-programming/string-interpolation

On Mon, Dec 7, 2020 at 7:05 AM Mich Talebzadeh 
wrote:

> In Spark/Scala you can use 's' substitution invocator for a variable in
> sql call, for example
>
> var sqltext =
>   s"""
> INSERT INTO TABLE ${broadcastStagingConfig.broadcastTable}
> PARTITION (broadcastId = ${broadcastStagingConfig.broadcastValue},brand)
> SELECT
>   ocis_mrg_pty_id AS partyId
> , target_mobile_no AS phoneNumber
> , brand
> FROM ${tag}
>WHERE
>   length(target_mobile_no) =
> ${broadcastStagingConfig.mobileNoLength}
>AND
>   substring(target_mobile_no,1,1) =
> ${broadcastStagingConfig.ukMobileNoStart}
> """
> spark.sql(sqltext)
>
> However, in PySpark the same fails
>
> rows = spark.sql(s"""SELECT COUNT(1) FROM
> ${fullyQualifiedTableName}""").collect()[0][0]
>
>  ^
> SyntaxError: invalid syntax
>
> What is the correct substitute invocation in PyCharm if any?
>
> Thanks,
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Missing required configuration "partition.assignment.strategy" [ Kafka + Spark Structured Streaming ]

2020-12-07 Thread Gabor Somogyi
+ Adding back user list.

I've had a look at the Spark code and it's not
modifying "partition.assignment.strategy" so the problem
must be either in your application or in your cluster setup.

G


On Mon, Dec 7, 2020 at 12:31 PM Gabor Somogyi 
wrote:

> It's super interesting because that field has default value:
> *org.apache.kafka.clients.consumer.RangeAssignor*
>
> On Mon, 7 Dec 2020, 10:51 Amit Joshi,  wrote:
>
>> Hi,
>>
>> Thnks for the reply.
>> I did tried removing the client version.
>> But got the same exception.
>>
>>
>> Thnks
>>
>> On Monday, December 7, 2020, Gabor Somogyi 
>> wrote:
>>
>>> +1 on the mentioned change, Spark uses the following kafka-clients
>>> library:
>>>
>>> 2.4.1
>>>
>>> G
>>>
>>>
>>> On Mon, Dec 7, 2020 at 9:30 AM German Schiavon 
>>> wrote:
>>>
 Hi,

 I think the issue is that you are overriding the kafka-clients that
 comes with  spark-sql-kafka-0-10_2.12


 I'd try removing the kafka-clients and see if it works


 On Sun, 6 Dec 2020 at 08:01, Amit Joshi 
 wrote:

> Hi All,
>
> I am running the Spark Structured Streaming along with Kafka.
> Below is the pom.xml
>
> 
> 1.8
> 1.8
> UTF-8
> 
> 2.12.10
> 3.0.1
> 
>
> 
> org.apache.kafka
> kafka-clients
> 2.1.0
> 
>
> 
> org.apache.spark
> spark-core_2.12
> ${sparkVersion}
> provided
> 
> 
> 
> org.apache.spark
> spark-sql_2.12
> ${sparkVersion}
> provided
> 
> 
> 
> org.apache.spark
> spark-sql-kafka-0-10_2.12
> ${sparkVersion}
> 
>
> Building the fat jar with shade plugin. The jar is running as expected in 
> my local setup with the command
>
> *spark-submit --master local[*] --class com.stream.Main --num-executors 3 
> --driver-memory 2g --executor-cores 2 --executor-memory 3g 
> prism-event-synch-rta.jar*
>
> But when I am trying to run same jar in spark cluster using yarn with 
> command:
>
> *spark-submit --master yarn --deploy-mode cluster --class com.stream.Main 
> --num-executors 4 --driver-memory 2g --executor-cores 1 --executor-memory 
> 4g  gs://jars/prism-event-synch-rta.jar*
>
> Getting the this exception:
>
>   
>
>
> *at org.apache.spark.sql.execution.streaming.StreamExecution.org 
> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)Caused
>  by: org.apache.kafka.common.config.ConfigException: Missing required 
> configuration "partition.assignment.strategy" which has no default value. 
> at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)*
>
> I have tried setting up the "partition.assignment.strategy", then also 
> its not working.
>
> Please help.
>
>
> Regards
>
> Amit Joshi
>
>


substitution invocator for a variable in PyCharm sql

2020-12-07 Thread Mich Talebzadeh
In Spark/Scala you can use 's' substitution invocator for a variable in sql
call, for example

var sqltext =
  s"""
INSERT INTO TABLE ${broadcastStagingConfig.broadcastTable}
PARTITION (broadcastId = ${broadcastStagingConfig.broadcastValue},brand)
SELECT
  ocis_mrg_pty_id AS partyId
, target_mobile_no AS phoneNumber
, brand
FROM ${tag}
   WHERE
  length(target_mobile_no) =
${broadcastStagingConfig.mobileNoLength}
   AND
  substring(target_mobile_no,1,1) =
${broadcastStagingConfig.ukMobileNoStart}
"""
spark.sql(sqltext)

However, in PySpark the same fails

rows = spark.sql(s"""SELECT COUNT(1) FROM
${fullyQualifiedTableName}""").collect()[0][0]

   ^
SyntaxError: invalid syntax

What is the correct substitute invocation in PyCharm if any?

Thanks,



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Missing required configuration "partition.assignment.strategy" [ Kafka + Spark Structured Streaming ]

2020-12-07 Thread Amit Joshi
Hi All,

Thnks for the reply.
I did tried removing the client version.
But got the same exception.

Though one point there is some dependent artifacts which I am using, which
contains refrence to the Kafka client saw version.
I am trying to make uber jar, which will choose the closest version.

Thnks


On Monday, December 7, 2020, Gabor Somogyi 
wrote:

> +1 on the mentioned change, Spark uses the following kafka-clients library:
>
> 2.4.1
>
> G
>
>
> On Mon, Dec 7, 2020 at 9:30 AM German Schiavon 
> wrote:
>
>> Hi,
>>
>> I think the issue is that you are overriding the kafka-clients that comes
>> with  spark-sql-kafka-0-10_2.12
>>
>>
>> I'd try removing the kafka-clients and see if it works
>>
>>
>> On Sun, 6 Dec 2020 at 08:01, Amit Joshi 
>> wrote:
>>
>>> Hi All,
>>>
>>> I am running the Spark Structured Streaming along with Kafka.
>>> Below is the pom.xml
>>>
>>> 
>>> 1.8
>>> 1.8
>>> UTF-8
>>> 
>>> 2.12.10
>>> 3.0.1
>>> 
>>>
>>> 
>>> org.apache.kafka
>>> kafka-clients
>>> 2.1.0
>>> 
>>>
>>> 
>>> org.apache.spark
>>> spark-core_2.12
>>> ${sparkVersion}
>>> provided
>>> 
>>> 
>>> 
>>> org.apache.spark
>>> spark-sql_2.12
>>> ${sparkVersion}
>>> provided
>>> 
>>> 
>>> 
>>> org.apache.spark
>>> spark-sql-kafka-0-10_2.12
>>> ${sparkVersion}
>>> 
>>>
>>> Building the fat jar with shade plugin. The jar is running as expected in 
>>> my local setup with the command
>>>
>>> *spark-submit --master local[*] --class com.stream.Main --num-executors 3 
>>> --driver-memory 2g --executor-cores 2 --executor-memory 3g 
>>> prism-event-synch-rta.jar*
>>>
>>> But when I am trying to run same jar in spark cluster using yarn with 
>>> command:
>>>
>>> *spark-submit --master yarn --deploy-mode cluster --class com.stream.Main 
>>> --num-executors 4 --driver-memory 2g --executor-cores 1 --executor-memory 
>>> 4g  gs://jars/prism-event-synch-rta.jar*
>>>
>>> Getting the this exception:
>>>
>>> 
>>>
>>>
>>> *at org.apache.spark.sql.execution.streaming.StreamExecution.org 
>>> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
>>>at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)Caused
>>>  by: org.apache.kafka.common.config.ConfigException: Missing required 
>>> configuration "partition.assignment.strategy" which has no default value. 
>>> at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)*
>>>
>>> I have tried setting up the "partition.assignment.strategy", then also its 
>>> not working.
>>>
>>> Please help.
>>>
>>>
>>> Regards
>>>
>>> Amit Joshi
>>>
>>>


Re: Missing required configuration "partition.assignment.strategy" [ Kafka + Spark Structured Streaming ]

2020-12-07 Thread Gabor Somogyi
+1 on the mentioned change, Spark uses the following kafka-clients library:

2.4.1

G


On Mon, Dec 7, 2020 at 9:30 AM German Schiavon 
wrote:

> Hi,
>
> I think the issue is that you are overriding the kafka-clients that comes
> with  spark-sql-kafka-0-10_2.12
>
>
> I'd try removing the kafka-clients and see if it works
>
>
> On Sun, 6 Dec 2020 at 08:01, Amit Joshi  wrote:
>
>> Hi All,
>>
>> I am running the Spark Structured Streaming along with Kafka.
>> Below is the pom.xml
>>
>> 
>> 1.8
>> 1.8
>> UTF-8
>> 
>> 2.12.10
>> 3.0.1
>> 
>>
>> 
>> org.apache.kafka
>> kafka-clients
>> 2.1.0
>> 
>>
>> 
>> org.apache.spark
>> spark-core_2.12
>> ${sparkVersion}
>> provided
>> 
>> 
>> 
>> org.apache.spark
>> spark-sql_2.12
>> ${sparkVersion}
>> provided
>> 
>> 
>> 
>> org.apache.spark
>> spark-sql-kafka-0-10_2.12
>> ${sparkVersion}
>> 
>>
>> Building the fat jar with shade plugin. The jar is running as expected in my 
>> local setup with the command
>>
>> *spark-submit --master local[*] --class com.stream.Main --num-executors 3 
>> --driver-memory 2g --executor-cores 2 --executor-memory 3g 
>> prism-event-synch-rta.jar*
>>
>> But when I am trying to run same jar in spark cluster using yarn with 
>> command:
>>
>> *spark-submit --master yarn --deploy-mode cluster --class com.stream.Main 
>> --num-executors 4 --driver-memory 2g --executor-cores 1 --executor-memory 4g 
>>  gs://jars/prism-event-synch-rta.jar*
>>
>> Getting the this exception:
>>
>>  
>>
>>
>> *at org.apache.spark.sql.execution.streaming.StreamExecution.org 
>> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)Caused
>>  by: org.apache.kafka.common.config.ConfigException: Missing required 
>> configuration "partition.assignment.strategy" which has no default value. at 
>> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)*
>>
>> I have tried setting up the "partition.assignment.strategy", then also its 
>> not working.
>>
>> Please help.
>>
>>
>> Regards
>>
>> Amit Joshi
>>
>>


Re: Missing required configuration "partition.assignment.strategy" [ Kafka + Spark Structured Streaming ]

2020-12-07 Thread German Schiavon
Hi,

I think the issue is that you are overriding the kafka-clients that comes
with  spark-sql-kafka-0-10_2.12


I'd try removing the kafka-clients and see if it works


On Sun, 6 Dec 2020 at 08:01, Amit Joshi  wrote:

> Hi All,
>
> I am running the Spark Structured Streaming along with Kafka.
> Below is the pom.xml
>
> 
> 1.8
> 1.8
> UTF-8
> 
> 2.12.10
> 3.0.1
> 
>
> 
> org.apache.kafka
> kafka-clients
> 2.1.0
> 
>
> 
> org.apache.spark
> spark-core_2.12
> ${sparkVersion}
> provided
> 
> 
> 
> org.apache.spark
> spark-sql_2.12
> ${sparkVersion}
> provided
> 
> 
> 
> org.apache.spark
> spark-sql-kafka-0-10_2.12
> ${sparkVersion}
> 
>
> Building the fat jar with shade plugin. The jar is running as expected in my 
> local setup with the command
>
> *spark-submit --master local[*] --class com.stream.Main --num-executors 3 
> --driver-memory 2g --executor-cores 2 --executor-memory 3g 
> prism-event-synch-rta.jar*
>
> But when I am trying to run same jar in spark cluster using yarn with command:
>
> *spark-submit --master yarn --deploy-mode cluster --class com.stream.Main 
> --num-executors 4 --driver-memory 2g --executor-cores 1 --executor-memory 4g  
> gs://jars/prism-event-synch-rta.jar*
>
> Getting the this exception:
>
>   
>
>
> *at org.apache.spark.sql.execution.streaming.StreamExecution.org 
> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)Caused
>  by: org.apache.kafka.common.config.ConfigException: Missing required 
> configuration "partition.assignment.strategy" which has no default value. at 
> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)*
>
> I have tried setting up the "partition.assignment.strategy", then also its 
> not working.
>
> Please help.
>
>
> Regards
>
> Amit Joshi
>
>