[ 
https://issues.apache.org/jira/browse/SPARK-53917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Khakhlyuk updated SPARK-53917:
-----------------------------------
    Description: 
h1. Problem description

LocalRelation is a Catalyst logical operator used to represent a dataset of 
rows inline as part of the LogicalPlan. LocalRelations represent dataframes 
created directly from Python and Scala objects, e.g., Python and Scala lists, 
pandas dataframes, csv files loaded in memory, etc.

In Spark Connect, local relations are transferred over gRPC using LocalRelation 
(for relations under 64MB) and CachedLocalRelation (larger relations over 64MB) 
messages.

CachedLocalRelations currently have a hard size limit of 2GB, which means that 
spark users can’t execute queries with local client data, pandas dataframes, 
csv files of over 2GB.
h1. Design

In Spark Connect, the client needs to serialize the local relation before 
transferring it to the server. It serializes data via an Arrow IPC stream as a 
single record batch and schema as a json string. It then embeds data and schema 
as LocalRelation\{schema,data} proto message.
Small local relations (under 64MB) are sent directly as part of the 
ExecutePlanRequest.

!image-2025-10-15-13-50-04-179.png!

Larger local relations are first sent to the server via addArtifact and stored 
in memory or on disk via BlockManager. Then an ExecutePlanRequest is sent 
containing CachedLocalRelation\{hash}, where hash is the artifact hash. The 
server retrieves the cached LocalRelation from the BlockManager via the hash, 
deserializes it, adds it to the LogicalPlan and then executes it.

!image-2025-10-15-13-50-44-333.png!

 

The server reads the data from the BlockManager as a stream and tries to create 
proto.LocalRelation via
{quote}proto.Relation
.newBuilder()
.getLocalRelation
.getParserForType
.parseFrom(blockData.toInputStream())
{quote}
This fails, because java protobuf library has a 2GB limit on deserializing 
protobuf messages from a string.
{quote}org.sparkproject.connect.com.google.protobuf.InvalidProtocolBufferException)
 CodedInputStream encountered an embedded string or message which claimed to 
have negative size.
{quote}
!image-2025-10-15-13-53-40-306.png!

To fix this, I propose avoiding the protobuf layer during the serialization on 
the client and deserialization on the server. Instead of caching the full 
protobuf LocalRelation message, we cache the data and schema as separate 
artifacts, send two hashes \{data_hash, schema_hash} to the server, load them 
both from BlockManager directly and create a LocalRelation on the server based 
on the unpacked data and schema.

!image-2025-10-15-13-56-46-840.png!

After creating a prototype with the new proto message, I discovered that there 
are additional limits for CachedLocalRelations. Both the Scala Client and the 
Server store the data in a single Java {{{}Array[Byte]{}}}, which has a 2GB 
size limit in Java. To avoid this limit, I propose transferring data in chunks. 
The Python and Scala clients will split data into multiple Arrow batches and 
upload them separately to the server. Each batch will be uploaded and stored a 
separate artifact. The Server will then load and process each batch separately. 
We will keep batch sizes around 16MB (TBD), well below the 2GB limit. This way 
we will avoid 2GB limits on both clients and on the server.

!image-2025-10-15-13-59-08-081.png!

