[ https://issues.apache.org/jira/browse/SPARK-34985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lianjunzhi updated SPARK-34985: ------------------------------- Description: Hive has two non-partitioned tables, trade_order and trade_order_goods. Trade_order contains four fields: trade_id, company_id, is_delete, and trade_status. trade_order_goods contains four fields: trade_id, cost, is_delete, and sell_total. Run the following code snippets: {quote} |val df = spark.sql(| |"""| |select| |b.company_id,| |sum(a.cost) cost| |FROM oms.trade_order_goods a| |JOIN oms.trade_order b| |ON a.trade_id = b.trade_id| |WHERE a.is_delete = 0 AND b.is_delete = 0| |GROUP BY| |b.company_id| |""".stripMargin)| {quote} {quote}df.explain() //Physical Plan 1 {quote} {quote}df.write.insertInto("oms.test") //Physical Plan 2 {quote} {quote}df.write .format("jdbc") .option("url", "") .option("dbtable", "test") .option("user", "") .option("password", "") .option("driver", "com.mysql.jdbc.Driver") .option("truncate", value = true) .option("batchsize", 15000) .mode(SaveMode.Append) .save() //Physical Plan 3 {quote} Physical Plan 1: {quote}AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[company_id#6L|#6L], functions=[sum(cost#2)|#2)]) +- Exchange hashpartitioning(company_id#6L, 6), true, [id=#40|#40] +- HashAggregate(keys=[company_id#6L|#6L], functions=[partial_sum(cost#2)|#2)]) +- Project [cost#2, company_id#6L|#2, company_id#6L] +- SortMergeJoin [trade_id#1L|#1L], [trade_id#5L|#5L], Inner :- Sort [trade_id#1L ASC NULLS FIRST|#1L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#32|#32] : +- Project [trade_id#1L, cost#2|#1L, cost#2] : +- Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND isnotnull(trade_id#1L)) : +- FileScan parquet oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3|#1L,cost#2,is_delete#3] Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0), isnotnull(trade_id#1L)|#3), (is_delete#3 = 0), isnotnull(trade_id#1L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,cost:decimal(18,2),is_delete:int> +- Sort [trade_id#5L ASC NULLS FIRST|#5L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#33|#33] +- Project [trade_id#5L, company_id#6L|#5L, company_id#6L] +- Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND isnotnull(trade_id#5L)) +- FileScan parquet oms.trade_order[trade_id#5L,company_id#6L,is_delete#7|#5L,company_id#6L,is_delete#7] Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0), isnotnull(trade_id#5L)|#7), (is_delete#7 = 0), isnotnull(trade_id#5L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,company_id:bigint,is_delete:int> {quote} Physical Plan 2: {quote}+- AdaptiveSparkPlan isFinalPlan=true +- *(6) HashAggregate(keys=[company_id#6L|#6L], functions=[sum(cost#2)|#2)], output=[company_id#6L, cost#28|#6L, cost#28]) +- CustomShuffleReader coalesced +- ShuffleQueryStage 2 +- Exchange hashpartitioning(company_id#6L, 6), true, [id=#244|#244] +- *(5) HashAggregate(keys=[company_id#6L|#6L], functions=[partial_sum(cost#2)|#2)], output=[company_id#6L, sum#21|#6L, sum#21]) +- *(5) Project [cost#2, company_id#6L|#2, company_id#6L] +- *(5) SortMergeJoin [trade_id#1L|#1L], [trade_id#5L|#5L], Inner :- *(3) Sort [trade_id#1L ASC NULLS FIRST|#1L ASC NULLS FIRST], false, 0 : +- CustomShuffleReader coalesced : +- ShuffleQueryStage 0 : +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#119|#119] : +- *(1) Project [trade_id#1L, cost#2|#1L, cost#2] : +- *(1) Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND isnotnull(trade_id#1L)) : +- FileScan parquet oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3|#1L,cost#2,is_delete#3] Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0), isnotnull(trade_id#1L)|#3), (is_delete#3 = 0), isnotnull(trade_id#1L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,cost:decimal(18,2),is_delete:int> +- *(4) Sort [trade_id#5L ASC NULLS FIRST|#5L ASC NULLS FIRST], false, 0 +- CustomShuffleReader coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#126|#126] +- *(2) Project [trade_id#5L, company_id#6L|#5L, company_id#6L] +- *(2) Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND isnotnull(trade_id#5L)) +- FileScan parquet oms.trade_order[trade_id#5L,company_id#6L,is_delete#7|#5L,company_id#6L,is_delete#7] Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0), isnotnull(trade_id#5L)|#7), (is_delete#7 = 0), isnotnull(trade_id#5L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,company_id:bigint,is_delete:int> {quote} Physical Plan 3: {quote}Execute SaveIntoDataSourceCommand +- SaveIntoDataSourceCommand org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider@64ee110b, Map(url -> *********(redacted), truncate -> true, batchsize -> 15000, driver -> com.mysql.jdbc.Driver, dbtable -> test, user -> jkyun, password -> *********(redacted)), Append +- Aggregate [company_id#6L|#6L], [company_id#6L, sum(cost#2) AS cost#0|#6L, sum(cost#2) AS cost#0] +- Filter ((is_delete#3 = 0) AND (is_delete#7 = 0)) +- Join Inner, (trade_id#1L = trade_id#5L) :- SubqueryAlias a : +- SubqueryAlias spark_catalog.oms.trade_order_goods : +- Relation[trade_id#1L,cost#2,is_delete#3,sell_total#4|#1L,cost#2,is_delete#3,sell_total#4] parquet +- SubqueryAlias b +- SubqueryAlias spark_catalog.oms.trade_order +- Relation[trade_id#5L,company_id#6L,is_delete#7,trade_status#8|#5L,company_id#6L,is_delete#7,trade_status#8] parquet {quote} As you can see, Physical Plan 3 does not have column pruning and predicate pushdown. was: Hive has two non-partitioned tables, trade_order and trade_order_goods. Trade_order contains four fields: trade_id, company_id, is_delete, and trade_status. trade_order_goods contains four fields: trade_id, cost, is_delete, and sell_total. Run the following code snippets: {quote}val df = spark.sql( """ |select |b.company_id, |sum(a.cost) cost |FROM oms.trade_order_goods a | JOIN oms.trade_order b |ON a.trade_id = b.trade_id |WHERE a.is_delete = 0 AND b.is_delete = 0 |GROUP BY |b.company_id |""".stripMargin){quote} {quote}df.explain() //Physical Plan 1{quote} {quote}df.write.insertInto("oms.test") //Physical Plan 2{quote} {quote}df.write .format("jdbc") .option("url", "") .option("dbtable", "test") .option("user", "") .option("password", "") .option("driver", "com.mysql.jdbc.Driver") .option("truncate", value = true) .option("batchsize", 15000) .mode(SaveMode.Append) .save() //Physical Plan 3{quote} Physical Plan 1: {quote}AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[company_id#6L], functions=[sum(cost#2)]) +- Exchange hashpartitioning(company_id#6L, 6), true, [id=#40] +- HashAggregate(keys=[company_id#6L], functions=[partial_sum(cost#2)]) +- Project [cost#2, company_id#6L] +- SortMergeJoin [trade_id#1L], [trade_id#5L], Inner :- Sort [trade_id#1L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#32] : +- Project [trade_id#1L, cost#2] : +- Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND isnotnull(trade_id#1L)) : +- FileScan parquet oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3] Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0), isnotnull(trade_id#1L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,cost:decimal(18,2),is_delete:int> +- Sort [trade_id#5L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#33] +- Project [trade_id#5L, company_id#6L] +- Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND isnotnull(trade_id#5L)) +- FileScan parquet oms.trade_order[trade_id#5L,company_id#6L,is_delete#7] Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0), isnotnull(trade_id#5L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,company_id:bigint,is_delete:int>{quote} Physical Plan 2: {quote}+- AdaptiveSparkPlan isFinalPlan=true +- *(6) HashAggregate(keys=[company_id#6L], functions=[sum(cost#2)], output=[company_id#6L, cost#28]) +- CustomShuffleReader coalesced +- ShuffleQueryStage 2 +- Exchange hashpartitioning(company_id#6L, 6), true, [id=#244] +- *(5) HashAggregate(keys=[company_id#6L], functions=[partial_sum(cost#2)], output=[company_id#6L, sum#21]) +- *(5) Project [cost#2, company_id#6L] +- *(5) SortMergeJoin [trade_id#1L], [trade_id#5L], Inner :- *(3) Sort [trade_id#1L ASC NULLS FIRST], false, 0 : +- CustomShuffleReader coalesced : +- ShuffleQueryStage 0 : +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#119] : +- *(1) Project [trade_id#1L, cost#2] : +- *(1) Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND isnotnull(trade_id#1L)) : +- FileScan parquet oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3] Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0), isnotnull(trade_id#1L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,cost:decimal(18,2),is_delete:int> +- *(4) Sort [trade_id#5L ASC NULLS FIRST], false, 0 +- CustomShuffleReader coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#126] +- *(2) Project [trade_id#5L, company_id#6L] +- *(2) Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND isnotnull(trade_id#5L)) +- FileScan parquet oms.trade_order[trade_id#5L,company_id#6L,is_delete#7] Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0), isnotnull(trade_id#5L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,company_id:bigint,is_delete:int> {quote} Physical Plan 3: {quote}Execute SaveIntoDataSourceCommand +- SaveIntoDataSourceCommand org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider@64ee110b, Map(url -> *********(redacted), truncate -> true, batchsize -> 15000, driver -> com.mysql.jdbc.Driver, dbtable -> test, user -> jkyun, password -> *********(redacted)), Append +- Aggregate [company_id#6L], [company_id#6L, sum(cost#2) AS cost#0] +- Filter ((is_delete#3 = 0) AND (is_delete#7 = 0)) +- Join Inner, (trade_id#1L = trade_id#5L) :- SubqueryAlias a : +- SubqueryAlias spark_catalog.oms.trade_order_goods : +- Relation[trade_id#1L,cost#2,is_delete#3,sell_total#4] parquet +- SubqueryAlias b +- SubqueryAlias spark_catalog.oms.trade_order +- Relation[trade_id#5L,company_id#6L,is_delete#7,trade_status#8] parquet {quote} As you can see, Physical Plan 3 does not have column pruning and predicate pushdown. > Different execution plans under jdbc and hdfs > --------------------------------------------- > > Key: SPARK-34985 > URL: https://issues.apache.org/jira/browse/SPARK-34985 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.0.1 > Environment: spark 3.0.1 > hive 2.1.1-cdh6.2.0 > hadoop 3.0.0-cdh6.2.0 > > Reporter: lianjunzhi > Priority: Major > > Hive has two non-partitioned tables, trade_order and trade_order_goods. > Trade_order contains four fields: trade_id, company_id, is_delete, and > trade_status. trade_order_goods contains four fields: trade_id, cost, > is_delete, and sell_total. Run the following code snippets: > {quote} > |val df = spark.sql(| > |"""| > |select| > |b.company_id,| > |sum(a.cost) cost| > |FROM oms.trade_order_goods a| > |JOIN oms.trade_order b| > |ON a.trade_id = b.trade_id| > |WHERE a.is_delete = 0 AND b.is_delete = 0| > |GROUP BY| > |b.company_id| > |""".stripMargin)| > {quote} > {quote}df.explain() //Physical Plan 1 > {quote} > {quote}df.write.insertInto("oms.test") //Physical Plan 2 > {quote} > {quote}df.write > .format("jdbc") > .option("url", "") > .option("dbtable", "test") > .option("user", "") > .option("password", "") > .option("driver", "com.mysql.jdbc.Driver") > .option("truncate", value = true) > .option("batchsize", 15000) > .mode(SaveMode.Append) > .save() //Physical Plan 3 > {quote} > Physical Plan 1: > {quote}AdaptiveSparkPlan isFinalPlan=false > +- HashAggregate(keys=[company_id#6L|#6L], functions=[sum(cost#2)|#2)]) > +- Exchange hashpartitioning(company_id#6L, 6), true, [id=#40|#40] > +- HashAggregate(keys=[company_id#6L|#6L], > functions=[partial_sum(cost#2)|#2)]) > +- Project [cost#2, company_id#6L|#2, company_id#6L] > +- SortMergeJoin [trade_id#1L|#1L], [trade_id#5L|#5L], Inner > :- Sort [trade_id#1L ASC NULLS FIRST|#1L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#32|#32] > : +- Project [trade_id#1L, cost#2|#1L, cost#2] > : +- Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND > isnotnull(trade_id#1L)) > : +- FileScan parquet > oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3|#1L,cost#2,is_delete#3] > Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0), > isnotnull(trade_id#1L)|#3), (is_delete#3 = 0), isnotnull(trade_id#1L)], > Format: Parquet, Location: > InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods], > PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), > EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: > struct<trade_id:bigint,cost:decimal(18,2),is_delete:int> > +- Sort [trade_id#5L ASC NULLS FIRST|#5L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#33|#33] > +- Project [trade_id#5L, company_id#6L|#5L, company_id#6L] > +- Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND > isnotnull(trade_id#5L)) > +- FileScan parquet > oms.trade_order[trade_id#5L,company_id#6L,is_delete#7|#5L,company_id#6L,is_delete#7] > Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0), > isnotnull(trade_id#5L)|#7), (is_delete#7 = 0), isnotnull(trade_id#5L)], > Format: Parquet, Location: > InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order], > PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), > EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: > struct<trade_id:bigint,company_id:bigint,is_delete:int> > {quote} > Physical Plan 2: > {quote}+- AdaptiveSparkPlan isFinalPlan=true > +- *(6) HashAggregate(keys=[company_id#6L|#6L], functions=[sum(cost#2)|#2)], > output=[company_id#6L, cost#28|#6L, cost#28]) > +- CustomShuffleReader coalesced > +- ShuffleQueryStage 2 > +- Exchange hashpartitioning(company_id#6L, 6), true, [id=#244|#244] > +- *(5) HashAggregate(keys=[company_id#6L|#6L], > functions=[partial_sum(cost#2)|#2)], output=[company_id#6L, sum#21|#6L, > sum#21]) > +- *(5) Project [cost#2, company_id#6L|#2, company_id#6L] > +- *(5) SortMergeJoin [trade_id#1L|#1L], [trade_id#5L|#5L], Inner > :- *(3) Sort [trade_id#1L ASC NULLS FIRST|#1L ASC NULLS FIRST], false, 0 > : +- CustomShuffleReader coalesced > : +- ShuffleQueryStage 0 > : +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#119|#119] > : +- *(1) Project [trade_id#1L, cost#2|#1L, cost#2] > : +- *(1) Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND > isnotnull(trade_id#1L)) > : +- FileScan parquet > oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3|#1L,cost#2,is_delete#3] > Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0), > isnotnull(trade_id#1L)|#3), (is_delete#3 = 0), isnotnull(trade_id#1L)], > Format: Parquet, Location: > InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods], > PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), > EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: > struct<trade_id:bigint,cost:decimal(18,2),is_delete:int> > +- *(4) Sort [trade_id#5L ASC NULLS FIRST|#5L ASC NULLS FIRST], false, 0 > +- CustomShuffleReader coalesced > +- ShuffleQueryStage 1 > +- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#126|#126] > +- *(2) Project [trade_id#5L, company_id#6L|#5L, company_id#6L] > +- *(2) Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND > isnotnull(trade_id#5L)) > +- FileScan parquet > oms.trade_order[trade_id#5L,company_id#6L,is_delete#7|#5L,company_id#6L,is_delete#7] > Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0), > isnotnull(trade_id#5L)|#7), (is_delete#7 = 0), isnotnull(trade_id#5L)], > Format: Parquet, Location: > InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order], > PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), > EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: > struct<trade_id:bigint,company_id:bigint,is_delete:int> > {quote} > Physical Plan 3: > {quote}Execute SaveIntoDataSourceCommand > +- SaveIntoDataSourceCommand > org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider@64ee110b, > Map(url -> *********(redacted), truncate -> true, batchsize -> 15000, driver > -> com.mysql.jdbc.Driver, dbtable -> test, user -> jkyun, password -> > *********(redacted)), Append > +- Aggregate [company_id#6L|#6L], [company_id#6L, sum(cost#2) AS cost#0|#6L, > sum(cost#2) AS cost#0] > +- Filter ((is_delete#3 = 0) AND (is_delete#7 = 0)) > +- Join Inner, (trade_id#1L = trade_id#5L) > :- SubqueryAlias a > : +- SubqueryAlias spark_catalog.oms.trade_order_goods > : +- > Relation[trade_id#1L,cost#2,is_delete#3,sell_total#4|#1L,cost#2,is_delete#3,sell_total#4] > parquet > +- SubqueryAlias b > +- SubqueryAlias spark_catalog.oms.trade_order > +- > Relation[trade_id#5L,company_id#6L,is_delete#7,trade_status#8|#5L,company_id#6L,is_delete#7,trade_status#8] > parquet > {quote} > As you can see, Physical Plan 3 does not have column pruning and predicate > pushdown. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org