Spark streaming giving error for version 2.4

2021-03-15 Thread Renu Yadav
Hi Team,


I have upgraded my spark streaming from 2.2 to 2.4 but getting below error:


spark-streaming-kafka_0-10.2.11_2.4.0


scala 2.11


Any Idea?



main" java.lang.AbstractMethodError

at
org.apache.spark.util.ListenerBus$class.$init$(ListenerBus.scala:34)

at
org.apache.spark.streaming.scheduler.StreamingListenerBus.(StreamingListenerBus.scala:30)

at
org.apache.spark.streaming.scheduler.JobScheduler.(JobScheduler.scala:57)

at
org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:184)

at
org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:85)


Thanks & Regards,

Renu Yadav


How default partitioning in spark is deployed

2021-03-15 Thread Renganathan Mutthiah
Hi,

I have a question with respect to default partitioning in RDD.




*case class Animal(id:Int, name:String)   val myRDD =
session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
"Chetah") ) ))Console println myRDD.getNumPartitions  *

I am running the above piece of code in my laptop which has 12 logical
cores.
Hence I see that there are 12 partitions created.

My understanding is that hash partitioning is used to determine which
object needs to go to which partition. So in this case, the formula would
be: hashCode() % 12
But when I further examine, I see all the RDDs are put in the last
partition.

*myRDD.foreachPartition( e => { println("--"); e.foreach(println) }
)*

Above code prints the below(first eleven partitions are empty and the last
one has all the objects. The line is separate the partition contents):
--
--
--
--
--
--
--
--
--
--
--
--
Animal(2,Elephant)
Animal(4,Tiger)
Animal(3,Jaguar)
Animal(5,Chetah)
Animal(1,Lion)

I don't know why this happens. Can you please help.

Thanks!


Re: How to make bucket listing faster while using S3 with wholeTextFile

2021-03-15 Thread Stephen Coy
Hi there,

At risk of stating the obvious, the first step is to ensure that your Spark 
application and S3 bucket are colocated in the same AWS region.

Steve C

On 16 Mar 2021, at 3:31 am, Alchemist 
mailto:alchemistsrivast...@gmail.com>> wrote:

How to optimize s3 list S3 file using wholeTextFile(): We are using 
wholeTextFile to read data from S3.  As per my understanding wholeTextFile 
first list files of given path.  Since we are using S3 as input source, then 
listing files in a bucket is single-threaded, the S3 API for listing the keys 
in a bucket only returns keys by chunks of 1000 per call.   Since we have at 
millions of files, we are making thousands API calls.  This listing make our 
processing very slow. How can we make listing of S3 faster?

Thanks,

Rachana

This email contains confidential information of and is the copyright of 
Infomedia. It must not be forwarded, amended or disclosed without consent of 
the sender. If you received this message by mistake, please advise the sender 
and delete all copies. Security of transmission on the internet cannot be 
guaranteed, could be infected, intercepted, or corrupted and you should ensure 
you have suitable antivirus protection in place. By sending us your or any 
third party personal details, you consent to (or confirm you have obtained 
consent from such third parties) to Infomedia's privacy policy. 
http://www.infomedia.com.au/privacy-policy/


Re: Spark Structured Streaming and Kafka message schema evolution

2021-03-15 Thread Jungtaek Lim
If I understand correctly, SQL semantics are strict on column schema.
Reading via Kafka data source doesn't require you to specify the schema as
it provides the key and value as binary, but once you deserialize them,
unless you keep the type as primitive (e.g. String), you'll need to specify
the schema, like from_json requires you to.

This wouldn't be changed even if you leverage Schema Registry - you'll need
to provide the schema which is compatible with all schemas which records
are associated with. I guess that's guaranteed if you use the latest
version of the schema and you've changed the schema as "backward-compatible
ways". I admit I haven't dealt with SR in SSS, but if you integrate the
schema to the query plan, running query is unlikely getting the latest
schema, but it still wouldn't matter as your query should only leverage the
part of schema you've integrated, and the latest schema is "backward
compatible" with the integrated schema.

Hope this helps.

Thanks
Jungtaek Lim (HeartSaVioR)

