[jira] [Updated] (SPARK-38162) Optimize one max row plan in normal and AQE Optimizer

2022-02-09 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-38162:
--
Summary: Optimize one max row plan in normal and AQE Optimizer  (was: 
Remove distinct in aggregate if its child is empty)

> Optimize one max row plan in normal and AQE Optimizer
> -
>
> Key: SPARK-38162
> URL: https://issues.apache.org/jira/browse/SPARK-38162
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> We can not propagate empty through aggregate if it does not contain grouping 
> expression. But for the aggregate which contains distinct aggregate 
> expression, we can remove distinct if its child is empty.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38162) Remove distinct in aggregate if its child is empty

2022-02-09 Thread XiDuo You (Jira)
XiDuo You created SPARK-38162:
-

 Summary: Remove distinct in aggregate if its child is empty
 Key: SPARK-38162
 URL: https://issues.apache.org/jira/browse/SPARK-38162
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


We can not propagate empty through aggregate if it does not contain grouping 
expression. But for the aggregate which contains distinct aggregate expression, 
we can remove distinct if its child is empty.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38148) Do not add dynamic partition pruning if there exists static partition pruning

2022-02-08 Thread XiDuo You (Jira)
XiDuo You created SPARK-38148:
-

 Summary: Do not add dynamic partition pruning if there exists 
static partition pruning
 Key: SPARK-38148
 URL: https://issues.apache.org/jira/browse/SPARK-38148
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


Dynamic partition pruning add a filter as long as the join condition contains 
partition columns. But if there exists other condition which contains the 
static partition pruning, it's unnecessary to add an extra dynamic partition 
pruning.

For example:
{code:java}
CREATE TABLE t1 (c1 int) USING PARQUET PARTITIONED BY (p1 string);
CREATE TABLE t2 (c2 int) USING PARQUET PARTITIONED BY (p2 string);

SELECT * FROM t1 JOIN t2 ON t1.p1 = t2.p2 and t1.p1 = 'a' AND t2.c2 > 0;
{code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33832) Add an option in AQE to mitigate skew even if it causes an new shuffle

2022-01-27 Thread XiDuo You (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17483604#comment-17483604
 ] 

XiDuo You commented on SPARK-33832:
---

thank you [~dongjoon] !

> Add an option in AQE to mitigate skew even if it causes an new shuffle
> --
>
> Key: SPARK-33832
> URL: https://issues.apache.org/jira/browse/SPARK-33832
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Eugene Koifman
>Assignee: XiDuo You
>Priority: Major
> Fix For: 3.3.0
>
>
> Currently {{OptimizeSkewedJoin}} will not apply if skew mitigation causes a 
> new shuffle.
> There are situations where it's better to mitigate skew even if it means a 
> new shuffle is added, for example if the join outputs small amount of data.
> As a first step I propose adding a SQLConf option to enable this.  
> I'll open a PR shortly to get feedback on the approach.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38013) AQE can change bhj to smj if no extra shuffle introduce

2022-01-27 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-38013:
--
Parent: SPARK-37063
Issue Type: Sub-task  (was: Task)

> AQE can change bhj to smj if no extra shuffle introduce
> ---
>
> Key: SPARK-38013
> URL: https://issues.apache.org/jira/browse/SPARK-38013
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> An example to reproduce the bug.
> {code:java}
> create table t1 as select 1 c1, 2 c2;
> create table t2 as select 1 c1, 2 c2;
> create table t3 as select 1 c1, 2 c2;
> set spark.sql.adaptive.autoBroadcastJoinThreshold=-1;
> select /*+ merge(t3) */ * from t1
> left join (
> select c1 as c from t3
> ) t3 on t1.c1 = t3.c
> left join (
> select /*+ repartition(c1) */ c1 from t2
> ) t2 on t1.c1 = t2.c1;
> {code}
> The key to produce this bug is that a bhj convert to smj/shj without 
> introducing extra shuffe and AQE does not think the join can be planned as 
> bhj.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38013) AQE can change bhj to smj if no extra shuffle introduce

2022-01-27 Thread XiDuo You (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17483598#comment-17483598
 ] 

XiDuo You commented on SPARK-38013:
---

Add a test to cover this behavior

> AQE can change bhj to smj if no extra shuffle introduce
> ---
>
> Key: SPARK-38013
> URL: https://issues.apache.org/jira/browse/SPARK-38013
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> An example to reproduce the bug.
> {code:java}
> create table t1 as select 1 c1, 2 c2;
> create table t2 as select 1 c1, 2 c2;
> create table t3 as select 1 c1, 2 c2;
> set spark.sql.adaptive.autoBroadcastJoinThreshold=-1;
> select /*+ merge(t3) */ * from t1
> left join (
> select c1 as c from t3
> ) t3 on t1.c1 = t3.c
> left join (
> select /*+ repartition(c1) */ c1 from t2
> ) t2 on t1.c1 = t2.c1;
> {code}
> The key to produce this bug is that a bhj convert to smj/shj without 
> introducing extra shuffe and AQE does not think the join can be planned as 
> bhj.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-38013) AQE can change bhj to smj if no extra shuffle introduce

2022-01-25 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You resolved SPARK-38013.
---
Resolution: Won't Fix

> AQE can change bhj to smj if no extra shuffle introduce
> ---
>
> Key: SPARK-38013
> URL: https://issues.apache.org/jira/browse/SPARK-38013
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> An example to reproduce the bug.
> {code:java}
> create table t1 as select 1 c1, 2 c2;
> create table t2 as select 1 c1, 2 c2;
> create table t3 as select 1 c1, 2 c2;
> set spark.sql.adaptive.autoBroadcastJoinThreshold=-1;
> select /*+ merge(t3) */ * from t1
> left join (
> select c1 as c from t3
> ) t3 on t1.c1 = t3.c
> left join (
> select /*+ repartition(c1) */ c1 from t2
> ) t2 on t1.c1 = t2.c1;
> {code}
> The key to produce this bug is that a bhj convert to smj/shj without 
> introducing extra shuffe and AQE does not think the join can be planned as 
> bhj.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38013) AQE can change bhj to smj if no extra shuffle introduce

2022-01-25 Thread XiDuo You (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482209#comment-17482209
 ] 

XiDuo You commented on SPARK-38013:
---

seems it is allowed in AQE, not a bug otherwise ..

> AQE can change bhj to smj if no extra shuffle introduce
> ---
>
> Key: SPARK-38013
> URL: https://issues.apache.org/jira/browse/SPARK-38013
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> An example to reproduce the bug.
> {code:java}
> create table t1 as select 1 c1, 2 c2;
> create table t2 as select 1 c1, 2 c2;
> create table t3 as select 1 c1, 2 c2;
> set spark.sql.adaptive.autoBroadcastJoinThreshold=-1;
> select /*+ merge(t3) */ * from t1
> left join (
> select c1 as c from t3
> ) t3 on t1.c1 = t3.c
> left join (
> select /*+ repartition(c1) */ c1 from t2
> ) t2 on t1.c1 = t2.c1;
> {code}
> The key to produce this bug is that a bhj convert to smj/shj without 
> introducing extra shuffe and AQE does not think the join can be planned as 
> bhj.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38013) AQE can change bhj to smj if no extra shuffle introduce

2022-01-25 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-38013:
--
Issue Type: Task  (was: Bug)

> AQE can change bhj to smj if no extra shuffle introduce
> ---
>
> Key: SPARK-38013
> URL: https://issues.apache.org/jira/browse/SPARK-38013
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> An example to reproduce the bug.
> {code:java}
> create table t1 as select 1 c1, 2 c2;
> create table t2 as select 1 c1, 2 c2;
> create table t3 as select 1 c1, 2 c2;
> set spark.sql.adaptive.autoBroadcastJoinThreshold=-1;
> select /*+ merge(t3) */ * from t1
> left join (
> select c1 as c from t3
> ) t3 on t1.c1 = t3.c
> left join (
> select /*+ repartition(c1) */ c1 from t2
> ) t2 on t1.c1 = t2.c1;
> {code}
> The key to produce this bug is that a bhj convert to smj/shj without 
> introducing extra shuffe and AQE does not think the join can be planned as 
> bhj.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38013) AQE can change bhj to smj if no extra shuffle introduce

2022-01-25 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-38013:
--
Summary: AQE can change bhj to smj if no extra shuffle introduce  (was: Fix 
AQE can change bhj to smj if no extra shuffle introduce)

> AQE can change bhj to smj if no extra shuffle introduce
> ---
>
> Key: SPARK-38013
> URL: https://issues.apache.org/jira/browse/SPARK-38013
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> An example to reproduce the bug.
> {code:java}
> create table t1 as select 1 c1, 2 c2;
> create table t2 as select 1 c1, 2 c2;
> create table t3 as select 1 c1, 2 c2;
> set spark.sql.adaptive.autoBroadcastJoinThreshold=-1;
> select /*+ merge(t3) */ * from t1
> left join (
> select c1 as c from t3
> ) t3 on t1.c1 = t3.c
> left join (
> select /*+ repartition(c1) */ c1 from t2
> ) t2 on t1.c1 = t2.c1;
> {code}
> The key to produce this bug is that a bhj convert to smj/shj without 
> introducing extra shuffe and AQE does not think the join can be planned as 
> bhj.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38013) Fix AQE can change bhj to smj if no extra shuffle introduce

2022-01-25 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-38013:
--
Description: 
An example to reproduce the bug.

{code:java}
create table t1 as select 1 c1, 2 c2;
create table t2 as select 1 c1, 2 c2;
create table t3 as select 1 c1, 2 c2;

set spark.sql.adaptive.autoBroadcastJoinThreshold=-1;

select /*+ merge(t3) */ * from t1
left join (
select c1 as c from t3
) t3 on t1.c1 = t3.c
left join (
select /*+ repartition(c1) */ c1 from t2
) t2 on t1.c1 = t2.c1;

{code}

The key to produce this bug is that a bhj convert to smj/shj without 
introducing extra shuffe and AQE does not think the join can be planned as bhj.

  was:
An example to reproduce the bug.

{code:java}
create table t1 as select 1 c1, 2 c2;
create table t2 as select 1 c1, 2 c2;
create table t3 as select 1 c1, 2 c2;

set spark.sql.adaptive.autoBroadcastJoinThreshold=-1;

select /*+ merge(t3) */ * from t1
left join (
select c1 as c from t3
) t3 on t1.c1 = t3.c
left join (
select /*+ repartition(c1) */ c1 from t2
) t2 on t1.c1 = t2.c1;

{code}

The key to produce this bug is that a bhj convert to smj/shj without 
introducing extra shuffe.


> Fix AQE can change bhj to smj if no extra shuffle introduce
> ---
>
> Key: SPARK-38013
> URL: https://issues.apache.org/jira/browse/SPARK-38013
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> An example to reproduce the bug.
> {code:java}
> create table t1 as select 1 c1, 2 c2;
> create table t2 as select 1 c1, 2 c2;
> create table t3 as select 1 c1, 2 c2;
> set spark.sql.adaptive.autoBroadcastJoinThreshold=-1;
> select /*+ merge(t3) */ * from t1
> left join (
> select c1 as c from t3
> ) t3 on t1.c1 = t3.c
> left join (
> select /*+ repartition(c1) */ c1 from t2
> ) t2 on t1.c1 = t2.c1;
> {code}
> The key to produce this bug is that a bhj convert to smj/shj without 
> introducing extra shuffe and AQE does not think the join can be planned as 
> bhj.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38013) Fix AQE can change bhj to smj if no extra shuffle introduce

2022-01-25 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-38013:
--
Description: 
An example to reproduce the bug.

{code:java}
create table t1 as select 1 c1, 2 c2;
create table t2 as select 1 c1, 2 c2;
create table t3 as select 1 c1, 2 c2;

set spark.sql.adaptive.autoBroadcastJoinThreshold=-1;

select /*+ merge(t3) */ * from t1
left join (
select c1 as c from t3
) t3 on t1.c1 = t3.c
left join (
select /*+ repartition(c1) */ c1 from t2
) t2 on t1.c1 = t2.c1;

{code}

The key to produce this bug is that a bhj convert to smj/shj without 
introducing extra shuffe.

> Fix AQE can change bhj to smj if no extra shuffle introduce
> ---
>
> Key: SPARK-38013
> URL: https://issues.apache.org/jira/browse/SPARK-38013
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> An example to reproduce the bug.
> {code:java}
> create table t1 as select 1 c1, 2 c2;
> create table t2 as select 1 c1, 2 c2;
> create table t3 as select 1 c1, 2 c2;
> set spark.sql.adaptive.autoBroadcastJoinThreshold=-1;
> select /*+ merge(t3) */ * from t1
> left join (
> select c1 as c from t3
> ) t3 on t1.c1 = t3.c
> left join (
> select /*+ repartition(c1) */ c1 from t2
> ) t2 on t1.c1 = t2.c1;
> {code}
> The key to produce this bug is that a bhj convert to smj/shj without 
> introducing extra shuffe.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38013) Fix AQE can change bhj to smj if no extra shuffle introduce

2022-01-24 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-38013:
--
Summary: Fix AQE can change bhj to smj if no extra shuffle introduce  (was: 
Fix AQE can change bhj to smj if the build side is aggregate)

> Fix AQE can change bhj to smj if no extra shuffle introduce
> ---
>
> Key: SPARK-38013
> URL: https://issues.apache.org/jira/browse/SPARK-38013
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38013) Fix AQE can change bhj to smj if the build side is aggregate

