[jira] [Updated] (SPARK-48628) Add task peak on/off heap execution memory metrics

2024-07-02 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-48628:
-
Description: 
Currently there is no task on/off heap execution memory metrics. There is a 
[peakExecutionMemory|https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114]
  metrics, however, the semantic is a confusing: it only cover the execution 
memory used by shuffle/join/aggregate/sort, which is accumulated in specific 
operators.

 

We can easily maintain the whole task-level peak memory in TaskMemoryManager, 
assuming *acquireExecutionMemory* is the only one narrow waist for acquiring 
execution memory.

 

Also it's nice to cleanup/deprecate that poorly-named `peakExecutionMemory`.

 

Creating two followup sub tickets:
 * https://issues.apache.org/jira/browse/SPARK-48788 :accumulate task metrics 
in stage, and display in Spark UI
 * https://issues.apache.org/jira/browse/SPARK-48789 : deprecate 
`peakExecutionMemory` once we have replacement for it.

  was:
Currently there is no task on/off heap execution memory metrics. There is a 
[peakExecutionMemory|https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114]
  metrics, however, the semantic is a bit confusing: it only cover the 
execution memory used by shuffle/join/aggregate/sort, which is accumulated in 
specific operators.

 

We can easily maintain the whole task-level peak memory in TaskMemoryManager, 
assuming *acquireExecutionMemory* is the only one narrow waist for acquiring 
execution memory.


> Add task peak on/off heap execution memory metrics
> --
>
> Key: SPARK-48628
> URL: https://issues.apache.org/jira/browse/SPARK-48628
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 4.0.0
>Reporter: Ziqi Liu
>Priority: Major
>
> Currently there is no task on/off heap execution memory metrics. There is a 
> [peakExecutionMemory|https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114]
>   metrics, however, the semantic is a confusing: it only cover the execution 
> memory used by shuffle/join/aggregate/sort, which is accumulated in specific 
> operators.
>  
> We can easily maintain the whole task-level peak memory in TaskMemoryManager, 
> assuming *acquireExecutionMemory* is the only one narrow waist for acquiring 
> execution memory.
>  
> Also it's nice to cleanup/deprecate that poorly-named `peakExecutionMemory`.
>  
> Creating two followup sub tickets:
>  * https://issues.apache.org/jira/browse/SPARK-48788 :accumulate task metrics 
> in stage, and display in Spark UI
>  * https://issues.apache.org/jira/browse/SPARK-48789 : deprecate 
> `peakExecutionMemory` once we have replacement for it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48788) Accumulate task-level execution memory in stage and display in Spark UI

2024-07-02 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-48788:


 Summary: Accumulate task-level execution memory in stage and 
display in Spark UI
 Key: SPARK-48788
 URL: https://issues.apache.org/jira/browse/SPARK-48788
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 4.0.0
Reporter: Ziqi Liu


As a follow for https://issues.apache.org/jira/browse/SPARK-48628, after we 
have task-level onheap/offheap execution memory metrics, we should accumulate 
them in stage and make them available in Spark UI.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48628) Add task peak on/off heap execution memory metrics

2024-06-14 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-48628:


 Summary: Add task peak on/off heap execution memory metrics
 Key: SPARK-48628
 URL: https://issues.apache.org/jira/browse/SPARK-48628
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 4.0.0
Reporter: Ziqi Liu


Currently there is no task on/off heap execution memory metrics. There is a 
[peakExecutionMemory|https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114]
  metrics, however, the semantic is a bit confusing: it only cover the 
execution memory used by shuffle/join/aggregate/sort, which is accumulated in 
specific operators.

 

We can easily maintain the whole task-level peak memory in TaskMemoryManager, 
assuming *acquireExecutionMemory* is the only one narrow waist for acquiring 
execution memory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48610) Remove ExplainUtils.processPlan synchronize

2024-06-12 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-48610:


 Summary: Remove ExplainUtils.processPlan synchronize
 Key: SPARK-48610
 URL: https://issues.apache.org/jira/browse/SPARK-48610
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 4.0.0
Reporter: Ziqi Liu


[https://github.com/apache/spark/pull/45282] introduced synchronize to 
`ExplainUtils.processPlan`  to avoid race condition when multiple queries 
refers to same cached plan.

The granularity of lock is too large. We can try to fix the root cause of this 
concurrency issue by refactoring the usage of mutable OP_ID_TAG, which is not a 
good practice in terms of immutable nature of SparkPlan.  Instead, we can use 
an auxiliary id map, with object identity as the key.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48468) Add LogicalQueryStage interface in catalyst