On Mon, Mar 15, 2021 at 9:25 PM Mich Talebzadeh 
wrote:

> This is just a query.
>
> In general Kafka-connect requires means to register that schema such that
> producers and consumers understand that. It also allows schema evolution,
> i.e. changes to metadata that identifies the structure of data sent via
> topic.
>
> When we stream a kafka topic into (Spark Structured Streaming (SSS), the
> assumption is that by the time Spark processes that data, its structure
> can be established. With foreachBatch, we create a dataframe on top of
> incoming batches of Json messages and the dataframe can be interrogated.
> However, the processing may fail if another column is added to the topic
> and the consumer (in this case SSS) is not aware of it. How can this change
> of schema be verified?
>
> Thanks
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: [k8s] PersistentVolumeClaim support in 3.1.1 on minikube

2021-03-15 Thread Jacek Laskowski
Hi,

I think I found it. I should be using OnDemand claim name so it gets
replaced to be unique per executor (?)

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




On Mon, Mar 15, 2021 at 8:36 PM Jacek Laskowski  wrote:

> Hi,
>
> I've been toying with persistent volumes using Spark 3.1.1 on minikube and
> am wondering whether it's a supported platform. I'd not be surprised if not
> given all the surprises I've been experiencing lately.
>
> Can I use spark-shell or any Spark app in client mode with PVCs with the
> default 2 executors? Should the following work if I removed --num-executors
> 1?
>
> ./bin/spark-shell \
>   --master k8s://$K8S_SERVER \
>   --num-executors 1 \
>   --conf
> spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.mount.path=$MOUNT_PATH
> \
>   --conf
> spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.claimName=$PVC_CLAIM_NAME
> \
>   --conf
> spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.storageClass=$PVC_STORAGE_CLASS
> \
>   --conf
> spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.sizeLimit=$PVC_SIZE_LIMIT
> \
>   --conf spark.kubernetes.container.image=$IMAGE_NAME \
>   --conf spark.kubernetes.context=minikube \
>   --conf spark.kubernetes.namespace=spark-demo \
>   --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>   --verbose
>
> The whole demo is available at
> https://jaceklaskowski.github.io/spark-kubernetes-book/demo/persistentvolumeclaims/
>
> Please help. Thank you!
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books 
> Follow me on https://twitter.com/jaceklaskowski
>
> 
>


Re: Spark Structured Streaming from GCS files

2021-03-15 Thread Gowrishankar Sunder
Our online services running in GCP collect data from our clients and write
it to GCS under time-partitioned folders like /mm/dd/hh/mm
(current-time) or similar ones. We need these files to be processed in
real-time from Spark. As for the runtime, we plan to run it either on
Dataproc or K8s.

- Gowrishankar Sunder


On Mon, Mar 15, 2021 at 12:13 PM Mich Talebzadeh 
wrote:

>
> Hi,
>
> I looked at the stackoverflow reference.
>
> The first question that comes to my mind is how you are populating these
> gcs buckets? Are you shifting data from on-prem and landing them in the
> buckets and  creating a new folder at the given interval?
>
> Where will you be running your Spark Structured Streaming? On dataproics?
>
> HTH
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 15 Mar 2021 at 19:00, Gowrishankar Sunder 
> wrote:
>
>> Hi,
>>We have a use case to stream files from GCS time-partitioned folders
>> and perform structured streaming queries on top of them. I have detailed
>> the use cases and requirements in this Stackoverflow question
>> 
>>  but
>> at a high level, the problems I am facing are listed below and would like
>> guidance on the best approach to use
>>
>>- Custom source APIs for Structured Streaming are undergoing major
>>changes (including the new Table API support) and the documentation does
>>not capture much details when it comes to building custom sources. I was
>>wondering if the current APIs are expected to remain stable through the
>>targeted 3.2 release and if there are examples on how to use them for my
>>use case.
>>- The default FileStream
>>
>> 
>>source looks up a static glob path which might not scale when the job runs
>>for days with multiple time partitions. But it has some really useful
>>features handling files - supports all major source formats (AVRO, 
>> Parquet,
>>JSON etc...), takes care of compression and partitioning large files into
>>sub-tasks - all of which I need to implement again for the current custom
>>source APIs as they stand. I was wondering if I can still somehow make use
>>of them while solving the scaling time partitioning file globbing issue.
>>
>> Thanks
>>
>>
>>


[k8s] PersistentVolumeClaim support in 3.1.1 on minikube

2021-03-15 Thread Jacek Laskowski
Hi,

I've been toying with persistent volumes using Spark 3.1.1 on minikube and
am wondering whether it's a supported platform. I'd not be surprised if not
given all the surprises I've been experiencing lately.

Can I use spark-shell or any Spark app in client mode with PVCs with the
default 2 executors? Should the following work if I removed --num-executors
1?

./bin/spark-shell \
  --master k8s://$K8S_SERVER \
  --num-executors 1 \
  --conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.mount.path=$MOUNT_PATH
\
  --conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.claimName=$PVC_CLAIM_NAME
\
  --conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.storageClass=$PVC_STORAGE_CLASS
\
  --conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.sizeLimit=$PVC_SIZE_LIMIT
\
  --conf spark.kubernetes.container.image=$IMAGE_NAME \
  --conf spark.kubernetes.context=minikube \
  --conf spark.kubernetes.namespace=spark-demo \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --verbose

The whole demo is available at
https://jaceklaskowski.github.io/spark-kubernetes-book/demo/persistentvolumeclaims/

Please help. Thank you!

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




Re: Spark Structured Streaming from GCS files

2021-03-15 Thread Mich Talebzadeh
Hi,

I looked at the stackoverflow reference.

The first question that comes to my mind is how you are populating these
gcs buckets? Are you shifting data from on-prem and landing them in the
buckets and  creating a new folder at the given interval?

Where will you be running your Spark Structured Streaming? On dataproics?

HTH


LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 15 Mar 2021 at 19:00, Gowrishankar Sunder 
wrote:

> Hi,
>We have a use case to stream files from GCS time-partitioned folders
> and perform structured streaming queries on top of them. I have detailed
> the use cases and requirements in this Stackoverflow question
> 
>  but
> at a high level, the problems I am facing are listed below and would like
> guidance on the best approach to use
>
>- Custom source APIs for Structured Streaming are undergoing major
>changes (including the new Table API support) and the documentation does
>not capture much details when it comes to building custom sources. I was
>wondering if the current APIs are expected to remain stable through the
>targeted 3.2 release and if there are examples on how to use them for my
>use case.
>- The default FileStream
>
> 
>source looks up a static glob path which might not scale when the job runs
>for days with multiple time partitions. But it has some really useful
>features handling files - supports all major source formats (AVRO, Parquet,
>JSON etc...), takes care of compression and partitioning large files into
>sub-tasks - all of which I need to implement again for the current custom
>source APIs as they stand. I was wondering if I can still somehow make use
>of them while solving the scaling time partitioning file globbing issue.
>
> Thanks
>
>
>


Re: How to make bucket listing faster while using S3 with wholeTextFile

2021-03-15 Thread Ben Kaylor
Not sure on answer on this, but am solving similar issues. So looking for
additional feedback on how to do this.

My thoughts if unable to do via spark and S3 boto commands,  then have apps
self report those changes. Where instead of having just mappers discovering
the keys, you have services self reporting that a new key has been created
or modified to a metadata service for incremental and more realtime updates.

Would like to hear more ideas on this, thanks
David




On Mon, Mar 15, 2021, 11:31 AM Alchemist 
wrote:

> *How to optimize s3 list S3 file using wholeTextFile()*: We are using
> wholeTextFile to read data from S3.  As per my understanding wholeTextFile
> first list files of given path.  Since we are using S3 as input source,
> then listing files in a bucket is single-threaded, the S3 API for listing
> the keys in a bucket only returns keys by chunks of 1000 per call.   Since
> we have at millions of files, we are making thousands API calls.  This
> listing make our processing very slow. How can we make listing of S3 faster?
>
> Thanks,
>
> Rachana
>


Spark Structured Streaming from GCS files

2021-03-15 Thread Gowrishankar Sunder
Hi,
   We have a use case to stream files from GCS time-partitioned folders and
perform structured streaming queries on top of them. I have detailed the
use cases and requirements in this Stackoverflow question

but
at a high level, the problems I am facing are listed below and would like
guidance on the best approach to use

   - Custom source APIs for Structured Streaming are undergoing major
   changes (including the new Table API support) and the documentation does
   not capture much details when it comes to building custom sources. I was
   wondering if the current APIs are expected to remain stable through the
   targeted 3.2 release and if there are examples on how to use them for my
   use case.
   - The default FileStream
   

   source looks up a static glob path which might not scale when the job runs
   for days with multiple time partitions. But it has some really useful
   features handling files - supports all major source formats (AVRO, Parquet,
   JSON etc...), takes care of compression and partitioning large files into
   sub-tasks - all of which I need to implement again for the current custom
   source APIs as they stand. I was wondering if I can still somehow make use
   of them while solving the scaling time partitioning file globbing issue.

Thanks


How to make bucket listing faster while using S3 with wholeTextFile

2021-03-15 Thread Alchemist
How to optimize s3 list S3 file using wholeTextFile(): We are using 
wholeTextFile to read data from S3.  As per my understanding wholeTextFile 
first list files of given path.  Since we are using S3 as input source, then 
listing files in a bucket is single-threaded, the S3 API for listing the keys 
in a bucket only returns keys by chunks of 1000 per call.   Since we have at 
millions of files, we are making thousands API calls.  This listing make our 
processing very slow. How can we make listing of S3 faster?
Thanks,
Rachana

Spark Structured Streaming and Kafka message schema evolution

2021-03-15 Thread Mich Talebzadeh
This is just a query.

In general Kafka-connect requires means to register that schema such that
producers and consumers understand that. It also allows schema evolution,
i.e. changes to metadata that identifies the structure of data sent via
topic.

When we stream a kafka topic into (Spark Structured Streaming (SSS), the
assumption is that by the time Spark processes that data, its structure
can be established. With foreachBatch, we create a dataframe on top of
incoming batches of Json messages and the dataframe can be interrogated.
However, the processing may fail if another column is added to the topic
and the consumer (in this case SSS) is not aware of it. How can this change
of schema be verified?

Thanks

LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: DB Config data update across multiple Spark Streaming Jobs

2021-03-15 Thread forece85
Any suggestion on this? How to update configuration data on all executors
with out downtime?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark on k8s driver pod exception

2021-03-15 Thread Attila Zsolt Piros
Sure, that is expected, see the "How it works" section in "Running Spark on
Kubernetes" page
,
quote:

When the application completes, the executor pods terminate and are cleaned
> up, but the driver pod persists logs and remains in “completed” state in
> the Kubernetes API until it’s eventually garbage collected or manually
> cleaned up.



On Mon, Mar 15, 2021 at 8:45 AM 040840219  wrote:

>
> when driver pod throws exception ,  driver pod still running   ?
>
> kubectl logs  wordcount-e3141c7834d3dd68-driver
>
> 21/03/15 07:40:19 DEBUG Analyzer$ResolveReferences: Resolving 'value1 to
> 'value1
> Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot
> resolve '`value1`' given input columns: [key, value];
> 'Aggregate [key#6], [key#6, count('value1) AS cnt#14]
> +- Project [(id#4 % 5) AS key#6, (id#4 % 10) AS value#7]
>+- Project [value#1 AS id#4]
>   +- LocalRelation [value#1]
>
> at
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:155)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:152)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:341)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73)
>
> kubectl get pods wordcount-e3141c7834d3dd68-driver
>
> NAMEREADY   STATUSRESTARTS   AGE
> wordcount-e3141c7834d3dd68-driver   1/1 Running   0  2m58s
>
> On 03/12/2021 05:42,Attila Zsolt Piros
>  wrote:
>
> > but  the spark-submit log still  running
>
> Set the "spark.kubernetes.submission.waitAppCompletion" config to false to
> change that. As the doc says:
>
> "spark.kubernetes.submission.waitAppCompletion" : In cluster mode, whether
> to wait for the application to finish before exiting the launcher process.
> When changed to false, the launcher has a "fire-and-forget" behavior when
> launching the Spark job.
>
> On Thu, Mar 11, 2021 at 10:05 PM Attila Zsolt Piros <
> piros.attila.zs...@gmail.com> wrote:
>
>>
>> For getting the logs please read Accessing Logs
>> 
>>  part
>> of the *Running Spark on Kubernetes* page.
>>
>> For stopping and generic management of the spark application please read
>> the Spark Application Management
>> ,
>> where you find the example:
>>
>> $ spark-submit --kill spark:spark-pi* --master  
>> k8s://https://192.168.2.8:8443
>>
>>
>>
>> On Thu, Mar 11, 2021 at 1:07 PM yxl040840219 
>> wrote:
>>
>>>
>>>
>>>
>>> when run the code in k8s ,  driver pod throw AnalysisException , but
>>>  the spark-submit log still  running , then how to get the exception and
>>> stop pods ?
>>>
>>> val spark = SparkSession.builder().getOrCreate()
>>> import spark.implicits._
>>> val df = (0 until 10).toDF("id").selectExpr("id % 5 as key",
>>> "id%10 as value")
>>>   .groupBy("key").agg(count("value1").as("cnt"))
>>> df.show()
>>> spark.stop()
>>>
>>> bin/spark-submit \
>>> --master k8s://https://localhost:9443 \
>>> --deploy-mode cluster \
>>> --name wordcount \
>>> --class k8s.WordCount \
>>> --conf spark.kubernetes.container.image=rspark:v3.1.1 \
>>> --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>> --conf
>>> spark.kubernetes.file.upload.path=hdfs://localhost:8020/data/spark \
>>> /data/spark-example-1.0.0.jar
>>>
>>


Re: spark on k8s driver pod exception

2021-03-15 Thread 040840219


when driver pod throws exception ,  driver pod still running   ?


kubectl logs  wordcount-e3141c7834d3dd68-driver


21/03/15 07:40:19 DEBUG Analyzer$ResolveReferences: Resolving 'value1 to 'value1
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
resolve '`value1`' given input columns: [key, value];
'Aggregate [key#6], [key#6, count('value1) AS cnt#14]
+- Project [(id#4 % 5) AS key#6, (id#4 % 10) AS value#7]
   +- Project [value#1 AS id#4]
  +- LocalRelation [value#1]


at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:155)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:152)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:341)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73)