2022-01-24 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-38013:
--
Description: (was: The condition of isBroadcastStage is not completed. 
It does not catch the plan whose build side is Aggregate.

{code:java}
  private def isBroadcastStage(plan: LogicalPlan): Boolean = plan match {
case LogicalQueryStage(_, _: BroadcastQueryStageExec) => true
case _ => false
  }
{code}


We should add the pattern if the BroadcastQueryStageExec is inside Aggregate.)

> Fix AQE can change bhj to smj if the build side is aggregate
> 
>
> Key: SPARK-38013
> URL: https://issues.apache.org/jira/browse/SPARK-38013
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38013) Fix AQE can change bhj to smj if the build side is aggregate

2022-01-24 Thread XiDuo You (Jira)
XiDuo You created SPARK-38013:
-

 Summary: Fix AQE can change bhj to smj if the build side is 
aggregate
 Key: SPARK-38013
 URL: https://issues.apache.org/jira/browse/SPARK-38013
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


The condition of isBroadcastStage is not completed. It does not catch the plan 
whose build side is Aggregate.

{code:java}
  private def isBroadcastStage(plan: LogicalPlan): Boolean = plan match {
case LogicalQueryStage(_, _: BroadcastQueryStageExec) => true
case _ => false
  }
{code}


We should add the pattern if the BroadcastQueryStageExec is inside Aggregate.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37949) Improve Rebalance statistics estimation

2022-01-17 Thread XiDuo You (Jira)
XiDuo You created SPARK-37949:
-

 Summary: Improve Rebalance statistics estimation
 Key: SPARK-37949
 URL: https://issues.apache.org/jira/browse/SPARK-37949
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


The defualt statistics estimation only consider the size in bytes, which may 
lost the row rount and columns statistics.

The `RebalancePartitions` actually does not change the statistics of plan, so 
we can use the statistics of its child for more accurate.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37904) Improve RebalancePartitions in rules of Optimizer

2022-01-13 Thread XiDuo You (Jira)
XiDuo You created SPARK-37904:
-

 Summary: Improve RebalancePartitions in rules of Optimizer
 Key: SPARK-37904
 URL: https://issues.apache.org/jira/browse/SPARK-37904
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


After SPARK-37267, we support do optimize rebalance partitions in everywhere of 
plan rather than limit to the root node. So It should make sense to also let 
`RebalancePartitions` work in all rules of Optimizer like `Repartition` and 
`RepartitionByExpression` did.




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37855) IllegalStateException when transforming an array inside a nested struct

2022-01-11 Thread XiDuo You (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17472938#comment-17472938
 ] 

XiDuo You commented on SPARK-37855:
---

The regression seems from SPARK-35636, for the quick work around, you can set 
config

{code:java}
set spark.sql.optimizer.nestedSchemaPruning.enabled=false;
{code}


> IllegalStateException when transforming an array inside a nested struct
> ---
>
> Key: SPARK-37855
> URL: https://issues.apache.org/jira/browse/SPARK-37855
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.0
> Environment: OS: Ubuntu 20.04.3 LTS
> Scala version: 2.12.12
>  
>Reporter: G Muciaccia
>Priority: Major
>
> *NOTE*: this bug is only present in version {{3.2.0}}. Downgrading to 
> {{3.1.2}} solves the problem.
> h3. Prerequisites to reproduce the bug
> # use Spark version 3.2.0
> # create a DataFrame with an array field, which contains a struct field with 
> a nested array field
> # *apply a limit* to the DataFrame
> # transform the outer array, renaming one of its fields
> # transform the inner array too, which requires two {{getField}} in sequence
> h3. Example that reproduces the bug
> This is a minimal example (as minimal as I could make it) to reproduce the 
> bug:
> {code}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.{DataFrame, Row}
> def makeInput(): DataFrame = {
> val innerElement1 = Row(3, 3.12)
> val innerElement2 = Row(4, 2.1)
> val innerElement3 = Row(1, 985.2)
> val innerElement4 = Row(10, 757548.0)
> val innerElement5 = Row(1223, 0.665)
> val outerElement1 = Row(1, Row(List(innerElement1, innerElement2)))
> val outerElement2 = Row(2, Row(List(innerElement3)))
> val outerElement3 = Row(3, Row(List(innerElement4, innerElement5)))
> val data = Seq(
> Row("row1", List(outerElement1)),
> Row("row2", List(outerElement2, outerElement3)),
> )
> val schema = new StructType()
> .add("name", StringType)
> .add("outer_array", ArrayType(new StructType()
> .add("id", IntegerType)
> .add("inner_array_struct", new StructType()
> .add("inner_array", ArrayType(new StructType()
> .add("id", IntegerType)
> .add("value", DoubleType)
> ))
> )
> ))
> spark.createDataFrame(spark.sparkContext
> .parallelize(data),schema)
> }
> // val df = makeInput()
> val df = makeInput().limit(2)
> // val df = makeInput().limit(2).cache()
> val res = df.withColumn("extracted", transform(
> col("outer_array"),
> c1 => {
> struct(
> c1.getField("id").alias("outer_id"),
> transform(
> c1.getField("inner_array_struct").getField("inner_array"),
> c2 => {
> struct(
> c2.getField("value").alias("inner_value")
> )
> }
> )
> )
> }
> ))
> res.printSchema()
> res.show(false)
> {code}
> h4. Executing the example code
> When executing it as-is, the execution will fail on the {{show}} statement, 
> with
> {code}
> java.lang.IllegalStateException Couldn't find _extract_inner_array#23 in 
> [name#2,outer_array#3]
> {code}
> However, *if the limit is not applied, or if the DataFrame is cached after 
> the limit, everything works* (you can uncomment the corresponding lines in 
> the example to try it).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37862) RecordBinaryComparator should fast skip the check of aligning with unaligned platform

2022-01-10 Thread XiDuo You (Jira)
XiDuo You created SPARK-37862:
-

 Summary: RecordBinaryComparator should fast skip the check of 
aligning with unaligned platform
 Key: SPARK-37862
 URL: https://issues.apache.org/jira/browse/SPARK-37862
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


Same with SPARK-37796

It would be better to fast sikip the check of aligning if the platform is 
unaligned.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35442) Support propagate empty relation through aggregate

2022-01-07 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-35442:
--
Summary: Support propagate empty relation through aggregate  (was: 
Eliminate unnecessary join through Aggregate)

> Support propagate empty relation through aggregate
> --
>
> Key: SPARK-35442
> URL: https://issues.apache.org/jira/browse/SPARK-35442
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Priority: Minor
>
> The Aggregate in AQE is different with others, the `LogicalQueryStage` looks 
> like `LogicalQueryStage(Aggregate, BaseAggregate)`. We should handle this 
> case specially.
> Logically, if the Aggregate grouping expression is not empty, we can 
> eliminate it safely.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35442) Eliminate unnecessary join through Aggregate

2022-01-07 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-35442:
--
Description: 
The Aggregate in AQE is different with others, the `LogicalQueryStage` looks 
like `LogicalQueryStage(Aggregate, BaseAggregate)`. We should handle this case 
specially.


Logically, if the Aggregate grouping expression is not empty, we can eliminate 
it safely.


  was:
If Aggregate and Join have the same output partitioning, the plan will look 
like:
{code:java}
 SortMergeJoin
   Sort
 HashAggregate
   Shuffle
   Sort
 xxx{code}
 

Currently `EliminateUnnecessaryJoin` doesn't support optimize this case. 
Logically, if the Aggregate grouping expression is not empty, we can eliminate 
it safely.

 


> Eliminate unnecessary join through Aggregate
> 
>
> Key: SPARK-35442
> URL: https://issues.apache.org/jira/browse/SPARK-35442
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Priority: Minor
>
> The Aggregate in AQE is different with others, the `LogicalQueryStage` looks 
> like `LogicalQueryStage(Aggregate, BaseAggregate)`. We should handle this 
> case specially.
> Logically, if the Aggregate grouping expression is not empty, we can 
> eliminate it safely.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37796) ByteArrayMethods arrayEquals should fast skip the check of aligning with unaligned platform

2021-12-30 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37796:
--
Description: 
The method `arrayEquals` in `ByteArrayMethods` is critical function which is 
used in `UTF8String.` `equals`, `indexOf`,`find` etc.

After SPARK-16962, it add the complexity of aligned. It would be better to fast 
sikip the check of aligning if the platform is unaligned.

  was:
The method `arrayEquals` in `ByteArrayMethods` is critical function which is 
used in `UTF8String.` `equals`, `indexOf`,`find` etc.

After SPARK-16962, it add the complexity of aligned. It would be better to fast 
sikip the checking of aligned if the platform is unaligned.


> ByteArrayMethods arrayEquals should fast skip the check of aligning with 
> unaligned platform
> ---
>
> Key: SPARK-37796
> URL: https://issues.apache.org/jira/browse/SPARK-37796
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> The method `arrayEquals` in `ByteArrayMethods` is critical function which is 
> used in `UTF8String.` `equals`, `indexOf`,`find` etc.
> After SPARK-16962, it add the complexity of aligned. It would be better to 
> fast sikip the check of aligning if the platform is unaligned.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37796) ByteArrayMethods arrayEquals should fast skip the check of aligning with unaligned platform

2021-12-30 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37796:
--
Summary: ByteArrayMethods arrayEquals should fast skip the check of 
aligning with unaligned platform  (was: ByteArrayMethods arrayEquals should 
fast skip the checking of aligned in unaligned platform)

> ByteArrayMethods arrayEquals should fast skip the check of aligning with 
> unaligned platform
> ---
>
> Key: SPARK-37796
> URL: https://issues.apache.org/jira/browse/SPARK-37796
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> The method `arrayEquals` in `ByteArrayMethods` is critical function which is 
> used in `UTF8String.` `equals`, `indexOf`,`find` etc.
> After SPARK-16962, it add the complexity of aligned. It would be better to 
> fast sikip the checking of aligned if the platform is unaligned.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37796) ByteArrayMethods arrayEquals should fast skip the checking of aligned in unaligned platform

2021-12-30 Thread XiDuo You (Jira)
XiDuo You created SPARK-37796:
-

 Summary: ByteArrayMethods arrayEquals should fast skip the 
checking of aligned in unaligned platform
 Key: SPARK-37796
 URL: https://issues.apache.org/jira/browse/SPARK-37796
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


The method `arrayEquals` in `ByteArrayMethods` is critical function which is 
used in `UTF8String.` `equals`, `indexOf`,`find` etc.

After SPARK-16962, it add the complexity of aligned. It would be better to fast 
sikip the checking of aligned if the platform is unaligned.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37659) Fix FsHistoryProvider race condition between list and delet log info

2021-12-15 Thread XiDuo You (Jira)
XiDuo You created SPARK-37659:
-

 Summary: Fix FsHistoryProvider race condition between list and 
delet log info
 Key: SPARK-37659
 URL: https://issues.apache.org/jira/browse/SPARK-37659
 Project: Spark
  Issue Type: Bug
  Components: Web UI
Affects Versions: 3.1.2, 3.2.1, 3.3.0
Reporter: XiDuo You


After SPARK-29043, FsHistoryProvider will list the log info without waitting 
all `mergeApplicationListing` task finished.

However the `LevelDBIterator` of list log info is not thread safe if some other 
threads delete the related log info at same time.

There is the error msg:
{code:java}
21/12/15 14:12:02 ERROR FsHistoryProvider: Exception in checking for event log 
updates
java.util.NoSuchElementException: 
1^@__main__^@+hdfs://xxx/application_xxx.inprogress
at org.apache.spark.util.kvstore.LevelDB.get(LevelDB.java:132)
at 
org.apache.spark.util.kvstore.LevelDBIterator.next(LevelDBIterator.java:137)
at 
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at 
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:184)
at 
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:47)
at scala.collection.TraversableLike.to(TraversableLike.scala:678)
at scala.collection.TraversableLike.to$(TraversableLike.scala:675)
at scala.collection.AbstractTraversable.to(Traversable.scala:108)
at scala.collection.TraversableOnce.toList(TraversableOnce.scala:299)
at scala.collection.TraversableOnce.toList$(TraversableOnce.scala:299)
at scala.collection.AbstractTraversable.toList(Traversable.scala:108)
at 
org.apache.spark.deploy.history.FsHistoryProvider.checkForLogs(FsHistoryProvider.scala:588)
at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$startPolling$3(FsHistoryProvider.scala:299)
{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37559) ShuffledRowRDD get preferred locations order by reduce size

2021-12-06 Thread XiDuo You (Jira)
XiDuo You created SPARK-37559:
-

 Summary: ShuffledRowRDD get preferred locations order by reduce 
size
 Key: SPARK-37559
 URL: https://issues.apache.org/jira/browse/SPARK-37559
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


The coalesced partition can contain several reduce partitions. The preferred 
locations of the RDD partition should be the biggest reduce partition before 
coalesced. So it can get a better data locality and reduce the network traffic.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37528) Support reorder tasks during scheduling by shuffle partition size in AQE

2021-12-02 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37528:
--
Description: 
Reorder tasks by input size can save the whole stage execution time. Assume the 
larger amount of input data takes longer to execute. Let's say we have one 
stage with 4 tasks and the `defaultParallelism` is 2 and the 4 tasks have 
differnt execution time with [1s, 3s, 2s, 4s].
 * in normal the execution time of the stage is: 7s
 * after reorder the tasks, the execution time of the stage is: 5s

a new config `spark.scheduler.reorderTasks.enabled` to decide if we allow to 
reorder tasks.

 

  was:
