Steven Ottens created SPARK-45769: ------------------------------------- Summary: data retrieval fails on executors with spark connect Key: SPARK-45769 URL: https://issues.apache.org/jira/browse/SPARK-45769 Project: Spark Issue Type: Bug Components: Connect Affects Versions: 3.5.0 Reporter: Steven Ottens
We have an OpenShift cluster with Spark and JupyterHub and we use Spark-Connect to access Spark from within Jupyter. This worked fine with Spark 3.4.1. However after upgrading to Spark 3.5.0 we were not able to access any data in our Delta Tables through Spark. Initially I assumed it was a bug in Delta: [https://github.com/delta-io/delta/issues/2235] The actual error is {code:java} SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 13) (172.31.15.72 executor 4): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD{code} However after further investigation I discovered that this is a regression in Spark 3.5.0. The issue is similar to SPARK-36917, however I am not using any custom functions, nor any other classes than spark-connect, and this setup used to work in 3.4.1. The issue only occurs when remote executors are used in a kubernetes environment. Running a plain Spark-Connect eg {code:java} ./sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.0{code} doesn't produce the error. The issue occurs both in a full OpenShift cluster as in a tiny minikube setup. The steps to reproduce are based on the minikube setup. You need to have a minimal Spark 3.5.0 setup with 1 driver and at least 1 executor and use python to access data through Spark. The query I used to test this is {code:java} from pyspark.sql import SparkSession logFile = '/opt/spark/work-dir/data.csv' spark = SparkSession.builder.remote('sc://spark-connect').getOrCreate() df = spark.read.csv(logFile) df.count() {code} However it doesn't matter if the data is local, or remote on a S3 storage, nor if the data is plain text, CSV or Delta Table. h3. Steps to reproduce: # Install minikube # Create a service account 'spark' {code:java} kubectl create sa spark{code} # Bind the 'edit' role to the service account {code:java} kubectl create rolebinding spark-edit \ --clusterrole=edit \ --serviceaccount=default:spark \ --namespace=default{code} # Create a service for spark {code:java} kubectl create -f service.yml{code} # Create a Spark-Connect deployment with the default Spark docker image: [https://hub.docker.com/_/spark] (do change the deployment.yml to point to the kubernetes API endpoint {code:java} kubectl create -f deployment.yml{code} # Add data to both the executor and the driver pods, e.g. login on the terminal of the pods and run on both pods {code:java} touch data.csv echo id,name > data.csv echo 1,2 >> data.csv {code} # Start a spark-remote session to access the newly created data. I logged in on the driver pod and installed the necessary python packages: {code:java} python3 -m pip install pandas pyspark grpcio-tools grpcio-status pyarrow{code} Started a python shell and executed: {code:java} from pyspark.sql import SparkSession logFile = '/opt/spark/work-dir/data.csv' spark = SparkSession.builder.remote('sc://spark-connect').getOrCreate() df = spark.read.csv(logFile) df.count() {code} h3. Necessary files: Service.yml: {code:java} apiVersion: v1 kind: Service metadata: labels: app: spark-connect name: spark-connect namespace: default spec: ipFamilies: - IPv4 ports: - name: connect-grpc protocol: TCP port: 15002 # Port the service listens on. targetPort: 15002 # Port on the backing pods to which the service forwards connections - name: sparkui protocol: TCP port: 4040 # Port the service listens on. targetPort: 4040 # Port on the backing pods to which the service forwards connections - name: spark-rpc protocol: TCP port: 7078 # Port the service listens on. targetPort: 7078 # Port on the backing pods to which the service forwards connections - name: blockmanager protocol: TCP port: 7079 # Port the service listens on. targetPort: 7079 # Port on the backing pods to which the service forwards connections internalTrafficPolicy: Cluster type: ClusterIP ipFamilyPolicy: SingleStack sessionAffinity: None selector: app: spark-connect {code} deployment.yml: (do replace the spark.master URL with the correct one for your setup) {code:java} kind: Deployment apiVersion: apps/v1 metadata: name: spark-connect namespace: default uid: 3a1b448e-4594-47a9-95f6-a82ea4ac9341 resourceVersion: '6107' generation: 23 creationTimestamp: '2023-10-31T13:35:46Z' labels: k8s-app: spark-connect spec: replicas: 1 selector: matchLabels: k8s-app: spark-connect template: metadata: name: spark-connect creationTimestamp: null labels: k8s-app: spark-connect spec: serviceAccount: spark containers: - name: spark-connect image: spark command: - /opt/entrypoint.sh - driver args: - '--class' - org.apache.spark.sql.connect.service.SparkConnectServer - '--name' - spark-connect - '--conf' - spark.driver.blockManager.port=7079 - '--conf' - spark.driver.port=7078 - '--conf' - spark.driver.host=spark-connect - '--conf' - spark.master=k8s://https://<Kubernetes API Address eg https://192.168.49.2:8443> - '--conf' - spark.kubernetes.namespace=default - '--conf' - spark.kubernetes.container.image=spark:latest - '--conf' - spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp - '--conf' - spark.driver.extraJavaOptions=-Divy.home=/tmp - '--conf' - spark.kubernetes.driver.label.app.kubernetes.io/part-of=spark-connect - '--conf' - spark.kubernetes.executor.label.app=spark-connect - '--conf' - spark.executor.memory=1g - '--conf' - spark.executor.cores=1 - '--conf' - spark.executor.instances=1 - '--conf' - spark.kubernetes.executor.podNamePrefix=spark-connect - '--packages' - org.apache.spark:spark-connect_2.12:3.5.0 env: - name: SPARK_DRIVER_BIND_ADDRESS valueFrom: fieldRef: apiVersion: v1 fieldPath: status.podIP resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File imagePullPolicy: Always securityContext: privileged: true restartPolicy: Always terminationGracePeriodSeconds: 30 dnsPolicy: ClusterFirst securityContext: {} schedulerName: default-scheduler strategy: type: RollingUpdate rollingUpdate: maxUnavailable: 25% maxSurge: 25% revisionHistoryLimit: 10 progressDeadlineSeconds: 600 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org