Hi Mitch, All - thnx, i was able to resolve this using the command below :
--- gcloud dataproc jobs submit pyspark /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2.py --cluster dataproc-ss-poc --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --region us-central1 ---- On Wed, Feb 2, 2022 at 1:25 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > The current Spark version on GCP is 3.1.2. > > Try using this jar file instead > > spark-sql-kafka-0-10_2.12-3.0.1.jar > > > HTH > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Wed, 2 Feb 2022 at 06:51, karan alang <karan.al...@gmail.com> wrote: > >> Hello All, >> >> I'm running a simple Structured Streaming on GCP, which reads data from >> Kafka and prints onto console. >> >> Command : >> >> cloud dataproc jobs submit pyspark >> /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb1.py >> --cluster dataproc-ss-poc --jars >> gs://spark-jars-karan/spark-sql-kafka-0-10_2.12-3.1.2.jar >> gs://spark-jars-karan/spark-core_2.12-3.1.2.jar --region us-central1 >> >> I'm getting error : >> >> File >> "/tmp/01c16a55009a42a0a29da6dde9aae4d5/StructuredStreaming_Kafka_GCP-Batch-feb1.py", >> line 49, in <module> >> >> df = spark.read.format('kafka')\ >> >> File >> "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line >> 210, in load >> >> File >> "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line >> 1304, in __call__ >> >> File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", >> line 111, in deco >> >> File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", >> line 326, in get_return_value >> >> py4j.protocol.Py4JJavaError: An error occurred while calling o69.load. >> >> : java.lang.NoClassDefFoundError: >> org/apache/kafka/common/serialization/ByteArraySerializer >> >> at >> org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:599) >> >> at >> org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala) >> >> at org.apache.spark.sql.kafka010.KafkaSourceProvider.org >> $apache$spark$sql$kafka010$KafkaSourceProvider$$validateBatchOptions(KafkaSourceProvider.scala:348) >> >> at >> org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:128) >> >> at >> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355) >> >> at >> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325) >> >> at >> org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307) >> >> at scala.Option.getOrElse(Option.scala:189) >> >> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307) >> >> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225) >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> at java.lang.reflect.Method.invoke(Method.java:498) >> >> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >> >> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >> >> at py4j.Gateway.invoke(Gateway.java:282) >> >> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >> >> at py4j.commands.CallCommand.execute(CallCommand.java:79) >> >> at py4j.GatewayConnection.run(GatewayConnection.java:238) >> >> at java.lang.Thread.run(Thread.java:748) >> >> Caused by: java.lang.ClassNotFoundException: >> org.apache.kafka.common.serialization.ByteArraySerializer >> >> at java.net.URLClassLoader.findClass(URLClassLoader.java:387) >> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:418) >> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:351) >> >> Additional details are in stackoverflow - >> >> >> https://stackoverflow.com/questions/70951195/gcp-dataproc-java-lang-noclassdeffounderror-org-apache-kafka-common-serializa >> >> Do we need to pass any other jar ? >> What needs to be done to debug/fix this ? >> >> tia ! >> >> >>