2024-05-29 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-48468:


 Summary: Add LogicalQueryStage interface in catalyst
 Key: SPARK-48468
 URL: https://issues.apache.org/jira/browse/SPARK-48468
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 4.0.0
Reporter: Ziqi Liu


Add `LogicalQueryStage` interface in catalyst so that it's visible in logical 
rules



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48465) Avoid no-op empty relation propagation in AQE

2024-05-29 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-48465:


 Summary: Avoid no-op empty relation propagation in AQE
 Key: SPARK-48465
 URL: https://issues.apache.org/jira/browse/SPARK-48465
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 4.0.0
Reporter: Ziqi Liu


We should avoid no-op empty relation propagation in AQE: if we convert an empty 
QueryStageExec to empty relation, it will further wrapped into a new query 
stage and execute -> produce empty result -> empty relation propagation again. 
This issue is currently not exposed because AQE will try to reuse shuffle.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-02-27 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-47177:
-
Description: 
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan. 

*In short, the plan used to executed and the plan used to explain is not the 
same instance, thus causing the inconsistency.*

 

I don't have a clear idea how yet
 * maybe we just a coarse granularity lock in explain?
 * make innerChildren a function: clone the initial plan, every time checked 
for whether the original AQE plan is finalized (making the final flag atomic 
first, of course), if no return the cloned initial plan, if it's finalized, 
clone the final plan and return that one. But still this won't be able to 
reflect the AQE plan in real time, in a concurrent situation, but at least we 
have initial version and final version.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.filter("key > 10")
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Filter (isnotnull(key#4L) AND (key#4L > 10))
   +- TableCacheQueryStage 0
      +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), 
(key#4L > 10)]
            +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, 
memory, deserialized, 1 replicas)
                  +- AdaptiveSparkPlan isFinalPlan=false
                     +- HashAggregate(keys=[key#4L], functions=[count(key#4L)])
                        +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=24]
                           +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                              +- Project [(id#2L % 100) AS key#4L]
                                 +- Range (0, 1000, step=1, splits=10)
+- == Initial Plan ==
   Filter (isnotnull(key#4L) AND (key#4L > 10))
   +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L > 
10)]
         +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, 
memory, deserialized, 1 replicas)
               +- AdaptiveSparkPlan isFinalPlan=false
                  +- HashAggregate(keys=[key#4L], functions=[count(key#4L)])
                     +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=24]
                        +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                           +- Project [(id#2L % 100) AS key#4L]
                              +- Range (0, 1000, step=1, splits=10){code}

  was:
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan. 

*In short, the plan used to executed and the plan used to explain is not the 
same instance, thus causing the inconsistency.*

 

I don't have a clear idea how yet
 * maybe we just a coarse granularity lock in explain?
 * make innerChildren a function: clone the initial plan, every time checked 
for whether the original AQE plan is finalized (making the final flag atomic 
first, of course), if no return the cloned initial plan, if it's finalized, 
clone the final plan and return that one. But still this won't be able to 
reflect the AQE plan in real time, in a concurrent situation, but at least we 
have initial version and final version.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
             

[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-02-27 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-47177:
-
Description: 
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan. 

*In short, the plan used to executed and the plan used to explain is not the 
same instance, thus causing the inconsistency.*

 

I don't have a clear idea how yet
 * maybe we just a coarse granularity lock in explain?
 * make innerChildren a function: clone the initial plan, every time checked 
for whether the original AQE plan is finalized (making the final flag atomic 
first, of course), if no return the cloned initial plan, if it's finalized, 
clone the final plan and return that one. But still this won't be able to 
reflect the AQE plan in real time, in a concurrent situation, but at least we 
have initial version and final version.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                                       +- Exchange hashpartitioning(key#4L, 
200), ENSURE_REQUIREMENTS, [plan_id=33]
                                          +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                             +- Project [(id#2L % 100) AS 
key#4L]
                                                +- Range (0, 1000, step=1, 
splits=10)
+- == Initial Plan ==
   HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=30]
      +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)])
         +- Project [(key#27L % 10) AS key2#36L]
            +- InMemoryTableScan [key#27L]
                  +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- AdaptiveSparkPlan isFinalPlan=false
                           +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                              +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=33]
                                 +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                    +- Project [(id#2L % 100) AS key#4L]
                                       +- Range (0, 1000, step=1, splits=10) 
{code}

  was:
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan. 

*In short, the plan used to executed and the plan used to explain is not the 
same instance, thus causing the inconsistency.*

 

I don't have a clear idea how yet, maybe we just a coarse granularity lock in 
explain?

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- 

[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-02-27 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-47177:
-
Description: 
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan. 

*In short, the plan used to executed and the plan used to explain is not the 
same instance, thus causing the inconsistency.*

 

I don't have a clear idea how yet, maybe we just a coarse granularity lock in 
explain?

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                                       +- Exchange hashpartitioning(key#4L, 
200), ENSURE_REQUIREMENTS, [plan_id=33]
                                          +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                             +- Project [(id#2L % 100) AS 
key#4L]
                                                +- Range (0, 1000, step=1, 
splits=10)
+- == Initial Plan ==
   HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=30]
      +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)])
         +- Project [(key#27L % 10) AS key2#36L]
            +- InMemoryTableScan [key#27L]
                  +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- AdaptiveSparkPlan isFinalPlan=false
                           +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                              +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=33]
                                 +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                    +- Project [(id#2L % 100) AS key#4L]
                                       +- Range (0, 1000, step=1, splits=10) 
{code}

  was:
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan.  I don't have a clear idea how 
yet, maybe we can check whether the AQE plan is finalized(make the final flag 
atomic first, of course), if not we can return the cloned one, otherwise it's 
thread-safe to return the final one, since it's immutable.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                                       +- Exchange hashpartitioning(key#4L, 
200), ENSURE_REQUIREMENTS, [plan_id=33]
                 

[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-02-26 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-47177:
-
Description: 
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan.  I don't have a clear idea how 
yet, maybe we can check whether the AQE plan is finalized(make the final flag 
atomic first, of course), if not we can return the cloned one, otherwise it's 
thread-safe to return the final one, since it's immutable.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                                       +- Exchange hashpartitioning(key#4L, 
200), ENSURE_REQUIREMENTS, [plan_id=33]
                                          +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                             +- Project [(id#2L % 100) AS 
key#4L]
                                                +- Range (0, 1000, step=1, 
splits=10)
+- == Initial Plan ==
   HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=30]
      +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)])
         +- Project [(key#27L % 10) AS key2#36L]
            +- InMemoryTableScan [key#27L]
                  +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- AdaptiveSparkPlan isFinalPlan=false
                           +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                              +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=33]
                                 +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                    +- Project [(id#2L % 100) AS key#4L]
                                       +- Range (0, 1000, step=1, splits=10) 
{code}

  was:
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan.  I don't have a clear idea how 
yet, maybe we can check whether the AQE plan is finalized(make the final flag 
atomic first, of course), if not we can return the cloned one, otherwise it's 
thread-safe to return the final one, since it's immutable.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
Row(key2=7, count(key2)=10), Row(key2=3, count(key2)=10), Row(key2=1, 
count(key2)=10), Row(key2=8, count(key2)=10)]
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 

[jira] [Created] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-02-26 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-47177:


 Summary: Cached SQL plan do not display final AQE plan in explain 
string
 Key: SPARK-47177
 URL: https://issues.apache.org/jira/browse/SPARK-47177
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.5.1, 3.5.0, 3.4.2, 4.0.0, 3.5.2
Reporter: Ziqi Liu


AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan.  I don't have a clear idea how 
yet, maybe we can check whether the AQE plan is finalized(make the final flag 
atomic first, of course), if not we can return the cloned one, otherwise it's 
thread-safe to return the final one, since it's immutable.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
Row(key2=7, count(key2)=10), Row(key2=3, count(key2)=10), Row(key2=1, 
count(key2)=10), Row(key2=8, count(key2)=10)]
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                                       +- Exchange hashpartitioning(key#4L, 
200), ENSURE_REQUIREMENTS, [plan_id=33]
                                          +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                             +- Project [(id#2L % 100) AS 
key#4L]
                                                +- Range (0, 1000, step=1, 
splits=10)
+- == Initial Plan ==
   HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=30]
      +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)])
         +- Project [(key#27L % 10) AS key2#36L]
            +- InMemoryTableScan [key#27L]
                  +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- AdaptiveSparkPlan isFinalPlan=false
                           +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                              +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=33]
                                 +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                    +- Project [(id#2L % 100) AS key#4L]
                                       +- Range (0, 1000, step=1, splits=10) 
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-46995) Allow AQE coalesce final stage in SQL cached plan

2024-02-06 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-46995:
-
Component/s: SQL

> Allow AQE coalesce final stage in SQL cached plan
> -
>
> Key: SPARK-46995
> URL: https://issues.apache.org/jira/browse/SPARK-46995
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, SQL
>Affects Versions: 4.0.0
>Reporter: Ziqi Liu
>Priority: Major
>
> [https://github.com/apache/spark/pull/43435] and 
> [https://github.com/apache/spark/pull/43760] are fixing a correctness issue 
> which will be triggered when AQE applied on cached query plan, specifically, 
> when AQE coalescing the final result stage of the cached plan.
>  
> The current semantic of 
> {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}}
> ([source 
> code|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L403-L411]):
>  * when true, we enable AQE, but disable coalescing final stage 
> ({*}default{*})
>  * when false, we disable AQE
>  
> But let’s revisit the semantic of this config: actually for caller the only 
> thing that matters is whether we change the output partitioning of the cached 
> plan. And we should only try to apply AQE if possible.  Thus we want to 
> modify the semantic of 
> {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}}
>  * when true, we enable AQE and allow coalescing final: this might lead to 
> perf regression, because it introduce extra shuffle
>  * when false, we enable AQE, but disable coalescing final stage. *(this is 
> actually the `true` semantic of old behavior)*
> Also, to keep the default behavior unchanged, we might want to flip the 
> default value of 
> {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} to `false`
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46995) Allow AQE coalesce final stage in SQL cached plan

2024-02-06 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-46995:


 Summary: Allow AQE coalesce final stage in SQL cached plan
 Key: SPARK-46995
 URL: https://issues.apache.org/jira/browse/SPARK-46995
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 4.0.0
Reporter: Ziqi Liu


[https://github.com/apache/spark/pull/43435] and 
[https://github.com/apache/spark/pull/43760] are fixing a correctness issue 
which will be triggered when AQE applied on cached query plan, specifically, 
when AQE coalescing the final result stage of the cached plan.

 

The current semantic of 
{{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}}

([source 
code|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L403-L411]):
 * when true, we enable AQE, but disable coalescing final stage ({*}default{*})

 * when false, we disable AQE

 

But let’s revisit the semantic of this config: actually for caller the only 
thing that matters is whether we change the output partitioning of the cached 
plan. And we should only try to apply AQE if possible.  Thus we want to modify 
the semantic of {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}}
 * when true, we enable AQE and allow coalescing final: this might lead to perf 
regression, because it introduce extra shuffle

 * when false, we enable AQE, but disable coalescing final stage. *(this is 
actually the `true` semantic of old behavior)*

Also, to keep the default behavior unchanged, we might want to flip the default 
value of {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} to 
`false`

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44485) optimize generateTreeString code path

2023-07-19 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-44485:


 Summary: optimize generateTreeString code path
 Key: SPARK-44485
 URL: https://issues.apache.org/jira/browse/SPARK-44485
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.4.1
Reporter: Ziqi Liu


In `TreeNode.generateTreeString`, we observed inefficiency in scala collection 
operations and virtual function call.

This inefficiency become significant in large plan (we hit a example of more 
than 1000 nodes). So {*}it’s worth optimizing the super hot code path{*}. By 
rewriting into native Java code(not so sweet as scala syntax sugar though), we 
should be able to get rid of most of the overhead.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43300) Cascade failure in Guava cache due to fate-sharing

2023-04-26 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-43300:


 Summary: Cascade failure in Guava cache due to fate-sharing
 Key: SPARK-43300
 URL: https://issues.apache.org/jira/browse/SPARK-43300
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.4.0
Reporter: Ziqi Liu


Guava cache is widely used in spark, however, it suffers from fate-sharing 
behavior: If there are multiple requests trying to access the same key in the 
{{cache}} at the same time when the key is not in the cache, Guava cache will 
block all requests and create the object only once. If the creation fails, all 
requests will fail immediately without retry. So we might see task failure due 
to irrelevant failure in other queries due to fate sharing.

This fate sharing behavior might lead to unexpected results in some situation.

We can wrap around Guava cache with a KeyLock to synchronize all requests with 
the same key, so they will run individually and fail as if they come one at a 
time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-42980) SmallBroadcast

2023-03-30 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-42980:


 Summary: SmallBroadcast
 Key: SPARK-42980
 URL: https://issues.apache.org/jira/browse/SPARK-42980
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 3.4.0
Reporter: Ziqi Liu


The current TorrentBroadcast implementation is originally designed for large 
data transmission where driver might become the bottleneck and memory might 
also be the concern. Therefore it's kind of heavy and comes with some fixed 
overhead:
 * torrent protocol: more round traffic
 * disk level persistency, BlockManager, extra overhead

which makes it inefficient for some small data transmission.

We can have a lightweight broadcast implementation, e.g, SmallBroadcast, which 
implement star-topology broadcast(which avoids the unnecessary round traffic), 
and maybe in-memory only.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-42980) lightweight Broadcast implementation

2023-03-30 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-42980:
-
Summary: lightweight Broadcast implementation  (was: SmallBroadcast)

> lightweight Broadcast implementation
> 
>
> Key: SPARK-42980
> URL: https://issues.apache.org/jira/browse/SPARK-42980
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Ziqi Liu
>Priority: Major
>
> The current TorrentBroadcast implementation is originally designed for large 
> data transmission where driver might become the bottleneck and memory might 
> also be the concern. Therefore it's kind of heavy and comes with some fixed 
> overhead:
>  * torrent protocol: more round traffic
>  * disk level persistency, BlockManager, extra overhead
> which makes it inefficient for some small data transmission.
> We can have a lightweight broadcast implementation, e.g, SmallBroadcast, 
> which implement star-topology broadcast(which avoids the unnecessary round 
> traffic), and maybe in-memory only.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40622) Result of a single task in collect() must fit in 2GB

2022-09-30 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-40622:
-
Description: 
when collecting results, data from single partition/task is serialized through 
byte array or ByteBuffer(which is backed by byte array as well), therefore it's 
subject to java array max size limit(in terms of byte array, it's 2GB).

 

Construct a single partition larger than 2GB and collect it can easily 
reproduce the issue
{code:java}
// create data of size ~3GB in single partition, which exceeds the byte array 
limit
// random gen to make sure it's poorly compressed
val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 100) as 
data")

withSQLConf("spark.databricks.driver.localMaxResultSize" -> "4g") {
  withSQLConf("spark.sql.useChunkedBuffer" -> "true") {
df.queryExecution.executedPlan.executeCollect()
  }
} {code}
 will get a OOM error from 
[https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L125]

 

Consider using ChunkedByteBuffer to replace byte array in order to bypassing 
this limit

  was:
when collecting results, data from single partition/task is serialized through 
byte array or ByteBuffer(which is backed by byte array as well), therefore it's 
subject to java array max size limit(in terms of byte array, it's 2GB).

 

Construct a single partition larger than 2GB and collect it can easily 
reproduce the issue
{code:java}
val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 100) as 
data")

withSQLConf("spark.databricks.driver.localMaxResultSize" -> "4g") {
  withSQLConf("spark.sql.useChunkedBuffer" -> "true") {
df.queryExecution.executedPlan.executeCollect()
  }
} {code}
 will get a OOM error from 
[https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L125]

 

Consider using ChunkedByteBuffer to replace byte array in order to bypassing 
this limit


> Result of a single task in collect() must fit in 2GB
> 
>
> Key: SPARK-40622
> URL: https://issues.apache.org/jira/browse/SPARK-40622
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 3.3.0
>Reporter: Ziqi Liu
>Priority: Major
>
> when collecting results, data from single partition/task is serialized 
> through byte array or ByteBuffer(which is backed by byte array as well), 
> therefore it's subject to java array max size limit(in terms of byte array, 
> it's 2GB).
>  
> Construct a single partition larger than 2GB and collect it can easily 
> reproduce the issue
> {code:java}
> // create data of size ~3GB in single partition, which exceeds the byte array 
> limit
> // random gen to make sure it's poorly compressed
> val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 100) 
> as data")
> withSQLConf("spark.databricks.driver.localMaxResultSize" -> "4g") {
>   withSQLConf("spark.sql.useChunkedBuffer" -> "true") {
> df.queryExecution.executedPlan.executeCollect()
>   }
> } {code}
>  will get a OOM error from 
> [https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L125]
>  
> Consider using ChunkedByteBuffer to replace byte array in order to bypassing 
> this limit



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40622) Result of a single task in collect() must fit in 2GB

2022-09-30 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-40622:


 Summary: Result of a single task in collect() must fit in 2GB
 Key: SPARK-40622
 URL: https://issues.apache.org/jira/browse/SPARK-40622
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, SQL
Affects Versions: 3.3.0
Reporter: Ziqi Liu


when collecting results, data from single partition/task is serialized through 
byte array or ByteBuffer(which is backed by byte array as well), therefore it's 
subject to java array max size limit(in terms of byte array, it's 2GB).

 

Construct a single partition larger than 2GB and collect it can easily 
reproduce the issue
{code:java}
val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 100) as 
data")

withSQLConf("spark.databricks.driver.localMaxResultSize" -> "4g") {
  withSQLConf("spark.sql.useChunkedBuffer" -> "true") {
df.queryExecution.executedPlan.executeCollect()
  }
} {code}
 will get a OOM error from 
[https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L125]

 

Consider using ChunkedByteBuffer to replace byte array in order to bypassing 
this limit



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40418) Increase default initialNumPartitions to 10

2022-09-13 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-40418:


 Summary: Increase default initialNumPartitions to 10
 Key: SPARK-40418
 URL: https://issues.apache.org/jira/browse/SPARK-40418
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.4.0
Reporter: Ziqi Liu


It's actually an follow-up from SPARK-40211

The previous default value for initialNumPartitions is 1, which is way too 
small. Change to 10 might be a reasonable value to achieve a middle-ground 
trade-off, and will be beneficial in most cases (unless partition num is too 
small, but in that case either initialNumPartitions and scaleUpFactor won't 
have significant effect)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40261) DirectTaskResult meta should not be counted into result size

2022-08-29 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-40261:
-
Summary: DirectTaskResult meta should not be counted into result size  
(was: TaskResult meta should not be counted into result size)

> DirectTaskResult meta should not be counted into result size
> 
>
> Key: SPARK-40261
> URL: https://issues.apache.org/jira/browse/SPARK-40261
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Ziqi Liu
>Priority: Major
>
> This issue exists for a long time (since 
> [https://github.com/liuzqt/spark/commit/c33e55008239f417764d589c1366371d18331686)]
> when calculating whether driver fetching result exceed 
> `spark.driver.maxResultSize` limit, the whole serialized result task size is 
> taken into account, including task metadata overhead 
> size([accumUpdates|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L41])
>  as well. However, the metadata should not be counted because they will be 
> discarded by the driver immediately after being processed.
> This will lead to exception when running jobs with tons of task but actually 
> return small results.
> Therefore we should only count 
> `[valueBytes|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L40]`
>  when calculating result size limit.
> cc [~joshrosen] 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40261) TaskResult meta should not be counted into result size

2022-08-29 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-40261:
-
Description: 
This issue exists for a long time (since 
[https://github.com/liuzqt/spark/commit/c33e55008239f417764d589c1366371d18331686)]

when calculating whether driver fetching result exceed 
`spark.driver.maxResultSize` limit, the whole serialized result task size is 
taken into account, including task metadata overhead 
size([accumUpdates|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L41])
 as well. However, the metadata should not be counted because they will be 
discarded by the driver immediately after being processed.

This will lead to exception when running jobs with tons of task but actually 
return small results.

Therefore we should only count 
`[valueBytes|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L40]`
 when calculating result size limit.

cc [~joshrosen] 

 

 

  was:
This issue exists for a long time (since 
[https://github.com/liuzqt/spark/commit/c33e55008239f417764d589c1366371d18331686)]

when calculating whether driver fetching result exceed 
`spark.driver.maxResultSize` limit, the whole serialized result task size is 
taken into account, including task 
metadata([accumUpdates|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L41])
 as well. However, the metadata should not be counted because they will be 
discarded by the driver immediately after being processed.

This will lead to exception when running jobs with tons of task but actually 
return small results.

Therefore we should only count 
`[valueBytes|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L40]`
 when calculating result size limit.

 

 


> TaskResult meta should not be counted into result size
> --
>
> Key: SPARK-40261
> URL: https://issues.apache.org/jira/browse/SPARK-40261
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Ziqi Liu
>Priority: Major
>
> This issue exists for a long time (since 
> [https://github.com/liuzqt/spark/commit/c33e55008239f417764d589c1366371d18331686)]
> when calculating whether driver fetching result exceed 
> `spark.driver.maxResultSize` limit, the whole serialized result task size is 
> taken into account, including task metadata overhead 
> size([accumUpdates|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L41])
>  as well. However, the metadata should not be counted because they will be 
> discarded by the driver immediately after being processed.
> This will lead to exception when running jobs with tons of task but actually 
> return small results.
> Therefore we should only count 
> `[valueBytes|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L40]`
>  when calculating result size limit.
> cc [~joshrosen] 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40261) TaskResult meta should not be counted into result size

2022-08-29 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-40261:


 Summary: TaskResult meta should not be counted into result size
 Key: SPARK-40261
 URL: https://issues.apache.org/jira/browse/SPARK-40261
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.4.0
Reporter: Ziqi Liu


This issue exists for a long time (since 
[https://github.com/liuzqt/spark/commit/c33e55008239f417764d589c1366371d18331686)]

when calculating whether driver fetching result exceed 
`spark.driver.maxResultSize` limit, the whole serialized result task size is 
taken into account, including task 
metadata([accumUpdates|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L41])
 as well. However, the metadata should not be counted because they will be 
discarded by the driver immediately after being processed.

This will lead to exception when running jobs with tons of task but actually 
return small results.

Therefore we should only count 
`[valueBytes|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L40]`
 when calculating result size limit.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40238) support scaleUpFactor and initialNumPartition in pyspark rdd API

2022-08-26 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-40238:


 Summary: support scaleUpFactor and initialNumPartition in pyspark 
rdd API
 Key: SPARK-40238
 URL: https://issues.apache.org/jira/browse/SPARK-40238
 Project: Spark
  Issue Type: Story
  Components: PySpark
Affects Versions: 3.4.0
Reporter: Ziqi Liu


This is a followup on https://issues.apache.org/jira/browse/SPARK-40211

`scaleUpFactor` and `initialNumPartition` config are not supported yet in 
pyspark rdd take API

 

(see [https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L2799)]

basically it hardcoded `scaleUpFactor` as 1 and `initialNumPartition` as 4, 
therefore pyspark rdd take API is inconsistent with scala API.

 

Anyone familiar with pyspark can help support this (referring to [scala 
implementation|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1448])?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40221) Not able to format using scalafmt

2022-08-26 Thread Ziqi Liu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17585191#comment-17585191
 ] 

Ziqi Liu commented on SPARK-40221:
--

[~hyukjin.kwon]  I think running in master should always be good, since it only 
format git diff from master, so running in master actually didn't format 
anything if understand the [dev tools 
guidance|https://spark.apache.org/developer-tools.html] correctly...

 

The issue happened in another branch, but very new 
branch(https://github.com/apache/spark/pull/37661), and I believe I didn't 
modify anything related to scalafmt

It tried to format those files that changed since master then the error 
happened:

 
{code:java}
[INFO] Checking for files changed from master
[INFO] Changed from master:
/Users/ziqi.liu/code/spark/core/src/main/scala/org/apache/spark/internal/config/package.scala
/Users/ziqi.liu/code/spark/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
/Users/ziqi.liu/code/spark/core/src/main/scala/org/apache/spark/rdd/RDD.scala
/Users/ziqi.liu/code/spark/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
/Users/ziqi.liu/code/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
/Users/ziqi.liu/code/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
/Users/ziqi.liu/code/spark/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
[ERROR]
org.scalafmt.dynamic.exceptions.ScalafmtException: missing setting 'version'. 
To fix this problem, add the following line to .scalafmt.conf: 'version=3.2.1'. 
{code}
 

 

> Not able to format using scalafmt
> -
>
> Key: SPARK-40221
> URL: https://issues.apache.org/jira/browse/SPARK-40221
> Project: Spark
>  Issue Type: Question
>  Components: Build
>Affects Versions: 3.4.0
>Reporter: Ziqi Liu
>Priority: Major
>
> I'm following the guidance in [https://spark.apache.org/developer-tools.html] 
> using 
> {code:java}
> ./dev/scalafmt{code}
> to format the code, but getting this error:
> {code:java}
> [ERROR] Failed to execute goal 
> org.antipathy:mvn-scalafmt_2.12:1.1.1640084764.9f463a9:format (default-cli) 
> on project spark-parent_2.12: Error formatting Scala files: missing setting 
> 'version'. To fix this problem, add the following line to .scalafmt.conf: 
> 'version=3.2.1'. -> [Help 1]
> [ERROR]
> [ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
> switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions, please 
> read the following articles:
> [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40221) Not able to format using scalafmt

2022-08-25 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-40221:
-
Description: 
I'm following the guidance in [https://spark.apache.org/developer-tools.html] 
using 
{code:java}
./dev/scalafmt{code}

to format the code, but getting this error:
{code:java}
[ERROR] Failed to execute goal 
org.antipathy:mvn-scalafmt_2.12:1.1.1640084764.9f463a9:format (default-cli) on 
project spark-parent_2.12: Error formatting Scala files: missing setting 
'version'. To fix this problem, add the following line to .scalafmt.conf: 
'version=3.2.1'. -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please 
read the following articles:
[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException {code}
 

  was:
I'm following the guidance in [https://spark.apache.org/developer-tools.html] 
using 
./dev/scalafmt
to format the code, but getting this error:
{code:java}
[ERROR] Failed to execute goal 
org.antipathy:mvn-scalafmt_2.12:1.1.1640084764.9f463a9:format (default-cli) on 
project spark-parent_2.12: Error formatting Scala files: missing setting 
'version'. To fix this problem, add the following line to .scalafmt.conf: 
'version=3.2.1'. -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please 
read the following articles:
[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException {code}
 


> Not able to format using scalafmt
> -
>
> Key: SPARK-40221
> URL: https://issues.apache.org/jira/browse/SPARK-40221
> Project: Spark
>  Issue Type: Question
>  Components: Build
>Affects Versions: 3.4.0
>Reporter: Ziqi Liu
>Priority: Major
>
> I'm following the guidance in [https://spark.apache.org/developer-tools.html] 
> using 
> {code:java}
> ./dev/scalafmt{code}
> to format the code, but getting this error:
> {code:java}
> [ERROR] Failed to execute goal 
> org.antipathy:mvn-scalafmt_2.12:1.1.1640084764.9f463a9:format (default-cli) 
> on project spark-parent_2.12: Error formatting Scala files: missing setting 
> 'version'. To fix this problem, add the following line to .scalafmt.conf: 
> 'version=3.2.1'. -> [Help 1]
> [ERROR]
> [ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
> switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions, please 
> read the following articles:
> [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40221) Not able to format using scalafmt

2022-08-25 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-40221:


 Summary: Not able to format using scalafmt
 Key: SPARK-40221
 URL: https://issues.apache.org/jira/browse/SPARK-40221
 Project: Spark
  Issue Type: Question
  Components: Build
Affects Versions: 3.4.0
Reporter: Ziqi Liu


I'm following the guidance in [https://spark.apache.org/developer-tools.html] 
using 
./dev/scalafmt
to format the code, but getting this error:
{code:java}
[ERROR] Failed to execute goal 
org.antipathy:mvn-scalafmt_2.12:1.1.1640084764.9f463a9:format (default-cli) on 
project spark-parent_2.12: Error formatting Scala files: missing setting 
'version'. To fix this problem, add the following line to .scalafmt.conf: 
'version=3.2.1'. -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please 
read the following articles:
[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40211) Allow executeTake() / collectLimit's number of starting partitions to be customized

2022-08-24 Thread Ziqi Liu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17584458#comment-17584458
 ] 

Ziqi Liu commented on SPARK-40211:
--

I'm actively working on this

> Allow executeTake() / collectLimit's number of starting partitions to be 
> customized
> ---
>
> Key: SPARK-40211
> URL: https://issues.apache.org/jira/browse/SPARK-40211
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core, SQL
>Affects Versions: 3.4.0
>Reporter: Ziqi Liu
>Priority: Major
>
> Today, Spark’s executeTake() code allow for the limitScaleUpFactor to be 
> customized but does not allow for the initial number of partitions to be 
> customized: it’s currently hardcoded to {{{}1{}}}.
> We should add a configuration so that the initial partition count can be 
> customized. By setting this new configuration to a high value we could 
> effectively mitigate the “run multiple jobs” overhead in {{take}} behavior. 
> We could also set it to higher-than-1-but-still-small values (like, say, 
> {{{}10{}}}) to achieve a middle-ground trade-off.
>  
> Essentially, we need to make {{numPartsToTry = 1L}} 
> ([code|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L481])
>  customizable. We should do this via a new SQL conf, similar to the 
> {{limitScaleUpFactor}} conf.
>  
> Spark has several near-duplicate versions of this code ([see code 
> search|https://github.com/apache/spark/search?q=numPartsToTry+%3D+1]) in:
>  * SparkPlan
>  * RDD
>  * pyspark rdd
> Also, in pyspark  {{limitScaleUpFactor}}  is not supported either. So for 
> now, I will focus on scala side first, leaving python side untouched and 
> meanwhile sync with pyspark members. Depending on the progress we can do them 
> all in one PR or make scala side change first and leave pyspark change as a 
> follow-up.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40211) Allow executeTake() / collectLimit's number of starting partitions to be customized

2022-08-24 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-40211:


 Summary: Allow executeTake() / collectLimit's number of starting 
partitions to be customized
 Key: SPARK-40211
 URL: https://issues.apache.org/jira/browse/SPARK-40211
 Project: Spark
  Issue Type: Story
  Components: Spark Core, SQL
Affects Versions: 3.4.0
Reporter: Ziqi Liu


Today, Spark’s executeTake() code allow for the limitScaleUpFactor to be 
customized but does not allow for the initial number of partitions to be 
customized: it’s currently hardcoded to {{{}1{}}}.

We should add a configuration so that the initial partition count can be 
customized. By setting this new configuration to a high value we could 
effectively mitigate the “run multiple jobs” overhead in {{take}} behavior. We 
could also set it to higher-than-1-but-still-small values (like, say, 
{{{}10{}}}) to achieve a middle-ground trade-off.

 

Essentially, we need to make {{numPartsToTry = 1L}} 
([code|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L481])
 customizable. We should do this via a new SQL conf, similar to the 
{{limitScaleUpFactor}} conf.

 

Spark has several near-duplicate versions of this code ([see code 
search|https://github.com/apache/spark/search?q=numPartsToTry+%3D+1]) in:
 * SparkPlan
 * RDD
 * pyspark rdd

Also, in pyspark  {{limitScaleUpFactor}}  is not supported either. So for now, 
I will focus on scala side first, leaving python side untouched and meanwhile 
sync with pyspark members. Depending on the progress we can do them all in one 
PR or make scala side change first and leave pyspark change as a follow-up.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org