I have a question about Spark and how it delegates filters to a Parquet-based 
table. I have two tables in Hive in Parquet format. Table1 has with four 
columns of type double and table2 has two columns of type double. I am doing an 
INNER JOIN of the following:

SELECT table1.name FROM table1 INNER JOIN table2 ON table2.x BETWEEN 
table1.xmin AND table1.xmax AND table2.y BETWEEN table1.ymin AND table1.ymax

I noticed that the execution plan as reported by Spark is only delegating the 
IsNull filter to the tables and not any other filters:

== Physical Plan ==
*Project [name#0]
+- BroadcastNestedLoopJoin BuildRight, Inner, ((((x#36 >= xmin#13) && (x#36 <= 
xmax#15)) && (y#37 >= ymin#14)) && (y#37 <= ymax#16))
   :- *Project [name#0, xmin#13, ymin#14, xmax#15, ymax#16]
   :  +- *Filter (((isnotnull(xmin#13) && isnotnull(ymin#14)) && 
isnotnull(ymax#16)) && isnotnull(xmax#15))
   :     +- *FileScan parquet [name#0,xmin#13,ymin#14,xmax#15,ymax#16] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[hdfs://xxxx.xxxx.xxxx.xxxx:8020/apps/hive/warehouse/table1, 
PartitionFilters: [], PushedFilters: [IsNotNull(xmin), IsNotNull(ymin), 
IsNotNull(ymax), IsNotNull(xmax)], ReadSchema: 
struct<name:string,xmin:double,ymin:double,xmax:double,ymax:double>
   +- BroadcastExchange IdentityBroadcastMode
      +- *Project [x#36, y#37]
         +- *Filter (isnotnull(y#37) && isnotnull(x#36))
            +- *FileScan parquet [x#36,y#37] Batched: true, Format: Parquet, 
Location: 
InMemoryFileIndex[hdfs://xxxx.xxxx.xxxx.xxxx:8020/apps/hive/warehouse/table2], 
PartitionFilters: [], PushedFilters: [IsNotNull(y), IsNotNull(x)], ReadSchema: 
struct<x:double,y:double>

However, when doing a filter against a single table the filter is delegated to 
the table:

SELECT name FROM table1 where table1.xmin > -73.4454183678

== Physical Plan ==
CollectLimit 21
+- *Project [pbkey#150]
   +- *Filter (isnotnull(xmin#163) && (xmin#163 > -73.4454183678))
      +- *FileScan parquet [pbkey#150,xmin#163] Batched: true, Format: Parquet, 
Location: 
InMemoryFileIndex[hdfs://xxxx.xxxx.xxxx.xxxx:8020/apps/hive/warehouse/table1, 
PartitionFilters: [], PushedFilters: [IsNotNull(xmin), 
GreaterThan(xmin,-73.4454183678)], ReadSchema: struct<pbkey:string,xmin:double>

So the question is: does Spark delegate filters in a join condition to a 
Parquet table or is this an error in the "explain plan" output?

Thanks.

Reply via email to