[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectori...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r138858332 --- Diff: python/pyspark/serializers.py --- @@ -199,6 +211,33 @@ def __repr__(self): return "ArrowSerializer" +class ArrowPandasSerializer(ArrowSerializer): + +def __init__(self): +super(ArrowPandasSerializer, self).__init__() + +def dumps(self, series): +""" +Make an ArrowRecordBatch from a Pandas Series and serialize +""" +import pyarrow as pa --- End diff -- Should we catch `ImportError`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectori...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r138855947 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -82,7 +82,6 @@ private[sql] object ArrowConverters { val root = VectorSchemaRoot.create(arrowSchema, allocator) val arrowWriter = ArrowWriter.create(root) - --- End diff -- (Looks unrelated change) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectori...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r138447017 --- Diff: python/pyspark/sql/functions.py --- @@ -2112,7 +2113,7 @@ def wrapper(*args): @since(1.3) -def udf(f=None, returnType=StringType()): +def udf(f=None, returnType=StringType(), vectorized=False): --- End diff -- It seems like the consensus is for `pandas_udf` and I'm fine with that too. I'll make that change and the others brought up here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectori...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r138273961 --- Diff: python/pyspark/sql/functions.py --- @@ -2112,7 +2113,7 @@ def wrapper(*args): @since(1.3) -def udf(f=None, returnType=StringType()): +def udf(f=None, returnType=StringType(), vectorized=False): --- End diff -- and also `**kwargs` to bring the size information --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectori...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r138273859 --- Diff: python/pyspark/sql/functions.py --- @@ -2112,7 +2113,7 @@ def wrapper(*args): @since(1.3) -def udf(f=None, returnType=StringType()): +def udf(f=None, returnType=StringType(), vectorized=False): --- End diff -- as we discussed in the email, we should also accept data type of string format. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectori...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r138273168 --- Diff: python/pyspark/sql/functions.py --- @@ -2112,7 +2113,7 @@ def wrapper(*args): @since(1.3) -def udf(f=None, returnType=StringType()): +def udf(f=None, returnType=StringType(), vectorized=False): --- End diff -- I think `@pandas_udf(DoubleType())` is better than `@udf(DoubleType(), vectorized=True)`, which is more concise. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectori...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r138272979 --- Diff: python/pyspark/serializers.py --- @@ -81,6 +81,12 @@ class SpecialLengths(object): NULL = -5 +class PythonEvalType(object): +NON_UDF = 0 +SQL_BATCHED_UDF = 1 +SQL_ARROW_UDF = 2 --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectori...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r138272886 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala --- @@ -83,10 +83,23 @@ private[spark] case class PythonFunction( */ private[spark] case class ChainedPythonFunctions(funcs: Seq[PythonFunction]) +/** + * Enumerate the type of command that will be sent to the Python worker + */ +private[spark] object PythonEvalType { + val NON_UDF = 0 + val SQL_BATCHED_UDF = 1 + val SQL_ARROW_UDF = 2 --- End diff -- the new udf parameter is pandas `Series`, I think it's more accurate to call it `SQL_PANDAS_UDF`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectori...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r137412019 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonRunner} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.arrow.{ArrowConverters, ArrowPayload} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + + +/** + * A physical plan that evaluates a [[PythonUDF]], + */ +case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan) + extends SparkPlan { + + def children: Seq[SparkPlan] = child :: Nil + + override def producedAttributes: AttributeSet = AttributeSet(output.drop(child.output.length)) + + private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = { +udf.children match { + case Seq(u: PythonUDF) => +val (chained, children) = collectFunctions(u) +(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children) + case children => +// There should not be any other UDFs, or the children can't be evaluated directly. +assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty)) +(ChainedPythonFunctions(Seq(udf.func)), udf.children) +} + } + + protected override def doExecute(): RDD[InternalRow] = { +val inputRDD = child.execute().map(_.copy()) +val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) +val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) + +inputRDD.mapPartitions { iter => + + // The queue used to buffer input rows so we can drain it to + // combine input with output from Python. + val queue = HybridRowQueue(TaskContext.get().taskMemoryManager(), +new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length) + TaskContext.get().addTaskCompletionListener({ ctx => +queue.close() + }) + + val (pyFuncs, inputs) = udfs.map(collectFunctions).unzip + + // flatten all the arguments + val allInputs = new ArrayBuffer[Expression] + val dataTypes = new ArrayBuffer[DataType] + val argOffsets = inputs.map { input => +input.map { e => + if (allInputs.exists(_.semanticEquals(e))) { +allInputs.indexWhere(_.semanticEquals(e)) + } else { +allInputs += e +dataTypes += e.dataType +allInputs.length - 1 + } +}.toArray + }.toArray + val projection = newMutableProjection(allInputs, child.output) + val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) => +StructField(s"_$i", dt) + }) + + // Input iterator to Python: input rows are grouped so we send them in batches to Python. + // For each row, add it to the queue. + val projectedRowIter = iter.map { inputRow => +queue.add(inputRow.asInstanceOf[UnsafeRow]) +projection(inputRow) + } + + val context = TaskContext.get() + + val inputIterator = ArrowConverters.toPayloadIterator( + projectedRowIter, schema, conf.arrowMaxRecordsPerBatch, context). +map(_.asPythonSerializable) + + val schemaOut = St
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectori...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r137402913 --- Diff: python/pyspark/sql/functions.py --- @@ -2112,7 +2113,7 @@ def wrapper(*args): @since(1.3) -def udf(f=None, returnType=StringType()): +def udf(f=None, returnType=StringType(), vectorized=False): --- End diff -- @felixcheung does this fit your idea for a more generic decorator? Not exclusively labeled as `pandas_udf`, just enable vectorization with a flag, e.g. `@udf(DoubleType(), vectorized=True)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org