This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 42790905668 [SPARK-40556][PS][SQL] AQE clean up resources after each 
stage and eagerly clean Pandas Index Cache
42790905668 is described below

commit 42790905668effc2c0c081bae7d081faa1e18424
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Mon Oct 17 19:22:51 2022 +0800

    [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly 
clean Pandas Index Cache
    
    ### What changes were proposed in this pull request?
    
    1. On the SQL side:
    - Make AQE explicitly invoke `stage.cleanupResources()` at driver after 
each stage finished. Existing AQE's `cleanupResources` has following 
limitations:
    -- method `cleanupResources` is only implemented in `SortExec`, and it is 
only called in `SortMergeJoinExec`; so a physical operator's `cleanupResources` 
can be called iff it is a descendant of `SortMergeJoinExec`;
    -- the above invocations in `SortMergeJoinScanner` happens in the executors 
side, so a physical operator's `cleanupResources` will never be invoked at 
driver;
    - Invoke `plan.cleanupResources()` in `QueryStageExec`, since itself is a 
leaf;
    2. On the Pandas-API-on-Spark side:
    - apply `persist` instead of `localCheckpoint `
    - add a new config `compute. default_index_cache` to control the storage 
level of temporary RDDs;
    - unpersist the cached RDD in `cleanupResources` in 
`AttachDistributedSequenceExec`
    
    ### Why are the changes needed?
    
    `distributed_sequence` is the default indexing in Pandas-API-on-Spark, it 
will `localCheckpoint` (also cache internally) a temporary RDD to avoid 
re-computation. For large-scale dataset, it is prone to fail due to unreliable 
checkpointing:
    
    ```
    Caused by: org.apache.spark.SparkException:  Checkpoint block rdd_448_38 
not found! Either the executor that originally checkpointed this partition is 
no longer alive, or the original RDD is unpersisted. If this problem persists, 
you may consider using `rdd.checkpoint()` instead, which is slower than local 
checkpointing but more fault-tolerant.
        at 
org.apache.spark.errors.SparkCoreErrors$.checkpointRDDBlockIdNotFoundError(SparkCoreErrors.scala:82)
        at
    ```
    
    we should use `persist` instead, and clean the cached RDDs ASAP.
    
    ### Does this PR introduce _any_ user-facing change?
    yes, new config `compute. default_index_cache `
    
    ### How was this patch tested?
    added UT
    
    Closes #38130 from zhengruifeng/ps_indexing_cleanup.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 .../source/user_guide/pandas_on_spark/options.rst  | 11 +++-
 python/pyspark/pandas/config.py                    | 37 ++++++++++++
 python/pyspark/pandas/tests/test_default_index.py  | 53 +++++++++++++++++
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |  2 +
 .../sql/execution/adaptive/QueryStageExec.scala    |  5 ++
 .../python/AttachDistributedSequenceExec.scala     | 67 +++++++++++++++++++---
 6 files changed, 167 insertions(+), 8 deletions(-)

diff --git a/python/docs/source/user_guide/pandas_on_spark/options.rst 
b/python/docs/source/user_guide/pandas_on_spark/options.rst
index 67b8f6841f5..cbad934c844 100644
--- a/python/docs/source/user_guide/pandas_on_spark/options.rst
+++ b/python/docs/source/user_guide/pandas_on_spark/options.rst
@@ -271,6 +271,14 @@ compute.ops_on_diff_frames      False                   
This determines whether
                                                         that method throws an 
exception.
 compute.default_index_type      'distributed-sequence'  This sets the default 
index type: sequence,
                                                         distributed and 
distributed-sequence.
+compute.default_index_cache     'MEMORY_AND_DISK_SER'   This sets the default 
storage level for temporary
+                                                        RDDs cached in 
distributed-sequence indexing: 'NONE',
+                                                        'DISK_ONLY', 
'DISK_ONLY_2', 'DISK_ONLY_3',
+                                                        'MEMORY_ONLY', 
'MEMORY_ONLY_2', 'MEMORY_ONLY_SER',
+                                                        'MEMORY_ONLY_SER_2', 
'MEMORY_AND_DISK',
+                                                        'MEMORY_AND_DISK_2', 
'MEMORY_AND_DISK_SER',
+                                                        
'MEMORY_AND_DISK_SER_2', 'OFF_HEAP',
+                                                        'LOCAL_CHECKPOINT'.
 compute.ordered_head            False                   'compute.ordered_head' 
sets whether or not to operate
                                                         head with natural 
ordering. pandas-on-Spark does not
                                                         guarantee the row 
ordering so `head` could return
@@ -288,7 +296,8 @@ compute.eager_check             True                    
'compute.eager_check' se
                                                         `Series.asof`, 
`Series.compare`,
                                                         
`FractionalExtensionOps.astype`,
                                                         
`IntegralExtensionOps.astype`,
-                                                        
`FractionalOps.astype`, `DecimalOps.astype`.
+                                                        
`FractionalOps.astype`, `DecimalOps.astype`, `skipna
+                                                        of statistical 
functions`.
 compute.isin_limit              80                      'compute.isin_limit' 
sets the limit for filtering by
                                                         'Column.isin(list)'. 
If the length of the ‘list’ is
                                                         above the limit, 
broadcast join is used instead for
diff --git a/python/pyspark/pandas/config.py b/python/pyspark/pandas/config.py
index dc42a7c813b..990ae547a12 100644
--- a/python/pyspark/pandas/config.py
+++ b/python/pyspark/pandas/config.py
@@ -183,6 +183,43 @@ _options: List[Option] = [
             "Index type should be one of 'sequence', 'distributed', 
'distributed-sequence'.",
         ),
     ),
