Hi,

Have you tried this on local mode as opposed to Kubernetes to see if it
works?


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 6 Sept 2021 at 11:16, Stelios Philippou <stevo...@gmail.com> wrote:

> Hello Jacek,
>
> Yes this is a spark-streaming.
>  I have removed all code and created a new project with just the base code
> that is enough to open a stream and loop over it to see what i am doing
> wrong.
>
> Not adding the packages would result me in the following error
>
> 21/09/06 08:10:41 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
> task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1):
> java.lang.ClassNotFoundException:
> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>
> at java.lang.Class.forName0(Native Method)
>
> at java.lang.Class.forName(Class.java:348)
>
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>
>
> Which should not really be the case cause this should be included in the
> kubernetes pod. Anyway I can confirm this ?
>
>
> So my simple class is as follow :
>
>
> streamingContext = new JavaStreamingContext(javaSparkContext, 
> Durations.seconds(5));
>
> stream = KafkaUtils.createDirectStream(streamingContext, 
> LocationStrategies.PreferConsistent(),
>    ConsumerStrategies.Subscribe(topics, kafkaConfiguration));
>
> stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, byte[]>>>) rdd 
> -> {
>    try {
>       rdd.foreachPartition(partition -> {
>          while (partition.hasNext()) {
>             ConsumerRecord<String, byte[]> consumerRecord = partition.next();
>             LOGGER.info("WORKING " + consumerRecord.topic() 
> +consumerRecord.partition() + ": "+consumerRecord.offset());
>          }
>       });
>    } catch (Exception e) {
>       e.printStackTrace();
>    }
> });
>
> streamingContext.start();
> try {
>    streamingContext.awaitTermination();
> } catch (InterruptedException e) {
>    e.printStackTrace();
> } finally {
>    streamingContext.stop();
>    javaSparkContext.stop();
> }
>
>
> This is all there is too the class which is a java boot @Component.
>
> Now in order my pom is as such
>
> <?xml version="1.0" encoding="UTF-8"?>
> <project xmlns="http://maven.apache.org/POM/4.0.0";
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>   <modelVersion>4.0.0</modelVersion>
>
>   <groupId>com.kafka</groupId>
>   <artifactId>SimpleKafkaStream</artifactId>
>   <version>1.0</version>
>
>   <packaging>jar</packaging>
>
>   <properties>
>     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
>     <maven.compiler.source>8</maven.compiler.source>
>     <maven.compiler.target>8</maven.compiler.target>
>     <start-class>com.kafka.Main</start-class>
>   </properties>
>
>   <parent>
>     <groupId>org.springframework.boot</groupId>
>     <artifactId>spring-boot-starter-parent</artifactId>
>     <version>2.4.2</version>
>     <relativePath/>
>   </parent>
>
>   <dependencies>
>     <dependency>
>       <groupId>org.springframework.boot</groupId>
>       <artifactId>spring-boot-starter</artifactId>
>       <exclusions>
>         <exclusion>
>           <groupId>org.springframework.boot</groupId>
>           <artifactId>spring-boot-starter-logging</artifactId>
>         </exclusion>
>       </exclusions>
>     </dependency>
>
>     <dependency>
>       <groupId>org.apache.spark</groupId>
>       <artifactId>spark-core_2.12</artifactId>
>       <version>3.1.2</version>
>     </dependency>
>
>     <dependency>
>       <groupId>org.apache.spark</groupId>
>       <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
>       <version>3.1.2</version>
>       <scope>provided</scope>
>     </dependency>
>
>     <dependency>
>       <groupId>org.apache.spark</groupId>
>       <artifactId>spark-streaming_2.12</artifactId>
>       <version>3.1.2</version>
>     </dependency>
>
>   </dependencies>
>
>   <build>
>     <plugins>
>       <plugin>
>         <groupId>org.springframework.boot</groupId>
>         <artifactId>spring-boot-maven-plugin</artifactId>
>       </plugin>
>
>       <plugin>
>         <groupId>org.apache.maven.plugins</groupId>
>         <artifactId>maven-compiler-plugin</artifactId>
>         <version>3.8.1</version>
>         <configuration>
>           <source>1.8</source>
>           <target>1.8</target>
>         </configuration>
>       </plugin>
>
>     </plugins>
>   </build>
>
> </project>
>
> a simple pom that even the  spark-streaming-kafka-0-10_2.12 scope is
> provided or not it would stilly give the same error.
>
> I have tried to build an uber jar in order to test with that but i was
> still unable to make it work as such :
>
> <build>
>   <plugins>
>     <plugin>
>       <groupId>org.springframework.boot</groupId>
>       <artifactId>spring-boot-maven-plugin</artifactId>
>       <configuration>
>         <fork>true</fork>
>         <mainClass>com.kafka.Main</mainClass>
>       </configuration>
>       <executions>
>         <execution>
>           <goals>
>             <goal>repackage</goal>
>           </goals>
>         </execution>
>       </executions>
>     </plugin>
>     <plugin>
>       <artifactId>maven-assembly-plugin</artifactId>
>       <version>3.2.0</version>
>       <configuration>
>         <descriptorRefs>
>           <descriptorRef>dependencies</descriptorRef>
>         </descriptorRefs>
>         <archive>
>           <manifest>
>             <addClasspath>true</addClasspath>
>             <mainClass>com.kafka.Main</mainClass>
>           </manifest>
>         </archive>
>       </configuration>
>       <executions>
>         <execution>
>           <id>make-assembly</id>
>           <phase>package</phase>
>           <goals>
>             <goal>single</goal>
>           </goals>
>         </execution>
>       </executions>
>     </plugin>
>
>     <plugin>
>       <groupId>org.apache.maven.plugins</groupId>
>       <artifactId>maven-compiler-plugin</artifactId>
>       <version>3.8.1</version>
>       <configuration>
>         <source>1.8</source>
>         <target>1.8</target>
>       </configuration>
>     </plugin>
>
>   </plugins>
>
> </build>
>
>  I am open to any suggestions and implementations in why this is not
> working and what needs to be done.
>
>
> Thank you for your time,
>
> Stelios
>
> On Sun, 5 Sept 2021 at 16:56, Jacek Laskowski <ja...@japila.pl> wrote:
>
>> Hi,
>>
>> No idea still, but noticed
>> "org.apache.spark.streaming.kafka010.KafkaRDDPartition" and "--jars
>> "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
>> \" that bothers me quite a lot.
>>
>> First of all, it's a Spark Streaming (not Structured Streaming) app.
>> Correct? Please upgrade at your earliest convenience since it's no longer
>> in active development (if supported at all).
>>
>> Secondly, why are these jars listed explicitly since they're part of
>> Spark? You should not really be doing such risky config changes (unless
>> you've got no other choice and you know what you're doing).
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>>
>> On Tue, Aug 31, 2021 at 1:00 PM Stelios Philippou <stevo...@gmail.com>
>> wrote:
>>
>>> Yes you are right.
>>> I am using Spring Boot for this.
>>>
>>> The same does work for the event that does not involve any kafka events.
>>> But again i am not sending out extra jars there so nothing is replaced and
>>> we are using the default ones.
>>>
>>> If i do not use the userClassPathFirst which will force the service to
>>> use the newer version i will end up with the same problem
>>>
>>> We are using protobuf v3+ and as such we need to push that version since
>>> apache core uses an older version.
>>>
>>> So all we should really need is the following : --jars
>>> "protobuf-java-3.17.3.jar" \
>>> and here we need the userClassPathFirst=true in order to use the latest
>>> version.
>>>
>>>
>>> Using only this jar as it works on local or no jars defined we ended up
>>> with the following error.
>>>
>>> 21/08/31 10:53:40 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
>>> task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1):
>>> java.lang.ClassNotFoundException:
>>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>>
>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>>>
>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>
>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>
>>> at java.base/java.lang.Class.forName0(Native Method)
>>>
>>> at java.base/java.lang.Class.forName(Unknown Source)
>>>
>>> at
>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>>
>>>
>>>
>>>
>>> Which can be resolved with passing more jars.
>>>
>>>
>>> Any idea about this error ?
>>>
>>> K8 does not seem to like this, but Java Spring should be the one that is
>>> responsible for the version but it seems K8 does not like this versions.
>>>
>>> Perhaps miss configuration on K8 ?
>>>
>>> I haven't set that up so i am not aware of what was done there.
>>>
>>>
>>>
>>> For downgrading to java 8 on my K8 might not be so easy. I want to
>>> explore if there is something else before doing that as we will need to
>>> spin off new instances of K8 to check that.
>>>
>>>
>>>
>>> Thank you for the time taken
>>>
>>>
>>>
>>>
>>> On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski <ja...@japila.pl> wrote:
>>>
>>>> Hi Stelios,
>>>>
>>>> I've never seen this error before, but a couple of things caught
>>>> my attention that I would look at closer to chase the root cause of the
>>>> issue.
>>>>
>>>> "org.springframework.context.annotation.AnnotationConfigApplicationContext:"
>>>> and "21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>>> Application run failed" seem to indicate that you're using Spring Boot
>>>> (that I know almost nothing about so take the following with a pinch of
>>>> salt :))
>>>>
>>>> Spring Boot manages the classpath by itself and together with another
>>>> interesting option in your
>>>> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how
>>>> much this exception:
>>>>
>>>> > org.apache.spark.scheduler.ExternalClusterManager:
>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>> subtype
>>>>
>>>> could be due to casting compatible types from two different
>>>> classloaders?
>>>>
>>>> Just a thought but wanted to share as I think it's worth investigating.
>>>>
>>>> Pozdrawiam,
>>>> Jacek Laskowski
>>>> ----
>>>> https://about.me/JacekLaskowski
>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>
>>>> <https://twitter.com/jaceklaskowski>
>>>>
>>>>
>>>> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou <stevo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have been facing the current issue for some time now and I was
>>>>> wondering if someone might have some inside on how I can resolve the
>>>>> following.
>>>>>
>>>>> The code (java 11) is working correctly on my local machine but
>>>>> whenever I try to launch the following on K8 I am getting the following
>>>>> error.
>>>>>
>>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>>> initializing SparkContext.
>>>>>
>>>>> java.util.ServiceConfigurationError:
>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>> subtype
>>>>>
>>>>>
>>>>>
>>>>> I have a spark that will monitor some directories and handle the data
>>>>> accordingly.
>>>>>
>>>>> That part is working correctly on K8 and the SparkContext has no issue
>>>>> being initialized there.
>>>>>
>>>>>
>>>>> This is the spark-submit for that
>>>>>
>>>>>
>>>>> spark-submit \
>>>>> --master=k8s://https://url:port \
>>>>> --deploy-mode cluster \
>>>>> --name a-name\
>>>>> --conf spark.driver.userClassPathFirst=true  \
>>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \
>>>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>>>> --conf spark.kubernetes.namespace=spark \
>>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>>> --conf spark.dynamicAllocation.enabled=false \
>>>>> --driver-memory 525m --executor-memory 525m \
>>>>> --num-executors 1 --executor-cores 1 \
>>>>> target/SparkStream.jar continuous-merge
>>>>>
>>>>>
>>>>> My issue comes when I try to launch the service in order to listen to
>>>>> kafka events and store them in HDFS.
>>>>>
>>>>>
>>>>> spark-submit \
>>>>> --master=k8s://https://url:port \
>>>>> --deploy-mode cluster \
>>>>> --name consume-data \
>>>>> --conf spark.driver.userClassPathFirst=true  \
>>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path\
>>>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>>>> --jars 
>>>>> "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
>>>>>  \
>>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>>>> --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
>>>>> --conf spark.kubernetes.namespace=spark \
>>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>>> --conf spark.dynamicAllocation.enabled=false \
>>>>> --driver-memory 1g --executor-memory 1g \
>>>>> --num-executors 1 --executor-cores 1 \
>>>>> target/SparkStream.jar consume
>>>>>
>>>>>
>>>>> It could be that I am launching the application wrongly or perhaps
>>>>> that my K8 is not configured correctly ?
>>>>>
>>>>>
>>>>>
>>>>> I have stripped down my code and left it barebone and will end up with
>>>>> the following issue :
>>>>>
>>>>>
>>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>>> initializing SparkContext.
>>>>>
>>>>> java.util.ServiceConfigurationError:
>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>> subtype
>>>>>
>>>>> at java.base/java.util.ServiceLoader.fail(Unknown Source)
>>>>>
>>>>> at
>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown
>>>>> Source)
>>>>>
>>>>> at
>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown
>>>>> Source)
>>>>>
>>>>> at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source)
>>>>>
>>>>> at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source)
>>>>>
>>>>> at
>>>>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:
>>>>>
>>>>>
>>>>> 21/08/31 07:28:42 WARN  
>>>>> org.springframework.context.annotation.AnnotationConfigApplicationContext:
>>>>> Exception encountered during context initialization - cancelling refresh
>>>>> attempt: org.springframework.beans.factory.UnsatisfiedDependencyException:
>>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>>>>> expressed through field 'streamAllKafkaData'; nested exception is
>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>>> expressed through field 'javaSparkContext'; nested exception is
>>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>>> bean with name 'javaSparkContext' defined in class path resource
>>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>>>> factory method failed; nested exception is
>>>>> org.springframework.beans.BeanInstantiationException: Failed to 
>>>>> instantiate
>>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>>> 'javaSparkContext' threw exception; nested exception is
>>>>> java.util.ServiceConfigurationError:
>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>> subtype
>>>>>
>>>>> 21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>>>> Application run failed
>>>>>
>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException:
>>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>>>>> expressed through field 'streamAllKafkaData'; nested exception is
>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>>> expressed through field 'javaSparkContext'; nested exception is
>>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>>> bean with name 'javaSparkContext' defined in class path resource
>>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>>>> factory method failed; nested exception is
>>>>> org.springframework.beans.BeanInstantiationException: Failed to 
>>>>> instantiate
>>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>>> 'javaSparkContext' threw exception; nested exception is
>>>>> java.util.ServiceConfigurationError:
>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>> subtype
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> It could be that i am launching the application for Kafka wrongly with
>>>>> all the extra jars added ?
>>>>>
>>>>> Just that those seem to be needed or i am getting other errors when
>>>>> not including those.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Any help will be greatly appreciated.
>>>>>
>>>>>
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Stelios
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to