[ https://issues.apache.org/jira/browse/SPARK-24859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Johannes Mayer updated SPARK-24859: ----------------------------------- Description: I have two AVRO tables in Hive called FAct and DIm. Both are partitioned by a common column called part_col. Now I want to join both tables on their id but only for some of partitions. If I use an inner join, everything works well: {code:java} select * from FA f join DI d on(f.id = d.id and f.part_col = d.part_col) where f.part_col = 'xyz' {code} In the sql explain plan I can see, that the predicate part_col = 'xyz' is also used in the DIm HiveTableScan. When I execute the same query using a left join the full dim table is scanned. There are some workarounds for this issue, but I wanted to report this as a bug, since it works on an inner join, and i think the behaviour should be the same for an outer join. Here is a self contained example (created in Zeppelin): {code:java} val fact = Seq((1, 100), (2, 200), (3,100), (4,200)).toDF("id", "part_col") val dim = Seq((1, 100), (2, 200)).toDF("id", "part_col") fact.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/fact") dim.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/dim") spark.sqlContext.sql("create table if not exists fact(id int) partitioned by (part_col int) stored as avro location '/tmp/jira/fact'") spark.sqlContext.sql("msck repair table fact") spark.sqlContext.sql("create table if not exists dim(id int) partitioned by (part_col int) stored as avro location '/tmp/jira/dim'") spark.sqlContext.sql("msck repair table dim"){code} *Inner join example:* {code:java} select * from fact f join dim d on (f.id = d.id and f.part_col = d.part_col) where f.part_col = 100{code} Excerpt from Spark-SQL physical explain plan: {code:java} HiveTableScan [id#411, part_col#412], CatalogRelation `default`.`fact`, org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#411], [part_col#412], [isnotnull(part_col#412), (part_col#412 = 100)] HiveTableScan [id#413, part_col#414], CatalogRelation `default`.`dim`, org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#413], [part_col#414], [isnotnull(part_col#414), (part_col#414 = 100)]{code} *Outer join example:* {code:java} select * from fact f left join dim d on (f.id = d.id and f.part_col = d.part_col) where f.part_col = 100{code} Excerpt from Spark-SQL physical explain plan: {code:java} HiveTableScan [id#426, part_col#427], CatalogRelation `default`.`fact`, org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#426], [part_col#427], [isnotnull(part_col#427), (part_col#427 = 100)] HiveTableScan [id#428, part_col#429], CatalogRelation `default`.`dim`, org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#428], [part_col#429] {code} As you can see the predicate is not pushed down to the HiveTableScan of the dim table on the outer join. was: I have two AVRO tables in Hive called FAct and DIm. Both are partitioned by a common column called part_col. Now I want to join both tables on their id but only for some of partitions. If I use an inner join, everything works well: {code:java} select * from FA f join DI d on(f.id = d.id and f.part_col = d.part_col) where f.part_col = 'xyz' {code} In the sql explain plan I can see, that the predicate part_col = 'xyz' is also used in the DIm HiveTableScan. When I execute the same query using a left join the full dim table is scanned. There are some workarounds for this issue, but I wanted to report this as a bug, since it works on an inner join, and i think the behaviour should be the same for an outer join. Here is a self contained example (created in Zeppelin): {code:java} val fact = Seq((1, 100), (2, 200), (3,100), (4,200)).toDF("id", "part_col") val dim = Seq((1, 100), (2, 200)).toDF("id", "part_col") fact.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/fact") dim.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/dim") spark.sqlContext.sql("create table if not exists fact(id int) partitioned by (part_col int) stored as avro location '/tmp/jira/fact'") spark.sqlContext.sql("msck repair table fact") spark.sqlContext.sql("create table if not exists dim(id int) partitioned by (part_col int) stored as avro location '/tmp/jira/dim'") spark.sqlContext.sql("msck repair table dim"){code} *Inner join example:* {code:java} select * from fact f join dim d on (f.id = d.id and f.part_col = d.part_col) where f.part_col = 100{code} Excerpt from Spark-SQL physical explain plan: {code:java} HiveTableScan [id#411, part_col#412], CatalogRelation `default`.`fact`, org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#411], [part_col#412], [isnotnull(part_col#412), (part_col#412 = 100)] HiveTableScan [id#413, part_col#414], CatalogRelation `default`.`dim`, org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#413], [part_col#414], [isnotnull(part_col#414), (part_col#414 = 100)]{code} *Outer join example:* {code:java} select * from fact f left join dim d on (f.id = d.id and f.part_col = d.part_col) where f.part_col = 100{code} Excerpt from Spark-SQL physical explain plan: {code:java} HiveTableScan [id#426, part_col#427], CatalogRelation `default`.`fact`, org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#426], [part_col#427], [isnotnull(part_col#427), (part_col#427 = 100)] HiveTableScan [id#428, part_col#429], CatalogRelation `default`.`dim`, org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#428], [part_col#429] {code} As you can see the predicate is not pushed down to the HiveTableScan of the dim table on the outer join. > Predicates pushdown on outer joins > ---------------------------------- > > Key: SPARK-24859 > URL: https://issues.apache.org/jira/browse/SPARK-24859 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL > Affects Versions: 2.2.0 > Environment: Cloudera CDH 5.13.1 > Reporter: Johannes Mayer > Priority: Major > > I have two AVRO tables in Hive called FAct and DIm. Both are partitioned by a > common column called part_col. Now I want to join both tables on their id but > only for some of partitions. > If I use an inner join, everything works well: > > {code:java} > select * > from FA f > join DI d > on(f.id = d.id and f.part_col = d.part_col) > where f.part_col = 'xyz' > {code} > > In the sql explain plan I can see, that the predicate part_col = 'xyz' is > also used in the DIm HiveTableScan. > > When I execute the same query using a left join the full dim table is > scanned. There are some workarounds for this issue, but I wanted to report > this as a bug, since it works on an inner join, and i think the behaviour > should be the same for an outer join. > Here is a self contained example (created in Zeppelin): > > {code:java} > val fact = Seq((1, 100), (2, 200), (3,100), (4,200)).toDF("id", "part_col") > val dim = Seq((1, 100), (2, 200)).toDF("id", "part_col") > fact.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/fact") > dim.repartition($"part_col").write.mode("overwrite").partitionBy("part_col").format("com.databricks.spark.avro").save("/tmp/jira/dim") > > spark.sqlContext.sql("create table if not exists fact(id int) partitioned by > (part_col int) stored as avro location '/tmp/jira/fact'") > spark.sqlContext.sql("msck repair table fact") spark.sqlContext.sql("create > table if not exists dim(id int) partitioned by (part_col int) stored as avro > location '/tmp/jira/dim'") > spark.sqlContext.sql("msck repair table dim"){code} > > > > *Inner join example:* > {code:java} > select * from fact f > join dim d > on (f.id = d.id > and f.part_col = d.part_col) > where f.part_col = 100{code} > Excerpt from Spark-SQL physical explain plan: > {code:java} > HiveTableScan [id#411, part_col#412], CatalogRelation `default`.`fact`, > org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#411], [part_col#412], > [isnotnull(part_col#412), (part_col#412 = 100)] > HiveTableScan [id#413, part_col#414], CatalogRelation `default`.`dim`, > org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#413], [part_col#414], > [isnotnull(part_col#414), (part_col#414 = 100)]{code} > > *Outer join example:* > {code:java} > select * from fact f > left join dim d > on (f.id = d.id > and f.part_col = d.part_col) > where f.part_col = 100{code} > > Excerpt from Spark-SQL physical explain plan: > > {code:java} > HiveTableScan [id#426, part_col#427], CatalogRelation `default`.`fact`, > org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#426], [part_col#427], > [isnotnull(part_col#427), (part_col#427 = 100)] > HiveTableScan [id#428, part_col#429], CatalogRelation `default`.`dim`, > org.apache.hadoop.hive.serde2.avro.AvroSerDe, [id#428], [part_col#429] {code} > > > As you can see the predicate is not pushed down to the HiveTableScan of the > dim table on the outer join. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org