[
https://issues.apache.org/jira/browse/SPARK-52965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18059932#comment-18059932
]
Giambattista Bloisi commented on SPARK-52965:
---------------------------------------------
I came across the same problem with a spark connect using 4.0.2 and deployed on
kubernetes.
It looks like it is the same problem as reported here [INTERNAL: Encountered
end-of-stream mid-frame #12412|https://github.com/grpc/grpc-java/issues/12412]
and I can confirm it was fixed in my case by upgrading to 4.1.1, which has
updated grpc and netty libraries.
> [SPARK-CONNECT] [SPARK-4.0] Encountered end-of-stream mid-frame on Complex
> Schema Transformation
> ------------------------------------------------------------------------------------------------
>
> Key: SPARK-52965
> URL: https://issues.apache.org/jira/browse/SPARK-52965
> Project: Spark
> Issue Type: Bug
> Components: Connect, Spark Core
> Affects Versions: 4.0.0
> Environment: Environment * {*}Spark version{*}: 4.0.0 (from
> {{apache/spark:4.0.0-scala2.13-java17-python3-r-ubuntu}} image)
> * {*}Java version{*}: 17
> * {*}Scala version{*}: 2.13.x
> * {*}Platform{*}: Originally observed on AWS EKS 1.30, but fully
> reproducible locally with Docker.
> * {*}Node Architecture{*}: {{amd64}} / {{x86_64}}
> * {*}Node OS/Kernel (original){*}: {{Linux 5.10.225-213.878.amzn2.x86_64}}
> * {*}Client{*}: PySpark 4.0.0
> Reporter: Manish Kumar Jagnani
> Priority: Major
>
> A gRPC connection failure occurs when executing a PySpark DataFrame action
> that involves a complex, dynamically generated schema transformation. The
> Spark Connect server terminates the connection, leading to an {{Encountered
> end-of-stream mid-frame}} error.
> The issue is reproducible in a minimal Docker environment and persists even
> after eliminating network factors and increasing gRPC message size limits,
> indicating a potential core issue rather than a configuration problem.
> Steps to Reproduce
> This issue can be fully reproduced locally using the following Docker setup.
> *1. Build the Docker Image*
> Use the following {{Dockerfile}} to create the test environment.
> {code:java}
> # Dockerfile
> FROM apache/spark:4.0.0-scala2.13-java17-python3-r-ubuntu
> USER root
> # Ensure Python and pip are installed
> RUN apt-get update && \
> apt-get install -y --no-install-recommends python3-pip && \
> rm -rf /var/lib/apt/lists/*
> USER ${spark_uid} {code}
> Build the image:
>
> {code:java}
> docker build -t spark-connect-repro . {code}
> *2. Run the Spark Connect Server*
> Start the Spark Connect server in a container. Note that we are including the
> {{maxInboundMessageSize}} configuration to demonstrate that it does not solve
> the problem.
> {code:java}
> docker run -d --name spark-connect-server -p 15002:15002 spark-connect-repro \
> /opt/spark/bin/spark-submit \
> --class org.apache.spark.sql.connect.service.SparkConnectServer \
> --conf spark.driver.host=0.0.0.0 \
> --conf spark.driver.bindAddress=0.0.0.0 \
> --conf spark.connect.grpc.binding.port=15002 \
> --conf spark.connect.grpc.maxInboundMessageSize=104857600 \
> spark-internal {code}
> *3. Run the PySpark Client and Execute Failing Code*
> exec into the running container to start a client session that connects to
> the server via localhost. This replicates the "same-pod" test scenario.
> {code:java}
> docker exec -it spark-connect-server /bin/bash{code}
> Inside the container's shell, start a {{pyspark}} shell, again including the
> client-side {{maxMessageSize}} to show it doesn't help.
> pyspark --remote sc://localhost:15002
> Now, paste and run the following Python code in the {{pyspark}} shell.
> {code:java}
> df =
> spark.read.parquet('s3://prod-data-test/temp/manishjag/income_entropy/synthetic_salary_data.parquet')
> df.show()
> COLUMNS_TO_KEEP = [
> "user_id",
> "device_id",
> "external_id",
> "sender_address",
> "account_number",
> "credit_amount",
> "description",
> "transaction_date",
> "body",
> "sys_partition_date",
> "gt_model",
> "model_version",
> ]
> import pyspark.sql.connect.functions as F
> def transform_schema(df, table_name = None):
> """
> Transform DataFrame schema by moving specified columns into a metadata
> struct column.
> """
> # List of columns to keep as is
> if table_name == 'predicted_transactions':
> main_columns = COLUMNS_TO_KEEP
> elif table_name == 'aggregated':
> main_columns = [col for col in df.columns if col not in
> ["is_odd_found", "month_count", "months_last_12", "months_last_6",
> "months_last_3", "total_sal_old", "total_sal_lower", "total_sal_upper",
> "total_sal_bucket"]]
> # Get all columns that should go into metadata
> metadata_columns = [col for col in df.columns if col not in main_columns]
> # Create a struct with original data types
> metadata_struct = F.struct(*[
> F.col(col_name).alias(col_name)
> for col_name in metadata_columns
> ])
> # Select main columns and add metadata column
> result_df = df.select(
> *main_columns,
> metadata_struct.alias("metadata")
> )
> return result_df
> final_agg = transform_schema(df, table_name='aggregated')
> final_agg.show() {code}
> Observed Behavior
> The {{final_agg.show()}} command fails with the following errors.
> *Client-Side Error (in* {{*pyspark*}} *shell):*
> {code:java}
> pyspark.errors.exceptions.connect.SparkConnectGrpcException:
> <_InactiveRpcError of RPC that terminated with:
> status = StatusCode.INTERNAL
> details = "Encountered end-of-stream mid-frame"
> debug_error_string = "UNKNOWN:Error received from peer
> {grpc_message:"Encountered end-of-stream mid-frame", grpc_status:13}"
> > {code}
> *Server-Side Log (viewable via docker logs spark-connect-server):*
> {code:java}
> WARN NettyServerStream: Exception processing message
> org.sparkproject.connect.grpc.StatusRuntimeException: INTERNAL: Encountered
> end-of-stream mid-frame
> at
> org.sparkproject.connect.grpc.Status.asRuntimeException(Status.java:524)
> at
> org.sparkproject.connect.grpc.internal.AbstractServerStream$TransportState.deframerClosed(AbstractServerStream.java:247)
> at
> org.sparkproject.connect.grpc.netty.NettyServerStream$TransportState.deframerClosed(NettyServerStream.java:139)
> at
> org.sparkproject.connect.grpc.internal.MessageDeframer.close(MessageDeframer.java:234)
> at
> org.sparkproject.connect.grpc.internal.MessageDeframer.deliver(MessageDeframer.java:301)
> at
> org.sparkproject.connect.grpc.internal.MessageDeframer.request(MessageDeframer.java:162)
> {code}
> *Expected Behavior*
> The final_agg.show() command should execute successfully without any gRPC or
> network-related errors, printing the transformed DataFrame to the console.
> *Troubleshooting Performed:*
> The issue persists despite the following troubleshooting steps, which rules
> out common configuration errors: # {*}Isolated Environment{*}: The client
> ({{{}pyspark{}}} shell) was run inside the same container as the Spark
> Connect server, connecting via {{{}localhost{}}}. This confirms the issue is
> not caused by external network components (Load Balancers, Ingress, etc.).
> # {*}Increased Message Size Limits{*}: Both client
> ({{{}spark.connect.grpc.maxMessageSize{}}}) and server
> ({{{}spark.connect.grpc.maxInboundMessageSize{}}}) gRPC message size limits
> were increased to 100MB, with no change in behavior.
> # {*}Varied Java Versions{*}: The server was tested with both Java 17 and
> Java 21.
> # {*}Varied gRPCio Versions{*}: Multiple versions of the {{grpcio}} library
> were tested on the client side.
> # {*}Adjusted Timeout Configurations{*}: Several Spark Connect timeout
> settings were increased with no effect, including
> {{spark.connect.session.manager.defaultSessionTimeout}} and
> {{{}spark.connect.execute.manager.detachedTimeout{}}}.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]