Manish Kumar Jagnani created SPARK-52965:
--------------------------------------------
Summary: [SPARK-CONNECT] [SPARK-4.0] Encountered end-of-stream
mid-frame on Complex Schema Transformation
Key: SPARK-52965
URL: https://issues.apache.org/jira/browse/SPARK-52965
Project: Spark
Issue Type: Bug
Components: Connect, Spark Core
Affects Versions: 4.0.0
Environment: Environment * {*}Spark version{*}: 4.0.0 (from
{{apache/spark:4.0.0-scala2.13-java17-python3-r-ubuntu}} image)
* {*}Java version{*}: 17
* {*}Scala version{*}: 2.13.x
* {*}Platform{*}: Originally observed on AWS EKS 1.30, but fully reproducible
locally with Docker.
* {*}Node Architecture{*}: {{amd64}} / {{x86_64}}
* {*}Node OS/Kernel (original){*}: {{Linux 5.10.225-213.878.amzn2.x86_64}}
* {*}Client{*}: PySpark 4.0.0
Reporter: Manish Kumar Jagnani
A gRPC connection failure occurs when executing a PySpark DataFrame action that
involves a complex, dynamically generated schema transformation. The Spark
Connect server terminates the connection, leading to an {{Encountered
end-of-stream mid-frame}} error.
The issue is reproducible in a minimal Docker environment and persists even
after eliminating network factors and increasing gRPC message size limits,
indicating a potential core issue rather than a configuration problem.
Steps to Reproduce
This issue can be fully reproduced locally using the following Docker setup.
*1. Build the Docker Image*
Use the following {{Dockerfile}} to create the test environment.
{code:java}
# Dockerfile
FROM apache/spark:4.0.0-scala2.13-java17-python3-r-ubuntu
USER root
# Ensure Python and pip are installed
RUN apt-get update && \
apt-get install -y --no-install-recommends python3-pip && \
rm -rf /var/lib/apt/lists/*
USER ${spark_uid} {code}
Build the image:
{code:java}
docker build -t spark-connect-repro . {code}
*2. Run the Spark Connect Server*
Start the Spark Connect server in a container. Note that we are including the
{{maxInboundMessageSize}} configuration to demonstrate that it does not solve
the problem.
{code:java}
docker run -d --name spark-connect-server -p 15002:15002 spark-connect-repro \
/opt/spark/bin/spark-submit \
--class org.apache.spark.sql.connect.service.SparkConnectServer \
--conf spark.driver.host=0.0.0.0 \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.connect.grpc.binding.port=15002 \
--conf spark.connect.grpc.maxInboundMessageSize=104857600 \
spark-internal {code}
*3. Run the PySpark Client and Execute Failing Code*
exec into the running container to start a client session that connects to the
server via localhost. This replicates the "same-pod" test scenario.
{code:java}
docker exec -it spark-connect-server /bin/bash{code}
Inside the container's shell, start a {{pyspark}} shell, again including the
client-side {{maxMessageSize}} to show it doesn't help.
pyspark --remote sc://localhost:15002
Now, paste and run the following Python code in the {{pyspark}} shell.
{code:java}
df =
spark.read.parquet('s3://prod-data-test/temp/manishjag/income_entropy/synthetic_salary_data.parquet')
df.show()
COLUMNS_TO_KEEP = [
"user_id",
"device_id",
"external_id",
"sender_address",
"account_number",
"credit_amount",
"description",
"transaction_date",
"body",
"sys_partition_date",
"gt_model",
"model_version",
]
import pyspark.sql.connect.functions as F
def transform_schema(df, table_name = None):
"""
Transform DataFrame schema by moving specified columns into a metadata
struct column.
"""
# List of columns to keep as is
if table_name == 'predicted_transactions':
main_columns = COLUMNS_TO_KEEP
elif table_name == 'aggregated':
main_columns = [col for col in df.columns if col not in
["is_odd_found", "month_count", "months_last_12", "months_last_6",
"months_last_3", "total_sal_old", "total_sal_lower", "total_sal_upper",
"total_sal_bucket"]]
# Get all columns that should go into metadata
metadata_columns = [col for col in df.columns if col not in main_columns]
# Create a struct with original data types
metadata_struct = F.struct(*[
F.col(col_name).alias(col_name)
for col_name in metadata_columns
])
# Select main columns and add metadata column
result_df = df.select(
*main_columns,
metadata_struct.alias("metadata")
)
return result_df
final_agg = transform_schema(df, table_name='aggregated')
final_agg.show() {code}
Observed Behavior
The {{final_agg.show()}} command fails with the following errors.
*Client-Side Error (in* {{*pyspark*}} *shell):*
{code:java}
pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_InactiveRpcError
of RPC that terminated with:
status = StatusCode.INTERNAL
details = "Encountered end-of-stream mid-frame"
debug_error_string = "UNKNOWN:Error received from peer
{grpc_message:"Encountered end-of-stream mid-frame", grpc_status:13}"
> {code}
*Server-Side Log (viewable via docker logs spark-connect-server):*
{code:java}
WARN NettyServerStream: Exception processing message
org.sparkproject.connect.grpc.StatusRuntimeException: INTERNAL: Encountered
end-of-stream mid-frame
at org.sparkproject.connect.grpc.Status.asRuntimeException(Status.java:524)
at
org.sparkproject.connect.grpc.internal.AbstractServerStream$TransportState.deframerClosed(AbstractServerStream.java:247)
at
org.sparkproject.connect.grpc.netty.NettyServerStream$TransportState.deframerClosed(NettyServerStream.java:139)
at
org.sparkproject.connect.grpc.internal.MessageDeframer.close(MessageDeframer.java:234)
at
org.sparkproject.connect.grpc.internal.MessageDeframer.deliver(MessageDeframer.java:301)
at
org.sparkproject.connect.grpc.internal.MessageDeframer.request(MessageDeframer.java:162)
{code}
*Expected Behavior*
The final_agg.show() command should execute successfully without any gRPC or
network-related errors, printing the transformed DataFrame to the console.
*Troubleshooting Performed:*
The issue persists despite the following troubleshooting steps, which rules out
common configuration errors: # {*}Isolated Environment{*}: The client
({{{}pyspark{}}} shell) was run inside the same container as the Spark Connect
server, connecting via {{{}localhost{}}}. This confirms the issue is not caused
by external network components (Load Balancers, Ingress, etc.).
# {*}Increased Message Size Limits{*}: Both client
({{{}spark.connect.grpc.maxMessageSize{}}}) and server
({{{}spark.connect.grpc.maxInboundMessageSize{}}}) gRPC message size limits
were increased to 100MB, with no change in behavior.
# {*}Varied Java Versions{*}: The server was tested with both Java 17 and Java
21.
# {*}Varied gRPCio Versions{*}: Multiple versions of the {{grpcio}} library
were tested on the client side.
# {*}Adjusted Timeout Configurations{*}: Several Spark Connect timeout
settings were increased with no effect, including
{{spark.connect.session.manager.defaultSessionTimeout}} and
{{{}spark.connect.execute.manager.detachedTimeout{}}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]