Unsubscribe

2021-09-06 Thread Du Li



Re: Spark Pair RDD write to Hive

2021-09-06 Thread Anil Dasari
2nd try

From: Anil Dasari 
Date: Sunday, September 5, 2021 at 10:42 AM
To: "user@spark.apache.org" 
Subject: Spark Pair RDD write to Hive

Hello,

I have a use case where users of group id are persisted to hive table.

// pseudo code looks like below
usersRDD = sc.parallelize(..)
usersPairRDD = usersRDD.map(u => (u.groupId, u))
groupedUsers = usersPairRDD.groupByKey()

Can I save groupedUsers RDD into hive tables where table name is key of 
groupedUsers entry ?

I want to avoid below approach as it is not scalable solution where papalism is 
limited with driver cores –

groupIds = usersRDD.map(u => u.groupId).distinct.collect.toList

groupIds.par.map(id => {
  rdd = usersRDD.filter(u => u.groupId == id).cache
// create dataframe
// persist df to hive table using df.write.saveAsTable
)

Could you suggest better approach ? thanks in advance.

-
Anil


Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

2021-09-06 Thread Stelios Philippou
Stelios Philippou
16:20 (3 minutes ago)
to Mich
My local Spark submit :
 ~/development/SimpleKafkaStream  spark-submit --version



Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
  /_/

Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.10
Branch HEAD
Compiled by user centos on 2021-05-24T04:27:48Z


On K8  i have one for j8 and one for j11
The K8 Docker env :

/opt/spark/bin/spark-submit --version
Welcome to
 __
/ __/__ ___ _/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.1
/_/

Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_302

The k8 J11 Env :
:/opt/spark/work-dir$ /opt/spark/bin/spark-submit --version
Welcome to
 __
/ __/__ ___ _/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.1
/_/

Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.11


Will downgrade now to check for 3.1.1 as you mentioned. But as this is a
minor version i dont believe that there should be any issues there.

On Mon, 6 Sept 2021 at 16:12, Mich Talebzadeh 
wrote:

>
>1. which version of Spark the docker is built for
>2. Which version of spark-submit you are using to submit this job
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 6 Sept 2021 at 14:07, Stelios Philippou 
> wrote:
>
>> Yes on Local mode both from intelli and using spark-submit on my machine
>> and on a windows machine work.
>>
>> I have noticed the following error when adding this in the above
>> spark-submit for k8
>>
>> --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2 \
>>
>>
>> :: resolving dependencies ::
>> org.apache.spark#spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c;1.0
>>
>> confs: [default]
>>
>> Exception in thread "main" java.io.FileNotFoundException:
>> /opt/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c-1.0.xml
>> (No such file or directory)
>>
>>
>>
>> is there some way to verify that the k8 installation is correct ?
>>
>> Other spark processes that do not have streaming involved do work
>> correctly.
>>
>> On Mon, 6 Sept 2021 at 16:03, Mich Talebzadeh 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>>
>>> Have you tried this on local mode as opposed to Kubernetes to see if it
>>> works?
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 6 Sept 2021 at 11:16, Stelios Philippou 
>>> wrote:
>>>
 Hello Jacek,

 Yes this is a spark-streaming.
  I have removed all code and created a new project with just the base
 code that is enough to open a stream and loop over it to see what i am
 doing wrong.

 Not adding the packages would result me in the following error

 21/09/06 08:10:41 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
 task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1):
 java.lang.ClassNotFoundException:
 org.apache.spark.streaming.kafka010.KafkaRDDPartition

 at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:418)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

 at java.lang.Class.forName0(Native Method)

 at java.lang.Class.forName(Class.java:348)

 at
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)


 Which should not really be the case cause this should be included in
 the kubernetes pod. Anyway I can confirm this ?


 So my simple class is as follow :


 streamingContext = new JavaStreamingContext(javaSparkContext, 
 Durations.seconds(5));

 stream = KafkaUtils.createDirectStream(streamingContext, 
 LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaConfiguration));

 stream.foreachRDD((VoidFunction>>) 
 rdd -> {
try {
   rdd.foreachPartition(partition -> {
  while (partition.hasNext()) {
 ConsumerRecord consumerRecord = 
 partition.next();
 

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

2021-09-06 Thread Mich Talebzadeh
   1. which version of Spark the docker is built for
   2. Which version of spark-submit you are using to submit this job



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 6 Sept 2021 at 14:07, Stelios Philippou  wrote:

> Yes on Local mode both from intelli and using spark-submit on my machine
> and on a windows machine work.
>
> I have noticed the following error when adding this in the above
> spark-submit for k8
>
> --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2 \
>
>
> :: resolving dependencies ::
> org.apache.spark#spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c;1.0
>
> confs: [default]
>
> Exception in thread "main" java.io.FileNotFoundException:
> /opt/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c-1.0.xml
> (No such file or directory)
>
>
>
> is there some way to verify that the k8 installation is correct ?
>
> Other spark processes that do not have streaming involved do work
> correctly.
>
> On Mon, 6 Sept 2021 at 16:03, Mich Talebzadeh 
> wrote:
>
>>
>> Hi,
>>
>>
>> Have you tried this on local mode as opposed to Kubernetes to see if it
>> works?
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 6 Sept 2021 at 11:16, Stelios Philippou 
>> wrote:
>>
>>> Hello Jacek,
>>>
>>> Yes this is a spark-streaming.
>>>  I have removed all code and created a new project with just the base
>>> code that is enough to open a stream and loop over it to see what i am
>>> doing wrong.
>>>
>>> Not adding the packages would result me in the following error
>>>
>>> 21/09/06 08:10:41 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
>>> task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1):
>>> java.lang.ClassNotFoundException:
>>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>>
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>>
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>>
>>> at java.lang.Class.forName0(Native Method)
>>>
>>> at java.lang.Class.forName(Class.java:348)
>>>
>>> at
>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>>
>>>
>>> Which should not really be the case cause this should be included in the
>>> kubernetes pod. Anyway I can confirm this ?
>>>
>>>
>>> So my simple class is as follow :
>>>
>>>
>>> streamingContext = new JavaStreamingContext(javaSparkContext, 
>>> Durations.seconds(5));
>>>
>>> stream = KafkaUtils.createDirectStream(streamingContext, 
>>> LocationStrategies.PreferConsistent(),
>>>ConsumerStrategies.Subscribe(topics, kafkaConfiguration));
>>>
>>> stream.foreachRDD((VoidFunction>>) 
>>> rdd -> {
>>>try {
>>>   rdd.foreachPartition(partition -> {
>>>  while (partition.hasNext()) {
>>> ConsumerRecord consumerRecord = 
>>> partition.next();
>>> LOGGER.info("WORKING " + consumerRecord.topic() 
>>> +consumerRecord.partition() + ": "+consumerRecord.offset());
>>>  }
>>>   });
>>>} catch (Exception e) {
>>>   e.printStackTrace();
>>>}
>>> });
>>>
>>> streamingContext.start();
>>> try {
>>>streamingContext.awaitTermination();
>>> } catch (InterruptedException e) {
>>>e.printStackTrace();
>>> } finally {
>>>streamingContext.stop();
>>>javaSparkContext.stop();
>>> }
>>>
>>>
>>> This is all there is too the class which is a java boot @Component.
>>>
>>> Now in order my pom is as such
>>>
>>> 
>>> http://maven.apache.org/POM/4.0.0;
>>>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>>>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>>> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>>>   4.0.0
>>>
>>>   com.kafka
>>>   SimpleKafkaStream
>>>   1.0
>>>
>>>   jar
>>>
>>>   
>>> UTF-8
>>> 
>>> UTF-8
>>> 8
>>> 8
>>> com.kafka.Main
>>>   
>>>
>>>   
>>> org.springframework.boot
>>> spring-boot-starter-parent
>>> 2.4.2
>>> 
>>>   
>>>
>>>   
>>> 
>>>   org.springframework.boot
>>>   spring-boot-starter
>>>   
>>> 
>>>   org.springframework.boot
>>>   

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

2021-09-06 Thread Stelios Philippou
Yes on Local mode both from intelli and using spark-submit on my machine
and on a windows machine work.

I have noticed the following error when adding this in the above
spark-submit for k8

--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2 \


:: resolving dependencies ::
org.apache.spark#spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c;1.0

confs: [default]

Exception in thread "main" java.io.FileNotFoundException:
/opt/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c-1.0.xml
(No such file or directory)



is there some way to verify that the k8 installation is correct ?

Other spark processes that do not have streaming involved do work
correctly.

On Mon, 6 Sept 2021 at 16:03, Mich Talebzadeh 
wrote:

>
> Hi,
>
>
> Have you tried this on local mode as opposed to Kubernetes to see if it
> works?
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 6 Sept 2021 at 11:16, Stelios Philippou 
> wrote:
>
>> Hello Jacek,
>>
>> Yes this is a spark-streaming.
>>  I have removed all code and created a new project with just the base
>> code that is enough to open a stream and loop over it to see what i am
>> doing wrong.
>>
>> Not adding the packages would result me in the following error
>>
>> 21/09/06 08:10:41 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
>> task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1):
>> java.lang.ClassNotFoundException:
>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>
>> at java.lang.Class.forName0(Native Method)
>>
>> at java.lang.Class.forName(Class.java:348)
>>
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>
>>
>> Which should not really be the case cause this should be included in the
>> kubernetes pod. Anyway I can confirm this ?
>>
>>
>> So my simple class is as follow :
>>
>>
>> streamingContext = new JavaStreamingContext(javaSparkContext, 
>> Durations.seconds(5));
>>
>> stream = KafkaUtils.createDirectStream(streamingContext, 
>> LocationStrategies.PreferConsistent(),
>>ConsumerStrategies.Subscribe(topics, kafkaConfiguration));
>>
>> stream.foreachRDD((VoidFunction>>) 
>> rdd -> {
>>try {
>>   rdd.foreachPartition(partition -> {
>>  while (partition.hasNext()) {
>> ConsumerRecord consumerRecord = partition.next();
>> LOGGER.info("WORKING " + consumerRecord.topic() 
>> +consumerRecord.partition() + ": "+consumerRecord.offset());
>>  }
>>   });
>>} catch (Exception e) {
>>   e.printStackTrace();
>>}
>> });
>>
>> streamingContext.start();
>> try {
>>streamingContext.awaitTermination();
>> } catch (InterruptedException e) {
>>e.printStackTrace();
>> } finally {
>>streamingContext.stop();
>>javaSparkContext.stop();
>> }
>>
>>
>> This is all there is too the class which is a java boot @Component.
>>
>> Now in order my pom is as such
>>
>> 
>> http://maven.apache.org/POM/4.0.0;
>>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>>   4.0.0
>>
>>   com.kafka
>>   SimpleKafkaStream
>>   1.0
>>
>>   jar
>>
>>   
>> UTF-8
>> 
>> UTF-8
>> 8
>> 8
>> com.kafka.Main
>>   
>>
>>   
>> org.springframework.boot
>> spring-boot-starter-parent
>> 2.4.2
>> 
>>   
>>
>>   
>> 
>>   org.springframework.boot
>>   spring-boot-starter
>>   
>> 
>>   org.springframework.boot
>>   spring-boot-starter-logging
>> 
>>   
>> 
>>
>> 
>>   org.apache.spark
>>   spark-core_2.12
>>   3.1.2
>> 
>>
>> 
>>   org.apache.spark
>>   spark-streaming-kafka-0-10_2.12
>>   3.1.2
>>   provided
>> 
>>
>> 
>>   org.apache.spark
>>   spark-streaming_2.12
>>   3.1.2
>> 
>>
>>   
>>
>>   
>> 
>>   
>> org.springframework.boot
>> spring-boot-maven-plugin
>>   
>>
>>   
>> org.apache.maven.plugins
>> maven-compiler-plugin
>> 3.8.1
>> 
>>   1.8
>>   1.8
>> 
>>   
>>
>> 
>>   
>>
>> 
>>
>> a simple pom that even the  spark-streaming-kafka-0-10_2.12 scope is
>> provided or not it would stilly give the same error.
>>
>> I have tried 

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

2021-09-06 Thread Mich Talebzadeh
Hi,


Have you tried this on local mode as opposed to Kubernetes to see if it
works?


HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 6 Sept 2021 at 11:16, Stelios Philippou  wrote:

> Hello Jacek,
>
> Yes this is a spark-streaming.
>  I have removed all code and created a new project with just the base code
> that is enough to open a stream and loop over it to see what i am doing
> wrong.
>
> Not adding the packages would result me in the following error
>
> 21/09/06 08:10:41 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
> task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1):
> java.lang.ClassNotFoundException:
> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>
> at java.lang.Class.forName0(Native Method)
>
> at java.lang.Class.forName(Class.java:348)
>
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>
>
> Which should not really be the case cause this should be included in the
> kubernetes pod. Anyway I can confirm this ?
>
>
> So my simple class is as follow :
>
>
> streamingContext = new JavaStreamingContext(javaSparkContext, 
> Durations.seconds(5));
>
> stream = KafkaUtils.createDirectStream(streamingContext, 
> LocationStrategies.PreferConsistent(),
>ConsumerStrategies.Subscribe(topics, kafkaConfiguration));
>
> stream.foreachRDD((VoidFunction>>) rdd 
> -> {
>try {
>   rdd.foreachPartition(partition -> {
>  while (partition.hasNext()) {
> ConsumerRecord consumerRecord = partition.next();
> LOGGER.info("WORKING " + consumerRecord.topic() 
> +consumerRecord.partition() + ": "+consumerRecord.offset());
>  }
>   });
>} catch (Exception e) {
>   e.printStackTrace();
>}
> });
>
> streamingContext.start();
> try {
>streamingContext.awaitTermination();
> } catch (InterruptedException e) {
>e.printStackTrace();
> } finally {
>streamingContext.stop();
>javaSparkContext.stop();
> }
>
>
> This is all there is too the class which is a java boot @Component.
>
> Now in order my pom is as such
>
> 
> http://maven.apache.org/POM/4.0.0;
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>   4.0.0
>
>   com.kafka
>   SimpleKafkaStream
>   1.0
>
>   jar
>
>   
> UTF-8
> UTF-8
> 8
> 8
> com.kafka.Main
>   
>
>   
> org.springframework.boot
> spring-boot-starter-parent
> 2.4.2
> 
>   
>
>   
> 
>   org.springframework.boot
>   spring-boot-starter
>   
> 
>   org.springframework.boot
>   spring-boot-starter-logging
> 
>   
> 
>
> 
>   org.apache.spark
>   spark-core_2.12
>   3.1.2
> 
>
> 
>   org.apache.spark
>   spark-streaming-kafka-0-10_2.12
>   3.1.2
>   provided
> 
>
> 
>   org.apache.spark
>   spark-streaming_2.12
>   3.1.2
> 
>
>   
>
>   
> 
>   
> org.springframework.boot
> spring-boot-maven-plugin
>   
>
>   
> org.apache.maven.plugins
> maven-compiler-plugin
> 3.8.1
> 
>   1.8
>   1.8
> 
>   
>
> 
>   
>
> 
>
> a simple pom that even the  spark-streaming-kafka-0-10_2.12 scope is
> provided or not it would stilly give the same error.
>
> I have tried to build an uber jar in order to test with that but i was
> still unable to make it work as such :
>
> 
>   
> 
>   org.springframework.boot
>   spring-boot-maven-plugin
>   
> true
> com.kafka.Main
>   
>   
> 
>   
> repackage
>   
> 
>   
> 
> 
>   maven-assembly-plugin
>   3.2.0
>   
> 
>   dependencies
> 
> 
>   
> true
> com.kafka.Main
>   
> 
>   
>   
> 
>   make-assembly
>   package
>   
> single
>   
> 
>   
> 
>
> 
>   org.apache.maven.plugins
>   maven-compiler-plugin
>   3.8.1
>   
> 1.8
> 1.8
>   
> 
>
>   
>
> 
>
>  I am open to any suggestions and implementations in why this is not
> working and what needs to be done.
>
>
> Thank you for your time,
>
> Stelios
>

Re: Get application metric from Spark job

2021-09-06 Thread Aurélien Mazoyer
Hi Akshay,

Thank you for your reply. Sounds like a good idea, but I unfortunately have
a 2.6 cluster. Do you know if there would be another solution that would
run on 2.6 or if I have no other choice than migrating to 3?

Regards,

Aurélien

Le jeu. 2 sept. 2021 à 20:12, Haryani, Akshay  a
écrit :

> Hi Aurélien,
>
>
>
> Spark has endpoints to expose the spark application metrics. These
> endpoints can be used as a rest API. You can read more about these here:
> https://spark.apache.org/docs/3.1.1/monitoring.html#rest-api
>
>
>
> Additionally,
>
> If you want to build your own custom metrics, you can explore spark custom
> plugins. Using a custom plugin, you can track your own custom metrics and
> plug it into the spark metrics system. Please note plugins are supported
> on spark versions above 3.0.
>
>
>
>
>
> --
>
> Thanks & Regards,
>
> Akshay Haryani
>
>
>
> *From: *Aurélien Mazoyer 
> *Date: *Thursday, September 2, 2021 at 8:36 AM
> *To: *user@spark.apache.org 
> *Subject: *Get application metric from Spark job
>
> Hi community,
>
>
>
> I would like to collect information about the execution of a Spark job
> while it is running. Could I define some kind of application metrics (such
> as a counter that would be incremented in my code) that I could retrieve
> regularly while the job is running?
>
>
> Thank you for help,
>
>
>
> Aurelien
>


Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

2021-09-06 Thread Stelios Philippou
Hello Jacek,

Yes this is a spark-streaming.
 I have removed all code and created a new project with just the base code
that is enough to open a stream and loop over it to see what i am doing
wrong.

Not adding the packages would result me in the following error

21/09/06 08:10:41 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1):
java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka010.KafkaRDDPartition

at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

at java.lang.ClassLoader.loadClass(ClassLoader.java:418)

at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:348)

at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)


