Manish Kumar Jagnani created SPARK-52965:
--------------------------------------------

             Summary: [SPARK-CONNECT] [SPARK-4.0] Encountered end-of-stream 
mid-frame on Complex Schema Transformation
                 Key: SPARK-52965
                 URL: https://issues.apache.org/jira/browse/SPARK-52965
             Project: Spark
          Issue Type: Bug
          Components: Connect, Spark Core
    Affects Versions: 4.0.0
         Environment: Environment * {*}Spark version{*}: 4.0.0 (from 
{{apache/spark:4.0.0-scala2.13-java17-python3-r-ubuntu}} image)
 * {*}Java version{*}: 17
 * {*}Scala version{*}: 2.13.x
 * {*}Platform{*}: Originally observed on AWS EKS 1.30, but fully reproducible 
locally with Docker.
 * {*}Node Architecture{*}: {{amd64}} / {{x86_64}}
 * {*}Node OS/Kernel (original){*}: {{Linux 5.10.225-213.878.amzn2.x86_64}}
 * {*}Client{*}: PySpark 4.0.0
            Reporter: Manish Kumar Jagnani


A gRPC connection failure occurs when executing a PySpark DataFrame action that 
involves a complex, dynamically generated schema transformation. The Spark 
Connect server terminates the connection, leading to an {{Encountered 
end-of-stream mid-frame}} error.
The issue is reproducible in a minimal Docker environment and persists even 
after eliminating network factors and increasing gRPC message size limits, 
indicating a potential core issue rather than a configuration problem.

Steps to Reproduce
This issue can be fully reproduced locally using the following Docker setup.
*1. Build the Docker Image*
Use the following {{Dockerfile}} to create the test environment.
{code:java}
# Dockerfile
FROM apache/spark:4.0.0-scala2.13-java17-python3-r-ubuntu

USER root

# Ensure Python and pip are installed
RUN apt-get update && \
    apt-get install -y --no-install-recommends python3-pip && \
    rm -rf /var/lib/apt/lists/*

USER ${spark_uid} {code}
Build the image:

 
{code:java}
docker build -t spark-connect-repro . {code}
*2. Run the Spark Connect Server*
Start the Spark Connect server in a container. Note that we are including the 
{{maxInboundMessageSize}} configuration to demonstrate that it does not solve 
the problem.
{code:java}
docker run -d --name spark-connect-server -p 15002:15002 spark-connect-repro \
/opt/spark/bin/spark-submit \
  --class org.apache.spark.sql.connect.service.SparkConnectServer \
  --conf spark.driver.host=0.0.0.0 \
  --conf spark.driver.bindAddress=0.0.0.0 \
  --conf spark.connect.grpc.binding.port=15002 \
  --conf spark.connect.grpc.maxInboundMessageSize=104857600 \
  spark-internal {code}
*3. Run the PySpark Client and Execute Failing Code* 
exec into the running container to start a client session that connects to the 
server via localhost. This replicates the "same-pod" test scenario.
{code:java}
docker exec -it spark-connect-server /bin/bash{code}
Inside the container's shell, start a {{pyspark}} shell, again including the 
client-side {{maxMessageSize}} to show it doesn't help.
pyspark --remote sc://localhost:15002
Now, paste and run the following Python code in the {{pyspark}} shell.
{code:java}
df = 
spark.read.parquet('s3://prod-data-test/temp/manishjag/income_entropy/synthetic_salary_data.parquet')
df.show()
COLUMNS_TO_KEEP = [
    "user_id",
    "device_id",
    "external_id",
    "sender_address",
    "account_number",
    "credit_amount",
    "description",
    "transaction_date",
    "body",
    "sys_partition_date",
    "gt_model",
    "model_version",
]
import pyspark.sql.connect.functions as F
def transform_schema(df, table_name = None):
    """
    Transform DataFrame schema by moving specified columns into a metadata 
struct column.
    """
    # List of columns to keep as is
    if table_name == 'predicted_transactions':
        main_columns = COLUMNS_TO_KEEP
    elif table_name == 'aggregated':
        main_columns = [col for col in df.columns if col not in 
["is_odd_found", "month_count", "months_last_12", "months_last_6", 
"months_last_3", "total_sal_old", "total_sal_lower", "total_sal_upper", 
"total_sal_bucket"]]
    # Get all columns that should go into metadata
    metadata_columns = [col for col in df.columns if col not in main_columns]
    # Create a struct with original data types
    metadata_struct = F.struct(*[
        F.col(col_name).alias(col_name) 
        for col_name in metadata_columns
    ])
    # Select main columns and add metadata column
    result_df = df.select(
        *main_columns,
        metadata_struct.alias("metadata")
    )
    return result_df
final_agg = transform_schema(df, table_name='aggregated')
final_agg.show() {code}
Observed Behavior
The {{final_agg.show()}} command fails with the following errors.
*Client-Side Error (in* {{*pyspark*}} *shell):*

{code:java}
pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_InactiveRpcError 
of RPC that terminated with:
    status = StatusCode.INTERNAL
    details = "Encountered end-of-stream mid-frame"
    debug_error_string = "UNKNOWN:Error received from peer  
{grpc_message:"Encountered end-of-stream mid-frame", grpc_status:13}"
> {code}
*Server-Side Log (viewable via docker logs spark-connect-server):*
{code:java}
WARN NettyServerStream: Exception processing message
org.sparkproject.connect.grpc.StatusRuntimeException: INTERNAL: Encountered 
end-of-stream mid-frame
    at org.sparkproject.connect.grpc.Status.asRuntimeException(Status.java:524)
    at 
org.sparkproject.connect.grpc.internal.AbstractServerStream$TransportState.deframerClosed(AbstractServerStream.java:247)
    at 
org.sparkproject.connect.grpc.netty.NettyServerStream$TransportState.deframerClosed(NettyServerStream.java:139)
    at 
org.sparkproject.connect.grpc.internal.MessageDeframer.close(MessageDeframer.java:234)
    at 
org.sparkproject.connect.grpc.internal.MessageDeframer.deliver(MessageDeframer.java:301)
    at 
org.sparkproject.connect.grpc.internal.MessageDeframer.request(MessageDeframer.java:162)
 {code}
*Expected Behavior* 
The final_agg.show() command should execute successfully without any gRPC or 
network-related errors, printing the transformed DataFrame to the console.
*Troubleshooting Performed:*
The issue persists despite the following troubleshooting steps, which rules out 
common configuration errors: # {*}Isolated Environment{*}: The client 
({{{}pyspark{}}} shell) was run inside the same container as the Spark Connect 
server, connecting via {{{}localhost{}}}. This confirms the issue is not caused 
by external network components (Load Balancers, Ingress, etc.).
 # {*}Increased Message Size Limits{*}: Both client 
({{{}spark.connect.grpc.maxMessageSize{}}}) and server 
({{{}spark.connect.grpc.maxInboundMessageSize{}}}) gRPC message size limits 
were increased to 100MB, with no change in behavior.
 # {*}Varied Java Versions{*}: The server was tested with both Java 17 and Java 
21.
 # {*}Varied gRPCio Versions{*}: Multiple versions of the {{grpcio}} library 
were tested on the client side.
 # {*}Adjusted Timeout Configurations{*}: Several Spark Connect timeout 
settings were increased with no effect, including 
{{spark.connect.session.manager.defaultSessionTimeout}} and 
{{{}spark.connect.execute.manager.detachedTimeout{}}}.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to