The final proto message looks like this:
{quote}// A local relation that has been cached already.
message ChunkedCachedLocalRelation
Unknown macro: \{   // (Required) A list of sha-256 hashes for representing 
LocalRelation.data.   // Data is serialized in Arrow IPC streaming format, each 
batch is cached on the server as   // a separate artifact. Each hash represents 
one batch stored on the server.   // Hashes are hex-encoded strings (e.g., 
"a3b2c1d4...").   repeated string dataHashes = 1;  // (Optional) A sha-256 hash 
of the serialized LocalRelation.schema.   // Scala clients always provide the 
schema, Python clients can omit it.   // Hash is a hex-encoded string (e.g., 
"a3b2c1d4...").   optional string schemaHash = 2; }{quote}
Implementation details are discussed in the PR 
[https://github.com/apache/spark/pull/52613].

  was:
h1. Problem description

LocalRelation is a Catalyst logical operator used to represent a dataset of 
rows inline as part of the LogicalPlan. LocalRelations represent dataframes 
created directly from Python and Scala objects, e.g., Python and Scala lists, 
pandas dataframes, csv files loaded in memory, etc.

In Spark Connect, local relations are transferred over gRPC using LocalRelation 
(for relations under 64MB) and CachedLocalRelation (larger relations over 64MB) 
messages.

CachedLocalRelations currently have a hard size limit of 2GB, which means that 
spark users can’t execute queries with local client data, pandas dataframes, 
csv files of over 2GB.
h1. Design

In Spark Connect, the client needs to serialize the local relation before 
transferring it to the server. It serializes data via an Arrow IPC stream as a 
single record batch and schema as a json string. It then embeds data and schema 
as LocalRelation\{schema,data} proto message.
Small local relations (under 64MB) are sent directly as part of the 
ExecutePlanRequest.

!image-2025-10-15-13-50-04-179.png!

Larger local relations are first sent to the server via addArtifact and stored 
in memory or on disk via BlockManager. Then an ExecutePlanRequest is sent 
containing CachedLocalRelation\{hash}, where hash is the artifact hash. The 
server retrieves the cached LocalRelation from the BlockManager via the hash, 
deserializes it, adds it to the LogicalPlan and then executes it.

!image-2025-10-15-13-50-44-333.png!

 

The server reads the data from the BlockManager as a stream and tries to create 
proto.LocalRelation via
{quote}proto.Relation
.newBuilder()
.getLocalRelation
.getParserForType
.parseFrom(blockData.toInputStream())
{quote}
This fails, because java protobuf library has a 2GB limit on deserializing 
protobuf messages from a string.
{quote}org.sparkproject.connect.com.google.protobuf.InvalidProtocolBufferException)
 CodedInputStream encountered an embedded string or message which claimed to 
have negative size.
{quote}
!image-2025-10-15-13-53-40-306.png!

To fix this, I propose avoiding the protobuf layer during the serialization on 
the client and deserialization on the server. Instead of caching the full 
protobuf LocalRelation message, we cache the data and schema as separate 
artifacts, send two hashes \{data_hash, schema_hash} to the server, load them 
both from BlockManager directly and create a LocalRelation on the server based 
on the unpacked data and schema.

!image-2025-10-15-13-56-46-840.png!

After creating a prototype with the new proto message, I discovered that there 
are additional limits for CachedLocalRelations. Both the Scala Client and the 
Server store the data in a single Java {{{}Array[Byte]{}}}, which has a 2GB 
size limit in Java. To avoid this limit, I propose transferring data in chunks. 
The Python and Scala clients will split data into multiple Arrow batches and 
upload them separately to the server. Each batch will be uploaded and stored a 
separate artifact. The Server will then load and process each batch separately. 
We will keep batch sizes around 16MB (TBD), well below the 2GB limit. This way 
we will avoid 2GB limits on both clients and on the server.

!image-2025-10-15-13-59-08-081.png!

The final proto message looks like this:
{quote}
// A local relation that has been cached already.
message ChunkedCachedLocalRelation {
  // (Required) A list of sha-256 hashes for representing LocalRelation.data.
  // Data is serialized in Arrow IPC streaming format, each batch is cached on 
the server as
  // a separate artifact. Each hash represents one batch stored on the server.
  // Hashes are hex-encoded strings (e.g., "a3b2c1d4...").
  repeated string dataHashes = 1;

 // (Optional) A sha-256 hash of the serialized LocalRelation.schema.
  // Scala clients always provide the schema, Python clients can omit it.
  // Hash is a hex-encoded string (e.g., "a3b2c1d4...").
  optional string schemaHash = 2;
}
{quote}
 