Which should not really be the case cause this should be included in the
kubernetes pod. Anyway I can confirm this ?


So my simple class is as follow :


streamingContext = new JavaStreamingContext(javaSparkContext,
Durations.seconds(5));

stream = KafkaUtils.createDirectStream(streamingContext,
LocationStrategies.PreferConsistent(),
   ConsumerStrategies.Subscribe(topics, kafkaConfiguration));

stream.foreachRDD((VoidFunction>>) rdd -> {
   try {
  rdd.foreachPartition(partition -> {
 while (partition.hasNext()) {
ConsumerRecord consumerRecord = partition.next();
LOGGER.info("WORKING " + consumerRecord.topic()
+consumerRecord.partition() + ": "+consumerRecord.offset());
 }
  });
   } catch (Exception e) {
  e.printStackTrace();
   }
});

streamingContext.start();
try {
   streamingContext.awaitTermination();
} catch (InterruptedException e) {
   e.printStackTrace();
} finally {
   streamingContext.stop();
   javaSparkContext.stop();
}


This is all there is too the class which is a java boot @Component.

Now in order my pom is as such


http://maven.apache.org/POM/4.0.0;
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
  4.0.0

  com.kafka
  SimpleKafkaStream
  1.0

  jar

  
UTF-8
UTF-8
8
8
com.kafka.Main
  

  
org.springframework.boot
spring-boot-starter-parent
2.4.2

  

  

  org.springframework.boot
  spring-boot-starter
  

  org.springframework.boot
  spring-boot-starter-logging

  



  org.apache.spark
  spark-core_2.12
  3.1.2



  org.apache.spark
  spark-streaming-kafka-0-10_2.12
  3.1.2
  provided



  org.apache.spark
  spark-streaming_2.12
  3.1.2


  

  

  
