Hello Jacek, Yes this is a spark-streaming. I have removed all code and created a new project with just the base code that is enough to open a stream and loop over it to see what i am doing wrong.
Not adding the packages would result me in the following error 21/09/06 08:10:41 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1): java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.KafkaRDDPartition at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) Which should not really be the case cause this should be included in the kubernetes pod. Anyway I can confirm this ? So my simple class is as follow : streamingContext = new JavaStreamingContext(javaSparkContext, Durations.seconds(5)); stream = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafkaConfiguration)); stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, byte[]>>>) rdd -> { try { rdd.foreachPartition(partition -> { while (partition.hasNext()) { ConsumerRecord<String, byte[]> consumerRecord = partition.next(); LOGGER.info("WORKING " + consumerRecord.topic() +consumerRecord.partition() + ": "+consumerRecord.offset()); } }); } catch (Exception e) { e.printStackTrace(); } }); streamingContext.start(); try { streamingContext.awaitTermination(); } catch (InterruptedException e) { e.printStackTrace(); } finally { streamingContext.stop(); javaSparkContext.stop(); } This is all there is too the class which is a java boot @Component. Now in order my pom is as such <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.kafka</groupId> <artifactId>SimpleKafkaStream</artifactId> <version>1.0</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <start-class>com.kafka.Main</start-class> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.2</version> <relativePath/> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.1.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.1.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.1.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project> a simple pom that even the spark-streaming-kafka-0-10_2.12 scope is provided or not it would stilly give the same error. I have tried to build an uber jar in order to test with that but i was still unable to make it work as such : <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <fork>true</fork> <mainClass>com.kafka.Main</mainClass> </configuration> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.2.0</version> <configuration> <descriptorRefs> <descriptorRef>dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <addClasspath>true</addClasspath> <mainClass>com.kafka.Main</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> I am open to any suggestions and implementations in why this is not working and what needs to be done. Thank you for your time, Stelios On Sun, 5 Sept 2021 at 16:56, Jacek Laskowski <ja...@japila.pl> wrote: > Hi, > > No idea still, but noticed > "org.apache.spark.streaming.kafka010.KafkaRDDPartition" and "--jars > "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar" > \" that bothers me quite a lot. > > First of all, it's a Spark Streaming (not Structured Streaming) app. > Correct? Please upgrade at your earliest convenience since it's no longer > in active development (if supported at all). > > Secondly, why are these jars listed explicitly since they're part of > Spark? You should not really be doing such risky config changes (unless > you've got no other choice and you know what you're doing). > > Pozdrawiam, > Jacek Laskowski > ---- > https://about.me/JacekLaskowski > "The Internals Of" Online Books <https://books.japila.pl/> > Follow me on https://twitter.com/jaceklaskowski > > <https://twitter.com/jaceklaskowski> > > > On Tue, Aug 31, 2021 at 1:00 PM Stelios Philippou <stevo...@gmail.com> > wrote: > >> Yes you are right. >> I am using Spring Boot for this. >> >> The same does work for the event that does not involve any kafka events. >> But again i am not sending out extra jars there so nothing is replaced and >> we are using the default ones. >> >> If i do not use the userClassPathFirst which will force the service to >> use the newer version i will end up with the same problem >> >> We are using protobuf v3+ and as such we need to push that version since >> apache core uses an older version. >> >> So all we should really need is the following : --jars >> "protobuf-java-3.17.3.jar" \ >> and here we need the userClassPathFirst=true in order to use the latest >> version. >> >> >> Using only this jar as it works on local or no jars defined we ended up >> with the following error. >> >> 21/08/31 10:53:40 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1): >> java.lang.ClassNotFoundException: >> org.apache.spark.streaming.kafka010.KafkaRDDPartition >> >> at java.base/java.net.URLClassLoader.findClass(Unknown Source) >> >> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >> >> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >> >> at java.base/java.lang.Class.forName0(Native Method) >> >> at java.base/java.lang.Class.forName(Unknown Source) >> >> at >> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) >> >> >> >> >> Which can be resolved with passing more jars. >> >> >> Any idea about this error ? >> >> K8 does not seem to like this, but Java Spring should be the one that is >> responsible for the version but it seems K8 does not like this versions. >> >> Perhaps miss configuration on K8 ? >> >> I haven't set that up so i am not aware of what was done there. >> >> >> >> For downgrading to java 8 on my K8 might not be so easy. I want to >> explore if there is something else before doing that as we will need to >> spin off new instances of K8 to check that. >> >> >> >> Thank you for the time taken >> >> >> >> >> On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski <ja...@japila.pl> wrote: >> >>> Hi Stelios, >>> >>> I've never seen this error before, but a couple of things caught >>> my attention that I would look at closer to chase the root cause of the >>> issue. >>> >>> "org.springframework.context.annotation.AnnotationConfigApplicationContext:" >>> and "21/08/31 07:28:42 ERROR org.springframework.boot.SpringApplication: >>> Application run failed" seem to indicate that you're using Spring Boot >>> (that I know almost nothing about so take the following with a pinch of >>> salt :)) >>> >>> Spring Boot manages the classpath by itself and together with another >>> interesting option in your >>> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how >>> much this exception: >>> >>> > org.apache.spark.scheduler.ExternalClusterManager: >>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a >>> subtype >>> >>> could be due to casting compatible types from two different classloaders? >>> >>> Just a thought but wanted to share as I think it's worth investigating. >>> >>> Pozdrawiam, >>> Jacek Laskowski >>> ---- >>> https://about.me/JacekLaskowski >>> "The Internals Of" Online Books <https://books.japila.pl/> >>> Follow me on https://twitter.com/jaceklaskowski >>> >>> <https://twitter.com/jaceklaskowski> >>> >>> >>> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou <stevo...@gmail.com> >>> wrote: >>> >>>> Hello, >>>> >>>> I have been facing the current issue for some time now and I was >>>> wondering if someone might have some inside on how I can resolve the >>>> following. >>>> >>>> The code (java 11) is working correctly on my local machine but >>>> whenever I try to launch the following on K8 I am getting the following >>>> error. >>>> >>>> 21/08/31 07:28:42 ERROR org.apache.spark.SparkContext: Error >>>> initializing SparkContext. >>>> >>>> java.util.ServiceConfigurationError: >>>> org.apache.spark.scheduler.ExternalClusterManager: >>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a >>>> subtype >>>> >>>> >>>> >>>> I have a spark that will monitor some directories and handle the data >>>> accordingly. >>>> >>>> That part is working correctly on K8 and the SparkContext has no issue >>>> being initialized there. >>>> >>>> >>>> This is the spark-submit for that >>>> >>>> >>>> spark-submit \ >>>> --master=k8s://https://url:port \ >>>> --deploy-mode cluster \ >>>> --name a-name\ >>>> --conf spark.driver.userClassPathFirst=true \ >>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \ >>>> --files "application-dev.properties,keystore.jks,truststore.jks" \ >>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \ >>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ >>>> --conf spark.kubernetes.namespace=spark \ >>>> --conf spark.kubernetes.container.image.pullPolicy=Always \ >>>> --conf spark.dynamicAllocation.enabled=false \ >>>> --driver-memory 525m --executor-memory 525m \ >>>> --num-executors 1 --executor-cores 1 \ >>>> target/SparkStream.jar continuous-merge >>>> >>>> >>>> My issue comes when I try to launch the service in order to listen to >>>> kafka events and store them in HDFS. >>>> >>>> >>>> spark-submit \ >>>> --master=k8s://https://url:port \ >>>> --deploy-mode cluster \ >>>> --name consume-data \ >>>> --conf spark.driver.userClassPathFirst=true \ >>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path\ >>>> --files "application-dev.properties,keystore.jks,truststore.jks" \ >>>> --jars >>>> "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar" >>>> \ >>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \ >>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ >>>> --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \ >>>> --conf spark.kubernetes.namespace=spark \ >>>> --conf spark.kubernetes.container.image.pullPolicy=Always \ >>>> --conf spark.dynamicAllocation.enabled=false \ >>>> --driver-memory 1g --executor-memory 1g \ >>>> --num-executors 1 --executor-cores 1 \ >>>> target/SparkStream.jar consume >>>> >>>> >>>> It could be that I am launching the application wrongly or perhaps that >>>> my K8 is not configured correctly ? >>>> >>>> >>>> >>>> I have stripped down my code and left it barebone and will end up with >>>> the following issue : >>>> >>>> >>>> 21/08/31 07:28:42 ERROR org.apache.spark.SparkContext: Error >>>> initializing SparkContext. >>>> >>>> java.util.ServiceConfigurationError: >>>> org.apache.spark.scheduler.ExternalClusterManager: >>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a >>>> subtype >>>> >>>> at java.base/java.util.ServiceLoader.fail(Unknown Source) >>>> >>>> at >>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown >>>> Source) >>>> >>>> at >>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown >>>> Source) >>>> >>>> at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source) >>>> >>>> at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source) >>>> >>>> at >>>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala: >>>> >>>> >>>> 21/08/31 07:28:42 WARN >>>> org.springframework.context.annotation.AnnotationConfigApplicationContext: >>>> Exception encountered during context initialization - cancelling refresh >>>> attempt: org.springframework.beans.factory.UnsatisfiedDependencyException: >>>> Error creating bean with name 'mainApplication': Unsatisfied dependency >>>> expressed through field 'streamAllKafkaData'; nested exception is >>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error >>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency >>>> expressed through field 'javaSparkContext'; nested exception is >>>> org.springframework.beans.factory.BeanCreationException: Error creating >>>> bean with name 'javaSparkContext' defined in class path resource >>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via >>>> factory method failed; nested exception is >>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate >>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method >>>> 'javaSparkContext' threw exception; nested exception is >>>> java.util.ServiceConfigurationError: >>>> org.apache.spark.scheduler.ExternalClusterManager: >>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a >>>> subtype >>>> >>>> 21/08/31 07:28:42 ERROR org.springframework.boot.SpringApplication: >>>> Application run failed >>>> >>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error >>>> creating bean with name 'mainApplication': Unsatisfied dependency expressed >>>> through field 'streamAllKafkaData'; nested exception is >>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error >>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency >>>> expressed through field 'javaSparkContext'; nested exception is >>>> org.springframework.beans.factory.BeanCreationException: Error creating >>>> bean with name 'javaSparkContext' defined in class path resource >>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via >>>> factory method failed; nested exception is >>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate >>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method >>>> 'javaSparkContext' threw exception; nested exception is >>>> java.util.ServiceConfigurationError: >>>> org.apache.spark.scheduler.ExternalClusterManager: >>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a >>>> subtype >>>> >>>> >>>> >>>> >>>> It could be that i am launching the application for Kafka wrongly with >>>> all the extra jars added ? >>>> >>>> Just that those seem to be needed or i am getting other errors when not >>>> including those. >>>> >>>> >>>> >>>> >>>> Any help will be greatly appreciated. >>>> >>>> >>>> >>>> Cheers, >>>> >>>> Stelios >>>> >>>> >>>> >>>>