kubectl get pods wordcount-e3141c7834d3dd68-driver


NAMEREADY   STATUSRESTARTS   AGE
wordcount-e3141c7834d3dd68-driver   1/1 Running   0  2m58s


On 03/12/2021 05:42,Attila Zsolt Piros wrote:
> but  the spark-submit log still  running

Set the "spark.kubernetes.submission.waitAppCompletion" config to false to 
change that. As the doc says:

"spark.kubernetes.submission.waitAppCompletion" : In cluster mode, whether to 
wait for the application to finish before exiting the launcher process. When 
changed to false, the launcher has a "fire-and-forget" behavior when launching 
the Spark job.



On Thu, Mar 11, 2021 at 10:05 PM Attila Zsolt Piros 
 wrote:


For getting the logs please read Accessing Logs part of the Running Spark on 
Kubernetes page.

For stopping and generic management of the spark application please read the 
Spark Application Management, where you find the example:


$ spark-submit --kill spark:spark-pi*--master  k8s://https://192.168.2.8:8443




On Thu, Mar 11, 2021 at 1:07 PM yxl040840219  wrote:







when run the code in k8s ,  driver pod throw AnalysisException , but  the 
spark-submit log still  running , then how to get the exception and stop pods ?


val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val df = (0 until 10).toDF("id").selectExpr("id % 5 as key", "id%10 as 
value")
  .groupBy("key").agg(count("value1").as("cnt"))
df.show()
spark.stop()


bin/spark-submit \
--master k8s://https://localhost:9443 \
--deploy-mode cluster \
--name wordcount \
--class k8s.WordCount \
--conf spark.kubernetes.container.image=rspark:v3.1.1 \
--conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.file.upload.path=hdfs://localhost:8020/data/spark \
/data/spark-example-1.0.0.jar