org.springframework.boot
spring-boot-maven-plugin
  

  
org.apache.maven.plugins
maven-compiler-plugin
3.8.1

  1.8
  1.8

  


  



a simple pom that even the  spark-streaming-kafka-0-10_2.12 scope is
provided or not it would stilly give the same error.

I have tried to build an uber jar in order to test with that but i was
still unable to make it work as such :


  

  org.springframework.boot
  spring-boot-maven-plugin
  
true
com.kafka.Main
  
  

  
repackage
  

  


  maven-assembly-plugin
  3.2.0
  

  dependencies


  
true
com.kafka.Main
  

  
  

  make-assembly
  package
  
single
  

  



  org.apache.maven.plugins
  maven-compiler-plugin
  3.8.1
  
1.8
1.8
  


  



 I am open to any suggestions and implementations in why this is not
working and what needs to be done.


Thank you for your time,

Stelios

On Sun, 5 Sept 2021 at 16:56, Jacek Laskowski  wrote:

> Hi,
>
> No idea still, but noticed
> "org.apache.spark.streaming.kafka010.KafkaRDDPartition" and "--jars
> "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
> \" that bothers me quite a lot.
>
> First of all, it's a Spark Streaming (not Structured Streaming) app.
> Correct? Please upgrade at your earliest convenience since it's no longer
> in active development (if supported at all).
>
> Secondly, why are these jars listed explicitly since they're part of
> Spark? You should not really be doing such risky config changes (unless
> you've got no other choice and you know what you're doing).
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books 
> Follow me on https://twitter.com/jaceklaskowski
>
> 

