[ https://issues.apache.org/jira/browse/SPARK-34985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17318639#comment-17318639 ]
Hyukjin Kwon commented on SPARK-34985: -------------------------------------- [~ljzh], it sounds more like a question then an issue. I would encourage to ask it in the mailing list for better visiability before filing it as an issue. > Different execution plans under jdbc and hdfs > --------------------------------------------- > > Key: SPARK-34985 > URL: https://issues.apache.org/jira/browse/SPARK-34985 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.0.1 > Environment: spark 3.0.1 > hive 2.1.1-cdh6.2.0 > hadoop 3.0.0-cdh6.2.0 > > Reporter: lianjunzhi > Priority: Major > > Hive has two non-partitioned tables, trade_order and trade_order_goods. These > two tables are created by hive. The trade_order contains four fields: > trade_id, company_id, is_delete, and trade_status and the trade_order_goods > contains four fields: trade_id, cost, is_delete, and sell_total. Run the > following code snippets: > {quote}create table oms.trade_order > ( > trade_id bigint, > company_id bigint comment '公司id', > is_delete int comment '删除标记', > trade_status int > ) > comment 'trade_order' stored as parquet; > create table oms.trade_order_goods > ( > trade_id bigint, > cost decimal(18, 2) comment '货品成本', > is_delete int comment '删除标记', > sell_total decimal(16, 2) > ) > comment 'trade_order_goods' stored as parquet;{quote} > |val df = spark.sql(| > |"""| > |select| > |b.company_id,| > |sum(a.cost) cost| > |FROM oms.trade_order_goods a| > |JOIN oms.trade_order b| > |ON a.trade_id = b.trade_id| > |WHERE a.is_delete = 0 AND b.is_delete = 0| > |GROUP BY| > |b.company_id| > |""".stripMargin)| > > {quote}df.explain() //Physical Plan 1 > {quote} > {quote}df.write.insertInto("oms.test") //Physical Plan 2 > {quote} > {quote}df.write > .format("jdbc") > .option("url", "") > .option("dbtable", "test") > .option("user", "") > .option("password", "") > .option("driver", "com.mysql.jdbc.Driver") > .option("truncate", value = true) > .option("batchsize", 15000) > .mode(SaveMode.Append) > .save() //Physical Plan 3 > {quote} > Physical Plan 1: > {quote}AdaptiveSparkPlan isFinalPlan=false > +- HashAggregate(keys=[company_id#6L|#6L], functions=[sum(cost#2)|#2)]) > +- Exchange hashpartitioning(company_id#6L, 6), true, [id=#40|#40] > +- HashAggregate(keys=[company_id#6L|#6L], > functions=[partial_sum(cost#2)|#2)]) > +- Project [cost#2, company_id#6L|#2, company_id#6L] > +- SortMergeJoin [trade_id#1L|#1L], [trade_id#5L|#5L], Inner > :- Sort [trade_id#1L ASC NULLS FIRST|#1L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#32|#32] > : +- Project [trade_id#1L, cost#2|#1L, cost#2] > : +- Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND > isnotnull(trade_id#1L)) > : +- FileScan parquet > oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3|#1L,cost#2,is_delete#3] > Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0), > isnotnull(trade_id#1L)|#3), (is_delete#3 = 0), isnotnull(trade_id#1L)], > Format: Parquet, Location: > InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods], > PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), > EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: > struct<trade_id:bigint,cost:decimal(18,2),is_delete:int> > +- Sort [trade_id#5L ASC NULLS FIRST|#5L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#33|#33] > +- Project [trade_id#5L, company_id#6L|#5L, company_id#6L] > +- Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND > isnotnull(trade_id#5L)) > +- FileScan parquet > oms.trade_order[trade_id#5L,company_id#6L,is_delete#7|#5L,company_id#6L,is_delete#7] > Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0), > isnotnull(trade_id#5L)|#7), (is_delete#7 = 0), isnotnull(trade_id#5L)], > Format: Parquet, Location: > InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order], > PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), > EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: > struct<trade_id:bigint,company_id:bigint,is_delete:int> > {quote} > Physical Plan 2: > {quote}+- AdaptiveSparkPlan isFinalPlan=true > +- *(6) HashAggregate(keys=[company_id#6L|#6L], functions=[sum(cost#2)|#2)], > output=[company_id#6L, cost#28|#6L, cost#28]) > +- CustomShuffleReader coalesced > +- ShuffleQueryStage 2 > +- Exchange hashpartitioning(company_id#6L, 6), true, [id=#244|#244] > +- *(5) HashAggregate(keys=[company_id#6L|#6L], > functions=[partial_sum(cost#2)|#2)], output=[company_id#6L, sum#21|#6L, > sum#21]) > +- *(5) Project [cost#2, company_id#6L|#2, company_id#6L] > +- *(5) SortMergeJoin [trade_id#1L|#1L], [trade_id#5L|#5L], Inner > :- *(3) Sort [trade_id#1L ASC NULLS FIRST|#1L ASC NULLS FIRST], false, 0 > : +- CustomShuffleReader coalesced > : +- ShuffleQueryStage 0 > : +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#119|#119] > : +- *(1) Project [trade_id#1L, cost#2|#1L, cost#2] > : +- *(1) Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND > isnotnull(trade_id#1L)) > : +- FileScan parquet > oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3|#1L,cost#2,is_delete#3] > Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0), > isnotnull(trade_id#1L)|#3), (is_delete#3 = 0), isnotnull(trade_id#1L)], > Format: Parquet, Location: > InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods], > PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), > EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: > struct<trade_id:bigint,cost:decimal(18,2),is_delete:int> > +- *(4) Sort [trade_id#5L ASC NULLS FIRST|#5L ASC NULLS FIRST], false, 0 > +- CustomShuffleReader coalesced > +- ShuffleQueryStage 1 > +- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#126|#126] > +- *(2) Project [trade_id#5L, company_id#6L|#5L, company_id#6L] > +- *(2) Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND > isnotnull(trade_id#5L)) > +- FileScan parquet > oms.trade_order[trade_id#5L,company_id#6L,is_delete#7|#5L,company_id#6L,is_delete#7] > Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0), > isnotnull(trade_id#5L)|#7), (is_delete#7 = 0), isnotnull(trade_id#5L)], > Format: Parquet, Location: > InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order], > PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), > EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: > struct<trade_id:bigint,company_id:bigint,is_delete:int> > {quote} > Physical Plan 3: > {quote}Execute SaveIntoDataSourceCommand > +- SaveIntoDataSourceCommand > org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider@64ee110b, > Map(url -> *********(redacted), truncate -> true, batchsize -> 15000, driver > -> com.mysql.jdbc.Driver, dbtable -> test, user -> jkyun, password -> > *********(redacted)), Append > +- Aggregate [company_id#6L|#6L], [company_id#6L, sum(cost#2) AS cost#0|#6L, > sum(cost#2) AS cost#0] > +- Filter ((is_delete#3 = 0) AND (is_delete#7 = 0)) > +- Join Inner, (trade_id#1L = trade_id#5L) > :- SubqueryAlias a > : +- SubqueryAlias spark_catalog.oms.trade_order_goods > : +- > Relation[trade_id#1L,cost#2,is_delete#3,sell_total#4|#1L,cost#2,is_delete#3,sell_total#4] > parquet > +- SubqueryAlias b > +- SubqueryAlias spark_catalog.oms.trade_order > +- > Relation[trade_id#5L,company_id#6L,is_delete#7,trade_status#8|#5L,company_id#6L,is_delete#7,trade_status#8] > parquet > {quote} > As you can see, Physical Plan 3 does not have column pruning and predicate > pushdown. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org