[jira] [Updated] (SPARK-39710) Support push topK through outer join
[ https://issues.apache.org/jira/browse/SPARK-39710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39710: -- Description: Supports push down local limit and local sort from TopK through other join: - for a left outer join, the references of ordering of TopK come from the left side and the limits of TopK is smaller than left side - for a right outer join, the references of ordering of TopK come from the right side and the limits of TopK is smaller than right side was: Supports push down local limit and local sort from TopK through other join: - for a left outer join, the references of ordering of TopK come from the left side and the limits of TopK is smaller than left side max rows - for a right outer join, the references of ordering of TopK come from the right side and the limits of TopK is smaller than right side max rows > Support push topK through outer join > > > Key: SPARK-39710 > URL: https://issues.apache.org/jira/browse/SPARK-39710 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > Supports push down local limit and local sort from TopK through other join: > - for a left outer join, the references of ordering of TopK come from the > left side and > the limits of TopK is smaller than left side > - for a right outer join, the references of ordering of TopK come from the > right side and > the limits of TopK is smaller than right side -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39710) Support push topK through outer join
[ https://issues.apache.org/jira/browse/SPARK-39710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39710: -- Description: Supports push down local limit and local sort from TopK through other join: - for a left outer join, the references of ordering of TopK come from the left side and the limits of TopK is smaller than left side max rows - for a right outer join, the references of ordering of TopK come from the right side and the limits of TopK is smaller than right side max rows > Support push topK through outer join > > > Key: SPARK-39710 > URL: https://issues.apache.org/jira/browse/SPARK-39710 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > Supports push down local limit and local sort from TopK through other join: > - for a left outer join, the references of ordering of TopK come from the > left side and > the limits of TopK is smaller than left side max rows > - for a right outer join, the references of ordering of TopK come from the > right side and > the limits of TopK is smaller than right side max rows -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39710) Support push topK through outer join
[ https://issues.apache.org/jira/browse/SPARK-39710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39710: -- Environment: (was: Supports push down local limit and local sort from TopK through other join: - for a left outer join, the references of ordering of TopK come from the left side and the limits of TopK is smaller than left side max rows - for a right outer join, the references of ordering of TopK come from the right side and the limits of TopK is smaller than right side max rows) > Support push topK through outer join > > > Key: SPARK-39710 > URL: https://issues.apache.org/jira/browse/SPARK-39710 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39710) Support push topK through outer join
XiDuo You created SPARK-39710: - Summary: Support push topK through outer join Key: SPARK-39710 URL: https://issues.apache.org/jira/browse/SPARK-39710 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Environment: Supports push down local limit and local sort from TopK through other join: - for a left outer join, the references of ordering of TopK come from the left side and the limits of TopK is smaller than left side max rows - for a right outer join, the references of ordering of TopK come from the right side and the limits of TopK is smaller than right side max rows Reporter: XiDuo You -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39679) TakeOrderedAndProjectExec should respect child output ordering
XiDuo You created SPARK-39679: - Summary: TakeOrderedAndProjectExec should respect child output ordering Key: SPARK-39679 URL: https://issues.apache.org/jira/browse/SPARK-39679 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You TakeOrderedAndProjectExec should respect child output ordering to avoid unnecessary sort. For example: TakeOrderedAndProjectExec on the top of SortMergeJoin. {code:java} SELECT * FROM t1 JOIN t2 ON t1.c1 = t2.c2 ORDER BY t1.c1 LIMIT 100; {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39656) Fix wrong namespace in DescribeNamespaceExec
[ https://issues.apache.org/jira/browse/SPARK-39656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39656: -- Description: DescribeNamespaceExec should show whole namespace rather than last (was: DescribeNamespaceExec should show all namespace rather than last) > Fix wrong namespace in DescribeNamespaceExec > > > Key: SPARK-39656 > URL: https://issues.apache.org/jira/browse/SPARK-39656 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Minor > > DescribeNamespaceExec should show whole namespace rather than last -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39656) Fix wrong namespace in DescribeNamespaceExec
[ https://issues.apache.org/jira/browse/SPARK-39656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39656: -- Priority: Minor (was: Major) > Fix wrong namespace in DescribeNamespaceExec > > > Key: SPARK-39656 > URL: https://issues.apache.org/jira/browse/SPARK-39656 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Minor > > DescribeNamespaceExec should show all namespace rather than last -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39656) Fix wrong namespace in DescribeNamespaceExec
XiDuo You created SPARK-39656: - Summary: Fix wrong namespace in DescribeNamespaceExec Key: SPARK-39656 URL: https://issues.apache.org/jira/browse/SPARK-39656 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You DescribeNamespaceExec should show all namespace rather than last -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39503) Add session catalog name for v1 database table and function
[ https://issues.apache.org/jira/browse/SPARK-39503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39503: -- Parent: SPARK-39235 Issue Type: Sub-task (was: Improvement) > Add session catalog name for v1 database table and function > --- > > Key: SPARK-39503 > URL: https://issues.apache.org/jira/browse/SPARK-39503 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > To make it more clearer that this table or function comes from which catalog. > It affects: > * the scan table(view) of the query explain > * the target table(view) of the data writing > * desc database > * desc formatted table(view) > * show create table(view) > * desc function -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39503) Add session catalog name for v1 database table and function
[ https://issues.apache.org/jira/browse/SPARK-39503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39503: -- Description: To make it more clearer that this table or function comes from which catalog. It affects: * the scan table(view) of the query explain * the target table(view) of the data writing * desc database * desc formatted table(view) * show create table(view) * desc function was: To make it more clearer that this table or function comes from which catalog. It affects: * the scan table/view of the query explain * the target table/view of the data writing * desc database * desc formatted table * show create table * desc function > Add session catalog name for v1 database table and function > --- > > Key: SPARK-39503 > URL: https://issues.apache.org/jira/browse/SPARK-39503 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > To make it more clearer that this table or function comes from which catalog. > It affects: > * the scan table(view) of the query explain > * the target table(view) of the data writing > * desc database > * desc formatted table(view) > * show create table(view) > * desc function -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39503) Add session catalog name for v1 database table and function
[ https://issues.apache.org/jira/browse/SPARK-39503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39503: -- Summary: Add session catalog name for v1 database table and function (was: Add session catalog name for v1 table and function) > Add session catalog name for v1 database table and function > --- > > Key: SPARK-39503 > URL: https://issues.apache.org/jira/browse/SPARK-39503 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > To make it more clearer that this table or function comes from which catalog. > It affects: > * the scan table/view of the query explain > * the target table/view of the data writing > * desc database > * desc formatted table > * show create table > * desc function -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39503) Add session catalog name for v1 table and function
XiDuo You created SPARK-39503: - Summary: Add session catalog name for v1 table and function Key: SPARK-39503 URL: https://issues.apache.org/jira/browse/SPARK-39503 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You To make it more clearer that this table or function comes from which catalog. It affects: * the scan table/view of the query explain * the target table/view of the data writing * desc database * desc formatted table * show create table * desc function -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39475) Pull out complex join keys for shuffled join
[ https://issues.apache.org/jira/browse/SPARK-39475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39475: -- Summary: Pull out complex join keys for shuffled join (was: Pull out complex join keys) > Pull out complex join keys for shuffled join > > > Key: SPARK-39475 > URL: https://issues.apache.org/jira/browse/SPARK-39475 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > For a sort merge join, a complex join key may run three times at most: > # exchange > # sort > # join > We can pull out it to project so we will execute it only once. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39475) Pull out complex join keys
XiDuo You created SPARK-39475: - Summary: Pull out complex join keys Key: SPARK-39475 URL: https://issues.apache.org/jira/browse/SPARK-39475 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You For a sort merge join, a complex join key may run three times at most: # exchange # sort # join We can pull out it to project so we will execute it only once. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-39454) failed to convert LogicalPlan to SparkPlan when subquery exists after "IN" predicate
[ https://issues.apache.org/jira/browse/SPARK-39454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553874#comment-17553874 ] XiDuo You commented on SPARK-39454: --- [~allxu] this issue should be fixed by SPARK-37995 > failed to convert LogicalPlan to SparkPlan when subquery exists after "IN" > predicate > > > Key: SPARK-39454 > URL: https://issues.apache.org/jira/browse/SPARK-39454 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.1 > Environment: Spark 3.2.1, Standalone mode. > > Spark shell start: > {code:java} > SPARK_HOME=/spark-3.2.1-bin-hadoop3.2 > > $SPARK_HOME/bin/pyspark --master local[*] \ > --conf spark.executor.cores=12 \ > --driver-memory 40G \ > --executor-memory 10G \ > --conf spark.driver.maxResultSize=8G \ > --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.1 \ > --conf > spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions > \ > --conf > spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ > --conf spark.sql.catalog.spark_catalog.type=hadoop \ > --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ > --conf spark.sql.catalog.local.type=hadoop \ > --conf spark.sql.catalog.local.warehouse=$PWD/local-warehouse \ > --conf spark.sql.catalog.spark_catalog.warehouse=$PWD/spark-warehouse > {code} >Reporter: Yanzhe Xu >Priority: Major > Attachments: catalog_returns_repro.tar.gz, > catalog_sales_repro.tar.gz, date_dim_repro.tar.gz > > > When running a query with Iceberg: > {code:java} > spark.sql("drop table if exists catalog_returns") > spark.sql("drop table if exists catalog_sales") > spark.sql("drop table if exists date_dim") > > spark.read.parquet("catalog_returns_repro").createOrReplaceTempView("temp_catalog_returns") > spark.read.parquet("catalog_sales_repro").createOrReplaceTempView("temp_catalog_sales") > spark.read.parquet("date_dim_repro").createOrReplaceTempView("temp_date_dim") > > spark.sql("create table if not exists catalog_returns using iceberg > partitioned by (cr_returned_date_sk) > tblproperties('write.parquet.compression-codec' = 'snappy') as select * from > temp_catalog_returns") > spark.sql("create table if not exists catalog_sales using iceberg partitioned > by (cs_sold_date_sk) tblproperties('write.parquet.compression-codec' = > 'snappy') as select * from temp_catalog_sales") > spark.sql("create table if not exists date_dim using iceberg > tblproperties('write.parquet.compression-codec' = 'snappy') as select * from > temp_date_dim") > spark.sql("delete from catalog_returns where cr_order_number in (select > cs_order_number from catalog_sales, date_dim where cs_sold_date_sk=d_date_sk > and d_date between '2000-05-20' and '2000-05-21');").explain(True) {code} > Spark gives the following error: > {code:java} > : java.lang.ClassCastException: > org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to > org.apache.spark.sql.execution.SparkPlan > at scala.collection.immutable.List.map(List.scala:293) > at > org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:75) > at > org.apache.spark.sql.execution.SparkPlanInfo$.$anonfun$fromSparkPlan$3(SparkPlanInfo.scala:75) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableLike.map(TraversableLike.scala:286) > at scala.collection.TraversableLike.map$(TraversableLike.scala:279) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:75) > at > org.apache.spark.sql.execution.SparkPlanInfo$.$anonfun$fromSparkPlan$3(SparkPlanInfo.scala:75) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.sca
[jira] [Created] (SPARK-39455) Improve expression non-codegen code path performance by cache data type matching
XiDuo You created SPARK-39455: - Summary: Improve expression non-codegen code path performance by cache data type matching Key: SPARK-39455 URL: https://issues.apache.org/jira/browse/SPARK-39455 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You Some expressions do data type matching inside `eval` but it is not friendly for performance. Which is a overhead for every execution per row. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39397) Relax AliasAwareOutputExpression to support alias with expression
[ https://issues.apache.org/jira/browse/SPARK-39397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39397: -- Description: We will pull out complex join keys from grouping expressions, so the project can hold a alias with expression. Unfortunately we may lose the output partitioning since the current AliasAwareOutputExpression only support preserve the alias with attribute. For example, the follow query will introduce three exchanges instead of two. {code:java} SELECT c1 + 1, count(*) FROM t1 JOIN t2 ON c1 + 1 = c2 GROUP BY c1 + 1{code} was: We will pull out complex join keys from grouping expressions, so the project can hold a alias with expression. Unfortunately we may lose the output partitioning since the current AliasAwareOutputExpression only support preserve the alias with attribute. As the result, the follow query will introduce three exchanges instead of two. {code:java} SELECT c1 + 1, count(*) FROM t1 JOIN t2 ON c1 + 1 = c2 GROUP BY c1 + 1{code} > Relax AliasAwareOutputExpression to support alias with expression > - > > Key: SPARK-39397 > URL: https://issues.apache.org/jira/browse/SPARK-39397 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > We will pull out complex join keys from grouping expressions, so the project > can hold a alias with expression. Unfortunately we may lose the output > partitioning since the current AliasAwareOutputExpression only support > preserve the alias with attribute. > For example, the follow query will introduce three exchanges instead of two. > {code:java} > SELECT c1 + 1, count(*) > FROM t1 > JOIN t2 ON c1 + 1 = c2 > GROUP BY c1 + 1{code} -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39397) Relax AliasAwareOutputExpression to support alias with expression
[ https://issues.apache.org/jira/browse/SPARK-39397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39397: -- Description: We will pull out complex join keys from grouping expressions, so the project can hold a alias with expression. Unfortunately we may lose the output partitioning since the current AliasAwareOutputExpression only support preserve the alias with attribute. As the result, the follow query will introduce three exchanges instead of two. {code:java} SELECT c1 + 1, count(*) FROM t1 JOIN t2 ON c1 + 1 = c2 GROUP BY c1 + 1{code} was: We will pull out complex join keys from grouping expressions, so the project can hold a alias with expression. Unfortunately we may lose the output partitioning since the current AliasAwareOutputExpression only support preserve the alias with attribute. As the result, the follow query will introduce three exchanges. {code:java} SELECT c1 + 1, count(*) FROM t1 JOIN t2 ON c1 + 1 = c2 GROUP BY c1 + 1{code} > Relax AliasAwareOutputExpression to support alias with expression > - > > Key: SPARK-39397 > URL: https://issues.apache.org/jira/browse/SPARK-39397 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > We will pull out complex join keys from grouping expressions, so the project > can hold a alias with expression. Unfortunately we may lose the output > partitioning since the current AliasAwareOutputExpression only support > preserve the alias with attribute. > As the result, the follow query will introduce three exchanges instead of two. > {code:java} > SELECT c1 + 1, count(*) > FROM t1 > JOIN t2 ON c1 + 1 = c2 > GROUP BY c1 + 1{code} -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39397) Relax AliasAwareOutputExpression to support alias with expression
XiDuo You created SPARK-39397: - Summary: Relax AliasAwareOutputExpression to support alias with expression Key: SPARK-39397 URL: https://issues.apache.org/jira/browse/SPARK-39397 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You We will pull out complex join keys from grouping expressions, so the project can hold a alias with expression. Unfortunately we may lose the output partitioning since the current AliasAwareOutputExpression only support preserve the alias with attribute. As the result, the follow query will introduce three exchanges. {code:java} SELECT c1 + 1, count(*) FROM t1 JOIN t2 ON c1 + 1 = c2 GROUP BY c1 + 1{code} -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39318) Remove tpch-plan-stability WithStats golden files
[ https://issues.apache.org/jira/browse/SPARK-39318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39318: -- Description: It's a dead golden files since we have no stats with TPCH and no check for that. (was: It's a dead golden files since we have no stats and no stats check with TPCH.) > Remove tpch-plan-stability WithStats golden files > - > > Key: SPARK-39318 > URL: https://issues.apache.org/jira/browse/SPARK-39318 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > It's a dead golden files since we have no stats with TPCH and no check for > that. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39318) Rmove tpch-plan-stability WithStats golden files
XiDuo You created SPARK-39318: - Summary: Rmove tpch-plan-stability WithStats golden files Key: SPARK-39318 URL: https://issues.apache.org/jira/browse/SPARK-39318 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You It's a dead golden files since we have no stats and no stats check with TPCH. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39318) Remove tpch-plan-stability WithStats golden files
[ https://issues.apache.org/jira/browse/SPARK-39318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39318: -- Summary: Remove tpch-plan-stability WithStats golden files (was: Rmove tpch-plan-stability WithStats golden files) > Remove tpch-plan-stability WithStats golden files > - > > Key: SPARK-39318 > URL: https://issues.apache.org/jira/browse/SPARK-39318 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > It's a dead golden files since we have no stats and no stats check with TPCH. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39316) Merge PromotePrecision and CheckOverflow into decimal binary arithmetic
[ https://issues.apache.org/jira/browse/SPARK-39316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39316: -- Description: Merge {{PromotePrecision}} into {{{}dataType{}}}, for example, {{{}Add{}}}: {code:java} override def dataType: DataType = (left, right) match { case (DecimalType.Expression(p1, s1), DecimalType.Expression(p2, s2)) => val resultScale = max(s1, s2) if (allowPrecisionLoss) { DecimalType.adjustPrecisionScale(max(p1 - s1, p2 - s2) + resultScale + 1, resultScale) } else { DecimalType.bounded(max(p1 - s1, p2 - s2) + resultScale + 1, resultScale) } case _ => super.dataType } {code} Merge {{{}CheckOverflow{}}}, for example, {{Add}} eval: {code:java} dataType match { case decimalType: DecimalType => val value = numeric.plus(input1, input2) checkOverflow(value.asInstanceOf[Decimal], decimalType) ... } {code} was: Merge `PromotePrecision` into `dataType`, so every arithmetic should report the accurate decimal type. For example, `Add`: {code:java} override def dataType: DataType = (left, right) match { case (DecimalType.Expression(p1, s1), DecimalType.Expression(p2, s2)) => val resultScale = max(s1, s2) if (allowPrecisionLoss) { DecimalType.adjustPrecisionScale(max(p1 - s1, p2 - s2) + resultScale + 1, resultScale) } else { DecimalType.bounded(max(p1 - s1, p2 - s2) + resultScale + 1, resultScale) } case _ => super.dataType } {code} Merge `CheckOverflow` into eval and code-gen code path, so every arithmetic can handle the overflow case during runtime. For example, `Add`: {code:java} dataType match { case decimalType: DecimalType => val value = numeric.plus(input1, input2) checkOverflow(value.asInstanceOf[Decimal], decimalType) ... } {code} > Merge PromotePrecision and CheckOverflow into decimal binary arithmetic > --- > > Key: SPARK-39316 > URL: https://issues.apache.org/jira/browse/SPARK-39316 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > Merge {{PromotePrecision}} into {{{}dataType{}}}, for example, {{{}Add{}}}: > {code:java} > override def dataType: DataType = (left, right) match { > case (DecimalType.Expression(p1, s1), DecimalType.Expression(p2, s2)) => > val resultScale = max(s1, s2) > if (allowPrecisionLoss) { > DecimalType.adjustPrecisionScale(max(p1 - s1, p2 - s2) + resultScale + > 1, > resultScale) > } else { > DecimalType.bounded(max(p1 - s1, p2 - s2) + resultScale + 1, > resultScale) > } > case _ => super.dataType > } {code} > Merge {{{}CheckOverflow{}}}, for example, {{Add}} eval: > {code:java} > dataType match { > case decimalType: DecimalType => > val value = numeric.plus(input1, input2) > checkOverflow(value.asInstanceOf[Decimal], decimalType) > ... > } {code} -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39316) Merge PromotePrecision and CheckOverflow into decimal binary arithmetic
XiDuo You created SPARK-39316: - Summary: Merge PromotePrecision and CheckOverflow into decimal binary arithmetic Key: SPARK-39316 URL: https://issues.apache.org/jira/browse/SPARK-39316 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You Merge `PromotePrecision` into `dataType`, so every arithmetic should report the accurate decimal type. For example, `Add`: {code:java} override def dataType: DataType = (left, right) match { case (DecimalType.Expression(p1, s1), DecimalType.Expression(p2, s2)) => val resultScale = max(s1, s2) if (allowPrecisionLoss) { DecimalType.adjustPrecisionScale(max(p1 - s1, p2 - s2) + resultScale + 1, resultScale) } else { DecimalType.bounded(max(p1 - s1, p2 - s2) + resultScale + 1, resultScale) } case _ => super.dataType } {code} Merge `CheckOverflow` into eval and code-gen code path, so every arithmetic can handle the overflow case during runtime. For example, `Add`: {code:java} dataType match { case decimalType: DecimalType => val value = numeric.plus(input1, input2) checkOverflow(value.asInstanceOf[Decimal], decimalType) ... } {code} -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39315) Refactor PromotePrecision and CheckOverflow with decimal binary arithmetic
XiDuo You created SPARK-39315: - Summary: Refactor PromotePrecision and CheckOverflow with decimal binary arithmetic Key: SPARK-39315 URL: https://issues.apache.org/jira/browse/SPARK-39315 Project: Spark Issue Type: Umbrella Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You For a decimal binary arithmetic expression, we correct its runtime data type and result data type at DecimalPrecision. It has two drawback: # it has some bug with the different ordering of the rules in TypeCoercion # it's unnecessary to change its runtime data type for decimal, as it supports eval with different precision. So what we need to do is change the result type precision. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39291) Fetch blocks and open stream should not respond a closed channel
[ https://issues.apache.org/jira/browse/SPARK-39291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39291: -- Description: If user cancel and interrupt a reduce task who is fetching shuffle blocks, the channel would be closed. However there may be some ChunkFetchRequest still in flight, so the server side TransportRequestHandler would still try to respond those ChunkFetchRequest. It gets worser if the reduce stage is big. {code:java} 22/05/24 21:29:30 ERROR ChunkFetchRequestHandler: Error sending result ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=736493140719,chunkIndex=6],errorString=java.lang.IllegalStateException: Requested chunk not available since streamId 736493140719 is closed at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92) at org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:103) at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:82) at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61) at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:750) ] to /ip:port; closing connection java.nio.channels.ClosedChannelException at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957) at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865) at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764) at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:750) {code} was: If user cancel and interrupt a reduce task who is fetching shuffle blocks, the channel would be closed. However there may be some ChunkFetchRequest still in flight, so the server side TransportRequestHandler would still try to respond those ChunkFetchRequest. It gets worser if the reduce stage is big. {code:java} 22/05/24 21:29:30 ERROR ChunkFetchRequestHandler: Error sending result ChunkFetc
[jira] [Created] (SPARK-39291) Fetch blocks and open stream should not respond a closed channel
XiDuo You created SPARK-39291: - Summary: Fetch blocks and open stream should not respond a closed channel Key: SPARK-39291 URL: https://issues.apache.org/jira/browse/SPARK-39291 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.4.0 Reporter: XiDuo You If user cancel and interrupt a reduce task who is fetching shuffle blocks, the channel would be closed. However there may be some ChunkFetchRequest still in flight, so the server side TransportRequestHandler would still try to respond those ChunkFetchRequest. It gets worser if the reduce stage is big. {code:java} 22/05/24 21:29:30 ERROR ChunkFetchRequestHandler: Error sending result ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=736493140719,chunkIndex=6],errorString=java.lang.IllegalStateException: Requested chunk not available since streamId 736493140719 is closed at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92) at org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:103) at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:82) at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61) at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:750) ] to /ip:port; closing connection java.nio.channels.ClosedChannelException at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957) at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865) at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764) at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:750) {code} -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For addit
[jira] [Created] (SPARK-39267) Clean up dsl unnecessary symbol
XiDuo You created SPARK-39267: - Summary: Clean up dsl unnecessary symbol Key: SPARK-39267 URL: https://issues.apache.org/jira/browse/SPARK-39267 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You dsl is a test helper file which provide easy used functions. But some of these are unnecessary, for example: {code:java} def subquery(alias: Symbol): LogicalPlan {code} For a subquery, we only need the name, so a string type parameter is enough. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-39220) codegen cause NullPointException
[ https://issues.apache.org/jira/browse/SPARK-39220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17539876#comment-17539876 ] XiDuo You commented on SPARK-39220: --- is it possible to also provide a stack log ? > codegen cause NullPointException > > > Key: SPARK-39220 > URL: https://issues.apache.org/jira/browse/SPARK-39220 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.6, 3.2.1 >Reporter: chenxusheng >Priority: Major > > The following code raises NullPointException > {code:sql} > SELECT > fk4c7a8cfc, > fka54f2a73, > fk37e266f7 > FROM > be2a04fad4a24848bee641825e5b3466 > WHERE > ( > fk4c7a8cfc is not null > and fk4c7a8cfc<> '' > ) > LIMIT > 1000 > {code} > However, if so, it is normal > {code:sql} > SELECT > fk4c7a8cfc, > fka54f2a73, > fk37e266f7 > FROM > be2a04fad4a24848bee641825e5b3466 > WHERE > ( > fk4c7a8cfc is not null > and '' <> fk4c7a8cfc > ) > LIMIT > 1000 > {code} > I just put the '' in where in front. > The reason for this problem is that the data contains null values. > *_org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext#genEqual_* > {code:scala} > def genEqual(dataType: DataType, c1: String, c2: String): String = dataType > match { > case BinaryType => s"java.util.Arrays.equals($c1, $c2)" > case FloatType => > s"((java.lang.Float.isNaN($c1) && java.lang.Float.isNaN($c2)) || $c1 == > $c2)" > case DoubleType => > s"((java.lang.Double.isNaN($c1) && java.lang.Double.isNaN($c2)) || $c1 > == $c2)" > case dt: DataType if isPrimitiveType(dt) => s"$c1 == $c2" > case dt: DataType if dt.isInstanceOf[AtomicType] => s"$c1.equals($c2)" > case array: ArrayType => genComp(array, c1, c2) + " == 0" > case struct: StructType => genComp(struct, c1, c2) + " == 0" > case udt: UserDefinedType[_] => genEqual(udt.sqlType, c1, c2) > case NullType => "false" > case _ => > throw new IllegalArgumentException( > "cannot generate equality code for un-comparable type: " + > dataType.catalogString) > } > {code} > {code:scala} > case dt: DataType if dt.isInstanceOf[AtomicType] => s"$c1.equals($c2)" > {code} > Missing null value judgment? -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39172) Remove outer join if all output come from streamed side and buffered side keys exist unique key
[ https://issues.apache.org/jira/browse/SPARK-39172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39172: -- Summary: Remove outer join if all output come from streamed side and buffered side keys exist unique key (was: Remove outer join if all output come from streamed side and buffered side keys exist unique) > Remove outer join if all output come from streamed side and buffered side > keys exist unique key > --- > > Key: SPARK-39172 > URL: https://issues.apache.org/jira/browse/SPARK-39172 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > Improve the optimzation case using the distinct keys framework. > For example: > {code:java} > SELECT t1.* FROM t1 LEFT JOIN (SELECT distinct c1 as c1 FROM t)t2 ON t1.c1 = > t2.c1 > ==> > SELECT t1.* FROM t1 {code} -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39172) Remove outer join if all output come from streamed side and buffered side keys exist unique
XiDuo You created SPARK-39172: - Summary: Remove outer join if all output come from streamed side and buffered side keys exist unique Key: SPARK-39172 URL: https://issues.apache.org/jira/browse/SPARK-39172 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You Improve the optimzation case using the distinct keys framework. For example: {code:java} SELECT t1.* FROM t1 LEFT JOIN (SELECT distinct c1 as c1 FROM t)t2 ON t1.c1 = t2.c1 ==> SELECT t1.* FROM t1 {code} -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-39104) Null Pointer Exeption on unpersist call
[ https://issues.apache.org/jira/browse/SPARK-39104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534136#comment-17534136 ] XiDuo You commented on SPARK-39104: --- it seems this bug also exists at 3.3.0 branch > Null Pointer Exeption on unpersist call > --- > > Key: SPARK-39104 > URL: https://issues.apache.org/jira/browse/SPARK-39104 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.1 >Reporter: Denis >Priority: Major > > DataFrame.unpesist call fails wth NPE > > {code:java} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247) > at > org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241) > at > org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189) > at > org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176) > at > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303) > at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297) > at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) > at scala.collection.TraversableLike.filter(TraversableLike.scala:395) > at scala.collection.TraversableLike.filter$(TraversableLike.scala:395) > at scala.collection.AbstractTraversable.filter(Traversable.scala:108) > at > org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219) > at > org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176) > at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220) > at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231){code} > Looks like syncronization in required for > org.apache.spark.sql.execution.columnar.CachedRDDBuilder#isCachedColumnBuffersLoaded > > {code:java} > def isCachedColumnBuffersLoaded: Boolean = { > _cachedColumnBuffers != null && isCachedRDDLoaded > } > def isCachedRDDLoaded: Boolean = { > _cachedColumnBuffersAreLoaded || { > val bmMaster = SparkEnv.get.blockManager.master > val rddLoaded = _cachedColumnBuffers.partitions.forall { partition => > bmMaster.getBlockStatus(RDDBlockId(_cachedColumnBuffers.id, > partition.index), false) > .exists { case(_, blockStatus) => blockStatus.isCached } > } > if (rddLoaded) { > _cachedColumnBuffersAreLoaded = rddLoaded > } > rddLoaded > } > } {code} > isCachedRDDLoaded relies on _cachedColumnBuffers != null check while it can > be changed concurrently from other thread. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-39132) spark3.2.1 cache throw NPE
[ https://issues.apache.org/jira/browse/SPARK-39132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534134#comment-17534134 ] XiDuo You commented on SPARK-39132: --- same bug with SPARK-39104 > spark3.2.1 cache throw NPE > -- > > Key: SPARK-39132 > URL: https://issues.apache.org/jira/browse/SPARK-39132 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.2.1 > Environment: i set it a driver and 2 executors executor allocate 2g > memory and old generation usage rate about 50%, i think it is health >Reporter: cxb >Priority: Major > Original Estimate: 72h > Remaining Estimate: 72h > > a job running some time about 1 day will throw the exception when i upgrade > spark version to 3.2.1 > gc log: > {code:java} > Heap > par new generation total 307840K, used 239453K [0x8000, > 0x94e0, 0x) > eden space 273664K, 81% used [0x8000, 0x8da4bdd0, > 0x90b4) > from space 34176K, 46% used [0x92ca, 0x93c2b6b8, > 0x94e0) > to space 34176K, 0% used [0x90b4, 0x90b4, > 0x92ca) > concurrent mark-sweep generation total 811300K, used 451940K > [0x, 0xdc2e9000, 0x0001) > Metaspace used 102593K, capacity 110232K, committed 121000K, reserved > 1155072K > class spaceused 12473K, capacity 13482K, committed 15584K, reserved > 1048576K {code} > code: > {{}}{{}} > > {code:java} > sparkSession > .readStream > .format('kafka') > .load > .repartition(4) > ...project > .watermark > .groupby(k1, k2) > .agg(size(collect_set('xxx'))) > .writeStream > .foreachBatch(function test) > .start > def test:(Dataset[Row], Long) => Unit = (ds: Dataset[Row], _: Long) => { > ds.persist(StorageLevel.MEMORY_AND_DISK_SER) > ds.write > .option("collection", s"col_1") > .option("maxBatchSize", "2048") > .mode("append") > .mongo() > ds..write > .option("collection", s"col_2") > .option("maxBatchSize", "2048") > .mode("append") > .mongo() > ds.unpersist() > }{code} > > > exception log > > {code:java} > {code} > 22/05/09 21:11:28 ERROR streaming.MicroBatchExecution: Query rydts_regist_gp > [id = 669c2031-71b2-422b-859d-336722d289e9, runId = > 049de32c-e6ff-48f1-8742-bb95122a36ea] terminated with error > java.lang.NullPointerException > at > org.apache.spark.sql.execution.columnar.CachedRDDBuilder.$anonfun$isCachedRDDLoaded$1(InMemoryRelation.scala:248) > at > org.apache.spark.sql.execution.columnar.CachedRDDBuilder.$anonfun$isCachedRDDLoaded$1$adapted(InMemoryRelation.scala:247) > at > scala.collection.IndexedSeqOptimized.prefixLengthImpl(IndexedSeqOptimized.scala:41) > at > scala.collection.IndexedSeqOptimized.forall(IndexedSeqOptimized.scala:46) > at > scala.collection.IndexedSeqOptimized.forall$(IndexedSeqOptimized.scala:46) > at scala.collection.mutable.ArrayOps$ofRef.forall(ArrayOps.scala:198) > at > org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247) > at > org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241) > at > org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189) > at > org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176) > at > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303) > at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297) > at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) > at scala.collection.TraversableLike.filter(TraversableLike.scala:395) > at scala.collection.TraversableLike.filter$(TraversableLike.scala:395) > at scala.collection.AbstractTraversable.filter(Traversable.scala:108) > at > org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219) > at > org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176) > at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220) > at org.apache.spark.sql.
[jira] [Created] (SPARK-39122) Python UDF does not follow the conditional expression evaluation order
XiDuo You created SPARK-39122: - Summary: Python UDF does not follow the conditional expression evaluation order Key: SPARK-39122 URL: https://issues.apache.org/jira/browse/SPARK-39122 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You We will pull out python udf and run them eagerly whatever can be reached or not, so the below query will fail: {code:java} SELECT if(true, 1, python_udf(1/0)) {code} -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39106) Correct conditional expression constant folding
XiDuo You created SPARK-39106: - Summary: Correct conditional expression constant folding Key: SPARK-39106 URL: https://issues.apache.org/jira/browse/SPARK-39106 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You For a conditional expression, we can not partially fold the constant inside it's children. For example if c1 or c2 is not null, the last branch should be never hit. {code:java} SELECT COALESCE(c1, c2, 1/0); {code} Besides, for CaseWhen and If, we should mark it as foldable if it's children are foldable. It is safe since the both non-codegen and codegen code path have already respected the evaluation order. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39105) Add ConditionalExpression trait
[ https://issues.apache.org/jira/browse/SPARK-39105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39105: -- Description: For developers, if a custom conditional like expression contains common sub expression then the evaluation order may be changed since Spark will pull out and eval the common sub expressions first during execution. Add ConditionalExpression trait is friendly for developers. was: For develpers, if a custom conditional like expression contains common sub expression then the evaluation order may be changed since Spark will pull out and eval the common sub expressions first during execution. Add ConditionalExpression trait is friendly for developers. > Add ConditionalExpression trait > --- > > Key: SPARK-39105 > URL: https://issues.apache.org/jira/browse/SPARK-39105 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > For developers, if a custom conditional like expression contains common sub > expression then the evaluation order may be changed since Spark will pull out > and eval the common sub expressions first during execution. > Add ConditionalExpression trait is friendly for developers. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39105) Add ConditionalExpression trait
[ https://issues.apache.org/jira/browse/SPARK-39105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39105: -- Description: For develpers, if a custom conditional like expression contains common sub expression then the evaluation order may be changed since Spark will pull out and eval the common sub expressions first during execution. Add ConditionalExpression trait is friendly for developers. was: For develpers, if a custom conditional like expression contains common sub expression then the evaluation order may be changed since Spark will pull out and evla the common sub expressions first during execution. Add ConditionalExpression trait is friendly for developers. > Add ConditionalExpression trait > --- > > Key: SPARK-39105 > URL: https://issues.apache.org/jira/browse/SPARK-39105 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > For develpers, if a custom conditional like expression contains common sub > expression then the evaluation order may be changed since Spark will pull out > and eval the common sub expressions first during execution. > Add ConditionalExpression trait is friendly for developers. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39105) Add ConditionalExpression trait
[ https://issues.apache.org/jira/browse/SPARK-39105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39105: -- Description: For develpers, if a custom conditional like expression contains common sub expression then the evaluation order may be changed since Spark will pull out and evla the common sub expressions first during execution. Add ConditionalExpression trait is friendly for developers. was: For develpers, if a custom conditional like expression supports codegen then the evaluation order may be changed since Spark will pull out and evla the common sub expressions first during codegen. Add ConditionalExpression trait is friendly for developers. > Add ConditionalExpression trait > --- > > Key: SPARK-39105 > URL: https://issues.apache.org/jira/browse/SPARK-39105 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > For develpers, if a custom conditional like expression contains common sub > expression then the evaluation order may be changed since Spark will pull out > and evla the common sub expressions first during execution. > Add ConditionalExpression trait is friendly for developers. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39105) Add ConditionalExpression trait
XiDuo You created SPARK-39105: - Summary: Add ConditionalExpression trait Key: SPARK-39105 URL: https://issues.apache.org/jira/browse/SPARK-39105 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You For develpers, if a custom conditional like expression supports codegen then the evaluation order may be changed since Spark will pull out and evla the common sub expressions first during codegen. Add ConditionalExpression trait is friendly for developers. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39040) Respect NaNvl in EquivalentExpressions for expression elimination
[ https://issues.apache.org/jira/browse/SPARK-39040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-39040: -- Description: For example the query will fail: {code:java} set spark.sql.ansi.enabled=true; set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.ConstantFolding; SELECT nanvl(1, 1/0 + 1/0); {code} {code:java} org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4) (10.221.98.68 executor driver): org.apache.spark.SparkArithmeticException: divide by zero. To return NULL instead, use 'try_divide'. If necessary set spark.sql.ansi.enabled to false (except for ANSI interval type) to bypass this error. == SQL(line 1, position 17) == select nanvl(1 , 1/0 + 1/0) ^^^ at org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:151) {code} We should respect the ordering of conditional expression that always evaluate the predicate branch first, so the query above should not fail. was: For example: {code:java} set spark.sql.ansi.enabled=true; set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.ConstantFolding; SELECT nanvl(1, 1/0 + 1/0); {code} We should respect the ordering of conditional expression that always evaluate the predicate branch first, so the query above should not fail. > Respect NaNvl in EquivalentExpressions for expression elimination > - > > Key: SPARK-39040 > URL: https://issues.apache.org/jira/browse/SPARK-39040 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > For example the query will fail: > {code:java} > set spark.sql.ansi.enabled=true; > set > spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.ConstantFolding; > SELECT nanvl(1, 1/0 + 1/0); {code} > {code:java} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in > stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 > (TID 4) (10.221.98.68 executor driver): > org.apache.spark.SparkArithmeticException: divide by zero. To return NULL > instead, use 'try_divide'. If necessary set spark.sql.ansi.enabled to false > (except for ANSI interval type) to bypass this error. > == SQL(line 1, position 17) == > select nanvl(1 , 1/0 + 1/0) > ^^^ at > org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:151) > {code} > We should respect the ordering of conditional expression that always evaluate > the predicate branch first, so the query above should not fail. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39040) Respect NaNvl in EquivalentExpressions for expression elimination
XiDuo You created SPARK-39040: - Summary: Respect NaNvl in EquivalentExpressions for expression elimination Key: SPARK-39040 URL: https://issues.apache.org/jira/browse/SPARK-39040 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You For example: {code:java} set spark.sql.ansi.enabled=true; set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.ConstantFolding; SELECT nanvl(1, 1/0 + 1/0); {code} We should respect the ordering of conditional expression that always evaluate the predicate branch first, so the query above should not fail. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39039) Conditional expression evaluation ordering
XiDuo You created SPARK-39039: - Summary: Conditional expression evaluation ordering Key: SPARK-39039 URL: https://issues.apache.org/jira/browse/SPARK-39039 Project: Spark Issue Type: Umbrella Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You Make sure the conditional expression evaluation ordering is always same, in case the different expression ordering cause the different result. It gets worse with ansi mode which could fail at runtime. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37528) Schedule Tasks By Input Size
[ https://issues.apache.org/jira/browse/SPARK-37528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-37528: -- Description: In general, the larger input data size means longer running time. So ideally, we can let DAGScheduler submit bigger input size task first. It can reduce the whole stage running time. [design doc|https://docs.google.com/document/d/1vPcuEADUokO4XpqBV1rFH90Zi4rKdsgZtZMYX80c2gw/edit?usp=sharing] was: In general, the larger input data size means longer running time. So ideally, we can let DAGScheduler submit bigger input size task first. It can reduce the whole stage running time. [design doc|[https://docs.google.com/document/d/1vPcuEADUokO4XpqBV1rFH90Zi4rKdsgZtZMYX80c2gw/edit?usp=sharing]] > Schedule Tasks By Input Size > > > Key: SPARK-37528 > URL: https://issues.apache.org/jira/browse/SPARK-37528 > Project: Spark > Issue Type: New Feature > Components: Spark Core, SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > In general, the larger input data size means longer running time. So ideally, > we can let DAGScheduler submit bigger input size task first. It can reduce > the whole stage running time. > [design > doc|https://docs.google.com/document/d/1vPcuEADUokO4XpqBV1rFH90Zi4rKdsgZtZMYX80c2gw/edit?usp=sharing] -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37528) Schedule Tasks By Input Size
[ https://issues.apache.org/jira/browse/SPARK-37528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-37528: -- Description: In general, the larger input data size means longer running time. So ideally, we can let DAGScheduler submit bigger input size task first. It can reduce the whole stage running time. [design doc](https://docs.google.com/document/d/1vPcuEADUokO4XpqBV1rFH90Zi4rKdsgZtZMYX80c2gw/edit?usp=sharing) was: In general, the larger input data size means longer running time. So ideally, we can let DAGScheduler submit bigger input size task first. It can reduce the whole stage running time. For example, we have one stage with 4 tasks and the defaultParallelism is 2 and the 4 tasks have different running time [1s, 3s, 2s, 4s]. - in normal, the running time of the stage is: 7s - if big task first, the running time of the stage is: 5s > Schedule Tasks By Input Size > > > Key: SPARK-37528 > URL: https://issues.apache.org/jira/browse/SPARK-37528 > Project: Spark > Issue Type: New Feature > Components: Spark Core, SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > In general, the larger input data size means longer running time. So ideally, > we can let DAGScheduler submit bigger input size task first. It can reduce > the whole stage running time. > [design > doc](https://docs.google.com/document/d/1vPcuEADUokO4XpqBV1rFH90Zi4rKdsgZtZMYX80c2gw/edit?usp=sharing) -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37528) Schedule Tasks By Input Size
[ https://issues.apache.org/jira/browse/SPARK-37528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-37528: -- Description: In general, the larger input data size means longer running time. So ideally, we can let DAGScheduler submit bigger input size task first. It can reduce the whole stage running time. [design doc|[https://docs.google.com/document/d/1vPcuEADUokO4XpqBV1rFH90Zi4rKdsgZtZMYX80c2gw/edit?usp=sharing]] was: In general, the larger input data size means longer running time. So ideally, we can let DAGScheduler submit bigger input size task first. It can reduce the whole stage running time. [design doc](https://docs.google.com/document/d/1vPcuEADUokO4XpqBV1rFH90Zi4rKdsgZtZMYX80c2gw/edit?usp=sharing) > Schedule Tasks By Input Size > > > Key: SPARK-37528 > URL: https://issues.apache.org/jira/browse/SPARK-37528 > Project: Spark > Issue Type: New Feature > Components: Spark Core, SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > In general, the larger input data size means longer running time. So ideally, > we can let DAGScheduler submit bigger input size task first. It can reduce > the whole stage running time. > [design > doc|[https://docs.google.com/document/d/1vPcuEADUokO4XpqBV1rFH90Zi4rKdsgZtZMYX80c2gw/edit?usp=sharing]] -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38962) Fix wrong computeStats at DataSourceV2Relation
XiDuo You created SPARK-38962: - Summary: Fix wrong computeStats at DataSourceV2Relation Key: SPARK-38962 URL: https://issues.apache.org/jira/browse/SPARK-38962 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You The interface `SupportsReportStatistics` should be mixed in `Scan` rather than `ScanBuilder` -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38962) Fix wrong computeStats at DataSourceV2Relation
[ https://issues.apache.org/jira/browse/SPARK-38962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38962: -- Issue Type: Bug (was: Improvement) > Fix wrong computeStats at DataSourceV2Relation > -- > > Key: SPARK-38962 > URL: https://issues.apache.org/jira/browse/SPARK-38962 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Minor > > The interface `SupportsReportStatistics` should be mixed in `Scan` rather > than `ScanBuilder` -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38667) Optimizer generates error when using inner join along with sequence
[ https://issues.apache.org/jira/browse/SPARK-38667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524692#comment-17524692 ] XiDuo You commented on SPARK-38667: --- So you can add a config to avoid this issue set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerate > Optimizer generates error when using inner join along with sequence > --- > > Key: SPARK-38667 > URL: https://issues.apache.org/jira/browse/SPARK-38667 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 3.2.1 >Reporter: Lars >Priority: Major > > This issue occurred in a more complex scenario, so I've broken it down into a > simple case. > {*}Steps to reproduce{*}: Execute the following example. The code should run > without errors, but instead a *java.lang.IllegalArgumentException: Illegal > sequence boundaries: 4 to 2 by 1* is thrown. > {code:java} > package com.example > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.functions._ > object SparkIssue { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[*]") > .getOrCreate() > val dfA = spark > .createDataFrame(Seq((1, 1), (2, 4))) > .toDF("a1", "a2") > val dfB = spark > .createDataFrame(Seq((1, 5), (2, 2))) > .toDF("b1", "b2") > dfA.join(dfB, dfA("a1") === dfB("b1"), "inner") > .where(col("a2") < col("b2")) > .withColumn("x", explode(sequence(col("a2"), col("b2"), lit(1 > .show() > spark.stop() > } > } > {code} > When I look at the Optimized Logical Plan I can see that the Inner Join and > the Filter are brought together, with an additional check for an empty > Sequence. The exception is thrown because the Sequence check is executed > before the Filter. > {code:java} > == Parsed Logical Plan == > 'Project [a1#4, a2#5, b1#12, b2#13, explode(sequence('a2, 'b2, Some(1), > None)) AS x#24] > +- Filter (a2#5 < b2#13) > +- Join Inner, (a1#4 = b1#12) > :- Project [_1#0 AS a1#4, _2#1 AS a2#5] > : +- LocalRelation [_1#0, _2#1] > +- Project [_1#8 AS b1#12, _2#9 AS b2#13] > +- LocalRelation [_1#8, _2#9] > == Analyzed Logical Plan == > a1: int, a2: int, b1: int, b2: int, x: int > Project [a1#4, a2#5, b1#12, b2#13, x#25] > +- Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), > false, [x#25] > +- Filter (a2#5 < b2#13) > +- Join Inner, (a1#4 = b1#12) > :- Project [_1#0 AS a1#4, _2#1 AS a2#5] > : +- LocalRelation [_1#0, _2#1] > +- Project [_1#8 AS b1#12, _2#9 AS b2#13] > +- LocalRelation [_1#8, _2#9] > == Optimized Logical Plan == > Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), false, > [x#25] > +- Join Inner, (((size(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin)), > true) > 0) AND (a2#5 < b2#13)) AND (a1#4 = b1#12)) > :- LocalRelation [a1#4, a2#5] > +- LocalRelation [b1#12, b2#13] > == Physical Plan == > Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), [a1#4, > a2#5, b1#12, b2#13], false, [x#25] > +- *(1) BroadcastHashJoin [a1#4], [b1#12], Inner, BuildRight, > ((size(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin)), true) > 0) AND > (a2#5 < b2#13)), false > :- *(1) LocalTableScan [a1#4, a2#5] > +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, > false] as bigint)),false), [id=#15] > +- LocalTableScan [b1#12, b2#13] > {code} > > > -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38667) Optimizer generates error when using inner join along with sequence
[ https://issues.apache.org/jira/browse/SPARK-38667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524690#comment-17524690 ] XiDuo You commented on SPARK-38667: --- it was introduced by SPARK-32295 and fixed by SPARK-37392 > Optimizer generates error when using inner join along with sequence > --- > > Key: SPARK-38667 > URL: https://issues.apache.org/jira/browse/SPARK-38667 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 3.2.1 >Reporter: Lars >Priority: Major > > This issue occurred in a more complex scenario, so I've broken it down into a > simple case. > {*}Steps to reproduce{*}: Execute the following example. The code should run > without errors, but instead a *java.lang.IllegalArgumentException: Illegal > sequence boundaries: 4 to 2 by 1* is thrown. > {code:java} > package com.example > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.functions._ > object SparkIssue { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[*]") > .getOrCreate() > val dfA = spark > .createDataFrame(Seq((1, 1), (2, 4))) > .toDF("a1", "a2") > val dfB = spark > .createDataFrame(Seq((1, 5), (2, 2))) > .toDF("b1", "b2") > dfA.join(dfB, dfA("a1") === dfB("b1"), "inner") > .where(col("a2") < col("b2")) > .withColumn("x", explode(sequence(col("a2"), col("b2"), lit(1 > .show() > spark.stop() > } > } > {code} > When I look at the Optimized Logical Plan I can see that the Inner Join and > the Filter are brought together, with an additional check for an empty > Sequence. The exception is thrown because the Sequence check is executed > before the Filter. > {code:java} > == Parsed Logical Plan == > 'Project [a1#4, a2#5, b1#12, b2#13, explode(sequence('a2, 'b2, Some(1), > None)) AS x#24] > +- Filter (a2#5 < b2#13) > +- Join Inner, (a1#4 = b1#12) > :- Project [_1#0 AS a1#4, _2#1 AS a2#5] > : +- LocalRelation [_1#0, _2#1] > +- Project [_1#8 AS b1#12, _2#9 AS b2#13] > +- LocalRelation [_1#8, _2#9] > == Analyzed Logical Plan == > a1: int, a2: int, b1: int, b2: int, x: int > Project [a1#4, a2#5, b1#12, b2#13, x#25] > +- Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), > false, [x#25] > +- Filter (a2#5 < b2#13) > +- Join Inner, (a1#4 = b1#12) > :- Project [_1#0 AS a1#4, _2#1 AS a2#5] > : +- LocalRelation [_1#0, _2#1] > +- Project [_1#8 AS b1#12, _2#9 AS b2#13] > +- LocalRelation [_1#8, _2#9] > == Optimized Logical Plan == > Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), false, > [x#25] > +- Join Inner, (((size(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin)), > true) > 0) AND (a2#5 < b2#13)) AND (a1#4 = b1#12)) > :- LocalRelation [a1#4, a2#5] > +- LocalRelation [b1#12, b2#13] > == Physical Plan == > Generate explode(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin))), [a1#4, > a2#5, b1#12, b2#13], false, [x#25] > +- *(1) BroadcastHashJoin [a1#4], [b1#12], Inner, BuildRight, > ((size(sequence(a2#5, b2#13, Some(1), Some(Europe/Berlin)), true) > 0) AND > (a2#5 < b2#13)), false > :- *(1) LocalTableScan [a1#4, a2#5] > +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, > false] as bigint)),false), [id=#15] > +- LocalTableScan [b1#12, b2#13] > {code} > > > -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38932) Datasource v2 support report unique keys
XiDuo You created SPARK-38932: - Summary: Datasource v2 support report unique keys Key: SPARK-38932 URL: https://issues.apache.org/jira/browse/SPARK-38932 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You Datasource v2 can be used to connect to some databases who support [*unique key*|https://en.wikipedia.org/wiki/Unique_key]. Spark catalyst optimizer support do further optimization through unique keys. So it can improve the performance if the Scan reports its unique keys to Spark. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38895) Unify the AQE shuffle read canonicalized
XiDuo You created SPARK-38895: - Summary: Unify the AQE shuffle read canonicalized Key: SPARK-38895 URL: https://issues.apache.org/jira/browse/SPARK-38895 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You After canonicalized, the child of AQEShuffleReadExec will be a exchange instead of shuffle query stage. For better maintenance, we can simply override the isCanonicalizedPlan and let famework to check if the plan can be executed. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38887) Support switch inner join side for sort merge join
[ https://issues.apache.org/jira/browse/SPARK-38887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38887: -- Summary: Support switch inner join side for sort merge join (was: Support swtich inner join side for sort merge join) > Support switch inner join side for sort merge join > -- > > Key: SPARK-38887 > URL: https://issues.apache.org/jira/browse/SPARK-38887 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > For an inner join type SortMergeJoin, it always uses the left side as > streamed side and right side as buffered side. > Accoirding to the implementaion of SortMergeJoin, we expect the buffered side > to be: > * smaller than streamed side > * has less duplicate data > We do not know whether the join will be SortMergeJoin at logical phase, so it > should do this selection at physcial phase. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38887) Support swtich inner join side for sort merge join
XiDuo You created SPARK-38887: - Summary: Support swtich inner join side for sort merge join Key: SPARK-38887 URL: https://issues.apache.org/jira/browse/SPARK-38887 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You For an inner join type SortMergeJoin, it always uses the left side as streamed side and right side as buffered side. Accoirding to the implementaion of SortMergeJoin, we expect the buffered side to be: * smaller than streamed side * has less duplicate data We do not know whether the join will be SortMergeJoin at logical phase, so it should do this selection at physcial phase. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38886) Remove outer join if aggregate functions are duplicate agnostic on streamed side
[ https://issues.apache.org/jira/browse/SPARK-38886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38886: -- Description: If aggregate child is outer join, and the aggregate references are all coming from the streamed side and the aggregate functions are all duplicate agnostic, we can remve the outer join. For example: {code:java} SELECT t1.c1, min(t1.c2) FROM t1 LEFT JOIN t2 ON t1.c1 = t2.c1 GROUP BY t1.c1 ==> SELECT t1.c1, min(t1.c2) FROM t1 GROUP BY t1.c1 {code} was: If aggregate child is outer join, and the aggregate references are all coming from the streamed side and the aggregate functions are all duplicate agnostic, we can remve the outer join. For example: {code:java} SELECT t1.c1, max(t1.c2) FROM t1 LEFT JOIN t2 ON t1.c1 = t2.c1 GROUP BY t1.c1 ==> SELECT t1.c1, max(t1.c2) FROM t1 GROUP BY t1.c1 {code} > Remove outer join if aggregate functions are duplicate agnostic on streamed > side > > > Key: SPARK-38886 > URL: https://issues.apache.org/jira/browse/SPARK-38886 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > If aggregate child is outer join, and the aggregate references are all coming > from the streamed side and the aggregate functions are all duplicate > agnostic, we can remve the outer join. > For example: > {code:java} > SELECT t1.c1, min(t1.c2) FROM t1 LEFT JOIN t2 ON t1.c1 = t2.c1 GROUP BY t1.c1 > ==> > SELECT t1.c1, min(t1.c2) FROM t1 GROUP BY t1.c1 > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38886) Remove outer join if aggregate functions are duplicate agnostic on streamed side
XiDuo You created SPARK-38886: - Summary: Remove outer join if aggregate functions are duplicate agnostic on streamed side Key: SPARK-38886 URL: https://issues.apache.org/jira/browse/SPARK-38886 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You If aggregate child is outer join, and the aggregate references are all coming from the streamed side and the aggregate functions are all duplicate agnostic, we can remve the outer join. For example: {code:java} SELECT t1.c1, max(t1.c2) FROM t1 LEFT JOIN t2 ON t1.c1 = t2.c1 GROUP BY t1.c1 ==> SELECT t1.c1, max(t1.c2) FROM t1 GROUP BY t1.c1 {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38853) optimizeSkewsInRebalancePartitions has performance issue
[ https://issues.apache.org/jira/browse/SPARK-38853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17520368#comment-17520368 ] XiDuo You commented on SPARK-38853: --- Some issues might cause driver hang during optimizing skew : SPARK-38406 SPARK-38401 > optimizeSkewsInRebalancePartitions has performance issue > > > Key: SPARK-38853 > URL: https://issues.apache.org/jira/browse/SPARK-38853 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Yuming Wang >Priority: Major > Attachments: Disable.png, enable.png > > > How to reproduce this issue: > {code:sql} > CREATE TABLE t USING PARQUET > AS > SELECT > /*+ REBALANCE */ > A.SESSION_START_DT > , COALESCE(A.SITE_ID,0) AS SITE_ID > , A.GUID > , COALESCE(CAST(A.SESSION_SKEY AS BIGINT),0) AS SESSION_SKEY > , COALESCE(CAST(A.SEQNUM AS INT),0) AS SEQNUM > > , COALESCE(A.IMP_PAGE_ID,0) AS IMP_PAGE_ID > , COALESCE(A.PLACEMENT_ID,0) AS PLACEMENT_ID > , A.PRODUCT_LINE_CODE > , A.ALGORITHM_ID > , A.MEID > , A.ALGO_OUTPUT_ITEMS > , A.CLICKS > , A.GMV_7D > FROM big_partition_table A > WHERE > DT BETWEEN DATE_FORMAT(DATE_SUB(CURRENT_DATE,11), 'MMdd') AND > DATE_FORMAT(DATE_ADD(DATE_SUB(CURRENT_DATE,11),0), 'MMdd') > AND TO_DATE(from_unixtime(unix_timestamp(A.SESSION_START_DT, > '/MM/dd'))) = DATE_SUB(CURRENT_DATE,11) > AND ICFBOT = '00'; > {code} > Enabling optimizeSkewsInRebalancePartitions takes more than 2 hours and the > driver hangs: > !enable.png! > Disabling optimizeSkewsInRebalancePartitions takes only 29 minutes: > !Disable.png! -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38832) Remove unnecessary distinct in aggregate expression by distinctKeys
[ https://issues.apache.org/jira/browse/SPARK-38832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38832: -- Description: We can remove the distinct in aggregate expression if the child distinct semantics is guaranteed. For example: {code:java} SELECT count(distinct c) FROM ( SELECT c FROM t GROUP BY c ){code} was: We can remove the distinct in aggregate expression if the child distinct semantics is guaranteed. For example: {code:java} SELECT count(distinct c) FROM ( SELECT c FROM t GROUP BY c ){code} > Remove unnecessary distinct in aggregate expression by distinctKeys > --- > > Key: SPARK-38832 > URL: https://issues.apache.org/jira/browse/SPARK-38832 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > We can remove the distinct in aggregate expression if the child distinct > semantics is guaranteed. > For example: > {code:java} > SELECT count(distinct c) FROM ( > SELECT c FROM t GROUP BY c > ){code} > > > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38832) Remove unnecessary distinct in aggregate expression by distinctKeys
XiDuo You created SPARK-38832: - Summary: Remove unnecessary distinct in aggregate expression by distinctKeys Key: SPARK-38832 URL: https://issues.apache.org/jira/browse/SPARK-38832 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You We can remove the distinct in aggregate expression if the child distinct semantics is guaranteed. For example: {code:java} SELECT count(distinct c) FROM ( SELECT c FROM t GROUP BY c ){code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38162) Optimize one row plan in normal and AQE Optimizer
[ https://issues.apache.org/jira/browse/SPARK-38162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38162: -- Parent: SPARK-37063 Issue Type: Sub-task (was: Improvement) > Optimize one row plan in normal and AQE Optimizer > - > > Key: SPARK-38162 > URL: https://issues.apache.org/jira/browse/SPARK-38162 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > Fix For: 3.3.0 > > > Optimize the plan if its max row is equal to or less than 1 in these cases: > - if the child of sort max rows less than or equal to 1, remove the sort > - if the child of local sort max rows per partition less than or equal to 1, > remove the local sort > - if the child of aggregate max rows less than or equal to 1 and it's > grouping only (include the rewritten distinct plan), remove the aggregate > - if the child of aggregate max rows less than or equal to 1, set distinct to > false in all aggregate expression -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38773) Correct the Union output partitioning and ordering
XiDuo You created SPARK-38773: - Summary: Correct the Union output partitioning and ordering Key: SPARK-38773 URL: https://issues.apache.org/jira/browse/SPARK-38773 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You If the output partitioning of all children are semantics equality, the Union should respect their output partitioning. Else if the outout ordering of all children are semantics equality, the Union should respect their output ordering. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37528) Schedule Tasks By Input Size
[ https://issues.apache.org/jira/browse/SPARK-37528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-37528: -- Affects Version/s: 3.4.0 (was: 3.3.0) > Schedule Tasks By Input Size > > > Key: SPARK-37528 > URL: https://issues.apache.org/jira/browse/SPARK-37528 > Project: Spark > Issue Type: New Feature > Components: Spark Core, SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > In general, the larger input data size means longer running time. So ideally, > we can let DAGScheduler submit bigger input size task first. It can reduce > the whole stage running time. For example, we have one stage with 4 tasks and > the defaultParallelism is 2 and the 4 tasks have different running time [1s, > 3s, 2s, 4s]. > - in normal, the running time of the stage is: 7s > - if big task first, the running time of the stage is: 5s -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37528) Schedule Tasks By Input Size
[ https://issues.apache.org/jira/browse/SPARK-37528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-37528: -- Summary: Schedule Tasks By Input Size (was: Support reorder tasks during scheduling by shuffle partition size in AQE) > Schedule Tasks By Input Size > > > Key: SPARK-37528 > URL: https://issues.apache.org/jira/browse/SPARK-37528 > Project: Spark > Issue Type: New Feature > Components: Spark Core, SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > In general, the larger input data size means longer running time. So ideally, > we can let DAGScheduler submit bigger input size task first. It can reduce > the whole stage running time. For example, we have one stage with 4 tasks and > the defaultParallelism is 2 and the 4 tasks have different running time [1s, > 3s, 2s, 4s]. > - in normal, the running time of the stage is: 7s > - if big task first, the running time of the stage is: 5s -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37528) Support reorder tasks during scheduling by shuffle partition size in AQE
[ https://issues.apache.org/jira/browse/SPARK-37528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-37528: -- Description: In general, the larger input data size means longer running time. So ideally, we can let DAGScheduler submit bigger input size task first. It can reduce the whole stage running time. For example, we have one stage with 4 tasks and the defaultParallelism is 2 and the 4 tasks have different running time [1s, 3s, 2s, 4s]. - in normal, the running time of the stage is: 7s - if big task first, the running time of the stage is: 5s was: Reorder tasks by input size can save the whole stage execution time. Assume the larger amount of input data takes longer to execute. Let's say we have one stage with 4 tasks and the `defaultParallelism` is 2 and the 4 tasks have differnt execution time with [1s, 3s, 2s, 4s]. * in normal the execution time of the stage is: 7s * after reorder the tasks, the execution time of the stage is: 5s a new config `spark.scheduler.reorderTasks.enabled` to decide if we allow to reorder tasks. > Support reorder tasks during scheduling by shuffle partition size in AQE > > > Key: SPARK-37528 > URL: https://issues.apache.org/jira/browse/SPARK-37528 > Project: Spark > Issue Type: New Feature > Components: Spark Core, SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > In general, the larger input data size means longer running time. So ideally, > we can let DAGScheduler submit bigger input size task first. It can reduce > the whole stage running time. For example, we have one stage with 4 tasks and > the defaultParallelism is 2 and the 4 tasks have different running time [1s, > 3s, 2s, 4s]. > - in normal, the running time of the stage is: 7s > - if big task first, the running time of the stage is: 5s -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38697) Extend SparkSessionExtensions to inject rules into AQE Optimizer
[ https://issues.apache.org/jira/browse/SPARK-38697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38697: -- Description: Provide a entrance for developer to play their logical plan with runtime optimizer in adaptive query execution framework. We should follow the existed Spark session extension to allow developer inject the rule. was: Provide a entrance for user to play their logical plan with runtime optimizer in adaptive query execution framework. We should follow the existed Spark session extension to allow user inject the rule. > Extend SparkSessionExtensions to inject rules into AQE Optimizer > > > Key: SPARK-38697 > URL: https://issues.apache.org/jira/browse/SPARK-38697 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > Provide a entrance for developer to play their logical plan with runtime > optimizer in adaptive query execution framework. > We should follow the existed Spark session extension to allow developer > inject the rule. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38697) Extend SparkSessionExtensions to inject rules into AQE Optimizer
XiDuo You created SPARK-38697: - Summary: Extend SparkSessionExtensions to inject rules into AQE Optimizer Key: SPARK-38697 URL: https://issues.apache.org/jira/browse/SPARK-38697 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You Provide a entrance for user to play their logical plan with runtime optimizer in adaptive query execution framework. We should follow the existed Spark session extension to allow user inject the rule. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38578) Avoid unnecessary sort in FileFormatWriter if user has specified sort in AQE
[ https://issues.apache.org/jira/browse/SPARK-38578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38578: -- Summary: Avoid unnecessary sort in FileFormatWriter if user has specified sort in AQE (was: Avoid unnecessary sort in FileFormatWriter if user has specified sort) > Avoid unnecessary sort in FileFormatWriter if user has specified sort in AQE > > > Key: SPARK-38578 > URL: https://issues.apache.org/jira/browse/SPARK-38578 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > FileFormatWriter will check and add an implicit sort for dynamic partition > columns or bucket columns according to the input physical plan. The check > became always failure since AQE AdaptiveSparkPlanExec has no outputOrdering. > That casues a redundant sort if user has specified a sort which satisfies the > required ordering (dynamic partition and bucket columns). -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38578) Avoid unnecessary sort in FileFormatWriter if user has specified sort
[ https://issues.apache.org/jira/browse/SPARK-38578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38578: -- Parent: SPARK-37063 Issue Type: Sub-task (was: Improvement) > Avoid unnecessary sort in FileFormatWriter if user has specified sort > - > > Key: SPARK-38578 > URL: https://issues.apache.org/jira/browse/SPARK-38578 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > FileFormatWriter will check and add an implicit sort for dynamic partition > columns or bucket columns according to the input physical plan. The check > became always failure since AQE AdaptiveSparkPlanExec has no outputOrdering. > That casues a redundant sort if user has specified a sort which satisfies the > required ordering (dynamic partition and bucket columns). -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38578) Avoid unnecessary sort in FileFormatWriter if user has specified sort
XiDuo You created SPARK-38578: - Summary: Avoid unnecessary sort in FileFormatWriter if user has specified sort Key: SPARK-38578 URL: https://issues.apache.org/jira/browse/SPARK-38578 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.0 Reporter: XiDuo You FileFormatWriter will check and add an implicit sort for dynamic partition columns or bucket columns according to the input physical plan. The check became always failure since AQE AdaptiveSparkPlanExec has no outputOrdering. That casues a redundant sort if user has specified a sort which satisfies the required ordering (dynamic partition and bucket columns). -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37796) ByteArrayMethods arrayEquals should fast skip the check of aligning with unaligned platform
[ https://issues.apache.org/jira/browse/SPARK-37796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-37796: -- Priority: Major (was: Minor) > ByteArrayMethods arrayEquals should fast skip the check of aligning with > unaligned platform > --- > > Key: SPARK-37796 > URL: https://issues.apache.org/jira/browse/SPARK-37796 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Assignee: XiDuo You >Priority: Major > Fix For: 3.3.0 > > > The method `arrayEquals` in `ByteArrayMethods` is critical function which is > used in `UTF8String.` `equals`, `indexOf`,`find` etc. > After SPARK-16962, it add the complexity of aligned. It would be better to > fast sikip the check of aligning if the platform is unaligned. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36992) Improve byte array sort perf by unify getPrefix function of UTF8String and ByteArray
[ https://issues.apache.org/jira/browse/SPARK-36992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-36992: -- Priority: Major (was: Minor) > Improve byte array sort perf by unify getPrefix function of UTF8String and > ByteArray > > > Key: SPARK-36992 > URL: https://issues.apache.org/jira/browse/SPARK-36992 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Assignee: XiDuo You >Priority: Major > Fix For: 3.3.0 > > > When execute sort operator, we first compare the prefix. However the > getPrefix function of byte array is slow. We use first 8 bytes as the prefix, > so at most we will call 8 times with `Platform.getByte` which is slower than > call once with `Platform.getInt` or `Platform.getLong`. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37037) Improve byte array sort by unify compareTo function of UTF8String and ByteArray
[ https://issues.apache.org/jira/browse/SPARK-37037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-37037: -- Priority: Major (was: Minor) > Improve byte array sort by unify compareTo function of UTF8String and > ByteArray > > > Key: SPARK-37037 > URL: https://issues.apache.org/jira/browse/SPARK-37037 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Assignee: XiDuo You >Priority: Major > Fix For: 3.3.0 > > > BinaryType use `TypeUtils.compareBinary` to compare two byte array, however > it's slow since it compares byte array using unsigned int comparison byte by > bye. > We can compare them using `Platform.getLong` with unsigned long comparison if > they have more than 8 bytes. And here is some histroy about this `TODO` > [https://github.com/apache/spark/pull/6755/files#r32197461 > .|https://github.com/apache/spark/pull/6755/files#r32197461] -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38536) Spark 3 can not read mixed format partitions
[ https://issues.apache.org/jira/browse/SPARK-38536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17506789#comment-17506789 ] XiDuo You commented on SPARK-38536: --- it should be fixed by SPARK-36197 ? > Spark 3 can not read mixed format partitions > > > Key: SPARK-38536 > URL: https://issues.apache.org/jira/browse/SPARK-38536 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.2.1 >Reporter: Huicheng Song >Priority: Major > > Spark 3.x reads partitions with table's input format, which fails when the > partition has a different input format than the table. > This is a regression introduced by SPARK-26630. Before that fix, Spark will > use Partition InputFormat when creating HadoopRDD. With that fix, Spark uses > only Table InputFormat when creating HadoopRDD, causing failures > Reading mixed format partitions is an import scenario, especially for format > migration. It is also well supported in query engines like Hive and Presto. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38519) AQE throw exception should respect SparkFatalException
[ https://issues.apache.org/jira/browse/SPARK-38519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38519: -- Description: BroadcastExchangeExec will wrap fatal exception inside SparkFatalException and unwarp it before throw. AQE should also respect SparkFatalException and throw original error. {code:java} Caused by: org.apache.spark.util.SparkFatalException at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:168) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} was: BroadcastExchangeExec will wrap fatal exception in SparkFatalException and unwarp it before throw. AQE should also respect SparkFatalException and throw original error. {code:java} Caused by: org.apache.spark.util.SparkFatalException at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:168) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} > AQE throw exception should respect SparkFatalException > -- > > Key: SPARK-38519 > URL: https://issues.apache.org/jira/browse/SPARK-38519 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > BroadcastExchangeExec will wrap fatal exception inside SparkFatalException > and unwarp it before throw. > AQE should also respect SparkFatalException and throw original error. > {code:java} > Caused by: org.apache.spark.util.SparkFatalException > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:168) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38519) AQE throw exception should respect SparkFatalException
[ https://issues.apache.org/jira/browse/SPARK-38519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38519: -- Description: BroadcastExchangeExec will wrap fatal exception in SparkFatalException and unwarp it before throw. AQE should also respect SparkFatalException and throw original error. {code:java} Caused by: org.apache.spark.util.SparkFatalException at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:168) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} was: BroadcastExchangeExec will wrap fatal exception in SparkFatalException and unwarp it in some place. AQE should also respect SparkFatalException and throw original error. {code:java} Caused by: org.apache.spark.util.SparkFatalException at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:168) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} > AQE throw exception should respect SparkFatalException > -- > > Key: SPARK-38519 > URL: https://issues.apache.org/jira/browse/SPARK-38519 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > BroadcastExchangeExec will wrap fatal exception in SparkFatalException and > unwarp it before throw. > AQE should also respect SparkFatalException and throw original error. > {code:java} > Caused by: org.apache.spark.util.SparkFatalException > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:168) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38519) AQE throw exception should respect SparkFatalException
[ https://issues.apache.org/jira/browse/SPARK-38519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38519: -- Description: BroadcastExchangeExec will wrap fatal exception in SparkFatalException and unwarp it in some place. AQE should also respect SparkFatalException and throw original error. {code:java} Caused by: org.apache.spark.util.SparkFatalException at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:168) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} was: BroadcastExchangeExec will wrap fatal exception in SparkFatalException and unwarp in some place during catch SparkFatalException. AQE should also respect SparkFatalException and throw original error. {code:java} Caused by: org.apache.spark.util.SparkFatalException at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:168) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} > AQE throw exception should respect SparkFatalException > -- > > Key: SPARK-38519 > URL: https://issues.apache.org/jira/browse/SPARK-38519 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > BroadcastExchangeExec will wrap fatal exception in SparkFatalException and > unwarp it in some place. > AQE should also respect SparkFatalException and throw original error. > {code:java} > Caused by: org.apache.spark.util.SparkFatalException > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:168) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38519) AQE throw exception should respect SparkFatalException
XiDuo You created SPARK-38519: - Summary: AQE throw exception should respect SparkFatalException Key: SPARK-38519 URL: https://issues.apache.org/jira/browse/SPARK-38519 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.3.0 Reporter: XiDuo You BroadcastExchangeExec will wrap fatal exception in SparkFatalException and unwarp in some place during catch SparkFatalException. AQE should also respect SparkFatalException and throw original error. {code:java} Caused by: org.apache.spark.util.SparkFatalException at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:168) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38410) Support specify initial partition number for rebalance
XiDuo You created SPARK-38410: - Summary: Support specify initial partition number for rebalance Key: SPARK-38410 URL: https://issues.apache.org/jira/browse/SPARK-38410 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.3.0 Reporter: XiDuo You Rebalance partitions resolve the skew issue during shuffle dataset. It always returns an indeterminate partition number so at the beginning we do not pass partition as parameter. However, we find the initial partition number can affect the data compression ratio. So it would be better to make the partition number isolation. Note that, it only affects the initial partition number at map side during shuffle. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38406) Improve perfermance of ShufflePartitionsUtil createSkewPartitionSpecs
[ https://issues.apache.org/jira/browse/SPARK-38406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38406: -- Parent: SPARK-37063 Issue Type: Sub-task (was: Improvement) > Improve perfermance of ShufflePartitionsUtil createSkewPartitionSpecs > - > > Key: SPARK-38406 > URL: https://issues.apache.org/jira/browse/SPARK-38406 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > If shuffle is skewed with tens of thousands of map partitions and reduce > partitions in AQE, the method > `ShufflePartitionsUtil#createSkewPartitionSpecs` will be very slow. More > unfortunately, it is running at driver side. > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38406) Improve perfermance of ShufflePartitionsUtil createSkewPartitionSpecs
[ https://issues.apache.org/jira/browse/SPARK-38406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38406: -- Description: If shuffle is skewed with tens of thousands of map partitions and reduce partitions in AQE, the method `ShufflePartitionsUtil#createSkewPartitionSpecs` will be very slow. More unfortunately, it is running at driver side. was: If shuffle is skewed with tens of thousands of map partitions and reduce partitions in AQE, the method `ShufflePartitionsUtil#createSkewPartitionSpecs` will be very slow. More unfortunately, it is running at driver side. See the Flame Graph of my local test env: !image-2022-03-03-20-24-48-244.png! We can see, a lot of cpu is running for build seq. > Improve perfermance of ShufflePartitionsUtil createSkewPartitionSpecs > - > > Key: SPARK-38406 > URL: https://issues.apache.org/jira/browse/SPARK-38406 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > If shuffle is skewed with tens of thousands of map partitions and reduce > partitions in AQE, the method > `ShufflePartitionsUtil#createSkewPartitionSpecs` will be very slow. More > unfortunately, it is running at driver side. > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38406) Improve perfermance of ShufflePartitionsUtil createSkewPartitionSpecs
XiDuo You created SPARK-38406: - Summary: Improve perfermance of ShufflePartitionsUtil createSkewPartitionSpecs Key: SPARK-38406 URL: https://issues.apache.org/jira/browse/SPARK-38406 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.0 Reporter: XiDuo You If shuffle is skewed with tens of thousands of map partitions and reduce partitions in AQE, the method `ShufflePartitionsUtil#createSkewPartitionSpecs` will be very slow. More unfortunately, it is running at driver side. See the Flame Graph of my local test env: !image-2022-03-03-20-24-48-244.png! We can see, a lot of cpu is running for build seq. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38401) Unify get preferred locations for shuffle in AQE
[ https://issues.apache.org/jira/browse/SPARK-38401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38401: -- Description: It has several issues with method `ShuffledRowRDD#getPreferredLocations`. * it does not respect the config `spark.shuffle.reduceLocality.enabled`, so we can not disable it. * it does not respect `REDUCER_PREF_LOCS_FRACTION`, so it has no effect if DAG schedule task to an executor who has less data. In worse, driver will take more memory to store the useless locations. was: It has several issues in the method `getPreferredLocations` of `ShuffledRowRDD`. * it does not respect the config `spark.shuffle.reduceLocality.enabled`, so we can not disable it. * it does not respect `REDUCER_PREF_LOCS_FRACTION`, so it has no effect if DAG schedule task to an executor who has less data. In worse, driver will take more memory to store the useless locations. > Unify get preferred locations for shuffle in AQE > > > Key: SPARK-38401 > URL: https://issues.apache.org/jira/browse/SPARK-38401 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > It has several issues with method `ShuffledRowRDD#getPreferredLocations`. > * it does not respect the config `spark.shuffle.reduceLocality.enabled`, so > we can not disable it. > * it does not respect `REDUCER_PREF_LOCS_FRACTION`, so it has no effect if > DAG schedule task to an executor who has less data. In worse, driver will > take more memory to store the useless locations. > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38401) Unify get preferred locations for shuffle in AQE
XiDuo You created SPARK-38401: - Summary: Unify get preferred locations for shuffle in AQE Key: SPARK-38401 URL: https://issues.apache.org/jira/browse/SPARK-38401 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.3.0 Reporter: XiDuo You It has several issues in the method `getPreferredLocations` of `ShuffledRowRDD`. * it does not respect the config `spark.shuffle.reduceLocality.enabled`, so we can not disable it. * it does not respect `REDUCER_PREF_LOCS_FRACTION`, so it has no effect if DAG schedule task to an executor who has less data. In worse, driver will take more memory to store the useless locations. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38322) Support query stage show runtime statistics in formatted explain mode
[ https://issues.apache.org/jira/browse/SPARK-38322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38322: -- Description: The formatted explalin mode is the powerful explain mode to show the details of query plan. In AQE, the query stage know its statistics if has already materialized. So it can help to quick check the conversion of plan, e.g. join selection. A simple example: {code:java} SELECT * FROM t JOIN t2 ON t.c = t2.c;{code} {code:java} == Physical Plan == AdaptiveSparkPlan (21) +- == Final Plan == * SortMergeJoin Inner (13) :- * Sort (6) : +- AQEShuffleRead (5) : +- ShuffleQueryStage (4), Statistics(sizeInBytes=16.0 B, rowCount=1) :+- Exchange (3) : +- * Filter (2) : +- Scan hive default.t (1) +- * Sort (12) +- AQEShuffleRead (11) +- ShuffleQueryStage (10), Statistics(sizeInBytes=16.0 B, rowCount=1) +- Exchange (9) +- * Filter (8) +- Scan hive default.t2 (7) +- == Initial Plan == SortMergeJoin Inner (20) :- Sort (16) : +- Exchange (15) : +- Filter (14) :+- Scan hive default.t (1) +- Sort (19) +- Exchange (18) +- Filter (17) +- Scan hive default.t2 (7){code} was: The formatted explalin mode is the powerful explain mode to show the details of query plan. In AQE, the query stage know its statistics if has already materialized. So it can help to quick check the conversion of plan, e.g. join selection. A simple example: {code:java} SELECT * FROM t JOIN t2 ON t.c = t2.c;{code} {code:java} == Physical Plan == AdaptiveSparkPlan (21) +- == Final Plan == * SortMergeJoin Inner (13) :- * Sort (6) : +- AQEShuffleRead (5) : +- ShuffleQueryStage (4), Statistics(sizeInBytes=16.0 B, rowCount=1) :+- Exchange (3) : +- * Filter (2) : +- Scan hive default.t (1) +- * Sort (12) +- AQEShuffleRead (11) +- ShuffleQueryStage (10), Statistics(sizeInBytes=16.0 B, rowCount=1) +- Exchange (9) +- * Filter (8) +- Scan hive default.t2 (7) +- == Initial Plan == SortMergeJoin Inner (20) :- Sort (16) : +- Exchange (15) : +- Filter (14) :+- Scan hive default.t (1) +- Sort (19) +- Exchange (18) +- Filter (17) +- Scan hive default.t2 (7){code} > Support query stage show runtime statistics in formatted explain mode > - > > Key: SPARK-38322 > URL: https://issues.apache.org/jira/browse/SPARK-38322 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > The formatted explalin mode is the powerful explain mode to show the details > of query plan. In AQE, the query stage know its statistics if has already > materialized. So it can help to quick check the conversion of plan, e.g. join > selection. > A simple example: > {code:java} > SELECT * FROM t JOIN t2 ON t.c = t2.c;{code} > > {code:java} > == Physical Plan == > AdaptiveSparkPlan (21) > +- == Final Plan == >* SortMergeJoin Inner (13) >:- * Sort (6) >: +- AQEShuffleRead (5) >: +- ShuffleQueryStage (4), Statistics(sizeInBytes=16.0 B, rowCount=1) >:+- Exchange (3) >: +- * Filter (2) >: +- Scan hive default.t (1) >+- * Sort (12) > +- AQEShuffleRead (11) > +- ShuffleQueryStage (10), Statistics(sizeInBytes=16.0 B, rowCount=1) > +- Exchange (9) >+- * Filter (8) > +- Scan hive default.t2 (7) > +- == Initial Plan == >SortMergeJoin Inner (20) >:- Sort (16) >: +- Exchange (15) >: +- Filter (14) >:+- Scan hive default.t (1) >+- Sort (19) > +- Exchange (18) > +- Filter (17) > +- Scan hive default.t2 (7){code} > > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38322) Support query stage show runtime statistics in formatted explain mode
XiDuo You created SPARK-38322: - Summary: Support query stage show runtime statistics in formatted explain mode Key: SPARK-38322 URL: https://issues.apache.org/jira/browse/SPARK-38322 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.0 Reporter: XiDuo You The formatted explalin mode is the powerful explain mode to show the details of query plan. In AQE, the query stage know its statistics if has already materialized. So it can help to quick check the conversion of plan, e.g. join selection. A simple example: {code:java} SELECT * FROM t JOIN t2 ON t.c = t2.c;{code} {code:java} == Physical Plan == AdaptiveSparkPlan (21) +- == Final Plan == * SortMergeJoin Inner (13) :- * Sort (6) : +- AQEShuffleRead (5) : +- ShuffleQueryStage (4), Statistics(sizeInBytes=16.0 B, rowCount=1) :+- Exchange (3) : +- * Filter (2) : +- Scan hive default.t (1) +- * Sort (12) +- AQEShuffleRead (11) +- ShuffleQueryStage (10), Statistics(sizeInBytes=16.0 B, rowCount=1) +- Exchange (9) +- * Filter (8) +- Scan hive default.t2 (7) +- == Initial Plan == SortMergeJoin Inner (20) :- Sort (16) : +- Exchange (15) : +- Filter (14) :+- Scan hive default.t (1) +- Sort (19) +- Exchange (18) +- Filter (17) +- Scan hive default.t2 (7){code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38172) Adaptive coalesce not working with df persist
[ https://issues.apache.org/jira/browse/SPARK-38172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17497870#comment-17497870 ] XiDuo You commented on SPARK-38172: --- thanks [~Naveenmts] for the confirming ! > Adaptive coalesce not working with df persist > - > > Key: SPARK-38172 > URL: https://issues.apache.org/jira/browse/SPARK-38172 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.1 > Environment: OS: Linux > Spark Version: 3.2.3 >Reporter: Naveen Nagaraj >Priority: Major > Attachments: image-2022-02-10-15-32-30-355.png, > image-2022-02-10-15-33-08-018.png, image-2022-02-10-15-33-32-607.png > > > {code:java} > // code placeholder > val spark = SparkSession.builder().master("local[4]").appName("Test") > .config("spark.sql.adaptive.enabled", "true") > > .config("spark.sql.adaptive.coalescePartitions.enabled", "true") > > .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "50m") > > .config("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1") > > .config("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1024") > .getOrCreate() > val df = spark.read.csv("") > val df1 = df.distinct() > df1.persist() // On removing this line. Code works as expected > df1.write.csv("") {code} > Without df1.persist, df1.write.csv writes 4 partition files of 50 MB each > which is expected > [https://i.stack.imgur.com/tDxpV.png] > If I include df1.persist, Spark is writing 200 partitions(adaptive coalesce > not working) With persist > [https://i.stack.imgur.com/W13hA.png] > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-38172) Adaptive coalesce not working with df persist
[ https://issues.apache.org/jira/browse/SPARK-38172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You resolved SPARK-38172. --- Resolution: Won't Fix > Adaptive coalesce not working with df persist > - > > Key: SPARK-38172 > URL: https://issues.apache.org/jira/browse/SPARK-38172 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.1 > Environment: OS: Linux > Spark Version: 3.2.3 >Reporter: Naveen Nagaraj >Priority: Major > Attachments: image-2022-02-10-15-32-30-355.png, > image-2022-02-10-15-33-08-018.png, image-2022-02-10-15-33-32-607.png > > > {code:java} > // code placeholder > val spark = SparkSession.builder().master("local[4]").appName("Test") > .config("spark.sql.adaptive.enabled", "true") > > .config("spark.sql.adaptive.coalescePartitions.enabled", "true") > > .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "50m") > > .config("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1") > > .config("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1024") > .getOrCreate() > val df = spark.read.csv("") > val df1 = df.distinct() > df1.persist() // On removing this line. Code works as expected > df1.write.csv("") {code} > Without df1.persist, df1.write.csv writes 4 partition files of 50 MB each > which is expected > [https://i.stack.imgur.com/tDxpV.png] > If I include df1.persist, Spark is writing 200 partitions(adaptive coalesce > not working) With persist > [https://i.stack.imgur.com/W13hA.png] > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38232) Explain formatted does not collect subqueries under query stage in AQE
[ https://issues.apache.org/jira/browse/SPARK-38232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38232: -- Parent: SPARK-37063 Issue Type: Sub-task (was: Bug) > Explain formatted does not collect subqueries under query stage in AQE > -- > > Key: SPARK-38232 > URL: https://issues.apache.org/jira/browse/SPARK-38232 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.1.2, 3.2.1, 3.3.0 >Reporter: XiDuo You >Priority: Major > > ExplainUtils have not catched QueryStageExec during collecting subquries. So > we can not get the subqueries formatted explain who is under the > QueryStageExec. > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38232) Explain formatted does not collect subqueries under query stage in AQE
[ https://issues.apache.org/jira/browse/SPARK-38232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38232: -- Description: ExplainUtils have not catched QueryStageExec during collecting subquries. So we can not get the subqueries formatted explain who is under the QueryStageExec. Note that, it also affects the subquery of dpp. was: ExplainUtils have not catched QueryStageExec during collecting subquries. So we can not get the subqueries formatted explain who is under the QueryStageExec. > Explain formatted does not collect subqueries under query stage in AQE > -- > > Key: SPARK-38232 > URL: https://issues.apache.org/jira/browse/SPARK-38232 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.1.2, 3.2.1, 3.3.0 >Reporter: XiDuo You >Priority: Major > > ExplainUtils have not catched QueryStageExec during collecting subquries. So > we can not get the subqueries formatted explain who is under the > QueryStageExec. > Note that, it also affects the subquery of dpp. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38232) Explain formatted does not collect subqueries under query stage in AQE
XiDuo You created SPARK-38232: - Summary: Explain formatted does not collect subqueries under query stage in AQE Key: SPARK-38232 URL: https://issues.apache.org/jira/browse/SPARK-38232 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.1, 3.1.2, 3.3.0 Reporter: XiDuo You ExplainUtils have not catched QueryStageExec during collecting subquries. So we can not get the subqueries formatted explain who is under the QueryStageExec. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38172) Adaptive coalesce not working with df persist
[ https://issues.apache.org/jira/browse/SPARK-38172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490773#comment-17490773 ] XiDuo You commented on SPARK-38172: --- hi [~Naveenmts] have you tried enable this config ? {code:java} spark.sql.optimizer.canChangeCachedPlanOutputPartitioning {code} > Adaptive coalesce not working with df persist > - > > Key: SPARK-38172 > URL: https://issues.apache.org/jira/browse/SPARK-38172 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.1 > Environment: OS: Linux > Spark Version: 3.2.3 >Reporter: Naveen Nagaraj >Priority: Major > Attachments: image-2022-02-10-15-32-30-355.png, > image-2022-02-10-15-33-08-018.png, image-2022-02-10-15-33-32-607.png > > > {code:java} > // code placeholder > val spark = SparkSession.builder().master("local[4]").appName("Test") > .config("spark.sql.adaptive.enabled", "true") > > .config("spark.sql.adaptive.coalescePartitions.enabled", "true") > > .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "50m") > > .config("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1") > > .config("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1024") > .getOrCreate() > val df = spark.read.csv("") > val df1 = df.distinct() > df1.persist() // On removing this line. Code works as expected > df1.write.csv("") {code} > Without df1.persist, df1.write.csv writes 4 partition files of 50 MB each > which is expected > [https://i.stack.imgur.com/tDxpV.png] > If I include df1.persist, Spark is writing 200 partitions(adaptive coalesce > not working) With persist > [https://i.stack.imgur.com/W13hA.png] > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38185) Fix data incorrect if aggregate function is empty
[ https://issues.apache.org/jira/browse/SPARK-38185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38185: -- Summary: Fix data incorrect if aggregate function is empty (was: Fix data incorrect if aggregate is group only with empty function) > Fix data incorrect if aggregate function is empty > - > > Key: SPARK-38185 > URL: https://issues.apache.org/jira/browse/SPARK-38185 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.1, 3.3.0 >Reporter: XiDuo You >Priority: Major > > The group only condition should check if the aggregate expression is empty. > In DataFrame api, it is allowed to make a empty aggregations. > So the following query should return 1 rather than 0 because it's a global > aggregate. > {code:java} > val emptyAgg = Map.empty[String, String] > spark.range(2).where("id > 2").agg(emptyAgg).limit(1).count > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38185) Fix data incorrect if aggregate is group only with empty function
[ https://issues.apache.org/jira/browse/SPARK-38185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38185: -- Description: The group only condition should check if the aggregate expression is empty. In DataFrame api, it is allowed to make a empty aggregations. So the following query should return 1 rather than 0 because it's a global aggregate. {code:java} val emptyAgg = Map.empty[String, String] spark.range(2).where("id > 2").agg(emptyAgg).limit(1).count {code} was: The group only condition should check if the aggregate expression is empty. In DataFrame api, it is allowed to make a empty aggregations. So the following query should return 1 rather than 0 because it's a global aggregate. {code:java} spark.range(2).where("id > 2").agg(emptyAgg).limit(1).count {code} > Fix data incorrect if aggregate is group only with empty function > - > > Key: SPARK-38185 > URL: https://issues.apache.org/jira/browse/SPARK-38185 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.1, 3.3.0 >Reporter: XiDuo You >Priority: Major > > The group only condition should check if the aggregate expression is empty. > In DataFrame api, it is allowed to make a empty aggregations. > So the following query should return 1 rather than 0 because it's a global > aggregate. > {code:java} > val emptyAgg = Map.empty[String, String] > spark.range(2).where("id > 2").agg(emptyAgg).limit(1).count > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38185) Fix data incorrect if aggregate is group only with empty function
XiDuo You created SPARK-38185: - Summary: Fix data incorrect if aggregate is group only with empty function Key: SPARK-38185 URL: https://issues.apache.org/jira/browse/SPARK-38185 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.1, 3.3.0 Reporter: XiDuo You The group only condition should check if the aggregate expression is empty. In DataFrame api, it is allowed to make a empty aggregations. So the following query should return 1 rather than 0 because it's a global aggregate. {code:java} spark.range(2).where("id > 2").agg(emptyAgg).limit(1).count {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38182) Fix NoSuchElementException if pushed filter does not contain any references
[ https://issues.apache.org/jira/browse/SPARK-38182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38182: -- Description: reproduce: {code:java} CREATE TABLE t (c1 int) USING PARQUET; SET spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.BooleanSimplification; SELECT * FROM t WHERE c1 = 1 AND 2 > 1; {code} and the error msg: {code:java} java.util.NoSuchElementException: next on empty iterator at scala.collection.Iterator$$anon$2.next(Iterator.scala:41) at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) at scala.collection.mutable.LinkedHashSet$$anon$1.next(LinkedHashSet.scala:89) at scala.collection.IterableLike.head(IterableLike.scala:109) at scala.collection.IterableLike.head$(IterableLike.scala:108) at org.apache.spark.sql.catalyst.expressions.AttributeSet.head(AttributeSet.scala:69) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.$anonfun$listFiles$3(PartitioningAwareFileIndex.scala:85) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listFiles(PartitioningAwareFileIndex.scala:84) at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:249) {code} was: reproduce: {code:java} CREATE TABLE pt (c1 int) USING PARQUET PARTITIONED BY (p string); set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.BooleanSimplification; SELECT * FROM pt WHERE p = 'a' AND 2 > 1; {code} and the error msg: {code:java} java.util.NoSuchElementException: next on empty iterator at scala.collection.Iterator$$anon$2.next(Iterator.scala:41) at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) at scala.collection.mutable.LinkedHashSet$$anon$1.next(LinkedHashSet.scala:89) at scala.collection.IterableLike.head(IterableLike.scala:109) at scala.collection.IterableLike.head$(IterableLike.scala:108) at org.apache.spark.sql.catalyst.expressions.AttributeSet.head(AttributeSet.scala:69) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.$anonfun$listFiles$3(PartitioningAwareFileIndex.scala:85) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listFiles(PartitioningAwareFileIndex.scala:84) at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:249) {code} > Fix NoSuchElementException if pushed filter does not contain any references > --- > > Key: SPARK-38182 > URL: https://issues.apache.org/jira/browse/SPARK-38182 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > reproduce: > {code:java} > CREATE TABLE t (c1 int) USING PARQUET; > SET > spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.BooleanSimplification; > SELECT * FROM t WHERE c1 = 1 AND 2 > 1; > {code} > and the error msg: > {code:java} > java.util.NoSuchElementException: next on empty iterator > at scala.collection.Iterator$$anon$2.next(Iterator.scala:41) > at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) > at > scala.collection.mutable.LinkedHashSet$$anon$1.next(LinkedHashSet.scala:89) > at scala.collection.IterableLike.head(IterableLike.scala:109) > at scala.collection.IterableLike.head$(IterableLike.scala:108) > at > org.apache.spark.sql.catalyst.expressions.AttributeSet.head(AttributeSet.scala:69) > at > org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.$anonfun$listFiles$3(PartitioningAwareFileIndex.scala:85) > at scala.Option.map(Option.scala:230) > at > org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listFiles(PartitioningAwareFileIndex.scala:84) > at > org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:249) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38182) Fix NoSuchElementException if pushed filter does not contain any references
XiDuo You created SPARK-38182: - Summary: Fix NoSuchElementException if pushed filter does not contain any references Key: SPARK-38182 URL: https://issues.apache.org/jira/browse/SPARK-38182 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.3.0 Reporter: XiDuo You reproduce: {code:java} CREATE TABLE pt (c1 int) USING PARQUET PARTITIONED BY (p string); set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.BooleanSimplification; SELECT * FROM pt WHERE p = 'a' AND 2 > 1; {code} and the error msg: {code:java} java.util.NoSuchElementException: next on empty iterator at scala.collection.Iterator$$anon$2.next(Iterator.scala:41) at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) at scala.collection.mutable.LinkedHashSet$$anon$1.next(LinkedHashSet.scala:89) at scala.collection.IterableLike.head(IterableLike.scala:109) at scala.collection.IterableLike.head$(IterableLike.scala:108) at org.apache.spark.sql.catalyst.expressions.AttributeSet.head(AttributeSet.scala:69) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.$anonfun$listFiles$3(PartitioningAwareFileIndex.scala:85) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listFiles(PartitioningAwareFileIndex.scala:84) at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:249) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38177) Fix wrong transformExpressions in Optimizer
[ https://issues.apache.org/jira/browse/SPARK-38177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38177: -- Description: `transformExpressions` can only traverse all expressions in this current query plan, so the rule `EliminateDistinct` and `EliminateAggregateFilter` can not optimize the non-root node. We should use `transformAllExpressions` rather than `transformExpressions`. (was: `transformExpressions` can only traverse all expressions in this current query plan, so the rule `EliminateDistinct` and `EliminateAggregateFilter` should use `transformAllExpressions` rather than `transformExpressions`. ) > Fix wrong transformExpressions in Optimizer > --- > > Key: SPARK-38177 > URL: https://issues.apache.org/jira/browse/SPARK-38177 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > `transformExpressions` can only traverse all expressions in this current > query plan, so the rule `EliminateDistinct` and `EliminateAggregateFilter` > can not optimize the non-root node. We should use `transformAllExpressions` > rather than `transformExpressions`. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38177) Fix wrong transformExpressions in Optimizer
XiDuo You created SPARK-38177: - Summary: Fix wrong transformExpressions in Optimizer Key: SPARK-38177 URL: https://issues.apache.org/jira/browse/SPARK-38177 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.0 Reporter: XiDuo You `transformExpressions` can only traverse all expressions in this current query plan, so the rule `EliminateDistinct` and `EliminateAggregateFilter` should use `transformAllExpressions` rather than `transformExpressions`. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38162) Optimize one row plan in normal and AQE Optimizer
[ https://issues.apache.org/jira/browse/SPARK-38162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38162: -- Description: Optimize the plan if its max row is equal to or less than 1 in these cases: - if the child of sort max rows less than or equal to 1, remove the sort - if the child of local sort max rows per partition less than or equal to 1, remove the local sort - if the child of aggregate max rows less than or equal to 1 and it's grouping only (include the rewritten distinct plan), remove the aggregate - if the child of aggregate max rows less than or equal to 1, set distinct to false in all aggregate expression was: Optimize the plan if its max row is equal to or less than 1 in these cases: * if sort max rows less than or equal to 1, remove the sort * if local sort max rows per partition less than or equal to 1, remove the local sort * if aggregate max rows less than or equal to 1 and it's grouping only, remove the aggregate * if aggregate max rows less than or equal to 1, set distinct to false in all aggregate expression > Optimize one row plan in normal and AQE Optimizer > - > > Key: SPARK-38162 > URL: https://issues.apache.org/jira/browse/SPARK-38162 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > Optimize the plan if its max row is equal to or less than 1 in these cases: > - if the child of sort max rows less than or equal to 1, remove the sort > - if the child of local sort max rows per partition less than or equal to 1, > remove the local sort > - if the child of aggregate max rows less than or equal to 1 and it's > grouping only (include the rewritten distinct plan), remove the aggregate > - if the child of aggregate max rows less than or equal to 1, set distinct to > false in all aggregate expression -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38162) Optimize one row plan in normal and AQE Optimizer
[ https://issues.apache.org/jira/browse/SPARK-38162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38162: -- Summary: Optimize one row plan in normal and AQE Optimizer (was: Optimize one max row plan in normal and AQE Optimizer) > Optimize one row plan in normal and AQE Optimizer > - > > Key: SPARK-38162 > URL: https://issues.apache.org/jira/browse/SPARK-38162 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > Optimize the plan if its max row is equal to or less than 1 in these cases: > * if sort max rows less than or equal to 1, remove the sort > * if local sort max rows per partition less than or equal to 1, remove the > local sort > * if aggregate max rows less than or equal to 1 and it's grouping only, > remove the aggregate > * if aggregate max rows less than or equal to 1, set distinct to false in > all aggregate expression -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38162) Optimize one max row plan in normal and AQE Optimizer
[ https://issues.apache.org/jira/browse/SPARK-38162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] XiDuo You updated SPARK-38162: -- Description: Optimize the plan if its max row is equal to or less than 1 in these cases: * if sort max rows less than or equal to 1, remove the sort * if local sort max rows per partition less than or equal to 1, remove the local sort * if aggregate max rows less than or equal to 1 and it's grouping only, remove the aggregate * if aggregate max rows less than or equal to 1, set distinct to false in all aggregate expression was:We can not propagate empty through aggregate if it does not contain grouping expression. But for the aggregate which contains distinct aggregate expression, we can remove distinct if its child is empty. > Optimize one max row plan in normal and AQE Optimizer > - > > Key: SPARK-38162 > URL: https://issues.apache.org/jira/browse/SPARK-38162 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > > Optimize the plan if its max row is equal to or less than 1 in these cases: > * if sort max rows less than or equal to 1, remove the sort > * if local sort max rows per partition less than or equal to 1, remove the > local sort > * if aggregate max rows less than or equal to 1 and it's grouping only, > remove the aggregate > * if aggregate max rows less than or equal to 1, set distinct to false in > all aggregate expression -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org