Re: Appending a static dataframe to a stream create Parquet file fails

2021-09-06 Thread Jungtaek Lim
I'd recommend getting in touch with Delta Lake community (Google Groups)
https://groups.google.com/forum/#!forum/delta-users to get more feedback
from experts about Delta Lake specific issues.



On Mon, Sep 6, 2021 at 1:56 AM  wrote:

> Hi Jungtaek,
>  thanks for your reply. I was afraid that the problem is not only on my
> side but rather of conceptual nature. I guess I have to rethink my
> approach. However, because you mentioned DeltaLake. I have the same
> problem, but the other way around, with DeltaLake. I cannot write with a
> stream to a DeltaLake created from a static dataframe.
>
> Anyhow, best regards
>   Eugen
>
> On Fri, 2021-09-03 at 11:44 +0900, Jungtaek Lim wrote:
>
> Hi,
>
> The file stream sink maintains the metadata in the output directory. The
> metadata retains the list of files written by the streaming query, and
> Spark reads the metadata on listing the files to read.
>
> This is to guarantee end-to-end exactly once on writing files in the
> streaming query. There could be failure on the streaming query and some
> files may be partially written. Metadata will help to skip reading these
> files and only read files which are correctly written.
>
> This leads to a major restriction, you can't write the output directory
> from multiple queries. For your case, Spark will only read the files which
> are written from the streaming query.
>
> There are 3rd party projects dealing with transactional write from
> multiple writes, (alphabetically) Apache Iceberg, Delta Lake, and so on.
> You may want to check them out.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Thu, Sep 2, 2021 at 10:04 PM  wrote:
>
> Hi all,
>   I recently stumbled about a rather strange  problem with streaming
> sources in one of my tests. I am writing a Parquet file from a streaming
> source and subsequently try to append the same data but this time from a
> static dataframe. Surprisingly, the number of rows in the Parquet file
> remains the same after the append operation.
> Here is the relevant code
>
>   "Appending data from static dataframe" must "produce twice as much data" in 
> {
>
> logLinesStream.writeStream
>
>   .format("parquet")
>
>   .option("path", path.toString)
>
>   .outputMode("append")
>
>   .start()
>
>   .processAllAvailable()
>
> spark.read.format("parquet").load(path.toString).count mustBe 1159
>
>
> logLinesDF.write.format("parquet").mode("append").save(path.toString)
>
> spark.read.format("parquet").load(path.toString).count mustBe 2*1159
>
>   }
>
>
> Does anyone have an idea what I am doing wrong here?
>
> thanks in advance
>  Eugen Wintersberger
>
>
>