I've kubeflow spark-operator installed on K8s (GKE), and i'm running a
structured streaming job which reads data from kafka .. the job is run
every 10 mins.
It is giving an error shown below:
```
Traceback (most recent call last):
File "/opt/spark/custom-dir/main.py", line 356, in <module>
sys.exit(main())
File "/opt/spark/custom-dir/main.py", line 352, in main
ss.readData()
File "/opt/spark/custom-dir/main.py", line 327, in readData
query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp",
"topic").writeStream \
File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/streaming.py",
line 1491, in start
return self._sq(self._jwrite.start())
File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py",
line 1304, in __call__
return_value = get_return_value(
File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py",
line 111, in deco
return f(*a, **kw)
File "/usr/local/lib/python3.9/dist-packages/py4j/protocol.py", line
326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o103.start.
:
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
403 Forbidden
POST https://storage.googleapis.com/storage/v1/b?project=versa-kafka-poc
{
"code" : 403,
"errors" : [ {
"domain" : "global",
"message" :
"[email protected] does not
have storage.buckets.create access to the Google Cloud project.
Permission 'storage.buckets.create' denied on resource (or it may not
exist).",
"reason" : "forbidden"
} ],
"message" : "[email protected]
does not have storage.buckets.create access to the Google Cloud
project. Permission 'storage.buckets.create' denied on resource (or it
may not exist)."
}
```
Code reading data from kafka on GKE :
```
df_stream = self.spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location",
self.ssl_truststore_location) \
.option("kafka.ssl.truststore.password",
self.ssl_truststore_password) \
.option("kafka.ssl.keystore.location", self.ssl_keystore_location) \
.option("kafka.ssl.keystore.password", self.ssl_keystore_password) \
.option("kafka.bootstrap.servers", self.kafkaBrokers) \
.option("subscribePattern", "topic") \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.option("kafka.metadata.max.age.ms", "1000") \
.option("kafka.ssl.keystore.type", "PKCS12") \
.option("kafka.ssl.truststore.type", "PKCS12") \
.load()
logger.info(f" df_stream, calling convertToDictForEachBatch ->
{df_stream}")
# trigger once
query = df_stream.selectExpr("CAST(value AS STRING)",
"timestamp", "topic").writeStream \
.outputMode("append") \
.trigger(processingTime='10 minutes') \
.option("truncate", "false") \
.option("checkpointLocation", self.checkpoint) \
.foreachBatch(self.convertToDictForEachBatch) \
.start()
```
I'm unable to understand why error states - bucket.storage.create privilege
not there when the code is actually reading data from kafka
Any ideas on how to debug/fix ?
Anyone used the kubeflow spark-operator (v3.1.1) for streaming jobs on
kubernetes ?
tia!
here is stack overflow ticket ->
https://stackoverflow.com/questions/79044916/kubeflow-spark-operator-error-in-querying-strimzi-kafka-using-structured-strea