Thanks Nimrod !
yes the checkpoint is in GCS and as you pointed out the service account
being used did not have access to the GCS storage bucket, i was able to add
the permissions and GCS bucket is accessible now ..
however, here is another issue -I've mounted a secret 'spark-gcs-creds'
which contains the key to the service account 'spark' being used by the
application .. however the secret is not getting mounted.
here is the error :
```
24/10/05 20:10:36 INFO BasicExecutorFeatureStep: Decommissioning not
enabled, skipping shutdown script
24/10/05 20:10:38 WARN FileSystem: Failed to initialize fileystem
gs://vkp-spark-history-server/spark-events: java.io.FileNotFoundException:
/mnt/secrets/spark-gcs-key.json (No such file or directory)
24/10/05 20:10:38 ERROR SparkContext: Error initializing SparkContext.
java.io.FileNotFoundException: /mnt/secrets/spark-gcs-key.json (No such
file or directory)
at java.base/java.io.FileInputStream.open0(Native Method)
at java.base/java.io.FileInputStream.open(Unknown Source)
at java.base/java.io.FileInputStream.<init>(Unknown Source)
at java.base/java.io.FileInputStream.<init>(Unknown Source)
at
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromJsonKeyFile(CredentialFactory.java:297)
at
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredential(CredentialFactory.java:414)
at
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getCredential(GoogleHadoopFileSystemBase.java:1479)
at
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.createGcsFs(GoogleHadoopFileSystemBase.java:1638)
at
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1620)
at
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:507)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
```
here is the job yaml ->
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: structured-streaming-350-{{ now | unixEpoch }}
namespace: so350
spec:
type: Python
mode: cluster
sparkVersion: "3.5.0"
image: "
us-east1-docker.pkg.dev/versa-kafka-poc/spark-job-repo/ss-main-so350:3.5.0"
imagePullPolicy: Always
imagePullSecrets:
- gcr-json-key
mainApplicationFile: "local:///opt/spark/custom-dir/main.py"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: {{ .Values.driver.cores }}
coreLimit: "{{ .Values.driver.coreLimit }}"
memory: "{{ .Values.driver.memory }}"
labels:
version: 3.5.0
serviceAccount: spark
securityContext:
runAsUser: 185 # UID for spark user
volumeMounts:
- name: gcs-key
mountPath: /mnt/secrets
readOnly: true
initContainers:
- name: init1
image: busybox:1.28
command: ['sh', '-c', "until nslookup myservice.$(cat /var/run/secrets/
kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting
for myservice; sleep 2; done"]
- name: init2
image: busybox
command:
- /bin/sh
- "-c"
- |
echo "Checking mounted secrets";
ls -l /mnt/secrets;
cat /mnt/secrets/spark-gcs-key.json;
sleep 180; # Keep the init container running for inspection
executor:
cores: {{ .Values.executor.cores }}
instances: {{ .Values.executor.instances }}
memory: "{{ .Values.executor.memory }}"
labels:
version: "3.5.0"
securityContext:
runAsUser: 185 # UID for spark user
volumeMounts:
- name: gcs-key
mountPath: /mnt/secrets
readOnly: true
volumes:
- name: gcs-key
secret:
secretName: spark-gcs-creds
deps:
jars:
- local:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar
- local:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar
- local:///opt/spark/other-jars/bson-4.0.5.jar
- local:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar
- local:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar
- local:///opt/spark/other-jars/spark-sql-kafka-0-10_2.12-3.5.0.jar
-
local:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar
-
local:///opt/spark/other-jars/spark-token-provider-kafka-0-10_2.12-3.5.0.jar
- local:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar
- local:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar
- local:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar
- local:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar
- local:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar
- local:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar
- local:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar
- local:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar
- local:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar
pyFiles:
- local:///opt/spark/zips/streams.zip
- local:///opt/spark/zips/utils.zip
files:
- local:///opt/spark/certs/syslog-vani-prefix.p12
- local:///opt/spark/certs/versa-alarmblock-test-user.p12
- local:///opt/spark/certs/versa-appstat-test-user.p12
- local:///opt/spark/certs/versa-bandwidth-test-user.p12
- local:///opt/spark/certs/intfutil-user-test.p12
- local:///opt/spark/certs/alarm-compression-user-test.p12
- local:///opt/spark/certs/alarmblock-user-test.p12
- local:///opt/spark/certs/appstat-agg-user-test.p12
- local:///opt/spark/certs/appstat-anomaly-user-test.p12
- local:///opt/spark/certs/appstats-user-test.p12
- local:///opt/spark/certs/insights-user-test.p12
- local:///opt/spark/cfg/params.cfg
- local:///opt/spark/cfg/params_password.cfg
pythonVersion: "{{ .Values.pythonVersion }}"
sparkConf:
"spark.hadoop.google.cloud.project.id": "versa-kafka-poc"
"spark.hadoop.fs.gs.project.id": "versa-kafka-poc"
"spark.kafka.ssl.keystore.location":
"/opt/spark/certs/syslog-vani-prefix.p12"
"spark.kafka.ssl.truststore.location":
"/opt/spark/certs/versa-kafka-poc-tf-ca.p12"
"spark.kubernetes.namespace": "so350"
"spark.kubernetes.authenticate.driver.serviceAccountName": "spark"
"spark.kubernetes.container.image": "{{ .Values.image }}"
"spark.kubernetes.driver.container.image": "{{ .Values.image }}"
"spark.kubernetes.executor.container.image": "{{ .Values.image }}"
"spark.hadoop.fs.gs.impl":
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
"spark.hadoop.fs.AbstractFileSystem.gs.impl":
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
"spark.hadoop.google.cloud.auth.service.account.enable": "true"
"spark.hadoop.google.cloud.auth.service.account.json.keyfile":
"/mnt/secrets/spark-gcs-key.json"
"spark.eventLog.enabled": "true"
"spark.eventLog.dir": "{{ .Values.sparkEventLogDir }}"
"spark.hadoop.fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
"spark.dynamicAllocation.enabled": "false"
"spark.dynamicAllocation.executorIdleTimeout": "120s"
"spark.shuffle.service.enabled": "false"
"spark.kubernetes.executor.deleteOnTermination": "false"
"spark.jars":
"file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar"
"spark.driver.extraClassPath":
"file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar"
"spark.executor.extraClassPath":
"file:///opt/spark/other-jars/gcs-connector-hadoop3-latest.jar,file:///opt/spark/other-jars/mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/bson-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/mongodb-driver-core-4.0.5.jar,file:///opt/spark/other-jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.mongodb.spark_mongo-spark-connector_2.12-3.0.2.jar,file:///opt/spark/other-jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///opt/spark/other-jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///opt/spark/other-jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///opt/spark/other-jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///opt/spark/other-jars/org.lz4_lz4-java-1.7.1.jar,file:///opt/spark/other-jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///opt/spark/other-jars/org.slf4j_slf4j-api-1.7.30.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-sync-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_bson-4.0.5.jar,file:///opt/spark/other-jars/org.mongodb_mongodb-driver-core-4.0.5.jar"
"spark.submit.pyFiles":
"file:///opt/spark/zips/streams.zip,file:///opt/spark/zips/utils.zip"
hadoopConf:
"fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
"fs.AbstractFileSystem.gs.impl":
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
"google.cloud.auth.service.account.enable": "true"
"google.cloud.auth.service.account.json.keyfile":
"/mnt/secrets/spark-gcs-key.json"
arguments:
- "--isdebug={{ .Values.isdebug }}"
- "--istest={{ .Values.isdebug }}"
here is snapshot of the secret :
```
(base) Karans-MacBook-Pro:spark-k8s-operator karanalang$ kc get secret
spark-gcs-creds -n so350 -o yaml
apiVersion: v1
data:
spark-gcs-key.json:
<--- KEY --->
kind: Secret
metadata:
creationTimestamp: "2024-10-04T21:54:06Z"
name: spark-gcs-creds
namespace: so350
resourceVersion: "180991552"
uid: ac30c575-9abf-4a77-ba90-15576607c97f
type: Opaque
```
Any ideas on how to debug/fix this ?
tia!
On Thu, Oct 3, 2024 at 12:37 AM Nimrod Ofek <[email protected]> wrote:
> Where is the checkpoint location? Not in GCS?
> Probably the location of the checkpoint is there- and you don't have
> permissions for that...
>
> בתאריך יום ה׳, 3 באוק׳ 2024, 02:43, מאת karan alang <
> [email protected]>:
>
>> This seems to be the cause of this ->
>> github.com/kubeflow/spark-operator/issues/1619 .. the secret is not
>> getting mounted die to this error -> MountVolume.SetUp failed for volume
>> “spark-conf-volume-driver
>>
>> I'm getting same error in event logs, and the secret mounted is not
>> getting read
>>
>> If anyone is using spark-operator, and run into this issue .. pls let me
>> know.
>> Alternatively, is there another operator that can be used ?
>>
>> thanks!
>>
>>
>> On Tue, Oct 1, 2024 at 3:41 PM karan alang <[email protected]> wrote:
>>
>>> I've kubeflow spark-operator installed on K8s (GKE), and i'm running a
>>> structured streaming job which reads data from kafka .. the job is run
>>> every 10 mins.
>>>
>>> It is giving an error shown below:
>>>
>>> ```
>>>
>>> Traceback (most recent call last):
>>> File "/opt/spark/custom-dir/main.py", line 356, in <module>
>>> sys.exit(main())
>>> File "/opt/spark/custom-dir/main.py", line 352, in main
>>> ss.readData()
>>> File "/opt/spark/custom-dir/main.py", line 327, in readData
>>> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp",
>>> "topic").writeStream \
>>> File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/streaming.py",
>>> line 1491, in start
>>> return self._sq(self._jwrite.start())
>>> File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", line
>>> 1304, in __call__
>>> return_value = get_return_value(
>>> File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py", line
>>> 111, in deco
>>> return f(*a, **kw)
>>> File "/usr/local/lib/python3.9/dist-packages/py4j/protocol.py", line 326,
>>> in get_return_value
>>> raise Py4JJavaError(
>>> py4j.protocol.Py4JJavaError: An error occurred while calling o103.start.
>>> :
>>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
>>> 403 Forbidden
>>> POST https://storage.googleapis.com/storage/v1/b?project=versa-kafka-poc
>>> {
>>> "code" : 403,
>>> "errors" : [ {
>>> "domain" : "global",
>>> "message" : "[email protected]
>>> does not have storage.buckets.create access to the Google Cloud project.
>>> Permission 'storage.buckets.create' denied on resource (or it may not
>>> exist).",
>>> "reason" : "forbidden"
>>> } ],
>>> "message" : "[email protected]
>>> does not have storage.buckets.create access to the Google Cloud project.
>>> Permission 'storage.buckets.create' denied on resource (or it may not
>>> exist)."
>>> }
>>>
>>>
>>> ```
>>>
>>> Code reading data from kafka on GKE :
>>>
>>>
>>> ```
>>>
>>> df_stream = self.spark.readStream.format('kafka') \
>>> .option("kafka.security.protocol", "SSL") \
>>> .option("kafka.ssl.truststore.location",
>>> self.ssl_truststore_location) \
>>> .option("kafka.ssl.truststore.password",
>>> self.ssl_truststore_password) \
>>> .option("kafka.ssl.keystore.location",
>>> self.ssl_keystore_location) \
>>> .option("kafka.ssl.keystore.password",
>>> self.ssl_keystore_password) \
>>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>>> .option("subscribePattern", "topic") \
>>> .option("startingOffsets", "latest") \
>>> .option("failOnDataLoss", "false") \
>>> .option("kafka.metadata.max.age.ms", "1000") \
>>> .option("kafka.ssl.keystore.type", "PKCS12") \
>>> .option("kafka.ssl.truststore.type", "PKCS12") \
>>> .load()
>>>
>>> logger.info(f" df_stream, calling convertToDictForEachBatch ->
>>> {df_stream}")
>>> # trigger once
>>>
>>> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp",
>>> "topic").writeStream \
>>> .outputMode("append") \
>>> .trigger(processingTime='10 minutes') \
>>> .option("truncate", "false") \
>>> .option("checkpointLocation", self.checkpoint) \
>>> .foreachBatch(self.convertToDictForEachBatch) \
>>> .start()
>>>
>>> ```
>>>
>>> I'm unable to understand why error states - bucket.storage.create
>>> privilege not there when the code is actually reading data from kafka
>>>
>>> Any ideas on how to debug/fix ?
>>>
>>> Anyone used the kubeflow spark-operator (v3.1.1) for streaming jobs on
>>> kubernetes ?
>>>
>>> tia!
>>>
>>> here is stack overflow ticket ->
>>>
>>>
>>> https://stackoverflow.com/questions/79044916/kubeflow-spark-operator-error-in-querying-strimzi-kafka-using-structured-strea
>>>
>>>
>>>
>>>
>>>