Reorder tasks by input size can save the whole stage execution time. Assume the 
larger amount of data takes longer to execute. Let's say we have one stage with 
4 tasks and the `defaultParallelism` is 2 and the 4 tasks have differnt 
execution time with [1s, 3s, 2s, 4s].
 * in normal the execution time of the stage is: 7s
 * after reorder the tasks, the execution time of the stage is: 5s

a new config `spark.scheduler.reorderTasks.enabled` to decide if we allow to 
reorder tasks.

 


> Support reorder tasks during scheduling by shuffle partition size in AQE
> 
>
> Key: SPARK-37528
> URL: https://issues.apache.org/jira/browse/SPARK-37528
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core, SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> Reorder tasks by input size can save the whole stage execution time. Assume 
> the larger amount of input data takes longer to execute. Let's say we have 
> one stage with 4 tasks and the `defaultParallelism` is 2 and the 4 tasks have 
> differnt execution time with [1s, 3s, 2s, 4s].
>  * in normal the execution time of the stage is: 7s
>  * after reorder the tasks, the execution time of the stage is: 5s
> a new config `spark.scheduler.reorderTasks.enabled` to decide if we allow to 
> reorder tasks.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37528) Support reorder tasks during scheduling by shuffle partition size in AQE

2021-12-02 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37528:
--
Description: 
Reorder tasks by input size can save the whole stage execution time. Assume the 
larger amount of data takes longer to execute. Let's say we have one stage with 
4 tasks and the `defaultParallelism` is 2 and the 4 tasks have differnt 
execution time with [1s, 3s, 2s, 4s].
 * in normal the execution time of the stage is: 7s
 * after reorder the tasks, the execution time of the stage is: 5s

a new config `spark.scheduler.reorderTasks.enabled` to decide if we allow to 
reorder tasks.

 

  was:
Reorder tasks by input size can save the whole stage execution time. Let's say 
we have one stage with 4 tasks and the `defaultParallelism` is 2 and the 4 
tasks have differnt execution time with [1s, 3s, 2s, 4s].
 * in normal the execution time of the stage is: 7s
 * after reorder the tasks, the execution time of the stage is: 5s


a new config `spark.scheduler.reorderTasks.enabled` to decide if we allow to 
reorder tasks.

 


> Support reorder tasks during scheduling by shuffle partition size in AQE
> 
>
> Key: SPARK-37528
> URL: https://issues.apache.org/jira/browse/SPARK-37528
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core, SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> Reorder tasks by input size can save the whole stage execution time. Assume 
> the larger amount of data takes longer to execute. Let's say we have one 
> stage with 4 tasks and the `defaultParallelism` is 2 and the 4 tasks have 
> differnt execution time with [1s, 3s, 2s, 4s].
>  * in normal the execution time of the stage is: 7s
>  * after reorder the tasks, the execution time of the stage is: 5s
> a new config `spark.scheduler.reorderTasks.enabled` to decide if we allow to 
> reorder tasks.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37528) Support reorder tasks during scheduling by shuffle partition size in AQE

2021-12-02 Thread XiDuo You (Jira)
XiDuo You created SPARK-37528:
-

 Summary: Support reorder tasks during scheduling by shuffle 
partition size in AQE
 Key: SPARK-37528
 URL: https://issues.apache.org/jira/browse/SPARK-37528
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core, SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


Reorder tasks by input size can save the whole stage execution time. Let's say 
we have one stage with 4 tasks and the `defaultParallelism` is 2 and the 4 
tasks have differnt execution time with [1s, 3s, 2s, 4s].
 * in normal the execution time of the stage is: 7s
 * after reorder the tasks, the execution time of the stage is: 5s


a new config `spark.scheduler.reorderTasks.enabled` to decide if we allow to 
reorder tasks.

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37502) Support cast aware output partitioning and required if it can up cast

2021-11-30 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37502:
--
Description: 
If a `Cast` is up cast then it should be without any truncating or precision 
lose or possible runtime failures. So the output partitioning should be same 
with/without `Cast` if the `Cast` is up cast.

Let's say we have a query:
{code:java}
-- v1: c1 int
-- v2: c2 long

SELECT * FROM v2 JOIN (SELECT c1, count(*) FROM v1 GROUP BY c1) v1 ON v1.c1 = 
v2.c2
{code}
The executed plan contains three shuffle nodes which looks like:
{code:java}
SortMergeJoin
  Exchange(cast(c1 as bigint))
HashAggregate
  Exchange(c1)
Scan v1
  Exchange(c2)
Scan v2
{code}
We can simplify the plan using two shuffle nodes:
{code:java}
SortMergeJoin
  HashAggregate
Exchange(c1)
  Scan v1
  Exchange(c2)
Scan v2
{code}

  was:
if a `Cast` is up cast then it should be without any truncating or precision 
lose or possible runtime failures. So the output partitioning should be same 
with/without `Cast` if the `Cast` is up cast.

Let's say we have a query:
{code:java}
-- v1: c1 int
-- v2: c2 long

SELECT * FROM v2 JOIN (SELECT c1, count(*) FROM v1 GROUP BY c1) v1 ON v1.c1 = 
v2.c2
{code}
The executed plan contains three shuffle nodes which looks like:
{code:java}
SortMergeJoin
  Exchange(cast(c1 as bigint))
HashAggregate
  Exchange(c1)
Scan v1
  Exchange(c2)
Scan v2
{code}

We can simply the plan using two shuffle nodes:
{code:java}
SortMergeJoin
  HashAggregate
Exchange(c1)
  Scan v1
  Exchange(c2)
Scan v2
{code}


> Support cast aware output partitioning and required if it can up cast
> -
>
> Key: SPARK-37502
> URL: https://issues.apache.org/jira/browse/SPARK-37502
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> If a `Cast` is up cast then it should be without any truncating or precision 
> lose or possible runtime failures. So the output partitioning should be same 
> with/without `Cast` if the `Cast` is up cast.
> Let's say we have a query:
> {code:java}
> -- v1: c1 int
> -- v2: c2 long
> SELECT * FROM v2 JOIN (SELECT c1, count(*) FROM v1 GROUP BY c1) v1 ON v1.c1 = 
> v2.c2
> {code}
> The executed plan contains three shuffle nodes which looks like:
> {code:java}
> SortMergeJoin
>   Exchange(cast(c1 as bigint))
> HashAggregate
>   Exchange(c1)
> Scan v1
>   Exchange(c2)
> Scan v2
> {code}
> We can simplify the plan using two shuffle nodes:
> {code:java}
> SortMergeJoin
>   HashAggregate
> Exchange(c1)
>   Scan v1
>   Exchange(c2)
> Scan v2
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37502) Support cast aware output partitioning and required if it can up cast

2021-11-30 Thread XiDuo You (Jira)
XiDuo You created SPARK-37502:
-

 Summary: Support cast aware output partitioning and required if it 
can up cast
 Key: SPARK-37502
 URL: https://issues.apache.org/jira/browse/SPARK-37502
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


if a `Cast` is up cast then it should be without any truncating or precision 
lose or possible runtime failures. So the output partitioning should be same 
with/without `Cast` if the `Cast` is up cast.

Let's say we have a query:
{code:java}
-- v1: c1 int
-- v2: c2 long

SELECT * FROM v2 JOIN (SELECT c1, count(*) FROM v1 GROUP BY c1) v1 ON v1.c1 = 
v2.c2
{code}
The executed plan contains three shuffle nodes which looks like:
{code:java}
SortMergeJoin
  Exchange(cast(c1 as bigint))
HashAggregate
  Exchange(c1)
Scan v1
  Exchange(c2)
Scan v2
{code}

We can simply the plan using two shuffle nodes:
{code:java}
SortMergeJoin
  HashAggregate
Exchange(c1)
  Scan v1
  Exchange(c2)
Scan v2
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37357) Add small partition factor for rebalance partitions

2021-11-24 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37357:
--
Summary: Add small partition factor for rebalance partitions  (was: Create 
skew partition specs should respect min partition size)

> Add small partition factor for rebalance partitions
> ---
>
> Key: SPARK-37357
> URL: https://issues.apache.org/jira/browse/SPARK-37357
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> For example `Rebalance` provide a functionality that split the large reduce 
> partition into smalls. However we have seen many SQL produce small files due 
> to the last partition.
> Let's say we have one reduce partition and six map partitions and the blocks 
> are: 
> [10, 10, 10, 10, 10, 10]
> If the target size is 50, we will get two files with 50 and 10. And it will 
> get worse if there are thousands of reduce partitions.
> It should be helpful if we can control the min partition size.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37357) Create skew partition specs should respect min partition size

2021-11-18 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37357:
--
Description: 
For example `Rebalance` provide a functionality that split the large reduce 
partition into smalls. However we have seen many SQL produce small files due to 
the last partition.

Let's say we have one reduce partition and six map partitions and the blocks 
are: 
[10, 10, 10, 10, 10, 10]
If the target size is 50, we will get two files with 50 and 10. And it will get 
worse if there are thousands of reduce partitions.

It should be helpful if we can control the min partition size.

  was:
`Rebalance` provide a functionality that split the large reduce partition into 
smalls. However we have seen many SQL produce small files due to the last 
partition.

Let's say we have one reduce partition and six map partitions and the blocks 
are: [10, 10, 10, 10, 10, 10]. If the target size is 50, we will get two files 
with 50 and 10. And it will get worse if there are thousands of reduce 
partitions.

It should be helpful if we can merge the last small partition into previous.


> Create skew partition specs should respect min partition size
> -
>
> Key: SPARK-37357
> URL: https://issues.apache.org/jira/browse/SPARK-37357
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> For example `Rebalance` provide a functionality that split the large reduce 
> partition into smalls. However we have seen many SQL produce small files due 
> to the last partition.
> Let's say we have one reduce partition and six map partitions and the blocks 
> are: 
> [10, 10, 10, 10, 10, 10]
> If the target size is 50, we will get two files with 50 and 10. And it will 
> get worse if there are thousands of reduce partitions.
> It should be helpful if we can control the min partition size.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37357) Create skew partition specs should respect min partition size

2021-11-18 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37357:
--
Summary: Create skew partition specs should respect min partition size  
(was: Add merged last partition factor for rebalance)

> Create skew partition specs should respect min partition size
> -
>
> Key: SPARK-37357
> URL: https://issues.apache.org/jira/browse/SPARK-37357
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> `Rebalance` provide a functionality that split the large reduce partition 
> into smalls. However we have seen many SQL produce small files due to the 
> last partition.
> Let's say we have one reduce partition and six map partitions and the blocks 
> are: [10, 10, 10, 10, 10, 10]. If the target size is 50, we will get two 
> files with 50 and 10. And it will get worse if there are thousands of reduce 
> partitions.
> It should be helpful if we can merge the last small partition into previous.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37357) Add merged last partition factor for rebalance

2021-11-17 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37357:
--
Description: 
`Rebalance` provide a functionality that split the large reduce partition into 
smalls. However we have seen many SQL produce small files due to the last 
partition.

Let's say we have one reduce partition and six map partitions and the blocks 
are: [10, 10, 10, 10, 10, 10]. If the target size is 50, we will get two files 
with 50 and 10. And it will get worse if there are thousands of reduce 
partitions.

It should be helpful if we can merge the last small partition into previous.

  was:
`Rebalance` provide a functionality that split the large reduce partition into 
smalls. However we have seen many SQL produce small files due to the last 
partition.

Let's say we have one reduce partition and three map partitions and the blocks 
are: [10, 10, 10, 10, 10, 10]. If the target size is 50, we will get two files 
with 50 and 10. And it will get worse if there are thousands of reduce 
partitions.

It should be helpful if we can merge the last small partition into previous.


> Add merged last partition factor for rebalance
> --
>
> Key: SPARK-37357
> URL: https://issues.apache.org/jira/browse/SPARK-37357
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> `Rebalance` provide a functionality that split the large reduce partition 
> into smalls. However we have seen many SQL produce small files due to the 
> last partition.
> Let's say we have one reduce partition and six map partitions and the blocks 
> are: [10, 10, 10, 10, 10, 10]. If the target size is 50, we will get two 
> files with 50 and 10. And it will get worse if there are thousands of reduce 
> partitions.
> It should be helpful if we can merge the last small partition into previous.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37357) Add merged last partition factor for rebalance

2021-11-17 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37357:
--
Description: 
`Rebalance` provide a functionality that split the large reduce partition into 
smalls. However we have seen many SQL produce small files due to the last 
partition.

Let's say we have one reduce partition and three map partitions and the blocks 
are: [10, 10, 10, 10, 10, 10]. If the target size is 50, we will get two files 
with 50 and 10. And it will get worse if there are thousands of reduce 
partitions.

It should be helpful if we can merge the last small partition into previous.

  was:
`Rebalance` provide a functionality that split the large reduce partition into 
smalls. However we have seen many SQL produce small files due to the last 
partition.

Let's say we have one reduce partition and three map partitions and the blocks 
are: [10, 10, 10, 10, 10, 10] and the target size is 50. We will get two files 
with 50 and 10. And it will get worse if there are thousands of reduce 
partitions.

It should be helpful if we can merge the last small partition into previous.


> Add merged last partition factor for rebalance
> --
>
> Key: SPARK-37357
> URL: https://issues.apache.org/jira/browse/SPARK-37357
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> `Rebalance` provide a functionality that split the large reduce partition 
> into smalls. However we have seen many SQL produce small files due to the 
> last partition.
> Let's say we have one reduce partition and three map partitions and the 
> blocks are: [10, 10, 10, 10, 10, 10]. If the target size is 50, we will get 
> two files with 50 and 10. And it will get worse if there are thousands of 
> reduce partitions.
> It should be helpful if we can merge the last small partition into previous.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37357) Add merged last partition factor for rebalance