+    Option(
+        key="compute.default_index_cache",
+        doc=(
+            "This sets the default storage level for temporary RDDs cached in "
+            "distributed-sequence indexing: 'NONE', 'DISK_ONLY', 
'DISK_ONLY_2', "
+            "'DISK_ONLY_3', 'MEMORY_ONLY', 'MEMORY_ONLY_2', 'MEMORY_ONLY_SER', 
"
+            "'MEMORY_ONLY_SER_2', 'MEMORY_AND_DISK', 'MEMORY_AND_DISK_2', "
+            "'MEMORY_AND_DISK_SER', 'MEMORY_AND_DISK_SER_2', 'OFF_HEAP', "
+            "'LOCAL_CHECKPOINT'."
+        ),
+        default="MEMORY_AND_DISK_SER",
+        types=str,
+        check_func=(
+            lambda v: v
+            in (
+                "NONE",
+                "DISK_ONLY",
+                "DISK_ONLY_2",
+                "DISK_ONLY_3",
+                "MEMORY_ONLY",
+                "MEMORY_ONLY_2",
+                "MEMORY_ONLY_SER",
+                "MEMORY_ONLY_SER_2",
+                "MEMORY_AND_DISK",
+                "MEMORY_AND_DISK_2",
+                "MEMORY_AND_DISK_SER",
+                "MEMORY_AND_DISK_SER_2",
+                "OFF_HEAP",
+                "LOCAL_CHECKPOINT",
+            ),
+            "Index type should be one of 'NONE', 'DISK_ONLY', 'DISK_ONLY_2', "
+            "'DISK_ONLY_3', 'MEMORY_ONLY', 'MEMORY_ONLY_2', 'MEMORY_ONLY_SER', 
"
+            "'MEMORY_ONLY_SER_2', 'MEMORY_AND_DISK', 'MEMORY_AND_DISK_2', "
+            "'MEMORY_AND_DISK_SER', 'MEMORY_AND_DISK_SER_2', 'OFF_HEAP', "
+            "'LOCAL_CHECKPOINT'.",
+        ),
+    ),
     Option(
         key="compute.ordered_head",
         doc=(
diff --git a/python/pyspark/pandas/tests/test_default_index.py 
b/python/pyspark/pandas/tests/test_default_index.py
index fe744d12797..dcb120aee40 100644
--- a/python/pyspark/pandas/tests/test_default_index.py
+++ b/python/pyspark/pandas/tests/test_default_index.py
@@ -17,6 +17,7 @@
 
 import pandas as pd
 
+from pyspark.sql import functions as F
 from pyspark import pandas as ps
 from pyspark.testing.pandasutils import PandasOnSparkTestCase
 
@@ -38,6 +39,58 @@ class DefaultIndexTest(PandasOnSparkTestCase):
             pdf = ps.DataFrame(sdf)._to_pandas()
             self.assertEqual(len(set(pdf.index)), len(pdf))
 
+    def test_index_distributed_sequence_cleanup(self):
+        with ps.option_context(
+            "compute.default_index_type", "distributed-sequence"
+        ), ps.option_context("compute.ops_on_diff_frames", True):
+
+            with ps.option_context("compute.default_index_cache", 
"LOCAL_CHECKPOINT"):
+                cached_rdd_ids = [rdd_id for rdd_id in 
self.spark._jsc.getPersistentRDDs()]
+
+                psdf1 = (
+                    self.spark.range(0, 100, 1, 10).withColumn("Key", 
F.col("id") % 33).pandas_api()
+                )
+
+                psdf2 = psdf1["Key"].reset_index()
+                psdf2["index"] = (psdf2.groupby(["Key"]).cumcount() == 
0).astype(int)
+                psdf2["index"] = psdf2["index"].cumsum()
+
+                psdf3 = ps.merge(psdf1, psdf2, how="inner", left_on=["Key"], 
right_on=["Key"])
+                _ = len(psdf3)
+
+                # newly cached rdd
+                self.assertTrue(
+                    any(
+                        rdd_id not in cached_rdd_ids
+                        for rdd_id in self.spark._jsc.getPersistentRDDs()
+                    )
+                )
+
+            for storage_level in ["NONE", "DISK_ONLY_2", 
"MEMORY_AND_DISK_SER"]:
+                with ps.option_context("compute.default_index_cache", 
storage_level):
+                    cached_rdd_ids = [rdd_id for rdd_id in 
self.spark._jsc.getPersistentRDDs()]
+
+                    psdf1 = (
+                        self.spark.range(0, 100, 1, 10)
+                        .withColumn("Key", F.col("id") % 33)
+                        .pandas_api()
+                    )
+
+                    psdf2 = psdf1["Key"].reset_index()
+                    psdf2["index"] = (psdf2.groupby(["Key"]).cumcount() == 
0).astype(int)
+                    psdf2["index"] = psdf2["index"].cumsum()
+
+                    psdf3 = ps.merge(psdf1, psdf2, how="inner", 
left_on=["Key"], right_on=["Key"])
+                    _ = len(psdf3)
+
+                    # no newly cached rdd
+                    self.assertTrue(
+                        all(
+                            rdd_id in cached_rdd_ids
+                            for rdd_id in self.spark._jsc.getPersistentRDDs()
+                        )
+                    )
+
 
 if __name__ == "__main__":
     import unittest
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 36d74fdd24d..828a129a876 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -265,6 +265,8 @@ case class AdaptiveSparkPlanExec(
                 } else {
                   events.offer(StageFailure(stage, res.failed.get))
                 }
+                // explicitly clean up the resources in this stage
+                stage.cleanupResources()
               }(AdaptiveSparkPlanExec.executionContext)
             } catch {
               case e: Throwable =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index c30ff1f514d..0aee6c21f86 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -151,6 +151,11 @@ abstract class QueryStageExec extends LeafExecNode {
     plan.generateTreeString(
       depth + 1, lastChildren :+ true, append, verbose, "", false, maxFields, 
printNodeId, indent)
   }
+
+  override protected[sql] def cleanupResources(): Unit = {
+    plan.cleanupResources()
+    super.cleanupResources()
+  }
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
index 5f722826fc7..a1df89a20cb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
@@ -23,6 +23,8 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.storage.StorageLevel
 
 /**
  * A physical plan that adds a new long column with `sequenceAttr` that
@@ -40,15 +42,55 @@ case class AttachDistributedSequenceExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
+  @transient private var cached: RDD[InternalRow] = _
+
   override protected def doExecute(): RDD[InternalRow] = {
-    val childRDD = child.execute().map(_.copy())
-    val checkpointed = if (childRDD.getNumPartitions > 1) {
-      // to avoid execute multiple jobs. zipWithIndex launches a Spark job.
-      childRDD.localCheckpoint()
-    } else {
-      childRDD
+    val childRDD = child.execute()
+    // before `compute.default_index_cache` is explicitly set via
+    // `ps.set_option`, `SQLConf.get` can not get its value (as well as its 
default value);
+    // after `ps.set_option`, `SQLConf.get` can get its value:
+    //
+    //    In [1]: import pyspark.pandas as ps
+    //    In [2]: ps.get_option("compute.default_index_cache")
+    //    Out[2]: 'MEMORY_AND_DISK_SER'
+    //    In [3]: spark.conf.get("pandas_on_Spark.compute.default_index_cache")
+    //    ...
+    //    Py4JJavaError: An error occurred while calling o40.get.
+    //      : java.util.NoSuchElementException: 
pandas_on_Spark.compute.distributed_sequence_...
+    //    at 
org.apache.spark.sql.errors.QueryExecutionErrors$.noSuchElementExceptionError...
+    //    at 
org.apache.spark.sql.internal.SQLConf.$anonfun$getConfString$3(SQLConf.scala:4766)
+    //    ...
+    //    In [4]: ps.set_option("compute.default_index_cache", "NONE")
+    //    In [5]: spark.conf.get("pandas_on_Spark.compute.default_index_cache")
+    //    Out[5]: '"NONE"'
+    //    In [6]: ps.set_option("compute.default_index_cache", "DISK_ONLY")
+    //    In [7]: spark.conf.get("pandas_on_Spark.compute.default_index_cache")
+    //    Out[7]: '"DISK_ONLY"'
+
+    // The string is double quoted because of JSON ser/deser for pandas API on 
Spark
+    val storageLevel = SQLConf.get.getConfString(
+      "pandas_on_Spark.compute.default_index_cache",
+      "MEMORY_AND_DISK_SER"
+    ).stripPrefix("\"").stripSuffix("\"")
+
+    val cachedRDD = storageLevel match {
+      // zipWithIndex launches a Spark job only if #partition > 1
+      case _ if childRDD.getNumPartitions <= 1 => childRDD
+
+      case "NONE" => childRDD
+
+      case "LOCAL_CHECKPOINT" =>
+        // localcheckpointing is unreliable so should not eagerly release it 
in 'cleanupResources'
+        childRDD.map(_.copy()).localCheckpoint()
+          .setName(s"Temporary RDD locally checkpointed in 
AttachDistributedSequenceExec($id)")
+
+      case _ =>
+        cached = 
childRDD.map(_.copy()).persist(StorageLevel.fromString(storageLevel))
+          .setName(s"Temporary RDD cached in 
AttachDistributedSequenceExec($id)")
+        cached
     }
-    checkpointed.zipWithIndex().mapPartitions { iter =>
+
+    cachedRDD.zipWithIndex().mapPartitions { iter =>
       val unsafeProj = UnsafeProjection.create(output, output)
       val joinedRow = new JoinedRow
       val unsafeRowWriter =
@@ -63,6 +105,17 @@ case class AttachDistributedSequenceExec(
     }
   }
 
+  override protected[sql] def cleanupResources(): Unit = {
+    try {
+      if (cached != null && cached.getStorageLevel != StorageLevel.NONE) {
+        logWarning(s"clean up cached RDD(${cached.id}) in 
AttachDistributedSequenceExec($id)")
+        cached.unpersist(blocking = false)
+      }
+    } finally {
+      super.cleanupResources()
+    }
+  }
+
   override protected def withNewChildInternal(newChild: SparkPlan): 
AttachDistributedSequenceExec =
     copy(child = newChild)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to