Onur Satici created SPARK-24850:
-----------------------------------

             Summary: Query plan string representation grows exponentially on 
queries with recursive cached datasets
                 Key: SPARK-24850
                 URL: https://issues.apache.org/jira/browse/SPARK-24850
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.4.0
            Reporter: Onur Satici


As of [https://github.com/apache/spark/pull/21018], InMemoryRelation includes 
its cacheBuilder when logging query plans. This CachedRDDBuilder includes the 
cachedPlan, so calling treeString on InMemoryRelation will log the cachedPlan 
in the cacheBuilder.

Given the sample dataset:
{code:java}
$ cat test.csv
A,B
0,0{code}
If the query plan has multiple cached datasets that depend on each other:
{code:java}
var df_cached = spark.read.format("csv").option("header", 
"true").load("test.csv").cache()
0 to 1 foreach { _ =>
df_cached = df_cached.join(spark.read.format("csv").option("header", 
"true").load("test.csv"), "A").cache()
}
df_cached.explain

{code}
results in:
{code:java}
== Physical Plan ==
InMemoryTableScan [A#10, B#11, B#35, B#87]
+- InMemoryRelation [A#10, B#11, B#35, B#87], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(2) Project [A#10, B#11, B#35, B#87]
+- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11, B#35], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(2) Project [A#10, B#11, B#35]
+- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#34)
+- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
+- InMemoryRelation [A#34, B#35], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
,None)
: +- *(2) Project [A#10, B#11, B#35]
: +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
: :- *(2) Filter isnotnull(A#10)
: : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: : +- InMemoryRelation [A#10, B#11], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
,None)
: : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
false]))
: +- *(1) Filter isnotnull(A#34)
: +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
: +- InMemoryRelation [A#34, B#35], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#86)
+- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
+- InMemoryRelation [A#86, B#87], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
,None)
+- *(2) Project [A#10, B#11, B#35, B#87]
+- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11, B#35], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(2) Project [A#10, B#11, B#35]
+- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#34)
+- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
+- InMemoryRelation [A#34, B#35], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
,None)
: +- *(2) Project [A#10, B#11, B#35]
: +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
: :- *(2) Filter isnotnull(A#10)
: : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: : +- InMemoryRelation [A#10, B#11], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
,None)
: : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
false]))
: +- *(1) Filter isnotnull(A#34)
: +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
: +- InMemoryRelation [A#34, B#35], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#86)
+- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
+- InMemoryRelation [A#86, B#87], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>

{code}
previously this yielded:
{code:java}
== Physical Plan ==
InMemoryTableScan [A#10, B#11, B#37, B#89]
+- InMemoryRelation [A#10, B#11, B#37, B#89], true, 10000, StorageLevel(disk, 
memory, deserialized, 1 replicas)
+- *(2) Project [A#10, B#11, B#37, B#89]
+- *(2) BroadcastHashJoin [A#10], [A#88], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11, B#37], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11, B#37], true, 10000, StorageLevel(disk, 
memory, deserialized, 1 replicas)
: +- *(2) Project [A#10, B#11, B#37]
: +- *(2) BroadcastHashJoin [A#10], [A#36], Inner, BuildRight
: :- *(2) Filter isnotnull(A#10)
: : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: : +- InMemoryRelation [A#10, B#11], true, 10000, StorageLevel(disk, memory, 
deserialized, 1 replicas)
: : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
false]))
: +- *(1) Filter isnotnull(A#36)
: +- InMemoryTableScan [A#36, B#37], [isnotnull(A#36)]
: +- InMemoryRelation [A#36, B#37], true, 10000, StorageLevel(disk, memory, 
deserialized, 1 replicas)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#88)
+- InMemoryTableScan [A#88, B#89], [isnotnull(A#88)]
+- InMemoryRelation [A#88, B#89], true, 10000, StorageLevel(disk, memory, 
deserialized, 1 replicas)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<A:string,B:string>

{code}
This exponential growth can OOM the driver on large query plans with cached 
datasets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to