2021-11-17 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37357:
--
Description: 
`Rebalance` provide a functionality that split the large reduce partition into 
smalls. However we have seen many SQL produce small files due to the last 
partition.

Let's say we have one reduce partition and three map partitions and the blocks 
are: [10, 10, 10, 10, 10, 10] and the target size is 50. We will get two files 
with 50 and 10. And it will get worse if there are thousands of reduce 
partitions.

It should be helpful if we can merge the last small partition into previous.

  was:
For example `Rebalance` provide a functionality that split the large reduce 
partition into smalls. However we have seen many SQL produce small files due to 
the last partition.

Let's say we have one reduce partition and three map partitions and the blocks 
are: [10, 10, 10, 10, 10, 10] and the target size is 50. We will get two files 
with 50 and 10. And it will get worse if there are thousands of reduce 
partitions.

It should be helpful if we can merge the last small partition into previous.


> Add merged last partition factor for rebalance
> --
>
> Key: SPARK-37357
> URL: https://issues.apache.org/jira/browse/SPARK-37357
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> `Rebalance` provide a functionality that split the large reduce partition 
> into smalls. However we have seen many SQL produce small files due to the 
> last partition.
> Let's say we have one reduce partition and three map partitions and the 
> blocks are: [10, 10, 10, 10, 10, 10] and the target size is 50. We will get 
> two files with 50 and 10. And it will get worse if there are thousands of 
> reduce partitions.
> It should be helpful if we can merge the last small partition into previous.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37357) Add merged last partition factor for rebalance

2021-11-17 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37357:
--
Summary: Add merged last partition factor for rebalance  (was: Add merged 
last partition factor for split skew partition)

> Add merged last partition factor for rebalance
> --
>
> Key: SPARK-37357
> URL: https://issues.apache.org/jira/browse/SPARK-37357
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> For example `Rebalance` provide a functionality that split the large reduce 
> partition into smalls. However we have seen many SQL produce small files due 
> to the last partition.
> Let's say we have one reduce partition and three map partitions and the 
> blocks are: [10, 10, 10, 10, 10, 10] and the target size is 50. We will get 
> two files with 50 and 10. And it will get worse if there are thousands of 
> reduce partitions.
> It should be helpful if we can merge the last small partition into previous.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37357) Add merged last partition factor for split skew partition

2021-11-17 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37357:
--
Summary: Add merged last partition factor for split skew partition  (was: 
Add merged last partition factor for rebalance)

> Add merged last partition factor for split skew partition
> -
>
> Key: SPARK-37357
> URL: https://issues.apache.org/jira/browse/SPARK-37357
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> `Rebalance` provide a functionality that split the large reduce partition 
> into smalls. However we have seen many SQL produce small files due to the 
> last partition.
> Let's say we have one reduce partition and three map partitions and the 
> blocks are: [10, 10, 10, 10, 10, 10] and the target size is 50. We will get 
> two files with 50 and 10. And it will get worse if there are thousands of 
> reduce partitions.
> It should be helpful if we can merge the last small partition into previous.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37357) Add merged last partition factor for split skew partition

2021-11-17 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37357:
--
Description: 
For example `Rebalance` provide a functionality that split the large reduce 
partition into smalls. However we have seen many SQL produce small files due to 
the last partition.

Let's say we have one reduce partition and three map partitions and the blocks 
are: [10, 10, 10, 10, 10, 10] and the target size is 50. We will get two files 
with 50 and 10. And it will get worse if there are thousands of reduce 
partitions.

It should be helpful if we can merge the last small partition into previous.

  was:
`Rebalance` provide a functionality that split the large reduce partition into 
smalls. However we have seen many SQL produce small files due to the last 
partition.

Let's say we have one reduce partition and three map partitions and the blocks 
are: [10, 10, 10, 10, 10, 10] and the target size is 50. We will get two files 
with 50 and 10. And it will get worse if there are thousands of reduce 
partitions.

It should be helpful if we can merge the last small partition into previous.


> Add merged last partition factor for split skew partition
> -
>
> Key: SPARK-37357
> URL: https://issues.apache.org/jira/browse/SPARK-37357
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> For example `Rebalance` provide a functionality that split the large reduce 
> partition into smalls. However we have seen many SQL produce small files due 
> to the last partition.
> Let's say we have one reduce partition and three map partitions and the 
> blocks are: [10, 10, 10, 10, 10, 10] and the target size is 50. We will get 
> two files with 50 and 10. And it will get worse if there are thousands of 
> reduce partitions.
> It should be helpful if we can merge the last small partition into previous.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37357) Add merged last partition factor for rebalance

2021-11-17 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37357:
--
Description: 
`Rebalance` provide a functionality that split the large reduce partition into 
smalls. However we have seen many SQL produce small files due to the last 
partition.

Let's say we have one reduce partition and three map partitions and the blocks 
are: [10, 10, 10, 10, 10, 10] and the target size is 50. We will get two files 
with 50 and 10. And it will get worse if there are thousands of reduce 
partitions.

It should be helpful if we can merge the last small partition into previous.

  was:
`Rebalance` provide a functionality that split the large reduce partition into 
smalls. However we have seen many SQL produce small files due to the last 
partition.

Let's say we have one reduce partition and three map partitions and the blocks 
are: [10, 10, 10, 10, 10, 10] and the target size is 50. We will get two files 
with 50 and 10. And it will get worse if there thousands of reduce partitions.

It should be helpful if we can merge the last small partition into previous.


> Add merged last partition factor for rebalance
> --
>
> Key: SPARK-37357
> URL: https://issues.apache.org/jira/browse/SPARK-37357
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> `Rebalance` provide a functionality that split the large reduce partition 
> into smalls. However we have seen many SQL produce small files due to the 
> last partition.
> Let's say we have one reduce partition and three map partitions and the 
> blocks are: [10, 10, 10, 10, 10, 10] and the target size is 50. We will get 
> two files with 50 and 10. And it will get worse if there are thousands of 
> reduce partitions.
> It should be helpful if we can merge the last small partition into previous.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37357) Add merged last partition factor for rebalance

2021-11-17 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37357:
--
Description: 
`Rebalance` provide a functionality that split the large reduce partition into 
smalls. However we have seen many SQL produce small files due to the last 
partition.

Let's say we have one reduce partition and three map partitions and the blocks 
are: [10, 10, 10, 10, 10, 10] and the target size is 50. We will get two files 
with 50 and 10. And it will get worse if there thousands of reduce partitions.

It should be helpful if we can merge the last small partition into previous.

  was:
`Rebalance` provide a functionality that split the large reduce partition into 
smalls. However we have seen many SQL produce small files due to the last 
partition.

Let's say we have one reduce partition and three map partitions and the blocks 
are: [40, 60, 10, 10] and the target size is 100. We will get two files with 
110 and 10. And it will get worse if there thousands of reduce partitions.

It should be helpful if we can merge the last small partition into previous.


> Add merged last partition factor for rebalance
> --
>
> Key: SPARK-37357
> URL: https://issues.apache.org/jira/browse/SPARK-37357
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> `Rebalance` provide a functionality that split the large reduce partition 
> into smalls. However we have seen many SQL produce small files due to the 
> last partition.
> Let's say we have one reduce partition and three map partitions and the 
> blocks are: [10, 10, 10, 10, 10, 10] and the target size is 50. We will get 
> two files with 50 and 10. And it will get worse if there thousands of reduce 
> partitions.
> It should be helpful if we can merge the last small partition into previous.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37357) Add merged last partition factor for rebalance

2021-11-17 Thread XiDuo You (Jira)
XiDuo You created SPARK-37357:
-

 Summary: Add merged last partition factor for rebalance
 Key: SPARK-37357
 URL: https://issues.apache.org/jira/browse/SPARK-37357
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


`Rebalance` provide a functionality that split the large reduce partition into 
smalls. However we have seen many SQL produce small files due to the last 
partition.

Let's say we have one reduce partition and three map partitions and the blocks 
are: [40, 60, 10, 10] and the target size is 100. We will get two files with 
110 and 10. And it will get worse if there thousands of reduce partitions.

It should be helpful if we can merge the last small partition into previous.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37333) Specify the required distribution at V1Write

2021-11-15 Thread XiDuo You (Jira)
XiDuo You created SPARK-37333:
-

 Summary: Specify the required distribution at V1Write
 Key: SPARK-37333
 URL: https://issues.apache.org/jira/browse/SPARK-37333
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


An improvment of SPARK-37287.

We can specify the distribution at V1Write. e.g. the write is dynamic 
partition, we may expect an output partitioning based on dynamic partition 
columns.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37287) Pull out dynamic partition and bucket sort from FileFormatWriter

2021-11-11 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37287:
--
Description: 
`FileFormatWriter.write` now is used by all V1 write which includes datasource 
and hive table. However it contains a sort which is based on dynamic partition 
and bucket columns that can not be seen in plan directly.

V2 write has a better approach that it satisfies the order or even distribution 
by using rule `V2Writes`.

V1 write should do the similar thing with V2 write.

 

  was:
FileFormatWriter.write now is used by all V1 write which includes datasource 
and hive table. However it contains a sort: based on dynamic partition and 
bucket columns which can not be seen in plan directly.

V2 write has a better approach that it satisfies the order or even distribution 
by using rule `V2Writes`.

V1 write should do the similar thing with V2 write.

 


> Pull out dynamic partition and bucket sort from FileFormatWriter
> 
>
> Key: SPARK-37287
> URL: https://issues.apache.org/jira/browse/SPARK-37287
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> `FileFormatWriter.write` now is used by all V1 write which includes 
> datasource and hive table. However it contains a sort which is based on 
> dynamic partition and bucket columns that can not be seen in plan directly.
> V2 write has a better approach that it satisfies the order or even 
> distribution by using rule `V2Writes`.
> V1 write should do the similar thing with V2 write.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37287) Pull out dynamic partition and bucket sort from FileFormatWriter

2021-11-11 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37287:
--
Description: 
FileFormatWriter.write now is used by all V1 write which includes datasource 
and hive table. However it contains a sort: based on dynamic partition and 
bucket columns which can not be seen in plan directly.

V2 write has a better approach that satisfies add the order or even 
distribution using rule `V2Writes`.

V1 write should do the similar thing with V2 write.

 

  was:
FileFormatWriter.write now is used by all V1 write which includes datasource 
and hive table. However it contains a sort: dynamic partition and bucket 
columns which can not be seen in plan directly.

V2 write has a better approach that satisfies add the order or even 
distribution using rule `V2Writes`.

V1 write should do the similar thing with V2 write.

 


> Pull out dynamic partition and bucket sort from FileFormatWriter
> 
>
> Key: SPARK-37287
> URL: https://issues.apache.org/jira/browse/SPARK-37287
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> FileFormatWriter.write now is used by all V1 write which includes datasource 
> and hive table. However it contains a sort: based on dynamic partition and 
> bucket columns which can not be seen in plan directly.
> V2 write has a better approach that satisfies add the order or even 
> distribution using rule `V2Writes`.
> V1 write should do the similar thing with V2 write.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37287) Pull out dynamic partition and bucket sort from FileFormatWriter

2021-11-11 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37287:
--
Description: 
FileFormatWriter.write now is used by all V1 write which includes datasource 
and hive table. However it contains a sort: based on dynamic partition and 
bucket columns which can not be seen in plan directly.

V2 write has a better approach that it satisfies the order or even distribution 
by using rule `V2Writes`.

V1 write should do the similar thing with V2 write.

 

  was:
FileFormatWriter.write now is used by all V1 write which includes datasource 
and hive table. However it contains a sort: based on dynamic partition and 
bucket columns which can not be seen in plan directly.

V2 write has a better approach that satisfies add the order or even 
distribution using rule `V2Writes`.

V1 write should do the similar thing with V2 write.

 


> Pull out dynamic partition and bucket sort from FileFormatWriter
> 
>
> Key: SPARK-37287
> URL: https://issues.apache.org/jira/browse/SPARK-37287
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> FileFormatWriter.write now is used by all V1 write which includes datasource 
> and hive table. However it contains a sort: based on dynamic partition and 
> bucket columns which can not be seen in plan directly.
> V2 write has a better approach that it satisfies the order or even 
> distribution by using rule `V2Writes`.
> V1 write should do the similar thing with V2 write.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37287) Pull out dynamic partition and bucket sort from FileFormatWriter

2021-11-11 Thread XiDuo You (Jira)
XiDuo You created SPARK-37287:
-

 Summary: Pull out dynamic partition and bucket sort from 
FileFormatWriter
 Key: SPARK-37287
 URL: https://issues.apache.org/jira/browse/SPARK-37287
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


FileFormatWriter.write now is used by all V1 write which includes datasource 
and hive table. However it contains a sort: dynamic partition and bucket 
columns which can not be seen in plan directly.

V2 write has a better approach that satisfies add the order or even 
distribution using rule `V2Writes`.

V1 write should do the similar thing with V2 write.

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37267) OptimizeSkewInRebalancePartitions support optimize non-root node

2021-11-10 Thread XiDuo You (Jira)
XiDuo You created SPARK-37267:
-

 Summary: OptimizeSkewInRebalancePartitions support optimize 
non-root node
 Key: SPARK-37267
 URL: https://issues.apache.org/jira/browse/SPARK-37267
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


`OptimizeSkewInRebalancePartitions` now is applied if the `RebalancePartitions` 
is the root node, but sometimes, we expect a local sort after do 
RebalancePartitions that can improve the compression ratio.

After SPARK-36184, we make validate easy that whether the rule introduces extra 
shuffle or not and the output partitioning is ensured by 
`AQEShuffleReadExec.outputPartitioning`.