> [CONNECT] Supporting large LocalRelations
> -----------------------------------------
>
>                 Key: SPARK-53917
>                 URL: https://issues.apache.org/jira/browse/SPARK-53917
>             Project: Spark
>          Issue Type: Improvement
>          Components: Connect, PySpark
>    Affects Versions: 4.1.0
>            Reporter: Alex Khakhlyuk
>            Priority: Major
>             Fix For: 4.1.0
>
>         Attachments: image-2025-10-15-13-50-04-179.png, 
> image-2025-10-15-13-50-44-333.png, image-2025-10-15-13-53-40-306.png, 
> image-2025-10-15-13-56-46-840.png, image-2025-10-15-13-59-08-081.png
>
>
> h1. Problem description
> LocalRelation is a Catalyst logical operator used to represent a dataset of 
> rows inline as part of the LogicalPlan. LocalRelations represent dataframes 
> created directly from Python and Scala objects, e.g., Python and Scala lists, 
> pandas dataframes, csv files loaded in memory, etc.
> In Spark Connect, local relations are transferred over gRPC using 
> LocalRelation (for relations under 64MB) and CachedLocalRelation (larger 
> relations over 64MB) messages.
> CachedLocalRelations currently have a hard size limit of 2GB, which means 
> that spark users can’t execute queries with local client data, pandas 
> dataframes, csv files of over 2GB.
> h1. Design
> In Spark Connect, the client needs to serialize the local relation before 
> transferring it to the server. It serializes data via an Arrow IPC stream as 
> a single record batch and schema as a json string. It then embeds data and 
> schema as LocalRelation\{schema,data} proto message.
> Small local relations (under 64MB) are sent directly as part of the 
> ExecutePlanRequest.
> !image-2025-10-15-13-50-04-179.png!
> Larger local relations are first sent to the server via addArtifact and 
> stored in memory or on disk via BlockManager. Then an ExecutePlanRequest is 
> sent containing CachedLocalRelation\{hash}, where hash is the artifact hash. 
> The server retrieves the cached LocalRelation from the BlockManager via the 
> hash, deserializes it, adds it to the LogicalPlan and then executes it.
> !image-2025-10-15-13-50-44-333.png!
>  
> The server reads the data from the BlockManager as a stream and tries to 
> create proto.LocalRelation via
> {quote}proto.Relation
> .newBuilder()
> .getLocalRelation
> .getParserForType
> .parseFrom(blockData.toInputStream())
> {quote}
> This fails, because java protobuf library has a 2GB limit on deserializing 
> protobuf messages from a string.
> {quote}org.sparkproject.connect.com.google.protobuf.InvalidProtocolBufferException)
>  CodedInputStream encountered an embedded string or message which claimed to 
> have negative size.
> {quote}
> !image-2025-10-15-13-53-40-306.png!
> To fix this, I propose avoiding the protobuf layer during the serialization 
> on the client and deserialization on the server. Instead of caching the full 
> protobuf LocalRelation message, we cache the data and schema as separate 
> artifacts, send two hashes \{data_hash, schema_hash} to the server, load them 
> both from BlockManager directly and create a LocalRelation on the server 
> based on the unpacked data and schema.
> !image-2025-10-15-13-56-46-840.png!
> After creating a prototype with the new proto message, I discovered that 
> there are additional limits for CachedLocalRelations. Both the Scala Client 
> and the Server store the data in a single Java {{{}Array[Byte]{}}}, which has 
> a 2GB size limit in Java. To avoid this limit, I propose transferring data in 
> chunks. The Python and Scala clients will split data into multiple Arrow 
> batches and upload them separately to the server. Each batch will be uploaded 
> and stored a separate artifact. The Server will then load and process each 
> batch separately. We will keep batch sizes around 16MB (TBD), well below the 
> 2GB limit. This way we will avoid 2GB limits on both clients and on the 
> server.
> !image-2025-10-15-13-59-08-081.png!
> The final proto message looks like this:
> {quote}// A local relation that has been cached already.
> message ChunkedCachedLocalRelation
> Unknown macro: \{   // (Required) A list of sha-256 hashes for representing 
> LocalRelation.data.   // Data is serialized in Arrow IPC streaming format, 
> each batch is cached on the server as   // a separate artifact. Each hash 
> represents one batch stored on the server.   // Hashes are hex-encoded 
> strings (e.g., "a3b2c1d4...").   repeated string dataHashes = 1;  // 
> (Optional) A sha-256 hash of the serialized LocalRelation.schema.   // Scala 
> clients always provide the schema, Python clients can omit it.   // Hash is a 
> hex-encoded string (e.g., "a3b2c1d4...").   optional string schemaHash = 2; 
> }{quote}
> Implementation details are discussed in the PR 
> [https://github.com/apache/spark/pull/52613].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to