Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/12057#discussion_r57978405 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala --- @@ -69,11 +69,14 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: // combine input with output from Python. val queue = new java.util.concurrent.ConcurrentLinkedQueue[InternalRow]() - val (pyFuncs, children) = collectFunctions(udf) + val (pyFuncs, children) = udfs.map(collectFunctions).unzip + val numArgs = children.map(_.length) val pickle = new Pickler - val currentRow = newMutableProjection(children, child.output)() - val fields = children.map(_.dataType) + // flatten all the arguments + val allChildren = children.flatMap(x => x) --- End diff -- Quick clarification: if I have a function like `select udf(x), udf2(x), udf3(x), udf4(x) from ...`, we'll send the `x` column's value four times to PySpark? I know that we have a conceptually similar problem when we're evaluating multiple aggregates in parallel in JVM Spark SQL, but in that case I think we only project each column once and end up rebinding the references / offsets to reference the single copy. My hunch is that this extra copy isn't a huge perf. issue compared to the slow multiple-Python-UDF evaluation strategy we were using before, so I think it's fine to leave this for now. If it does become a problem, we could optimize later.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org