So it is safe to make `OptimizeSkewInRebalancePartitions` support optimize 
non-root node.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37194) Avoid unnecessary sort in FileFormatWriter if it's not dynamic partition

2021-11-02 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37194:
--
Description: 
`FileFormatWriter.write` will sort the partition and bucket column before 
writing. I think this code path assumed the input `partitionColumns` are 
dynamic but actually it's not. It now is used by three code path:
 - `FileStreamSink`; it should be always dynamic partition
 - `SaveAsHiveFile`; it followed the assuming that `InsertIntoHiveTable` has 
removed the static partition and `InsertIntoHiveDirCommand` has no partition
 - `InsertIntoHadoopFsRelationCommand`; it passed `partitionColumns` into 
`FileFormatWriter.write` without removing static partition because we need it 
to generate the partition path in `DynamicPartitionDataWriter`

It shows that the unnecessary sort only affected the 
`InsertIntoHadoopFsRelationCommand` if we write data with static partition.

 

  was:
`FileFormatWriter.write` will sort the partition and bucket column before 
writing. I think this code path assumed the input `partitionColumns` are 
dynamic but actually it's not. It now is used by three code path:
 - `FileStreamSink`; it should be always dynamic partition
 - `SaveAsHiveFile`; it followed the assuming that `InsertIntoHiveTable` has 
removed the static partition and `InsertIntoHiveDirCommand` has no partition
 - `InsertIntoHadoopFsRelationCommand`; it passed `partitionColumns` into 
`FileFormatWriter.write` without removing static partition because we need it 
to generate the partition path in `DynamicPartitionDataWriter`

It shows that the unnecessary sort only affected the 
`InsertIntoHadoopFsRelationCommand` if we write data with static partition.

 

Do a simple benchmak:
{code:java}
CREATE TABLE test (id long) USING PARQUET PARTITIONED BY (d string);

-- before this PR, it tooks 1.82 seconds
-- after this PR, it tooks 1.072 seconds
INSERT OVERWRITE TABLE test PARTITION(d='a') SELECT id FROM range(1000);
{code}


> Avoid unnecessary sort in FileFormatWriter if it's not dynamic partition
> 
>
> Key: SPARK-37194
> URL: https://issues.apache.org/jira/browse/SPARK-37194
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> `FileFormatWriter.write` will sort the partition and bucket column before 
> writing. I think this code path assumed the input `partitionColumns` are 
> dynamic but actually it's not. It now is used by three code path:
>  - `FileStreamSink`; it should be always dynamic partition
>  - `SaveAsHiveFile`; it followed the assuming that `InsertIntoHiveTable` has 
> removed the static partition and `InsertIntoHiveDirCommand` has no partition
>  - `InsertIntoHadoopFsRelationCommand`; it passed `partitionColumns` into 
> `FileFormatWriter.write` without removing static partition because we need it 
> to generate the partition path in `DynamicPartitionDataWriter`
> It shows that the unnecessary sort only affected the 
> `InsertIntoHadoopFsRelationCommand` if we write data with static partition.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37194) Avoid unnecessary sort in FileFormatWriter if it's not dynamic partition

2021-11-02 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37194:
--
Description: 
`FileFormatWriter.write` will sort the partition and bucket column before 
writing. I think this code path assumed the input `partitionColumns` are 
dynamic but actually it's not. It now is used by three code path:
 - `FileStreamSink`; it should be always dynamic partition
 - `SaveAsHiveFile`; it followed the assuming that `InsertIntoHiveTable` has 
removed the static partition and `InsertIntoHiveDirCommand` has no partition
 - `InsertIntoHadoopFsRelationCommand`; it passed `partitionColumns` into 
`FileFormatWriter.write` without removing static partition because we need it 
to generate the partition path in `DynamicPartitionDataWriter`

It shows that the unnecessary sort only affected the 
`InsertIntoHadoopFsRelationCommand` if we write data with static partition.

 

Do a simple benchmak:
{code:java}
CREATE TABLE test (id long) USING PARQUET PARTITIONED BY (d string);

-- before this PR, it tooks 1.82 seconds
-- after this PR, it tooks 1.072 seconds
INSERT OVERWRITE TABLE test PARTITION(d='a') SELECT id FROM range(1000);
{code}

  was:
`FileFormatWriter.write` will sort the partition and bucket column before do 
writing. I think this code path assumed the input `partitionColumns` are 
dynamic but actually it's not. It now is used by three code path:
 - `FileStreamSink`; it should be always dynamic partition
 - `SaveAsHiveFile`; it followed the assuming that `InsertIntoHiveTable` has 
removed the static partition and `InsertIntoHiveDirCommand` has no partition
 - `InsertIntoHadoopFsRelationCommand`; it passed `partitionColumns` into 
`FileFormatWriter.write` without removing static partition because we need it 
to generate the partition path in `DynamicPartitionDataWriter`

It shows that the unnecessary sort only affected the 
`InsertIntoHadoopFsRelationCommand` if we write data with static partition.

 

Do a simple benchmak:
{code:java}
CREATE TABLE test (id long) USING PARQUET PARTITIONED BY (d string);

-- before this PR, it tooks 1.82 seconds
-- after this PR, it tooks 1.072 seconds
INSERT OVERWRITE TABLE test PARTITION(d='a') SELECT id FROM range(1000);
{code}


> Avoid unnecessary sort in FileFormatWriter if it's not dynamic partition
> 
>
> Key: SPARK-37194
> URL: https://issues.apache.org/jira/browse/SPARK-37194
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> `FileFormatWriter.write` will sort the partition and bucket column before 
> writing. I think this code path assumed the input `partitionColumns` are 
> dynamic but actually it's not. It now is used by three code path:
>  - `FileStreamSink`; it should be always dynamic partition
>  - `SaveAsHiveFile`; it followed the assuming that `InsertIntoHiveTable` has 
> removed the static partition and `InsertIntoHiveDirCommand` has no partition
>  - `InsertIntoHadoopFsRelationCommand`; it passed `partitionColumns` into 
> `FileFormatWriter.write` without removing static partition because we need it 
> to generate the partition path in `DynamicPartitionDataWriter`
> It shows that the unnecessary sort only affected the 
> `InsertIntoHadoopFsRelationCommand` if we write data with static partition.
>  
> Do a simple benchmak:
> {code:java}
> CREATE TABLE test (id long) USING PARQUET PARTITIONED BY (d string);
> -- before this PR, it tooks 1.82 seconds
> -- after this PR, it tooks 1.072 seconds
> INSERT OVERWRITE TABLE test PARTITION(d='a') SELECT id FROM range(1000);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37194) Avoid unnecessary sort in FileFormatWriter if it's not dynamic partition

2021-11-02 Thread XiDuo You (Jira)
XiDuo You created SPARK-37194:
-

 Summary: Avoid unnecessary sort in FileFormatWriter if it's not 
dynamic partition
 Key: SPARK-37194
 URL: https://issues.apache.org/jira/browse/SPARK-37194
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


`FileFormatWriter.write` will sort the partition and bucket column before do 
writing. I think this code path assumed the input `partitionColumns` are 
dynamic but actually it's not. It now is used by three code path:
 - `FileStreamSink`; it should be always dynamic partition
 - `SaveAsHiveFile`; it followed the assuming that `InsertIntoHiveTable` has 
removed the static partition and `InsertIntoHiveDirCommand` has no partition
 - `InsertIntoHadoopFsRelationCommand`; it passed `partitionColumns` into 
`FileFormatWriter.write` without removing static partition because we need it 
to generate the partition path in `DynamicPartitionDataWriter`

It shows that the unnecessary sort only affected the 
`InsertIntoHadoopFsRelationCommand` if we write data with static partition.

 

Do a simple benchmak:
{code:java}
CREATE TABLE test (id long) USING PARQUET PARTITIONED BY (d string);

-- before this PR, it tooks 1.82 seconds
-- after this PR, it tooks 1.072 seconds
INSERT OVERWRITE TABLE test PARTITION(d='a') SELECT id FROM range(1000);
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37125) Support AnsiInterval radix sort

2021-10-26 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37125:
--
Parent: SPARK-27790
Issue Type: Sub-task  (was: Improvement)

> Support AnsiInterval radix sort
> ---
>
> Key: SPARK-37125
> URL: https://issues.apache.org/jira/browse/SPARK-37125
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> The radix sort is more faster than timsort, the benchmark result can see in 
> `SortBenchmark`.
> Since the `AnsiInterval` data type is comparable:
> - `YearMonthIntervalType` -> int ordering
> - `DayTimeIntervalType` -> long ordering
> And we aslo support radix sort when the ordering column date type is int or 
> long.
> So `AnsiInterval` radix sort can be supported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37125) Support AnsiInterval radix sort

2021-10-26 Thread XiDuo You (Jira)
XiDuo You created SPARK-37125:
-

 Summary: Support AnsiInterval radix sort
 Key: SPARK-37125
 URL: https://issues.apache.org/jira/browse/SPARK-37125
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


The radix sort is more faster than timsort, the benchmark result can see in 
`SortBenchmark`.

Since the `AnsiInterval` data type is comparable:
- `YearMonthIntervalType` -> int ordering
- `DayTimeIntervalType` -> long ordering

And we aslo support radix sort when the ordering column date type is int or 
long.

So `AnsiInterval` radix sort can be supported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37043) Cancel all running job after AQE plan finished

2021-10-25 Thread XiDuo You (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17434071#comment-17434071
 ] 

XiDuo You commented on SPARK-37043:
---

[~xkrogen] sorry for the late reply, I agree with mark this issue as the 
subtask of SPARK-37063.

> Cancel all running job after AQE plan finished
> --
>
> Key: SPARK-37043
> URL: https://issues.apache.org/jira/browse/SPARK-37043
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> We see stage was still running after AQE plan finished. This is because the 
> plan which contains a empty join has been converted to `LocalTableScanExec` 
> during `AQEOptimizer`, but the other side of this join is still running 
> (shuffle map stage).
>  
> It's no meaning to keep running the stage, It's better to cancel the running 
> stage after AQE plan finished in case wasting the task resource.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37098) Alter table properties should invalidate cache

2021-10-22 Thread XiDuo You (Jira)
XiDuo You created SPARK-37098:
-

 Summary: Alter table properties should invalidate cache
 Key: SPARK-37098
 URL: https://issues.apache.org/jira/browse/SPARK-37098
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0, 3.1.2, 3.0.3, 3.3.0
Reporter: XiDuo You


The table properties can change the behavior of wriing. e.g. the parquet table 
with `parquet.compression`.

If you execute the following SQL, we will get the file with snappy compression 
rather than zstd.
{code:java}
CREATE TABLE t (c int) STORED AS PARQUET;
// cache table metadata
SELECT * FROM t;
ALTER TABLE t SET TBLPROPERTIES('parquet.compression'='zstd');
INSERT INTO TABLE t values(1);
{code}
So we should invalidate the table cache after alter table properties.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37080) Add benchmark tool guide in pull request template

2021-10-20 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37080:
--
Summary: Add benchmark tool guide in pull request template  (was: Add 
benchmark guide in pull request template)

> Add benchmark tool guide in pull request template
> -
>
> Key: SPARK-37080
> URL: https://issues.apache.org/jira/browse/SPARK-37080
> Project: Spark
>  Issue Type: Improvement
>  Components: Project Infra
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Minor
>
> Add benchmark tool guide in pull request template to help developers find it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37080) Add benchmark guide in pull request template

2021-10-20 Thread XiDuo You (Jira)
XiDuo You created SPARK-37080:
-

 Summary: Add benchmark guide in pull request template
 Key: SPARK-37080
 URL: https://issues.apache.org/jira/browse/SPARK-37080
 Project: Spark
  Issue Type: Improvement
  Components: Project Infra
Affects Versions: 3.3.0
Reporter: XiDuo You


Add benchmark tool guide in pull request template to help developers find it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37064) Fix outer join return the wrong max rows if other side is empty

2021-10-19 Thread XiDuo You (Jira)
XiDuo You created SPARK-37064:
-

 Summary: Fix outer join return the wrong max rows if other side is 
empty
 Key: SPARK-37064
 URL: https://issues.apache.org/jira/browse/SPARK-37064
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0, 3.3.0
Reporter: XiDuo You


Outer join should return at least num rows of it's outer side, i.e left outer 
join with its left side, right outer join with its right side, full outer join 
with its both side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37063) SQL Adaptive Query Execution QA: Phase 2

2021-10-19 Thread XiDuo You (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17430838#comment-17430838
 ] 

XiDuo You commented on SPARK-37063:
---

thank you [~dongjoon] for creating this umbrella !

> SQL Adaptive Query Execution QA: Phase 2
> 
>
> Key: SPARK-37063
> URL: https://issues.apache.org/jira/browse/SPARK-37063
> Project: Spark
>  Issue Type: Umbrella
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Dongjoon Hyun
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37043) Cancel all running job after AQE plan finished

2021-10-18 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37043:
--
Description: 
We see stage was still running after AQE plan finished. This is because the 
plan which contains a empty join has been converted to `LocalTableScanExec` 
during `AQEOptimizer`, but the other side of this join is still running 
(shuffle map stage).

 

It's no meaning to keep running the stage, It's better to cancel the running 
stage after AQE plan finished in case wasting the task resource.

  was:
We see stage was still running after AQE plan finished. This is because the 
plan which contains a empty join has been converted to `LocalTableScanExec` 
during `AQEOptimizer`, but the other side of this join is still running 
(shuffle map stage).

 

It's better to cancel the running stage after AQE plan finished in case wasting 
the task resource.


