[jira] [Updated] (SPARK-48628) Add task peak on/off heap execution memory metrics
[ https://issues.apache.org/jira/browse/SPARK-48628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-48628: - Description: Currently there is no task on/off heap execution memory metrics. There is a [peakExecutionMemory|https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114] metrics, however, the semantic is a confusing: it only cover the execution memory used by shuffle/join/aggregate/sort, which is accumulated in specific operators. We can easily maintain the whole task-level peak memory in TaskMemoryManager, assuming *acquireExecutionMemory* is the only one narrow waist for acquiring execution memory. Also it's nice to cleanup/deprecate that poorly-named `peakExecutionMemory`. Creating two followup sub tickets: * https://issues.apache.org/jira/browse/SPARK-48788 :accumulate task metrics in stage, and display in Spark UI * https://issues.apache.org/jira/browse/SPARK-48789 : deprecate `peakExecutionMemory` once we have replacement for it. was: Currently there is no task on/off heap execution memory metrics. There is a [peakExecutionMemory|https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114] metrics, however, the semantic is a bit confusing: it only cover the execution memory used by shuffle/join/aggregate/sort, which is accumulated in specific operators. We can easily maintain the whole task-level peak memory in TaskMemoryManager, assuming *acquireExecutionMemory* is the only one narrow waist for acquiring execution memory. > Add task peak on/off heap execution memory metrics > -- > > Key: SPARK-48628 > URL: https://issues.apache.org/jira/browse/SPARK-48628 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 4.0.0 >Reporter: Ziqi Liu >Priority: Major > > Currently there is no task on/off heap execution memory metrics. There is a > [peakExecutionMemory|https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114] > metrics, however, the semantic is a confusing: it only cover the execution > memory used by shuffle/join/aggregate/sort, which is accumulated in specific > operators. > > We can easily maintain the whole task-level peak memory in TaskMemoryManager, > assuming *acquireExecutionMemory* is the only one narrow waist for acquiring > execution memory. > > Also it's nice to cleanup/deprecate that poorly-named `peakExecutionMemory`. > > Creating two followup sub tickets: > * https://issues.apache.org/jira/browse/SPARK-48788 :accumulate task metrics > in stage, and display in Spark UI > * https://issues.apache.org/jira/browse/SPARK-48789 : deprecate > `peakExecutionMemory` once we have replacement for it. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48788) Accumulate task-level execution memory in stage and display in Spark UI
Ziqi Liu created SPARK-48788: Summary: Accumulate task-level execution memory in stage and display in Spark UI Key: SPARK-48788 URL: https://issues.apache.org/jira/browse/SPARK-48788 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 4.0.0 Reporter: Ziqi Liu As a follow for https://issues.apache.org/jira/browse/SPARK-48628, after we have task-level onheap/offheap execution memory metrics, we should accumulate them in stage and make them available in Spark UI. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48628) Add task peak on/off heap execution memory metrics
Ziqi Liu created SPARK-48628: Summary: Add task peak on/off heap execution memory metrics Key: SPARK-48628 URL: https://issues.apache.org/jira/browse/SPARK-48628 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 4.0.0 Reporter: Ziqi Liu Currently there is no task on/off heap execution memory metrics. There is a [peakExecutionMemory|https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114] metrics, however, the semantic is a bit confusing: it only cover the execution memory used by shuffle/join/aggregate/sort, which is accumulated in specific operators. We can easily maintain the whole task-level peak memory in TaskMemoryManager, assuming *acquireExecutionMemory* is the only one narrow waist for acquiring execution memory. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48610) Remove ExplainUtils.processPlan synchronize
Ziqi Liu created SPARK-48610: Summary: Remove ExplainUtils.processPlan synchronize Key: SPARK-48610 URL: https://issues.apache.org/jira/browse/SPARK-48610 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 4.0.0 Reporter: Ziqi Liu [https://github.com/apache/spark/pull/45282] introduced synchronize to `ExplainUtils.processPlan` to avoid race condition when multiple queries refers to same cached plan. The granularity of lock is too large. We can try to fix the root cause of this concurrency issue by refactoring the usage of mutable OP_ID_TAG, which is not a good practice in terms of immutable nature of SparkPlan. Instead, we can use an auxiliary id map, with object identity as the key. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48468) Add LogicalQueryStage interface in catalyst
Ziqi Liu created SPARK-48468: Summary: Add LogicalQueryStage interface in catalyst Key: SPARK-48468 URL: https://issues.apache.org/jira/browse/SPARK-48468 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 4.0.0 Reporter: Ziqi Liu Add `LogicalQueryStage` interface in catalyst so that it's visible in logical rules -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48465) Avoid no-op empty relation propagation in AQE
Ziqi Liu created SPARK-48465: Summary: Avoid no-op empty relation propagation in AQE Key: SPARK-48465 URL: https://issues.apache.org/jira/browse/SPARK-48465 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 4.0.0 Reporter: Ziqi Liu We should avoid no-op empty relation propagation in AQE: if we convert an empty QueryStageExec to empty relation, it will further wrapped into a new query stage and execute -> produce empty result -> empty relation propagation again. This issue is currently not exposed because AQE will try to reuse shuffle. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string
[ https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-47177: - Description: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. *In short, the plan used to executed and the plan used to explain is not the same instance, thus causing the inconsistency.* I don't have a clear idea how yet * maybe we just a coarse granularity lock in explain? * make innerChildren a function: clone the initial plan, every time checked for whether the original AQE plan is finalized (making the final flag atomic first, of course), if no return the cloned initial plan, if it's finalized, clone the final plan and return that one. But still this won't be able to reflect the AQE plan in real time, in a concurrent situation, but at least we have initial version and final version. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.filter("key > 10") df.collect() {code} {code:java} >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(1) Filter (isnotnull(key#4L) AND (key#4L > 10)) +- TableCacheQueryStage 0 +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L > 10)] +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=24] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) +- == Initial Plan == Filter (isnotnull(key#4L) AND (key#4L > 10)) +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L > 10)] +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=24] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10){code} was: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. *In short, the plan used to executed and the plan used to explain is not the same instance, thus causing the inconsistency.* I don't have a clear idea how yet * maybe we just a coarse granularity lock in explain? * make innerChildren a function: clone the initial plan, every time checked for whether the original AQE plan is finalized (making the final flag atomic first, of course), if no return the cloned initial plan, if it's finalized, clone the final plan and return that one. But still this won't be able to reflect the AQE plan in real time, in a concurrent situation, but at least we have initial version and final version. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)])
[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string
[ https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-47177: - Description: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. *In short, the plan used to executed and the plan used to explain is not the same instance, thus causing the inconsistency.* I don't have a clear idea how yet * maybe we just a coarse granularity lock in explain? * make innerChildren a function: clone the initial plan, every time checked for whether the original AQE plan is finalized (making the final flag atomic first, of course), if no return the cloned initial plan, if it's finalized, clone the final plan and return that one. But still this won't be able to reflect the AQE plan in real time, in a concurrent situation, but at least we have initial version and final version. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) +- == Initial Plan == HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- Project [(key#27L % 10) AS key2#36L] +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) {code} was: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. *In short, the plan used to executed and the plan used to explain is not the same instance, thus causing the inconsistency.* I don't have a clear idea how yet, maybe we just a coarse granularity lock in explain? A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +-
[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string
[ https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-47177: - Description: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. *In short, the plan used to executed and the plan used to explain is not the same instance, thus causing the inconsistency.* I don't have a clear idea how yet, maybe we just a coarse granularity lock in explain? A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) +- == Initial Plan == HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- Project [(key#27L % 10) AS key2#36L] +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) {code} was: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. I don't have a clear idea how yet, maybe we can check whether the AQE plan is finalized(make the final flag atomic first, of course), if not we can return the cloned one, otherwise it's thread-safe to return the final one, since it's immutable. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33]
[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string
[ https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-47177: - Description: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. I don't have a clear idea how yet, maybe we can check whether the AQE plan is finalized(make the final flag atomic first, of course), if not we can return the cloned one, otherwise it's thread-safe to return the final one, since it's immutable. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) +- == Initial Plan == HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- Project [(key#27L % 10) AS key2#36L] +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) {code} was: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. I don't have a clear idea how yet, maybe we can check whether the AQE plan is finalized(make the final flag atomic first, of course), if not we can return the cloned one, otherwise it's thread-safe to return the final one, since it's immutable. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} Row(key2=7, count(key2)=10), Row(key2=3, count(key2)=10), Row(key2=1, count(key2)=10), Row(key2=8, count(key2)=10)] >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L],
[jira] [Created] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string
Ziqi Liu created SPARK-47177: Summary: Cached SQL plan do not display final AQE plan in explain string Key: SPARK-47177 URL: https://issues.apache.org/jira/browse/SPARK-47177 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.5.1, 3.5.0, 3.4.2, 4.0.0, 3.5.2 Reporter: Ziqi Liu AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. I don't have a clear idea how yet, maybe we can check whether the AQE plan is finalized(make the final flag atomic first, of course), if not we can return the cloned one, otherwise it's thread-safe to return the final one, since it's immutable. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} Row(key2=7, count(key2)=10), Row(key2=3, count(key2)=10), Row(key2=1, count(key2)=10), Row(key2=8, count(key2)=10)] >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) +- == Initial Plan == HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- Project [(key#27L % 10) AS key2#36L] +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-46995) Allow AQE coalesce final stage in SQL cached plan
[ https://issues.apache.org/jira/browse/SPARK-46995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-46995: - Component/s: SQL > Allow AQE coalesce final stage in SQL cached plan > - > > Key: SPARK-46995 > URL: https://issues.apache.org/jira/browse/SPARK-46995 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL >Affects Versions: 4.0.0 >Reporter: Ziqi Liu >Priority: Major > > [https://github.com/apache/spark/pull/43435] and > [https://github.com/apache/spark/pull/43760] are fixing a correctness issue > which will be triggered when AQE applied on cached query plan, specifically, > when AQE coalescing the final result stage of the cached plan. > > The current semantic of > {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} > ([source > code|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L403-L411]): > * when true, we enable AQE, but disable coalescing final stage > ({*}default{*}) > * when false, we disable AQE > > But let’s revisit the semantic of this config: actually for caller the only > thing that matters is whether we change the output partitioning of the cached > plan. And we should only try to apply AQE if possible. Thus we want to > modify the semantic of > {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} > * when true, we enable AQE and allow coalescing final: this might lead to > perf regression, because it introduce extra shuffle > * when false, we enable AQE, but disable coalescing final stage. *(this is > actually the `true` semantic of old behavior)* > Also, to keep the default behavior unchanged, we might want to flip the > default value of > {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} to `false` > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-46995) Allow AQE coalesce final stage in SQL cached plan
Ziqi Liu created SPARK-46995: Summary: Allow AQE coalesce final stage in SQL cached plan Key: SPARK-46995 URL: https://issues.apache.org/jira/browse/SPARK-46995 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 4.0.0 Reporter: Ziqi Liu [https://github.com/apache/spark/pull/43435] and [https://github.com/apache/spark/pull/43760] are fixing a correctness issue which will be triggered when AQE applied on cached query plan, specifically, when AQE coalescing the final result stage of the cached plan. The current semantic of {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} ([source code|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L403-L411]): * when true, we enable AQE, but disable coalescing final stage ({*}default{*}) * when false, we disable AQE But let’s revisit the semantic of this config: actually for caller the only thing that matters is whether we change the output partitioning of the cached plan. And we should only try to apply AQE if possible. Thus we want to modify the semantic of {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} * when true, we enable AQE and allow coalescing final: this might lead to perf regression, because it introduce extra shuffle * when false, we enable AQE, but disable coalescing final stage. *(this is actually the `true` semantic of old behavior)* Also, to keep the default behavior unchanged, we might want to flip the default value of {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} to `false` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44485) optimize generateTreeString code path
Ziqi Liu created SPARK-44485: Summary: optimize generateTreeString code path Key: SPARK-44485 URL: https://issues.apache.org/jira/browse/SPARK-44485 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.4.1 Reporter: Ziqi Liu In `TreeNode.generateTreeString`, we observed inefficiency in scala collection operations and virtual function call. This inefficiency become significant in large plan (we hit a example of more than 1000 nodes). So {*}it’s worth optimizing the super hot code path{*}. By rewriting into native Java code(not so sweet as scala syntax sugar though), we should be able to get rid of most of the overhead. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43300) Cascade failure in Guava cache due to fate-sharing
Ziqi Liu created SPARK-43300: Summary: Cascade failure in Guava cache due to fate-sharing Key: SPARK-43300 URL: https://issues.apache.org/jira/browse/SPARK-43300 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.4.0 Reporter: Ziqi Liu Guava cache is widely used in spark, however, it suffers from fate-sharing behavior: If there are multiple requests trying to access the same key in the {{cache}} at the same time when the key is not in the cache, Guava cache will block all requests and create the object only once. If the creation fails, all requests will fail immediately without retry. So we might see task failure due to irrelevant failure in other queries due to fate sharing. This fate sharing behavior might lead to unexpected results in some situation. We can wrap around Guava cache with a KeyLock to synchronize all requests with the same key, so they will run individually and fail as if they come one at a time. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-42980) SmallBroadcast
Ziqi Liu created SPARK-42980: Summary: SmallBroadcast Key: SPARK-42980 URL: https://issues.apache.org/jira/browse/SPARK-42980 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 3.4.0 Reporter: Ziqi Liu The current TorrentBroadcast implementation is originally designed for large data transmission where driver might become the bottleneck and memory might also be the concern. Therefore it's kind of heavy and comes with some fixed overhead: * torrent protocol: more round traffic * disk level persistency, BlockManager, extra overhead which makes it inefficient for some small data transmission. We can have a lightweight broadcast implementation, e.g, SmallBroadcast, which implement star-topology broadcast(which avoids the unnecessary round traffic), and maybe in-memory only. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-42980) lightweight Broadcast implementation
[ https://issues.apache.org/jira/browse/SPARK-42980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-42980: - Summary: lightweight Broadcast implementation (was: SmallBroadcast) > lightweight Broadcast implementation > > > Key: SPARK-42980 > URL: https://issues.apache.org/jira/browse/SPARK-42980 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Ziqi Liu >Priority: Major > > The current TorrentBroadcast implementation is originally designed for large > data transmission where driver might become the bottleneck and memory might > also be the concern. Therefore it's kind of heavy and comes with some fixed > overhead: > * torrent protocol: more round traffic > * disk level persistency, BlockManager, extra overhead > which makes it inefficient for some small data transmission. > We can have a lightweight broadcast implementation, e.g, SmallBroadcast, > which implement star-topology broadcast(which avoids the unnecessary round > traffic), and maybe in-memory only. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40622) Result of a single task in collect() must fit in 2GB
[ https://issues.apache.org/jira/browse/SPARK-40622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-40622: - Description: when collecting results, data from single partition/task is serialized through byte array or ByteBuffer(which is backed by byte array as well), therefore it's subject to java array max size limit(in terms of byte array, it's 2GB). Construct a single partition larger than 2GB and collect it can easily reproduce the issue {code:java} // create data of size ~3GB in single partition, which exceeds the byte array limit // random gen to make sure it's poorly compressed val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 100) as data") withSQLConf("spark.databricks.driver.localMaxResultSize" -> "4g") { withSQLConf("spark.sql.useChunkedBuffer" -> "true") { df.queryExecution.executedPlan.executeCollect() } } {code} will get a OOM error from [https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L125] Consider using ChunkedByteBuffer to replace byte array in order to bypassing this limit was: when collecting results, data from single partition/task is serialized through byte array or ByteBuffer(which is backed by byte array as well), therefore it's subject to java array max size limit(in terms of byte array, it's 2GB). Construct a single partition larger than 2GB and collect it can easily reproduce the issue {code:java} val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 100) as data") withSQLConf("spark.databricks.driver.localMaxResultSize" -> "4g") { withSQLConf("spark.sql.useChunkedBuffer" -> "true") { df.queryExecution.executedPlan.executeCollect() } } {code} will get a OOM error from [https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L125] Consider using ChunkedByteBuffer to replace byte array in order to bypassing this limit > Result of a single task in collect() must fit in 2GB > > > Key: SPARK-40622 > URL: https://issues.apache.org/jira/browse/SPARK-40622 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 3.3.0 >Reporter: Ziqi Liu >Priority: Major > > when collecting results, data from single partition/task is serialized > through byte array or ByteBuffer(which is backed by byte array as well), > therefore it's subject to java array max size limit(in terms of byte array, > it's 2GB). > > Construct a single partition larger than 2GB and collect it can easily > reproduce the issue > {code:java} > // create data of size ~3GB in single partition, which exceeds the byte array > limit > // random gen to make sure it's poorly compressed > val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 100) > as data") > withSQLConf("spark.databricks.driver.localMaxResultSize" -> "4g") { > withSQLConf("spark.sql.useChunkedBuffer" -> "true") { > df.queryExecution.executedPlan.executeCollect() > } > } {code} > will get a OOM error from > [https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L125] > > Consider using ChunkedByteBuffer to replace byte array in order to bypassing > this limit -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40622) Result of a single task in collect() must fit in 2GB
Ziqi Liu created SPARK-40622: Summary: Result of a single task in collect() must fit in 2GB Key: SPARK-40622 URL: https://issues.apache.org/jira/browse/SPARK-40622 Project: Spark Issue Type: Bug Components: Spark Core, SQL Affects Versions: 3.3.0 Reporter: Ziqi Liu when collecting results, data from single partition/task is serialized through byte array or ByteBuffer(which is backed by byte array as well), therefore it's subject to java array max size limit(in terms of byte array, it's 2GB). Construct a single partition larger than 2GB and collect it can easily reproduce the issue {code:java} val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 100) as data") withSQLConf("spark.databricks.driver.localMaxResultSize" -> "4g") { withSQLConf("spark.sql.useChunkedBuffer" -> "true") { df.queryExecution.executedPlan.executeCollect() } } {code} will get a OOM error from [https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L125] Consider using ChunkedByteBuffer to replace byte array in order to bypassing this limit -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40418) Increase default initialNumPartitions to 10
Ziqi Liu created SPARK-40418: Summary: Increase default initialNumPartitions to 10 Key: SPARK-40418 URL: https://issues.apache.org/jira/browse/SPARK-40418 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.4.0 Reporter: Ziqi Liu It's actually an follow-up from SPARK-40211 The previous default value for initialNumPartitions is 1, which is way too small. Change to 10 might be a reasonable value to achieve a middle-ground trade-off, and will be beneficial in most cases (unless partition num is too small, but in that case either initialNumPartitions and scaleUpFactor won't have significant effect) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40261) DirectTaskResult meta should not be counted into result size
[ https://issues.apache.org/jira/browse/SPARK-40261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-40261: - Summary: DirectTaskResult meta should not be counted into result size (was: TaskResult meta should not be counted into result size) > DirectTaskResult meta should not be counted into result size > > > Key: SPARK-40261 > URL: https://issues.apache.org/jira/browse/SPARK-40261 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Ziqi Liu >Priority: Major > > This issue exists for a long time (since > [https://github.com/liuzqt/spark/commit/c33e55008239f417764d589c1366371d18331686)] > when calculating whether driver fetching result exceed > `spark.driver.maxResultSize` limit, the whole serialized result task size is > taken into account, including task metadata overhead > size([accumUpdates|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L41]) > as well. However, the metadata should not be counted because they will be > discarded by the driver immediately after being processed. > This will lead to exception when running jobs with tons of task but actually > return small results. > Therefore we should only count > `[valueBytes|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L40]` > when calculating result size limit. > cc [~joshrosen] > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40261) TaskResult meta should not be counted into result size
[ https://issues.apache.org/jira/browse/SPARK-40261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-40261: - Description: This issue exists for a long time (since [https://github.com/liuzqt/spark/commit/c33e55008239f417764d589c1366371d18331686)] when calculating whether driver fetching result exceed `spark.driver.maxResultSize` limit, the whole serialized result task size is taken into account, including task metadata overhead size([accumUpdates|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L41]) as well. However, the metadata should not be counted because they will be discarded by the driver immediately after being processed. This will lead to exception when running jobs with tons of task but actually return small results. Therefore we should only count `[valueBytes|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L40]` when calculating result size limit. cc [~joshrosen] was: This issue exists for a long time (since [https://github.com/liuzqt/spark/commit/c33e55008239f417764d589c1366371d18331686)] when calculating whether driver fetching result exceed `spark.driver.maxResultSize` limit, the whole serialized result task size is taken into account, including task metadata([accumUpdates|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L41]) as well. However, the metadata should not be counted because they will be discarded by the driver immediately after being processed. This will lead to exception when running jobs with tons of task but actually return small results. Therefore we should only count `[valueBytes|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L40]` when calculating result size limit. > TaskResult meta should not be counted into result size > -- > > Key: SPARK-40261 > URL: https://issues.apache.org/jira/browse/SPARK-40261 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Ziqi Liu >Priority: Major > > This issue exists for a long time (since > [https://github.com/liuzqt/spark/commit/c33e55008239f417764d589c1366371d18331686)] > when calculating whether driver fetching result exceed > `spark.driver.maxResultSize` limit, the whole serialized result task size is > taken into account, including task metadata overhead > size([accumUpdates|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L41]) > as well. However, the metadata should not be counted because they will be > discarded by the driver immediately after being processed. > This will lead to exception when running jobs with tons of task but actually > return small results. > Therefore we should only count > `[valueBytes|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L40]` > when calculating result size limit. > cc [~joshrosen] > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40261) TaskResult meta should not be counted into result size
Ziqi Liu created SPARK-40261: Summary: TaskResult meta should not be counted into result size Key: SPARK-40261 URL: https://issues.apache.org/jira/browse/SPARK-40261 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.4.0 Reporter: Ziqi Liu This issue exists for a long time (since [https://github.com/liuzqt/spark/commit/c33e55008239f417764d589c1366371d18331686)] when calculating whether driver fetching result exceed `spark.driver.maxResultSize` limit, the whole serialized result task size is taken into account, including task metadata([accumUpdates|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L41]) as well. However, the metadata should not be counted because they will be discarded by the driver immediately after being processed. This will lead to exception when running jobs with tons of task but actually return small results. Therefore we should only count `[valueBytes|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L40]` when calculating result size limit. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40238) support scaleUpFactor and initialNumPartition in pyspark rdd API
Ziqi Liu created SPARK-40238: Summary: support scaleUpFactor and initialNumPartition in pyspark rdd API Key: SPARK-40238 URL: https://issues.apache.org/jira/browse/SPARK-40238 Project: Spark Issue Type: Story Components: PySpark Affects Versions: 3.4.0 Reporter: Ziqi Liu This is a followup on https://issues.apache.org/jira/browse/SPARK-40211 `scaleUpFactor` and `initialNumPartition` config are not supported yet in pyspark rdd take API (see [https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L2799)] basically it hardcoded `scaleUpFactor` as 1 and `initialNumPartition` as 4, therefore pyspark rdd take API is inconsistent with scala API. Anyone familiar with pyspark can help support this (referring to [scala implementation|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1448])? -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40221) Not able to format using scalafmt
[ https://issues.apache.org/jira/browse/SPARK-40221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17585191#comment-17585191 ] Ziqi Liu commented on SPARK-40221: -- [~hyukjin.kwon] I think running in master should always be good, since it only format git diff from master, so running in master actually didn't format anything if understand the [dev tools guidance|https://spark.apache.org/developer-tools.html] correctly... The issue happened in another branch, but very new branch(https://github.com/apache/spark/pull/37661), and I believe I didn't modify anything related to scalafmt It tried to format those files that changed since master then the error happened: {code:java} [INFO] Checking for files changed from master [INFO] Changed from master: /Users/ziqi.liu/code/spark/core/src/main/scala/org/apache/spark/internal/config/package.scala /Users/ziqi.liu/code/spark/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala /Users/ziqi.liu/code/spark/core/src/main/scala/org/apache/spark/rdd/RDD.scala /Users/ziqi.liu/code/spark/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala /Users/ziqi.liu/code/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala /Users/ziqi.liu/code/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala /Users/ziqi.liu/code/spark/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala [ERROR] org.scalafmt.dynamic.exceptions.ScalafmtException: missing setting 'version'. To fix this problem, add the following line to .scalafmt.conf: 'version=3.2.1'. {code} > Not able to format using scalafmt > - > > Key: SPARK-40221 > URL: https://issues.apache.org/jira/browse/SPARK-40221 > Project: Spark > Issue Type: Question > Components: Build >Affects Versions: 3.4.0 >Reporter: Ziqi Liu >Priority: Major > > I'm following the guidance in [https://spark.apache.org/developer-tools.html] > using > {code:java} > ./dev/scalafmt{code} > to format the code, but getting this error: > {code:java} > [ERROR] Failed to execute goal > org.antipathy:mvn-scalafmt_2.12:1.1.1640084764.9f463a9:format (default-cli) > on project spark-parent_2.12: Error formatting Scala files: missing setting > 'version'. To fix this problem, add the following line to .scalafmt.conf: > 'version=3.2.1'. -> [Help 1] > [ERROR] > [ERROR] To see the full stack trace of the errors, re-run Maven with the -e > switch. > [ERROR] Re-run Maven using the -X switch to enable full debug logging. > [ERROR] > [ERROR] For more information about the errors and possible solutions, please > read the following articles: > [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40221) Not able to format using scalafmt
[ https://issues.apache.org/jira/browse/SPARK-40221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-40221: - Description: I'm following the guidance in [https://spark.apache.org/developer-tools.html] using {code:java} ./dev/scalafmt{code} to format the code, but getting this error: {code:java} [ERROR] Failed to execute goal org.antipathy:mvn-scalafmt_2.12:1.1.1640084764.9f463a9:format (default-cli) on project spark-parent_2.12: Error formatting Scala files: missing setting 'version'. To fix this problem, add the following line to .scalafmt.conf: 'version=3.2.1'. -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException {code} was: I'm following the guidance in [https://spark.apache.org/developer-tools.html] using ./dev/scalafmt to format the code, but getting this error: {code:java} [ERROR] Failed to execute goal org.antipathy:mvn-scalafmt_2.12:1.1.1640084764.9f463a9:format (default-cli) on project spark-parent_2.12: Error formatting Scala files: missing setting 'version'. To fix this problem, add the following line to .scalafmt.conf: 'version=3.2.1'. -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException {code} > Not able to format using scalafmt > - > > Key: SPARK-40221 > URL: https://issues.apache.org/jira/browse/SPARK-40221 > Project: Spark > Issue Type: Question > Components: Build >Affects Versions: 3.4.0 >Reporter: Ziqi Liu >Priority: Major > > I'm following the guidance in [https://spark.apache.org/developer-tools.html] > using > {code:java} > ./dev/scalafmt{code} > to format the code, but getting this error: > {code:java} > [ERROR] Failed to execute goal > org.antipathy:mvn-scalafmt_2.12:1.1.1640084764.9f463a9:format (default-cli) > on project spark-parent_2.12: Error formatting Scala files: missing setting > 'version'. To fix this problem, add the following line to .scalafmt.conf: > 'version=3.2.1'. -> [Help 1] > [ERROR] > [ERROR] To see the full stack trace of the errors, re-run Maven with the -e > switch. > [ERROR] Re-run Maven using the -X switch to enable full debug logging. > [ERROR] > [ERROR] For more information about the errors and possible solutions, please > read the following articles: > [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40221) Not able to format using scalafmt
Ziqi Liu created SPARK-40221: Summary: Not able to format using scalafmt Key: SPARK-40221 URL: https://issues.apache.org/jira/browse/SPARK-40221 Project: Spark Issue Type: Question Components: Build Affects Versions: 3.4.0 Reporter: Ziqi Liu I'm following the guidance in [https://spark.apache.org/developer-tools.html] using ./dev/scalafmt to format the code, but getting this error: {code:java} [ERROR] Failed to execute goal org.antipathy:mvn-scalafmt_2.12:1.1.1640084764.9f463a9:format (default-cli) on project spark-parent_2.12: Error formatting Scala files: missing setting 'version'. To fix this problem, add the following line to .scalafmt.conf: 'version=3.2.1'. -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40211) Allow executeTake() / collectLimit's number of starting partitions to be customized
[ https://issues.apache.org/jira/browse/SPARK-40211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17584458#comment-17584458 ] Ziqi Liu commented on SPARK-40211: -- I'm actively working on this > Allow executeTake() / collectLimit's number of starting partitions to be > customized > --- > > Key: SPARK-40211 > URL: https://issues.apache.org/jira/browse/SPARK-40211 > Project: Spark > Issue Type: Story > Components: Spark Core, SQL >Affects Versions: 3.4.0 >Reporter: Ziqi Liu >Priority: Major > > Today, Spark’s executeTake() code allow for the limitScaleUpFactor to be > customized but does not allow for the initial number of partitions to be > customized: it’s currently hardcoded to {{{}1{}}}. > We should add a configuration so that the initial partition count can be > customized. By setting this new configuration to a high value we could > effectively mitigate the “run multiple jobs” overhead in {{take}} behavior. > We could also set it to higher-than-1-but-still-small values (like, say, > {{{}10{}}}) to achieve a middle-ground trade-off. > > Essentially, we need to make {{numPartsToTry = 1L}} > ([code|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L481]) > customizable. We should do this via a new SQL conf, similar to the > {{limitScaleUpFactor}} conf. > > Spark has several near-duplicate versions of this code ([see code > search|https://github.com/apache/spark/search?q=numPartsToTry+%3D+1]) in: > * SparkPlan > * RDD > * pyspark rdd > Also, in pyspark {{limitScaleUpFactor}} is not supported either. So for > now, I will focus on scala side first, leaving python side untouched and > meanwhile sync with pyspark members. Depending on the progress we can do them > all in one PR or make scala side change first and leave pyspark change as a > follow-up. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40211) Allow executeTake() / collectLimit's number of starting partitions to be customized
Ziqi Liu created SPARK-40211: Summary: Allow executeTake() / collectLimit's number of starting partitions to be customized Key: SPARK-40211 URL: https://issues.apache.org/jira/browse/SPARK-40211 Project: Spark Issue Type: Story Components: Spark Core, SQL Affects Versions: 3.4.0 Reporter: Ziqi Liu Today, Spark’s executeTake() code allow for the limitScaleUpFactor to be customized but does not allow for the initial number of partitions to be customized: it’s currently hardcoded to {{{}1{}}}. We should add a configuration so that the initial partition count can be customized. By setting this new configuration to a high value we could effectively mitigate the “run multiple jobs” overhead in {{take}} behavior. We could also set it to higher-than-1-but-still-small values (like, say, {{{}10{}}}) to achieve a middle-ground trade-off. Essentially, we need to make {{numPartsToTry = 1L}} ([code|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L481]) customizable. We should do this via a new SQL conf, similar to the {{limitScaleUpFactor}} conf. Spark has several near-duplicate versions of this code ([see code search|https://github.com/apache/spark/search?q=numPartsToTry+%3D+1]) in: * SparkPlan * RDD * pyspark rdd Also, in pyspark {{limitScaleUpFactor}} is not supported either. So for now, I will focus on scala side first, leaving python side untouched and meanwhile sync with pyspark members. Depending on the progress we can do them all in one PR or make scala side change first and leave pyspark change as a follow-up. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org