This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9b43a9f3ea5 [SPARK-44361][SQL] Use PartitionEvaluator API in MapInBatchExec 9b43a9f3ea5 is described below commit 9b43a9f3ea551a594835a4742f7b2d1fdb1cf518 Author: Vinod KC <vinod.kc...@gmail.com> AuthorDate: Wed Jul 19 12:02:52 2023 +0800 [SPARK-44361][SQL] Use PartitionEvaluator API in MapInBatchExec ### What changes were proposed in this pull request? SQL operator `MapInBatchExec` is updated to use the `PartitionEvaluator` API to do execution. Added a new method `mapPartitionsWithEvaluator` in `RDDBarrier`. ### Why are the changes needed? To avoid the use of lambda during distributed execution. Ref: SPARK-43061 for more details. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test cases. Once all SQL operators are refactored, will enable `spark.sql.execution.usePartitionEvaluator` by default, so all tests cover this code path. Closes #42024 from vinodkc/br_SPARK-44361. Authored-by: Vinod KC <vinod.kc...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../scala/org/apache/spark/rdd/RDDBarrier.scala | 16 +++- .../python/MapInBatchEvaluatorFactory.scala | 92 ++++++++++++++++++++++ .../sql/execution/python/MapInBatchExec.scala | 80 ++++++++----------- 3 files changed, 137 insertions(+), 51 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala index b70ea0073c9..13ce8f1e1b5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala @@ -19,8 +19,8 @@ package org.apache.spark.rdd import scala.reflect.ClassTag -import org.apache.spark.TaskContext -import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.{PartitionEvaluatorFactory, TaskContext} +import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} /** * :: Experimental :: @@ -76,5 +76,17 @@ class RDDBarrier[T: ClassTag] private[spark] (rdd: RDD[T]) { ) } + /** + * Return a new RDD by applying an evaluator to each partition of the wrapped RDD. The given + * evaluator factory will be serialized and sent to executors, and each task will create an + * evaluator with the factory, and use the evaluator to transform the data of the input + * partition. + */ + @DeveloperApi + @Since("3.5.0") + def mapPartitionsWithEvaluator[U: ClassTag]( + evaluatorFactory: PartitionEvaluatorFactory[T, U]): RDD[U] = rdd.withScope { + new MapPartitionsWithEvaluatorRDD(rdd, evaluatorFactory) + } // TODO: [SPARK-25247] add extra conf to RDDBarrier, e.g., timeout. } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala new file mode 100644 index 00000000000..efb063476a4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import scala.collection.JavaConverters._ + +import org.apache.spark.{ContextAwareIterator, TaskContext} +import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory} +import org.apache.spark.api.python.{ChainedPythonFunctions} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} + +class MapInBatchEvaluatorFactory( + output: Seq[Attribute], + chainedFunc: Seq[ChainedPythonFunctions], + outputTypes: StructType, + batchSize: Int, + pythonEvalType: Int, + sessionLocalTimeZone: String, + largeVarTypes: Boolean, + pythonRunnerConf: Map[String, String], + pythonMetrics: Map[String, SQLMetric], + jobArtifactUUID: Option[String]) + extends PartitionEvaluatorFactory[InternalRow, InternalRow] { + + override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow] = + new MapInBatchEvaluator + + private class MapInBatchEvaluator extends PartitionEvaluator[InternalRow, InternalRow] { + override def eval( + partitionIndex: Int, + inputs: Iterator[InternalRow]*): Iterator[InternalRow] = { + assert(inputs.length == 1) + val inputIter = inputs.head + // Single function with one struct. + val argOffsets = Array(Array(0)) + val context = TaskContext.get() + val contextAwareIterator = new ContextAwareIterator(context, inputIter) + + // Here we wrap it via another row so that Python sides understand it + // as a DataFrame. + val wrappedIter = contextAwareIterator.map(InternalRow(_)) + + // DO NOT use iter.grouped(). See BatchIterator. + val batchIter = + if (batchSize > 0) new BatchIterator(wrappedIter, batchSize) else Iterator(wrappedIter) + + val columnarBatchIter = new ArrowPythonRunner( + chainedFunc, + pythonEvalType, + argOffsets, + StructType(Array(StructField("struct", outputTypes))), + sessionLocalTimeZone, + largeVarTypes, + pythonRunnerConf, + pythonMetrics, + jobArtifactUUID).compute(batchIter, context.partitionId(), context) + + val unsafeProj = UnsafeProjection.create(output, output) + + columnarBatchIter + .flatMap { batch => + // Scalar Iterator UDF returns a StructType column in ColumnarBatch, select + // the children here + val structVector = batch.column(0).asInstanceOf[ArrowColumnVector] + val outputVectors = output.indices.map(structVector.getChild) + val flattenedBatch = new ColumnarBatch(outputVectors.toArray) + flattenedBatch.setNumRows(batch.numRows()) + flattenedBatch.rowIterator.asScala + } + .map(unsafeProj) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala index b4af3db3c83..0703f57c33d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala @@ -17,18 +17,14 @@ package org.apache.spark.sql.execution.python -import scala.collection.JavaConverters._ - -import org.apache.spark.{ContextAwareIterator, JobArtifactSet, TaskContext} +import org.apache.spark.JobArtifactSet import org.apache.spark.api.python.ChainedPythonFunctions import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.UnaryExecNode -import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.util.ArrowUtils -import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} /** * A relation produced by applying a function that takes an iterator of batches @@ -56,53 +52,39 @@ trait MapInBatchExec extends UnaryExecNode with PythonSQLMetrics { override def outputPartitioning: Partitioning = child.outputPartitioning override protected def doExecute(): RDD[InternalRow] = { - def mapper(inputIter: Iterator[InternalRow]): Iterator[InternalRow] = { - // Single function with one struct. - val argOffsets = Array(Array(0)) - val chainedFunc = Seq(ChainedPythonFunctions(Seq(pythonFunction))) - val sessionLocalTimeZone = conf.sessionLocalTimeZone - val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) - val outputTypes = child.schema - - val context = TaskContext.get() - val contextAwareIterator = new ContextAwareIterator(context, inputIter) - - // Here we wrap it via another row so that Python sides understand it - // as a DataFrame. - val wrappedIter = contextAwareIterator.map(InternalRow(_)) - - // DO NOT use iter.grouped(). See BatchIterator. - val batchIter = - if (batchSize > 0) new BatchIterator(wrappedIter, batchSize) else Iterator(wrappedIter) - - val columnarBatchIter = new ArrowPythonRunner( - chainedFunc, - pythonEvalType, - argOffsets, - StructType(Array(StructField("struct", outputTypes))), - sessionLocalTimeZone, - largeVarTypes, - pythonRunnerConf, - pythonMetrics, - jobArtifactUUID).compute(batchIter, context.partitionId(), context) - - val unsafeProj = UnsafeProjection.create(output, output) - - columnarBatchIter.flatMap { batch => - // Scalar Iterator UDF returns a StructType column in ColumnarBatch, select - // the children here - val structVector = batch.column(0).asInstanceOf[ArrowColumnVector] - val outputVectors = output.indices.map(structVector.getChild) - val flattenedBatch = new ColumnarBatch(outputVectors.toArray) - flattenedBatch.setNumRows(batch.numRows()) - flattenedBatch.rowIterator.asScala - }.map(unsafeProj) - } + val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) + val pythonFunction = func.asInstanceOf[PythonUDF].func + val chainedFunc = Seq(ChainedPythonFunctions(Seq(pythonFunction))) + val evaluatorFactory = new MapInBatchEvaluatorFactory( + output, + chainedFunc, + child.schema, + conf.arrowMaxRecordsPerBatch, + pythonEvalType, + conf.sessionLocalTimeZone, + conf.arrowUseLargeVarTypes, + pythonRunnerConf, + pythonMetrics, + jobArtifactUUID) if (isBarrier) { - child.execute().barrier().mapPartitions(mapper) + val rddBarrier = child.execute().barrier() + if (conf.usePartitionEvaluator) { + rddBarrier.mapPartitionsWithEvaluator(evaluatorFactory) + } else { + rddBarrier.mapPartitions { iter => + evaluatorFactory.createEvaluator().eval(0, iter) + } + } } else { - child.execute().mapPartitionsInternal(mapper) + val inputRdd = child.execute() + if (conf.usePartitionEvaluator) { + inputRdd.mapPartitionsWithEvaluator(evaluatorFactory) + } else { + inputRdd.mapPartitionsInternal { iter => + evaluatorFactory.createEvaluator().eval(0, iter) + } + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org