> Cancel all running job after AQE plan finished
> --
>
> Key: SPARK-37043
> URL: https://issues.apache.org/jira/browse/SPARK-37043
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> We see stage was still running after AQE plan finished. This is because the 
> plan which contains a empty join has been converted to `LocalTableScanExec` 
> during `AQEOptimizer`, but the other side of this join is still running 
> (shuffle map stage).
>  
> It's no meaning to keep running the stage, It's better to cancel the running 
> stage after AQE plan finished in case wasting the task resource.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37043) Cancel all running job after AQE plan finished

2021-10-18 Thread XiDuo You (Jira)
XiDuo You created SPARK-37043:
-

 Summary: Cancel all running job after AQE plan finished
 Key: SPARK-37043
 URL: https://issues.apache.org/jira/browse/SPARK-37043
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


We see stage was still running after AQE plan finished. This is because the 
plan which contains a empty join has been converted to `LocalTableScanExec` 
during `AQEOptimizer`, but the other side of this join is still running 
(shuffle map stage).

 

It's better to cancel the running stage after AQE plan finished in case wasting 
the task resource.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37037) Improve byte array sort by unify compareTo function of UTF8String and ByteArray

2021-10-17 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37037:
--
Description: 
BinaryType use `TypeUtils.compareBinary` to compare two byte array, however 
it's slow since it compares byte array using unsigned int comparison byte by 
bye.

We can compare them using `Platform.getLong` with unsigned long comparison if 
they have more than 8 bytes. And here is some histroy about this `TODO`  
[https://github.com/apache/spark/pull/6755/files#r32197461 
.|https://github.com/apache/spark/pull/6755/files#r32197461]

  was:
BinaryType use `TypeUtils.compareBinary` to compare two byte array, however 
it's slow since it compares byte array using unsigned int comparison byte by 
bye.

We can compare them using `Platform.getLong` with unsigned long comparison if 
they have more than 8 bytes. And here is some histroy about this 
[https://github.com/apache/spark/pull/6755/files#r32197461 
.|https://github.com/apache/spark/pull/6755/files#r32197461]


> Improve byte array sort by unify compareTo function of UTF8String and 
> ByteArray 
> 
>
> Key: SPARK-37037
> URL: https://issues.apache.org/jira/browse/SPARK-37037
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> BinaryType use `TypeUtils.compareBinary` to compare two byte array, however 
> it's slow since it compares byte array using unsigned int comparison byte by 
> bye.
> We can compare them using `Platform.getLong` with unsigned long comparison if 
> they have more than 8 bytes. And here is some histroy about this `TODO`  
> [https://github.com/apache/spark/pull/6755/files#r32197461 
> .|https://github.com/apache/spark/pull/6755/files#r32197461]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37037) Improve byte array sort by unify compareTo function of UTF8String and ByteArray

2021-10-17 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-37037:
--
Description: 
BinaryType use `TypeUtils.compareBinary` to compare two byte array, however 
it's slow since it compares byte array using unsigned int comparison byte by 
bye.

We can compare them using `Platform.getLong` with unsigned long comparison if 
they have more than 8 bytes. And here is some histroy about this 
[https://github.com/apache/spark/pull/6755/files#r32197461 
.|https://github.com/apache/spark/pull/6755/files#r32197461]

  was:
BinaryType use `TypeUtils.compareBinary` to compare two byte array, however 
it's slow since it compares byte array byte by bye.

We can compare them using `Platform.getLong` if they have more than 8 bytes. 
And here is some histroy about this 
[https://github.com/apache/spark/pull/6755/files#r32197461 
.|https://github.com/apache/spark/pull/6755/files#r32197461]


> Improve byte array sort by unify compareTo function of UTF8String and 
> ByteArray 
> 
>
> Key: SPARK-37037
> URL: https://issues.apache.org/jira/browse/SPARK-37037
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> BinaryType use `TypeUtils.compareBinary` to compare two byte array, however 
> it's slow since it compares byte array using unsigned int comparison byte by 
> bye.
> We can compare them using `Platform.getLong` with unsigned long comparison if 
> they have more than 8 bytes. And here is some histroy about this 
> [https://github.com/apache/spark/pull/6755/files#r32197461 
> .|https://github.com/apache/spark/pull/6755/files#r32197461]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37037) Improve byte array sort by unify compareTo function of UTF8String and ByteArray

2021-10-17 Thread XiDuo You (Jira)
XiDuo You created SPARK-37037:
-

 Summary: Improve byte array sort by unify compareTo function of 
UTF8String and ByteArray 
 Key: SPARK-37037
 URL: https://issues.apache.org/jira/browse/SPARK-37037
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


BinaryType use `TypeUtils.compareBinary` to compare two byte array, however 
it's slow since it compares byte array byte by bye.

We can compare them using `Platform.getLong` if they have more than 8 bytes. 
And here is some histroy about this 
[https://github.com/apache/spark/pull/6755/files#r32197461 
.|https://github.com/apache/spark/pull/6755/files#r32197461]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36993) Fix json_tupe throw NPE if fields exist no foldable null value

2021-10-12 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-36993:
--
Affects Version/s: 3.0.3

> Fix json_tupe throw NPE if fields exist no foldable null value
> --
>
> Key: SPARK-36993
> URL: https://issues.apache.org/jira/browse/SPARK-36993
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.3, 3.1.2, 3.2.0, 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> If json_tuple exists no foldable null field, Spark would throw NPE during 
> eval field.toString.
> e.g. the query will fail with:
> {code:java}
> SELECT json_tuple('{"a":"1"}', if(c1 < 1, null, 'a')) FROM ( SELECT rand() AS 
> c1 );
> {code}
> {code:java}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.JsonTuple.$anonfun$parseRow$2(jsonExpressions.scala:435)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:286)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> org.apache.spark.sql.catalyst.expressions.JsonTuple.parseRow(jsonExpressions.scala:435)
>   at 
> org.apache.spark.sql.catalyst.expressions.JsonTuple.$anonfun$eval$6(jsonExpressions.scala:413)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36993) Fix json_tupe throw NPE if fields exist no foldable null field

2021-10-12 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-36993:
--
Summary: Fix json_tupe throw NPE if fields exist no foldable null field  
(was: Fix json_tupe throw NPE if fields exist no foldable null column)

> Fix json_tupe throw NPE if fields exist no foldable null field
> --
>
> Key: SPARK-36993
> URL: https://issues.apache.org/jira/browse/SPARK-36993
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.2, 3.2.0, 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> If json_tuple exists no foldable null field, Spark would throw NPE during 
> eval field.toString.
> e.g. the query will fail with:
> {code:java}
> SELECT json_tuple('{"a":"1"}', if(c1 < 1, null, 'a')) FROM ( SELECT rand() AS 
> c1 );
> {code}
> {code:java}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.JsonTuple.$anonfun$parseRow$2(jsonExpressions.scala:435)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:286)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> org.apache.spark.sql.catalyst.expressions.JsonTuple.parseRow(jsonExpressions.scala:435)
>   at 
> org.apache.spark.sql.catalyst.expressions.JsonTuple.$anonfun$eval$6(jsonExpressions.scala:413)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36993) Fix json_tupe throw NPE if fields exist no foldable null value

2021-10-12 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-36993:
--
Summary: Fix json_tupe throw NPE if fields exist no foldable null value  
(was: Fix json_tupe throw NPE if fields exist no foldable null field)

> Fix json_tupe throw NPE if fields exist no foldable null value
> --
>
> Key: SPARK-36993
> URL: https://issues.apache.org/jira/browse/SPARK-36993
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.2, 3.2.0, 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> If json_tuple exists no foldable null field, Spark would throw NPE during 
> eval field.toString.
> e.g. the query will fail with:
> {code:java}
> SELECT json_tuple('{"a":"1"}', if(c1 < 1, null, 'a')) FROM ( SELECT rand() AS 
> c1 );
> {code}
> {code:java}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.JsonTuple.$anonfun$parseRow$2(jsonExpressions.scala:435)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:286)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> org.apache.spark.sql.catalyst.expressions.JsonTuple.parseRow(jsonExpressions.scala:435)
>   at 
> org.apache.spark.sql.catalyst.expressions.JsonTuple.$anonfun$eval$6(jsonExpressions.scala:413)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36993) Fix json_tupe throw NPE if fields exist no foldable null column

2021-10-12 Thread XiDuo You (Jira)
XiDuo You created SPARK-36993:
-

 Summary: Fix json_tupe throw NPE if fields exist no foldable null 
column
 Key: SPARK-36993
 URL: https://issues.apache.org/jira/browse/SPARK-36993
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.1.2, 3.2.0, 3.3.0
Reporter: XiDuo You


If json_tuple exists no foldable null field, Spark would throw NPE during eval 
field.toString.

e.g. the query `SELECT json_tuple('{"a":"1"}', if(c1 < 1, null, 'a')) FROM ( 
SELECT rand() AS c1 );` will fail with:

{code:java}
Caused by: java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.JsonTuple.$anonfun$parseRow$2(jsonExpressions.scala:435)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.spark.sql.catalyst.expressions.JsonTuple.parseRow(jsonExpressions.scala:435)
at 
org.apache.spark.sql.catalyst.expressions.JsonTuple.$anonfun$eval$6(jsonExpressions.scala:413)

{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36993) Fix json_tupe throw NPE if fields exist no foldable null column

2021-10-12 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-36993:
--
Description: 
If json_tuple exists no foldable null field, Spark would throw NPE during eval 
field.toString.

e.g. the query will fail with:

{code:java}
SELECT json_tuple('{"a":"1"}', if(c1 < 1, null, 'a')) FROM ( SELECT rand() AS 
c1 );
{code}

{code:java}
Caused by: java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.JsonTuple.$anonfun$parseRow$2(jsonExpressions.scala:435)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.spark.sql.catalyst.expressions.JsonTuple.parseRow(jsonExpressions.scala:435)
at 
org.apache.spark.sql.catalyst.expressions.JsonTuple.$anonfun$eval$6(jsonExpressions.scala:413)

{code}


  was:
If json_tuple exists no foldable null field, Spark would throw NPE during eval 
field.toString.

e.g. the query `SELECT json_tuple('{"a":"1"}', if(c1 < 1, null, 'a')) FROM ( 
SELECT rand() AS c1 );` will fail with:

{code:java}
Caused by: java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.JsonTuple.$anonfun$parseRow$2(jsonExpressions.scala:435)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.spark.sql.catalyst.expressions.JsonTuple.parseRow(jsonExpressions.scala:435)
at 
org.apache.spark.sql.catalyst.expressions.JsonTuple.$anonfun$eval$6(jsonExpressions.scala:413)

{code}



> Fix json_tupe throw NPE if fields exist no foldable null column
> ---
>
> Key: SPARK-36993
> URL: https://issues.apache.org/jira/browse/SPARK-36993
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.2, 3.2.0, 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> If json_tuple exists no foldable null field, Spark would throw NPE during 
> eval field.toString.
> e.g. the query will fail with:
> {code:java}
> SELECT json_tuple('{"a":"1"}', if(c1 < 1, null, 'a')) FROM ( SELECT rand() AS 
> c1 );
> {code}
> {code:java}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.JsonTuple.$anonfun$parseRow$2(jsonExpressions.scala:435)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:286)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> org.apache.spark.sql.catalyst.expressions.JsonTuple.parseRow(jsonExpressions.scala:435)
>   at 
> org.apache.spark.sql.catalyst.expressions.JsonTuple.$anonfun$eval$6(jsonExpressions.scala:413)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36992) Improve byte array sort perf by unify getPrefix function of UTF8String and ByteArray

2021-10-12 Thread XiDuo You (Jira)
XiDuo You created SPARK-36992:
-

 Summary: Improve byte array sort perf by unify getPrefix function 
of UTF8String and ByteArray
 Key: SPARK-36992
 URL: https://issues.apache.org/jira/browse/SPARK-36992
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


When execute sort operator, we first compare the prefix. However the getPrefix 
function of byte array is slow. We use first 8 bytes as the prefix, so at most 
we will call 8 times with `Platform.getByte` which is slower than call once 
with `Platform.getInt` or `Platform.getLong`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36979) Add RewriteLateralSubquery rule into nonExcludableRules

2021-10-11 Thread XiDuo You (Jira)
XiDuo You created SPARK-36979:
-

 Summary: Add RewriteLateralSubquery rule into nonExcludableRules
 Key: SPARK-36979
 URL: https://issues.apache.org/jira/browse/SPARK-36979
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


Lateral Join has no meaning without rule `RewriteLateralSubquery`. So now if we 
set 
`spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.RewriteLateralSubquery`,
 the lateral join query will fail with:
{code:java}
java.lang.AssertionError: assertion failed: No plan for LateralJoin 
lateral-subquery#218
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36424) Support eliminate limits in AQE Optimizer

2021-09-29 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-36424:
--
Parent: SPARK-33828
Issue Type: Sub-task  (was: Improvement)

> Support eliminate limits in AQE Optimizer
> -
>
> Key: SPARK-36424
> URL: https://issues.apache.org/jira/browse/SPARK-36424
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Priority: Major
> Fix For: 3.3.0
>
>
> In Ad-hoc scenario, we always add limit for the query if user have no special 
> limit value, but not all limit is nesessary.
> With the power of AQE, we can eliminate limits using running statistics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36823) Support broadcast nested loop join hint for equi-join

2021-09-21 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-36823:
--
Description: 
For the join if one side is small and other side is large, the shuffle overhead 
is also very big. Due to the 
bhj limitation, we can only broadcast right side for left join and left side 
for right join. So for the other case, we can try to use 
`BroadcastNestedLoopJoin` as the join strategy.


  was:
