Hey guys,

BLUF: sorry for the length of this email, trying to figure out how to batch
Python UDF executions, and since this is my first time messing with
catalyst, would like any feedback

My team is starting to use PySpark UDFs quite heavily, and performance is a
huge blocker. The extra roundtrip serialization from Java to Python is not
a huge concern if we only incur it ~once per column for most workflows,
since it'll be in the same order of magnitude as reading files from disk.
However, right now each Python UDFs lead to a single roundtrip. There is
definitely a lot we can do regarding this:

(all the prototyping code is here:
https://github.com/justinuang/spark/commit/8176749f8a6e6dc5a49fbbb952735ff40fb309fc
)

1. We can't chain Python UDFs.

    df.select(python_times_2(python_times_2("col1")))

throws an exception saying that the inner expression isn't evaluable. The
workaround is to do


df.select(python_times_2("col1").alias("tmp")).select(python_time_2("tmp"))

This can be solved in ExtractPythonUDFs by always extracting the inner most
Python UDF first.

         // Pick the UDF we are going to evaluate (TODO: Support evaluating
multiple UDFs at a time)
         // If there is more than one, we will add another evaluation
operator in a subsequent pass.
-        udfs.find(_.resolved) match {
+        udfs.find { udf =>
+          udf.resolved && udf.children.map { child: Expression =>
+            child.find { // really hacky way to find if a child of a udf
has the PythonUDF node
+              case p: PythonUDF => true
+              case _ => false
+            }.isEmpty
+          }.reduce((x, y) => x && y)
+        } match {
           case Some(udf) =>
             var evaluation: EvaluatePython = null

2. If we have a Python UDF applied to many different columns, where they
don’t depend on each other, we can optimize them by collapsing them down
into a single python worker. Although we have to serialize and send the
same amount of data to the python interpreter, in the case where I am
applying the same function to 20 columns, the overhead/context_switches of
having 20 interpreters run at the same time causes huge performance hits. I
have confirmed this by manually taking the 20 columns, converting them to a
struct, and then writing a UDF that processes the struct at the same time,
and the speed difference is 2x. My approach to adding this to catalyst is
basically to write an optimizer rule called CombinePython which joins
adjacent EvaluatePython nodes that don’t depend on each other’s variables,
and then having BatchPythonEvaluation run multiple lambdas at once. I would
also like to be able to handle the case
df.select(python_times_2(“col1”).alias(“col1x2”)).select(F.col(“col1x2”),
python_times_2(“col1x2”).alias(“col1x4”)). To get around that, I add a
PushDownPythonEvaluation optimizer that will push the optimization through
a select/project, so that the CombinePython rule can join the two.

3. I would like CombinePython to be able to handle UDFs that chain off of
each other.

    df.select(python_times_2(python_times_2(“col1”)))

I haven’t prototyped this yet, since it’s a lot more complex. The way I’m
thinking about this is to still have a rule called CombinePython, except
that the BatchPythonEvaluation will need to be smart enough to build up the
dag of dependencies, and then feed that information to the python
interpreter, so it can compute things in the right order, and reuse the
in-memory objects that it has already computed. Does this seem right?
Should the code mainly be in BatchPythonEvaluation? In addition, we will
need to change up the protocol between the java and python sides to support
sending this information. What is acceptable?

Any help would be much appreciated! Especially w.r.t where to the design
choices such that the PR that has a chance of being accepted.

Justin

Reply via email to