Hi Mitch, All -

thnx, i was able to resolve this using the command below  :

---
gcloud dataproc jobs submit pyspark
/Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2.py
 --cluster dataproc-ss-poc  --properties
spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2
--region us-central1
----


On Wed, Feb 2, 2022 at 1:25 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> The current Spark version on GCP is 3.1.2.
>
> Try using this jar file instead
>
> spark-sql-kafka-0-10_2.12-3.0.1.jar
>
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 2 Feb 2022 at 06:51, karan alang <karan.al...@gmail.com> wrote:
>
>> Hello All,
>>
>> I'm running a simple Structured Streaming on GCP, which reads data from
>> Kafka and prints onto console.
>>
>> Command :
>>
>> cloud dataproc jobs submit pyspark     
>> /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb1.py
>>     --cluster dataproc-ss-poc      --jars
>> gs://spark-jars-karan/spark-sql-kafka-0-10_2.12-3.1.2.jar
>> gs://spark-jars-karan/spark-core_2.12-3.1.2.jar     --region us-central1
>>
>> I'm getting error :
>>
>> File
>> "/tmp/01c16a55009a42a0a29da6dde9aae4d5/StructuredStreaming_Kafka_GCP-Batch-feb1.py",
>> line 49, in <module>
>>
>>     df = spark.read.format('kafka')\
>>
>>   File
>> "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line
>> 210, in load
>>
>>   File
>> "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
>> 1304, in __call__
>>
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
>> line 111, in deco
>>
>>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
>> line 326, in get_return_value
>>
>> py4j.protocol.Py4JJavaError: An error occurred while calling o69.load.
>>
>> : java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/serialization/ByteArraySerializer
>>
>> at
>> org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:599)
>>
>> at
>> org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)
>>
>> at org.apache.spark.sql.kafka010.KafkaSourceProvider.org
>> $apache$spark$sql$kafka010$KafkaSourceProvider$$validateBatchOptions(KafkaSourceProvider.scala:348)
>>
>> at
>> org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:128)
>>
>> at
>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
>>
>> at
>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
>>
>> at
>> org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
>>
>> at scala.Option.getOrElse(Option.scala:189)
>>
>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
>>
>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>
>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>
>> at py4j.Gateway.invoke(Gateway.java:282)
>>
>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>
>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>
>> at py4j.GatewayConnection.run(GatewayConnection.java:238)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.kafka.common.serialization.ByteArraySerializer
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>
>> Additional details are in stackoverflow -
>>
>>
>> https://stackoverflow.com/questions/70951195/gcp-dataproc-java-lang-noclassdeffounderror-org-apache-kafka-common-serializa
>>
>> Do we need to pass any other jar ?
>> What needs to be done to debug/fix this ?
>>
>> tia !
>>
>>
>>

Reply via email to