For the join if one side is small and other side is large, the shuffle overhead 
is also very big. Due to the limitation, we can only broadcast right side for 
left join and left side for right join. So for the other case, we can try to 
use `BroadcastNestedLoopJoin` as the join strategy.



> Support broadcast nested loop join hint for equi-join
> -
>
> Key: SPARK-36823
> URL: https://issues.apache.org/jira/browse/SPARK-36823
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> For the join if one side is small and other side is large, the shuffle 
> overhead is also very big. Due to the 
> bhj limitation, we can only broadcast right side for left join and left side 
> for right join. So for the other case, we can try to use 
> `BroadcastNestedLoopJoin` as the join strategy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36823) Support broadcast nested loop join hint for equi-join

2021-09-21 Thread XiDuo You (Jira)
XiDuo You created SPARK-36823:
-

 Summary: Support broadcast nested loop join hint for equi-join
 Key: SPARK-36823
 URL: https://issues.apache.org/jira/browse/SPARK-36823
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


For the join if one side is small and other side is large, the shuffle overhead 
is also very big. Due to the limitation, we can only broadcast right side for 
left join and left side for right join. So for the other case, we can try to 
use `BroadcastNestedLoopJoin` as the join strategy.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36822) BroadcastNestedLoopJoinExec should use all condition instead of non-equi condition

2021-09-21 Thread XiDuo You (Jira)
XiDuo You created SPARK-36822:
-

 Summary: BroadcastNestedLoopJoinExec should use all condition 
instead of non-equi condition
 Key: SPARK-36822
 URL: https://issues.apache.org/jira/browse/SPARK-36822
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: XiDuo You


At JoinSelection, with ExtractEquiJoinKeys, we use `nonEquiCond` as the join 
condition. It's wrong since there should exist some equi condition.

{code:java}
Seq(joins.BroadcastNestedLoopJoinExec(
  planLater(left), planLater(right), buildSide, joinType, nonEquiCond))
{code}

But it's should not be a bug, since we always use the smj as the default join 
strategy for ExtractEquiJoinKeys.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36424) Support eliminate limits in AQE Optimizer

2021-08-05 Thread XiDuo You (Jira)
XiDuo You created SPARK-36424:
-

 Summary: Support eliminate limits in AQE Optimizer
 Key: SPARK-36424
 URL: https://issues.apache.org/jira/browse/SPARK-36424
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: XiDuo You


In Ad-hoc scenario, we always add limit for the query if user have no special 
limit value, but not all limit is nesessary.

With the power of AQE, we can eliminate limits using running statistics.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36321) Do not fail application in kubernetes if name is too long

2021-07-27 Thread XiDuo You (Jira)
XiDuo You created SPARK-36321:
-

 Summary: Do not fail application in kubernetes if name is too long
 Key: SPARK-36321
 URL: https://issues.apache.org/jira/browse/SPARK-36321
 Project: Spark
  Issue Type: Bug
  Components: Kubernetes
Affects Versions: 3.3.0
Reporter: XiDuo You


If we have a long spark app name and start with k8s master, we will get the 
execption.

{code:java}
java.lang.IllegalArgumentException: 
'a-89fe2f7ae71c3570' in 
spark.kubernetes.executor.podNamePrefix is invalid. must conform 
https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#dns-label-names
 and the value length <= 47
at 
org.apache.spark.internal.config.TypedConfigBuilder.$anonfun$checkValue$1(ConfigBuilder.scala:108)
at 
org.apache.spark.internal.config.TypedConfigBuilder.$anonfun$transform$1(ConfigBuilder.scala:101)
at scala.Option.map(Option.scala:230)
at 
org.apache.spark.internal.config.OptionalConfigEntry.readFrom(ConfigEntry.scala:239)
at 
org.apache.spark.internal.config.OptionalConfigEntry.readFrom(ConfigEntry.scala:214)
at org.apache.spark.SparkConf.get(SparkConf.scala:261)
at 
org.apache.spark.deploy.k8s.KubernetesConf.get(KubernetesConf.scala:67)
at 
org.apache.spark.deploy.k8s.KubernetesExecutorConf.(KubernetesConf.scala:147)
at 
org.apache.spark.deploy.k8s.KubernetesConf$.createExecutorConf(KubernetesConf.scala:231)
at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$requestNewExecutors$2(ExecutorPodsAllocator.scala:367)
{code}

Use app name as the executor pod name is the Spark internal behavior and we 
should not make application failure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36221) Make sure CustomShuffleReaderExec has at least one partition

2021-07-19 Thread XiDuo You (Jira)
XiDuo You created SPARK-36221:
-

 Summary: Make sure CustomShuffleReaderExec has at least one 
partition
 Key: SPARK-36221
 URL: https://issues.apache.org/jira/browse/SPARK-36221
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


Since SPARK-32083, AQE coalesce always return at least one partition, it should 
be robust to add non-empty check in `CustomShuffleReaderExec`.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36174) Support explain final plan in AQE

2021-07-15 Thread XiDuo You (Jira)
XiDuo You created SPARK-36174:
-

 Summary: Support explain final plan in AQE
 Key: SPARK-36174
 URL: https://issues.apache.org/jira/browse/SPARK-36174
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: XiDuo You


The executed plan will be changed during running in AQE, however the current 
implementation of explain does not support this.

As the AQE is enabled by default, user may want to get the final plan using 
query, so it should make sense to add a new grammar to support it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-36085) Make broadcast query stage executionContext isolation from AQE

2021-07-12 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You resolved SPARK-36085.
---
Resolution: Won't Fix

> Make broadcast query stage executionContext isolation from AQE
> --
>
> Key: SPARK-36085
> URL: https://issues.apache.org/jira/browse/SPARK-36085
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.2, 3.2.0, 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> `AdaptiveSparkPlanExec` shares the execution context with 
> `BroadcastQueryStage`, but it has a small number thread (hard code 16). 
> Unfortunately, it's not enough in some complex queries and no thread is 
> available to cancel the broadcast stage.
>  
> In normal code path (without AQE), broadcast has it's own execution context 
> with a config `spark.sql.broadcastExchange.maxThreadThreshold`. It should 
> make sense to make broadcast query stage executionContext isolation from AQE 
> and respect the existed thread threshold config.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36085) Make broadcast query stage executionContext isolation from AQE

2021-07-12 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-36085:
--
Parent: (was: SPARK-33828)
Issue Type: Improvement  (was: Sub-task)

> Make broadcast query stage executionContext isolation from AQE
> --
>
> Key: SPARK-36085
> URL: https://issues.apache.org/jira/browse/SPARK-36085
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.2, 3.2.0, 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> `AdaptiveSparkPlanExec` shares the execution context with 
> `BroadcastQueryStage`, but it has a small number thread (hard code 16). 
> Unfortunately, it's not enough in some complex queries and no thread is 
> available to cancel the broadcast stage.
>  
> In normal code path (without AQE), broadcast has it's own execution context 
> with a config `spark.sql.broadcastExchange.maxThreadThreshold`. It should 
> make sense to make broadcast query stage executionContext isolation from AQE 
> and respect the existed thread threshold config.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36085) Make broadcast query stage executionContext isolation from AQE

2021-07-11 Thread XiDuo You (Jira)
XiDuo You created SPARK-36085:
-

 Summary: Make broadcast query stage executionContext isolation 
from AQE
 Key: SPARK-36085
 URL: https://issues.apache.org/jira/browse/SPARK-36085
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.1.2, 3.2.0, 3.3.0
Reporter: XiDuo You


`AdaptiveSparkPlanExec` shares the execution context with 
`BroadcastQueryStage`, but it has a small number thread (hard code 16). 
Unfortunately, it's not enough in some complex queries and no thread is 
available to cancel the broadcast stage.

 

In normal code path (without AQE), broadcast has it's own execution context 
with a config `spark.sql.broadcastExchange.maxThreadThreshold`. It should make 
sense to make broadcast query stage executionContext isolation from AQE and 
respect the existed thread threshold config.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36032) Use inputPlan instead of currentPhysicalPlan to initialize logical link

2021-07-08 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-36032:
--
Description: 
At {{initialPlan}} we may remove some Spark Plan with 
{{queryStagePreparationRules}}, if removed Spark Plan is top level node, then 
we will lose the linked logical node.

Since we support AQE side broadcast join config. It's more common that a join 
is SMJ at normal planner and changed to BHJ after AQE reOptimize. However, 
{{RemoveRedundantSorts}} is applied before reOptimize at {{initialPlan}}, then 
a local sort might be removed incorrectly if a join is SMJ at first but changed 
to BHJ during reOptimize.

  was:
Since we support AQE side broadcast join config. It's more common that a join 
is SMJ at normal planner and changed to BHJ after AQE reOptimize.

However, `RemoveRedundantSorts` is applied before reOptimize, then a local sort 
might be removed incorrectly if a join is SMJ at first but changed to BHJ 
during reOptimize.

 


> Use inputPlan instead of currentPhysicalPlan to initialize logical link
> ---
>
> Key: SPARK-36032
> URL: https://issues.apache.org/jira/browse/SPARK-36032
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> At {{initialPlan}} we may remove some Spark Plan with 
> {{queryStagePreparationRules}}, if removed Spark Plan is top level node, then 
> we will lose the linked logical node.
> Since we support AQE side broadcast join config. It's more common that a join 
> is SMJ at normal planner and changed to BHJ after AQE reOptimize. However, 
> {{RemoveRedundantSorts}} is applied before reOptimize at {{initialPlan}}, 
> then a local sort might be removed incorrectly if a join is SMJ at first but 
> changed to BHJ during reOptimize.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36032) Use inputPlan instead of currentPhysicalPlan to initialize logical link

2021-07-08 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-36032:
--
Summary: Use inputPlan instead of currentPhysicalPlan to initialize logical 
link  (was: RemoveRedundantSorts should be applied after reOptimize in AQE)

> Use inputPlan instead of currentPhysicalPlan to initialize logical link
> ---
>
> Key: SPARK-36032
> URL: https://issues.apache.org/jira/browse/SPARK-36032
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0
>Reporter: XiDuo You
>Priority: Major
>
> Since we support AQE side broadcast join config. It's more common that a join 
> is SMJ at normal planner and changed to BHJ after AQE reOptimize.
> However, `RemoveRedundantSorts` is applied before reOptimize, then a local 
> sort might be removed incorrectly if a join is SMJ at first but changed to 
> BHJ during reOptimize.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36032) RemoveRedundantSorts should be applied after reOptimize in AQE

2021-07-07 Thread XiDuo You (Jira)
XiDuo You created SPARK-36032:
-

 Summary: RemoveRedundantSorts should be applied after reOptimize 
in AQE
 Key: SPARK-36032
 URL: https://issues.apache.org/jira/browse/SPARK-36032
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.2.0, 3.3.0
Reporter: XiDuo You


Since we support AQE side broadcast join config. It's more common that a join 
is SMJ at normal planner and changed to BHJ after AQE reOptimize.

However, `RemoveRedundantSorts` is applied before reOptimize, then a local sort 
might be removed incorrectly if a join is SMJ at first but changed to BHJ 
during reOptimize.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36014) Use uuid as app id in kubernetes client mode

2021-07-05 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-36014:
--
Description: 
Currently, spark on kubernetes with client mode would use `"spark-application-" 
+ System.currentTimeMillis` as app id by default. It would cause app id 
conflict if submit several spark applications to kubernetes cluster in a short 
time.

Unfortunately, the event log use app id as the file name. With the conflict 
event log file, the exception was thrown.
{code:java}
Caused by: java.io.FileNotFoundException: File does not exist: 
xxx/spark-application-1624766876324.lz4.inprogress (inode 5984170846) Holder 
does not have any open files.
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2697)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2579)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:846)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:510)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:503)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:871)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:817)
{code}

  was:

Currently, spark on kubernetes with client mode would use `"spark-application-" 
+ System.currentTimeMillis` as app id by default. It would cause app id 
conflict if submit several spark applications to kubernetes cluster in a short 
time.

Unfortunately, the event log use app id as the file name. With the conflict app 
id, the exception was thrown.

{code:java}
Caused by: java.io.FileNotFoundException: File does not exist: 
xxx/spark-application-1624766876324.lz4.inprogress (inode 5984170846) Holder 
does not have any open files.
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2697)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2579)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:846)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:510)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:503)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:871)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:817)
{code}



> Use uuid as app id in kubernetes client mode
> 
>
> Key: SPARK-36014
> URL: https://issues.apache.org/jira/browse/SPARK-36014
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Priority: Major
>
> Currently, spark on kubernetes with client mode would use 
> `"spark-application-" + System.currentTimeMillis` as app id by default. It 
> would cause app id conflict if submit several spark applications to 
> kubernetes cluster in a short time.
> Unfortunately, the event log use app id as the file name. With the conflict 
> event log file, the exception was thrown.
> {code:java}
> Caused by: java.io.FileNotFoundException: File does not exist: 
> xxx/spark-application-1624766876324.lz4.inprogress (inode 5984170846) Holder 
> does not have any open files.
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2697)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAddi

[jira] [Created] (SPARK-36014) Use uuid as app id in kubernetes client mode

2021-07-05 Thread XiDuo You (Jira)
XiDuo You created SPARK-36014:
-

 Summary: Use uuid as app id in kubernetes client mode
 Key: SPARK-36014
 URL: https://issues.apache.org/jira/browse/SPARK-36014
 Project: Spark
  Issue Type: Bug
  Components: Kubernetes
Affects Versions: 3.2.0
Reporter: XiDuo You



