This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 42790905668 [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache 42790905668 is described below commit 42790905668effc2c0c081bae7d081faa1e18424 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Mon Oct 17 19:22:51 2022 +0800 [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache ### What changes were proposed in this pull request? 1. On the SQL side: - Make AQE explicitly invoke `stage.cleanupResources()` at driver after each stage finished. Existing AQE's `cleanupResources` has following limitations: -- method `cleanupResources` is only implemented in `SortExec`, and it is only called in `SortMergeJoinExec`; so a physical operator's `cleanupResources` can be called iff it is a descendant of `SortMergeJoinExec`; -- the above invocations in `SortMergeJoinScanner` happens in the executors side, so a physical operator's `cleanupResources` will never be invoked at driver; - Invoke `plan.cleanupResources()` in `QueryStageExec`, since itself is a leaf; 2. On the Pandas-API-on-Spark side: - apply `persist` instead of `localCheckpoint ` - add a new config `compute. default_index_cache` to control the storage level of temporary RDDs; - unpersist the cached RDD in `cleanupResources` in `AttachDistributedSequenceExec` ### Why are the changes needed? `distributed_sequence` is the default indexing in Pandas-API-on-Spark, it will `localCheckpoint` (also cache internally) a temporary RDD to avoid re-computation. For large-scale dataset, it is prone to fail due to unreliable checkpointing: ``` Caused by: org.apache.spark.SparkException: Checkpoint block rdd_448_38 not found! Either the executor that originally checkpointed this partition is no longer alive, or the original RDD is unpersisted. If this problem persists, you may consider using `rdd.checkpoint()` instead, which is slower than local checkpointing but more fault-tolerant. at org.apache.spark.errors.SparkCoreErrors$.checkpointRDDBlockIdNotFoundError(SparkCoreErrors.scala:82) at ``` we should use `persist` instead, and clean the cached RDDs ASAP. ### Does this PR introduce _any_ user-facing change? yes, new config `compute. default_index_cache ` ### How was this patch tested? added UT Closes #38130 from zhengruifeng/ps_indexing_cleanup. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../source/user_guide/pandas_on_spark/options.rst | 11 +++- python/pyspark/pandas/config.py | 37 ++++++++++++ python/pyspark/pandas/tests/test_default_index.py | 53 +++++++++++++++++ .../execution/adaptive/AdaptiveSparkPlanExec.scala | 2 + .../sql/execution/adaptive/QueryStageExec.scala | 5 ++ .../python/AttachDistributedSequenceExec.scala | 67 +++++++++++++++++++--- 6 files changed, 167 insertions(+), 8 deletions(-) diff --git a/python/docs/source/user_guide/pandas_on_spark/options.rst b/python/docs/source/user_guide/pandas_on_spark/options.rst index 67b8f6841f5..cbad934c844 100644 --- a/python/docs/source/user_guide/pandas_on_spark/options.rst +++ b/python/docs/source/user_guide/pandas_on_spark/options.rst @@ -271,6 +271,14 @@ compute.ops_on_diff_frames False This determines whether that method throws an exception. compute.default_index_type 'distributed-sequence' This sets the default index type: sequence, distributed and distributed-sequence. +compute.default_index_cache 'MEMORY_AND_DISK_SER' This sets the default storage level for temporary + RDDs cached in distributed-sequence indexing: 'NONE', + 'DISK_ONLY', 'DISK_ONLY_2', 'DISK_ONLY_3', + 'MEMORY_ONLY', 'MEMORY_ONLY_2', 'MEMORY_ONLY_SER', + 'MEMORY_ONLY_SER_2', 'MEMORY_AND_DISK', + 'MEMORY_AND_DISK_2', 'MEMORY_AND_DISK_SER', + 'MEMORY_AND_DISK_SER_2', 'OFF_HEAP', + 'LOCAL_CHECKPOINT'. compute.ordered_head False 'compute.ordered_head' sets whether or not to operate head with natural ordering. pandas-on-Spark does not guarantee the row ordering so `head` could return @@ -288,7 +296,8 @@ compute.eager_check True 'compute.eager_check' se `Series.asof`, `Series.compare`, `FractionalExtensionOps.astype`, `IntegralExtensionOps.astype`, - `FractionalOps.astype`, `DecimalOps.astype`. + `FractionalOps.astype`, `DecimalOps.astype`, `skipna + of statistical functions`. compute.isin_limit 80 'compute.isin_limit' sets the limit for filtering by 'Column.isin(list)'. If the length of the ‘list’ is above the limit, broadcast join is used instead for diff --git a/python/pyspark/pandas/config.py b/python/pyspark/pandas/config.py index dc42a7c813b..990ae547a12 100644 --- a/python/pyspark/pandas/config.py +++ b/python/pyspark/pandas/config.py @@ -183,6 +183,43 @@ _options: List[Option] = [ "Index type should be one of 'sequence', 'distributed', 'distributed-sequence'.", ), ), + Option( + key="compute.default_index_cache", + doc=( + "This sets the default storage level for temporary RDDs cached in " + "distributed-sequence indexing: 'NONE', 'DISK_ONLY', 'DISK_ONLY_2', " + "'DISK_ONLY_3', 'MEMORY_ONLY', 'MEMORY_ONLY_2', 'MEMORY_ONLY_SER', " + "'MEMORY_ONLY_SER_2', 'MEMORY_AND_DISK', 'MEMORY_AND_DISK_2', " + "'MEMORY_AND_DISK_SER', 'MEMORY_AND_DISK_SER_2', 'OFF_HEAP', " + "'LOCAL_CHECKPOINT'." + ), + default="MEMORY_AND_DISK_SER", + types=str, + check_func=( + lambda v: v + in ( + "NONE", + "DISK_ONLY", + "DISK_ONLY_2", + "DISK_ONLY_3", + "MEMORY_ONLY", + "MEMORY_ONLY_2", + "MEMORY_ONLY_SER", + "MEMORY_ONLY_SER_2", + "MEMORY_AND_DISK", + "MEMORY_AND_DISK_2", + "MEMORY_AND_DISK_SER", + "MEMORY_AND_DISK_SER_2", + "OFF_HEAP", + "LOCAL_CHECKPOINT", + ), + "Index type should be one of 'NONE', 'DISK_ONLY', 'DISK_ONLY_2', " + "'DISK_ONLY_3', 'MEMORY_ONLY', 'MEMORY_ONLY_2', 'MEMORY_ONLY_SER', " + "'MEMORY_ONLY_SER_2', 'MEMORY_AND_DISK', 'MEMORY_AND_DISK_2', " + "'MEMORY_AND_DISK_SER', 'MEMORY_AND_DISK_SER_2', 'OFF_HEAP', " + "'LOCAL_CHECKPOINT'.", + ), + ), Option( key="compute.ordered_head", doc=( diff --git a/python/pyspark/pandas/tests/test_default_index.py b/python/pyspark/pandas/tests/test_default_index.py index fe744d12797..dcb120aee40 100644 --- a/python/pyspark/pandas/tests/test_default_index.py +++ b/python/pyspark/pandas/tests/test_default_index.py @@ -17,6 +17,7 @@ import pandas as pd +from pyspark.sql import functions as F from pyspark import pandas as ps from pyspark.testing.pandasutils import PandasOnSparkTestCase @@ -38,6 +39,58 @@ class DefaultIndexTest(PandasOnSparkTestCase): pdf = ps.DataFrame(sdf)._to_pandas() self.assertEqual(len(set(pdf.index)), len(pdf)) + def test_index_distributed_sequence_cleanup(self): + with ps.option_context( + "compute.default_index_type", "distributed-sequence" + ), ps.option_context("compute.ops_on_diff_frames", True): + + with ps.option_context("compute.default_index_cache", "LOCAL_CHECKPOINT"): + cached_rdd_ids = [rdd_id for rdd_id in self.spark._jsc.getPersistentRDDs()] + + psdf1 = ( + self.spark.range(0, 100, 1, 10).withColumn("Key", F.col("id") % 33).pandas_api() + ) + + psdf2 = psdf1["Key"].reset_index() + psdf2["index"] = (psdf2.groupby(["Key"]).cumcount() == 0).astype(int) + psdf2["index"] = psdf2["index"].cumsum() + + psdf3 = ps.merge(psdf1, psdf2, how="inner", left_on=["Key"], right_on=["Key"]) + _ = len(psdf3) + + # newly cached rdd + self.assertTrue( + any( + rdd_id not in cached_rdd_ids + for rdd_id in self.spark._jsc.getPersistentRDDs() + ) + ) + + for storage_level in ["NONE", "DISK_ONLY_2", "MEMORY_AND_DISK_SER"]: + with ps.option_context("compute.default_index_cache", storage_level): + cached_rdd_ids = [rdd_id for rdd_id in self.spark._jsc.getPersistentRDDs()] + + psdf1 = ( + self.spark.range(0, 100, 1, 10) + .withColumn("Key", F.col("id") % 33) + .pandas_api() + ) + + psdf2 = psdf1["Key"].reset_index() + psdf2["index"] = (psdf2.groupby(["Key"]).cumcount() == 0).astype(int) + psdf2["index"] = psdf2["index"].cumsum() + + psdf3 = ps.merge(psdf1, psdf2, how="inner", left_on=["Key"], right_on=["Key"]) + _ = len(psdf3) + + # no newly cached rdd + self.assertTrue( + all( + rdd_id in cached_rdd_ids + for rdd_id in self.spark._jsc.getPersistentRDDs() + ) + ) + if __name__ == "__main__": import unittest diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 36d74fdd24d..828a129a876 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -265,6 +265,8 @@ case class AdaptiveSparkPlanExec( } else { events.offer(StageFailure(stage, res.failed.get)) } + // explicitly clean up the resources in this stage + stage.cleanupResources() }(AdaptiveSparkPlanExec.executionContext) } catch { case e: Throwable => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index c30ff1f514d..0aee6c21f86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -151,6 +151,11 @@ abstract class QueryStageExec extends LeafExecNode { plan.generateTreeString( depth + 1, lastChildren :+ true, append, verbose, "", false, maxFields, printNodeId, indent) } + + override protected[sql] def cleanupResources(): Unit = { + plan.cleanupResources() + super.cleanupResources() + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala index 5f722826fc7..a1df89a20cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala @@ -23,6 +23,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.storage.StorageLevel /** * A physical plan that adds a new long column with `sequenceAttr` that @@ -40,15 +42,55 @@ case class AttachDistributedSequenceExec( override def outputPartitioning: Partitioning = child.outputPartitioning + @transient private var cached: RDD[InternalRow] = _ + override protected def doExecute(): RDD[InternalRow] = { - val childRDD = child.execute().map(_.copy()) - val checkpointed = if (childRDD.getNumPartitions > 1) { - // to avoid execute multiple jobs. zipWithIndex launches a Spark job. - childRDD.localCheckpoint() - } else { - childRDD + val childRDD = child.execute() + // before `compute.default_index_cache` is explicitly set via + // `ps.set_option`, `SQLConf.get` can not get its value (as well as its default value); + // after `ps.set_option`, `SQLConf.get` can get its value: + // + // In [1]: import pyspark.pandas as ps + // In [2]: ps.get_option("compute.default_index_cache") + // Out[2]: 'MEMORY_AND_DISK_SER' + // In [3]: spark.conf.get("pandas_on_Spark.compute.default_index_cache") + // ... + // Py4JJavaError: An error occurred while calling o40.get. + // : java.util.NoSuchElementException: pandas_on_Spark.compute.distributed_sequence_... + // at org.apache.spark.sql.errors.QueryExecutionErrors$.noSuchElementExceptionError... + // at org.apache.spark.sql.internal.SQLConf.$anonfun$getConfString$3(SQLConf.scala:4766) + // ... + // In [4]: ps.set_option("compute.default_index_cache", "NONE") + // In [5]: spark.conf.get("pandas_on_Spark.compute.default_index_cache") + // Out[5]: '"NONE"' + // In [6]: ps.set_option("compute.default_index_cache", "DISK_ONLY") + // In [7]: spark.conf.get("pandas_on_Spark.compute.default_index_cache") + // Out[7]: '"DISK_ONLY"' + + // The string is double quoted because of JSON ser/deser for pandas API on Spark + val storageLevel = SQLConf.get.getConfString( + "pandas_on_Spark.compute.default_index_cache", + "MEMORY_AND_DISK_SER" + ).stripPrefix("\"").stripSuffix("\"") + + val cachedRDD = storageLevel match { + // zipWithIndex launches a Spark job only if #partition > 1 + case _ if childRDD.getNumPartitions <= 1 => childRDD + + case "NONE" => childRDD + + case "LOCAL_CHECKPOINT" => + // localcheckpointing is unreliable so should not eagerly release it in 'cleanupResources' + childRDD.map(_.copy()).localCheckpoint() + .setName(s"Temporary RDD locally checkpointed in AttachDistributedSequenceExec($id)") + + case _ => + cached = childRDD.map(_.copy()).persist(StorageLevel.fromString(storageLevel)) + .setName(s"Temporary RDD cached in AttachDistributedSequenceExec($id)") + cached } - checkpointed.zipWithIndex().mapPartitions { iter => + + cachedRDD.zipWithIndex().mapPartitions { iter => val unsafeProj = UnsafeProjection.create(output, output) val joinedRow = new JoinedRow val unsafeRowWriter = @@ -63,6 +105,17 @@ case class AttachDistributedSequenceExec( } } + override protected[sql] def cleanupResources(): Unit = { + try { + if (cached != null && cached.getStorageLevel != StorageLevel.NONE) { + logWarning(s"clean up cached RDD(${cached.id}) in AttachDistributedSequenceExec($id)") + cached.unpersist(blocking = false) + } + } finally { + super.cleanupResources() + } + } + override protected def withNewChildInternal(newChild: SparkPlan): AttachDistributedSequenceExec = copy(child = newChild) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org