[ https://issues.apache.org/jira/browse/SPARK-33482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan reassigned SPARK-33482: ----------------------------------- Assignee: Peter Toth > V2 Datasources that extend FileScan preclude exchange reuse > ----------------------------------------------------------- > > Key: SPARK-33482 > URL: https://issues.apache.org/jira/browse/SPARK-33482 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1 > Reporter: Bruce Robbins > Assignee: Peter Toth > Priority: Major > Fix For: 3.2.0, 3.1.2, 3.0.3 > > > Sample query: > {noformat} > spark.read.parquet("tbl").createOrReplaceTempView("tbl") > spark.read.parquet("lookup").createOrReplaceTempView("lookup") > sql(""" > select tbl.col1, fk1, fk2 > from tbl, lookup l1, lookup l2 > where fk1 = l1.key > and fk2 = l2.key > """).explain > {noformat} > Test files can be created as so: > {noformat} > import scala.util.Random > val rand = Random > val tbl = spark.range(1, 10000).map { x => > (rand.nextLong.abs % 20, > rand.nextLong.abs % 20, > x) > }.toDF("fk1", "fk2", "col1") > tbl.write.mode("overwrite").parquet("tbl") > val lookup = spark.range(0, 20).map { x => > (x + 1, x * 10000, (x + 1) * 10000) > }.toDF("key", "col1", "col2") > lookup.write.mode("overwrite").parquet("lookup") > {noformat} > Output with V1 Parquet reader: > {noformat} > == Physical Plan == > *(3) Project [col1#2L, fk1#0L, fk2#1L] > +- *(3) BroadcastHashJoin [fk2#1L], [key#12L], Inner, BuildRight, false > :- *(3) Project [fk1#0L, fk2#1L, col1#2L] > : +- *(3) BroadcastHashJoin [fk1#0L], [key#6L], Inner, BuildRight, false > : :- *(3) Filter (isnotnull(fk1#0L) AND isnotnull(fk2#1L)) > : : +- *(3) ColumnarToRow > : : +- FileScan parquet [fk1#0L,fk2#1L,col1#2L] Batched: true, > DataFilters: [isnotnull(fk1#0L), isnotnull(fk2#1L)], Format: Parquet, > Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/tbl], > PartitionFilters: [], PushedFilters: [IsNotNull(fk1), IsNotNull(fk2)], > ReadSchema: struct<fk1:bigint,fk2:bigint,col1:bigint> > : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, > bigint, false]),false), [id=#75] > : +- *(1) Filter isnotnull(key#6L) > : +- *(1) ColumnarToRow > : +- FileScan parquet [key#6L] Batched: true, DataFilters: > [isnotnull(key#6L)], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], > PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: > struct<key:bigint> > +- ReusedExchange [key#12L], BroadcastExchange > HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75] > {noformat} > With V1 Parquet reader, the exchange for lookup is reused (see last line). > Output with V2 Parquet reader (spark.sql.sources.useV1SourceList=""): > {noformat} > == Physical Plan == > *(3) Project [col1#2L, fk1#0L, fk2#1L] > +- *(3) BroadcastHashJoin [fk2#1L], [key#12L], Inner, BuildRight, false > :- *(3) Project [fk1#0L, fk2#1L, col1#2L] > : +- *(3) BroadcastHashJoin [fk1#0L], [key#6L], Inner, BuildRight, false > : :- *(3) Filter (isnotnull(fk1#0L) AND isnotnull(fk2#1L)) > : : +- *(3) ColumnarToRow > : : +- BatchScan[fk1#0L, fk2#1L, col1#2L] ParquetScan DataFilters: > [isnotnull(fk1#0L), isnotnull(fk2#1L)], Format: parquet, Location: > InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/tbl], > PartitionFilters: [], PushedFilers: [IsNotNull(fk1), IsNotNull(fk2)], > ReadSchema: struct<fk1:bigint,fk2:bigint,col1:bigint>, PushedFilters: > [IsNotNull(fk1), IsNotNull(fk2)] > : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, > bigint, false]),false), [id=#75] > : +- *(1) Filter isnotnull(key#6L) > : +- *(1) ColumnarToRow > : +- BatchScan[key#6L] ParquetScan DataFilters: > [isnotnull(key#6L)], Format: parquet, Location: > InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], > PartitionFilters: [], PushedFilers: [IsNotNull(key)], ReadSchema: > struct<key:bigint>, PushedFilters: [IsNotNull(key)] > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > false]),false), [id=#83] > +- *(2) Filter isnotnull(key#12L) > +- *(2) ColumnarToRow > +- BatchScan[key#12L] ParquetScan DataFilters: > [isnotnull(key#12L)], Format: parquet, Location: > InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], > PartitionFilters: [], PushedFilers: [IsNotNull(key)], ReadSchema: > struct<key:bigint>, PushedFilters: [IsNotNull(key)] > {noformat} > With the V2 Parquet reader, the exchange for lookup is not reused (see last 4 > lines). > You can see the same issue with the Orc reader (and I assume any other > datasource that extends Filescan). > The issue appears to be this check in FileScan#equals: > {code:java} > ExpressionSet(partitionFilters) == ExpressionSet(f.partitionFilters) && > ExpressionSet(dataFilters) == ExpressionSet(f.dataFilters) > {code} > partitionFilters and dataFilters are not normalized, so their exprIds don't > match. Thus FileScan objects don't match, even if they are the same. > As a side note, FileScan#equals has a dangling boolean expression: > {code:java} > fileIndex == f.fileIndex && readSchema == f.readSchema > {code} > The result of that expression is not actually used anywhere. We might want to > include it in the final decision, even though that's not the issue here. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org