Hello All,

I'm running a simple Structured Streaming on GCP, which reads data from
Kafka and prints onto console.

Command :

cloud dataproc jobs submit pyspark
/Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb1.py
    --cluster dataproc-ss-poc      --jars
gs://spark-jars-karan/spark-sql-kafka-0-10_2.12-3.1.2.jar
gs://spark-jars-karan/spark-core_2.12-3.1.2.jar     --region us-central1

I'm getting error :

File
"/tmp/01c16a55009a42a0a29da6dde9aae4d5/StructuredStreaming_Kafka_GCP-Batch-feb1.py",
line 49, in <module>

    df = spark.read.format('kafka')\

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
line 210, in load

  File
"/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
1304, in __call__

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
111, in deco

  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
line 326, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o69.load.

: java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArraySerializer

at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:599)

at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)

at org.apache.spark.sql.kafka010.KafkaSourceProvider.org
$apache$spark$sql$kafka010$KafkaSourceProvider$$validateBatchOptions(KafkaSourceProvider.scala:348)

at
org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:128)

at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)

at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)

at
org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)

at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at py4j.Gateway.invoke(Gateway.java:282)

at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.serialization.ByteArraySerializer

at java.net.URLClassLoader.findClass(URLClassLoader.java:387)

at java.lang.ClassLoader.loadClass(ClassLoader.java:418)

at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

Additional details are in stackoverflow -

https://stackoverflow.com/questions/70951195/gcp-dataproc-java-lang-noclassdeffounderror-org-apache-kafka-common-serializa

Do we need to pass any other jar ?
What needs to be done to debug/fix this ?

tia !

Reply via email to