Currently, spark on kubernetes with client mode would use `"spark-application-" 
+ System.currentTimeMillis` as app id by default. It would cause app id 
conflict if submit several spark applications to kubernetes cluster in a short 
time.

Unfortunately, the event log use app id as the file name. With the conflict app 
id, the exception was thrown.

{code:java}
Caused by: java.io.FileNotFoundException: File does not exist: 
xxx/spark-application-1624766876324.lz4.inprogress (inode 5984170846) Holder 
does not have any open files.
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2697)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2579)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:846)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:510)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:503)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:871)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:817)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35989) Do not remove REPARTITION_BY_NUM shuffle if AQE is enabled

2021-07-02 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-35989:
--
Description: The shuffle origin is `REPARTITION_BY_NUM` if user specify an 
exact partition number with repartition, then we should not do any change of 
the number. That said, the shuffle output partitioning number should be always 
same with user expected.

> Do not remove REPARTITION_BY_NUM shuffle if AQE is enabled
> --
>
> Key: SPARK-35989
> URL: https://issues.apache.org/jira/browse/SPARK-35989
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Priority: Major
>
> The shuffle origin is `REPARTITION_BY_NUM` if user specify an exact partition 
> number with repartition, then we should not do any change of the number. That 
> said, the shuffle output partitioning number should be always same with user 
> expected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35989) Do not remove REPARTITION_BY_NUM shuffle if AQE is enabled

2021-07-02 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-35989:
--
Environment: (was: The shuffle origin is `REPARTITION_BY_NUM` if user 
specify an exact partition number with repartition, then we should not do any 
change of the number. That said, the shuffle output partitioning number should 
be always same with user expected.)

> Do not remove REPARTITION_BY_NUM shuffle if AQE is enabled
> --
>
> Key: SPARK-35989
> URL: https://issues.apache.org/jira/browse/SPARK-35989
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35989) Do not remove REPARTITION_BY_NUM shuffle if AQE is enabled

2021-07-02 Thread XiDuo You (Jira)
XiDuo You created SPARK-35989:
-

 Summary: Do not remove REPARTITION_BY_NUM shuffle if AQE is enabled
 Key: SPARK-35989
 URL: https://issues.apache.org/jira/browse/SPARK-35989
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.2.0
 Environment: The shuffle origin is `REPARTITION_BY_NUM` if user 
specify an exact partition number with repartition, then we should not do any 
change of the number. That said, the shuffle output partitioning number should 
be always same with user expected.
Reporter: XiDuo You






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35961) Only use local shuffle reader when REBALANCE_PARTITIONS_BY_NONE without CustomShuffleReaderExec

2021-06-30 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-35961:
--
Summary: Only use local shuffle reader when REBALANCE_PARTITIONS_BY_NONE 
without CustomShuffleReaderExec  (was: Only use local shuffle reader for 
REBALANCE_PARTITIONS_BY_NONE without CustomShuffleReaderExec)

> Only use local shuffle reader when REBALANCE_PARTITIONS_BY_NONE without 
> CustomShuffleReaderExec
> ---
>
> Key: SPARK-35961
> URL: https://issues.apache.org/jira/browse/SPARK-35961
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Priority: Major
>
> After [SPARK-35725](https://issues.apache.org/jira/browse/SPARK-35725), we 
> might expand partition if that partition is skewed. So the partition number 
> check `bytesByPartitionId.length == partitionSpecs.size` would be wrong if 
> some partitions are coalesced and some partitions are splitted into smaller.
> Note that, it's unlikely happened in real world since it used RoundRobin.
> Otherhand, after 
> [SPARK-34899](https://issues.apache.org/jira/browse/SPARK-34899), we use 
> origin plan if can not coalesce partitions. So the assuming of that shuffle 
> stage has `CustomShuffleReaderExec` with no effect is always false in 
> `REBALANCE_PARTITIONS_BY_NONE` shuffle origin. That said, if no rule was 
> efficient, there would be no `CustomShuffleReaderExec`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35961) Only use local shuffle reader for REBALANCE_PARTITIONS_BY_NONE without CustomShuffleReaderExec

2021-06-30 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-35961:
--
Description: 
After [SPARK-35725](https://issues.apache.org/jira/browse/SPARK-35725), we 
might expand partition if that partition is skewed. So the partition number 
check `bytesByPartitionId.length == partitionSpecs.size` would be wrong if some 
partitions are coalesced and some partitions are splitted into smaller.
Note that, it's unlikely happened in real world since it used RoundRobin.

Otherhand, after 
[SPARK-34899](https://issues.apache.org/jira/browse/SPARK-34899), we use origin 
plan if can not coalesce partitions. So the assuming of that shuffle stage has 
`CustomShuffleReaderExec` with no effect is always false in 
`REBALANCE_PARTITIONS_BY_NONE` shuffle origin. That said, if no rule was 
efficient, there would be no `CustomShuffleReaderExec`.

  was:
After [SPARK-35725](https://issues.apache.org/jira/browse/SPARK-35725), we 
might expand partition if that partition is skewed. So the partition number 
check `bytesByPartitionId.length == partitionSpecs.size` would be wrong if some 
partitions are coalesced and some partitions 
are splitted into smaller (it's unlikely in real world since it used 
RoundRobin).

Otherhand, after 
[SPARK-34899](https://issues.apache.org/jira/browse/SPARK-34899), we use origin 
plan if can not coalesce partitions. So the assuming of that shuffle stage has 
`CustomShuffleReaderExec` with no effect is always false. That said, if no rule 
is efficient, there would be no `CustomShuffleReaderExec`.



> Only use local shuffle reader for REBALANCE_PARTITIONS_BY_NONE without 
> CustomShuffleReaderExec
> --
>
> Key: SPARK-35961
> URL: https://issues.apache.org/jira/browse/SPARK-35961
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Priority: Major
>
> After [SPARK-35725](https://issues.apache.org/jira/browse/SPARK-35725), we 
> might expand partition if that partition is skewed. So the partition number 
> check `bytesByPartitionId.length == partitionSpecs.size` would be wrong if 
> some partitions are coalesced and some partitions are splitted into smaller.
> Note that, it's unlikely happened in real world since it used RoundRobin.
> Otherhand, after 
> [SPARK-34899](https://issues.apache.org/jira/browse/SPARK-34899), we use 
> origin plan if can not coalesce partitions. So the assuming of that shuffle 
> stage has `CustomShuffleReaderExec` with no effect is always false in 
> `REBALANCE_PARTITIONS_BY_NONE` shuffle origin. That said, if no rule was 
> efficient, there would be no `CustomShuffleReaderExec`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35961) Only use local shuffle reader for REBALANCE_PARTITIONS_BY_NONE without CustomShuffleReaderExec

2021-06-30 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-35961:
--
Parent: SPARK-35793
Issue Type: Sub-task  (was: Improvement)

> Only use local shuffle reader for REBALANCE_PARTITIONS_BY_NONE without 
> CustomShuffleReaderExec
> --
>
> Key: SPARK-35961
> URL: https://issues.apache.org/jira/browse/SPARK-35961
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Priority: Major
>
> After [SPARK-35725](https://issues.apache.org/jira/browse/SPARK-35725), we 
> might expand partition if that partition is skewed. So the partition number 
> check `bytesByPartitionId.length == partitionSpecs.size` would be wrong if 
> some partitions are coalesced and some partitions 
> are splitted into smaller (it's unlikely in real world since it used 
> RoundRobin).
> Otherhand, after 
> [SPARK-34899](https://issues.apache.org/jira/browse/SPARK-34899), we use 
> origin plan if can not coalesce partitions. So the assuming of that shuffle 
> stage has `CustomShuffleReaderExec` with no effect is always false. That 
> said, if no rule is efficient, there would be no `CustomShuffleReaderExec`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35961) Only use local shuffle reader for REBALANCE_PARTITIONS_BY_NONE without CustomShuffleReaderExec

2021-06-30 Thread XiDuo You (Jira)
XiDuo You created SPARK-35961:
-

 Summary: Only use local shuffle reader for 
REBALANCE_PARTITIONS_BY_NONE without CustomShuffleReaderExec
 Key: SPARK-35961
 URL: https://issues.apache.org/jira/browse/SPARK-35961
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: XiDuo You


After [SPARK-35725](https://issues.apache.org/jira/browse/SPARK-35725), we 
might expand partition if that partition is skewed. So the partition number 
check `bytesByPartitionId.length == partitionSpecs.size` would be wrong if some 
partitions are coalesced and some partitions 
are splitted into smaller (it's unlikely in real world since it used 
RoundRobin).

Otherhand, after 
[SPARK-34899](https://issues.apache.org/jira/browse/SPARK-34899), we use origin 
plan if can not coalesce partitions. So the assuming of that shuffle stage has 
`CustomShuffleReaderExec` with no effect is always false. That said, if no rule 
is efficient, there would be no `CustomShuffleReaderExec`.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35923) Coalesce empty partition with mixed CoalescedPartitionSpec and PartialReducerPartitionSpec

2021-06-28 Thread XiDuo You (Jira)
XiDuo You created SPARK-35923:
-

 Summary: Coalesce empty partition with mixed 
CoalescedPartitionSpec and PartialReducerPartitionSpec
 Key: SPARK-35923
 URL: https://issues.apache.org/jira/browse/SPARK-35923
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: XiDuo You


Since [SPARK-35447](https://issues.apache.org/jira/browse/SPARK-35447), we 
apply `OptimizeSkewedJoin` before `CoalesceShufflePartitions`. However, There 
are something different with the order of these two rules.

Let's say if we have a skewed partitions: [0, 128MB, 0, 128MB, 0]:
 # coalesce partitions first then optimize skewed partitions:
 [64MB, 64MB, 64MB, 64MB]

 # optimize skew partition first then coalesce partitions:
 [0, 64MB, 64MB, 0, 64MB, 64MB, 0]

So we can do coalesce in ShufflePartitionsUtil.coalescePartitionsWithSkew with 
mixed CoalescedPartitionSpec and PartialReducerPartitionSpec if 
CoalescedPartitionSpec is empty.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35888) Add dataSize field in CoalescedPartitionSpec

2021-06-24 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-35888:
--
Description: 
Currently, all test suite about `CoalescedPartitionSpec` do not check the data 
size due to it doesn't contains data size field.

We can add data size in `CoalescedPartitionSpec` and then add test case for 
better coverage.

  was:
Currently, all test suite about `CoalescedPartitionSpec` do not check the data 
size due to it doesn't contains data size filed.

We can add data size in `CoalescedPartitionSpec` and then add test case for 
better coverage.


> Add dataSize field in CoalescedPartitionSpec
> 
>
> Key: SPARK-35888
> URL: https://issues.apache.org/jira/browse/SPARK-35888
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Priority: Major
>
> Currently, all test suite about `CoalescedPartitionSpec` do not check the 
> data size due to it doesn't contains data size field.
> We can add data size in `CoalescedPartitionSpec` and then add test case for 
> better coverage.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35888) Add dataSize field in CoalescedPartitionSpec

2021-06-24 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-35888:
--
Description: 
Currently, all test suite about `CoalescedPartitionSpec` do not check the data 
size due to it doesn't contains data size filed.

We can add data size in `CoalescedPartitionSpec` and then add test case for 
better coverage.

  was:
Currently, all test suite about `CoalescedPartitionSpec` do not check the data 
size. Check data size is needed  for better coverage.

And also we can reuse the data size in some other code (e.g. get 
partitionDataSizes in `CustomShuffleReaderExec`).



> Add dataSize field in CoalescedPartitionSpec
> 
>
> Key: SPARK-35888
> URL: https://issues.apache.org/jira/browse/SPARK-35888
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Priority: Major
>
> Currently, all test suite about `CoalescedPartitionSpec` do not check the 
> data size due to it doesn't contains data size filed.
> We can add data size in `CoalescedPartitionSpec` and then add test case for 
> better coverage.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35888) Add dataSize field in CoalescedPartitionSpec

2021-06-24 Thread XiDuo You (Jira)
XiDuo You created SPARK-35888:
-

 Summary: Add dataSize field in CoalescedPartitionSpec
 Key: SPARK-35888
 URL: https://issues.apache.org/jira/browse/SPARK-35888
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.2.0
Reporter: XiDuo You


Currently, all test suite about `CoalescedPartitionSpec` do not check the data 
size. Check data size is needed  for better coverage.

And also we can reuse the data size in some other code (e.g. get 
partitionDataSizes in `CustomShuffleReaderExec`).




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35853) Remark the shuffle origin to ENSURE_REQUIREMENTS as far as possible

2021-06-22 Thread XiDuo You (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

XiDuo You updated SPARK-35853:
--
Description: 
In some queries, we might repartition by some columns with a large partition 
number manually to make parallelism big enough. However if its output 
partitioning satisfies some other node (e.g. join/aggregate), this shuffle can 
not be optimized by AQE due to the shuffle origin.




  was:
In some queries, we might repartition by some column with a large partition 
number manually to make parallelism big enough. However if the output 
partitioning satisfies some other node (e.g. join/aggregate), this shuffle can 
not be optimized by AQE due to the shuffle origin. 




> Remark the shuffle origin to ENSURE_REQUIREMENTS as far as possible
> ---
>
> Key: SPARK-35853
> URL: https://issues.apache.org/jira/browse/SPARK-35853
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Priority: Major
>
> In some queries, we might repartition by some columns with a large partition 
> number manually to make parallelism big enough. However if its output 
> partitioning satisfies some other node (e.g. join/aggregate), this shuffle 
> can not be optimized by AQE due to the shuffle origin.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



<    1   2   3   4   5   >