Hey guys, BLUF: sorry for the length of this email, trying to figure out how to batch Python UDF executions, and since this is my first time messing with catalyst, would like any feedback
My team is starting to use PySpark UDFs quite heavily, and performance is a huge blocker. The extra roundtrip serialization from Java to Python is not a huge concern if we only incur it ~once per column for most workflows, since it'll be in the same order of magnitude as reading files from disk. However, right now each Python UDFs lead to a single roundtrip. There is definitely a lot we can do regarding this: (all the prototyping code is here: https://github.com/justinuang/spark/commit/8176749f8a6e6dc5a49fbbb952735ff40fb309fc ) 1. We can't chain Python UDFs. df.select(python_times_2(python_times_2("col1"))) throws an exception saying that the inner expression isn't evaluable. The workaround is to do df.select(python_times_2("col1").alias("tmp")).select(python_time_2("tmp")) This can be solved in ExtractPythonUDFs by always extracting the inner most Python UDF first. // Pick the UDF we are going to evaluate (TODO: Support evaluating multiple UDFs at a time) // If there is more than one, we will add another evaluation operator in a subsequent pass. - udfs.find(_.resolved) match { + udfs.find { udf => + udf.resolved && udf.children.map { child: Expression => + child.find { // really hacky way to find if a child of a udf has the PythonUDF node + case p: PythonUDF => true + case _ => false + }.isEmpty + }.reduce((x, y) => x && y) + } match { case Some(udf) => var evaluation: EvaluatePython = null 2. If we have a Python UDF applied to many different columns, where they don’t depend on each other, we can optimize them by collapsing them down into a single python worker. Although we have to serialize and send the same amount of data to the python interpreter, in the case where I am applying the same function to 20 columns, the overhead/context_switches of having 20 interpreters run at the same time causes huge performance hits. I have confirmed this by manually taking the 20 columns, converting them to a struct, and then writing a UDF that processes the struct at the same time, and the speed difference is 2x. My approach to adding this to catalyst is basically to write an optimizer rule called CombinePython which joins adjacent EvaluatePython nodes that don’t depend on each other’s variables, and then having BatchPythonEvaluation run multiple lambdas at once. I would also like to be able to handle the case df.select(python_times_2(“col1”).alias(“col1x2”)).select(F.col(“col1x2”), python_times_2(“col1x2”).alias(“col1x4”)). To get around that, I add a PushDownPythonEvaluation optimizer that will push the optimization through a select/project, so that the CombinePython rule can join the two. 3. I would like CombinePython to be able to handle UDFs that chain off of each other. df.select(python_times_2(python_times_2(“col1”))) I haven’t prototyped this yet, since it’s a lot more complex. The way I’m thinking about this is to still have a rule called CombinePython, except that the BatchPythonEvaluation will need to be smart enough to build up the dag of dependencies, and then feed that information to the python interpreter, so it can compute things in the right order, and reuse the in-memory objects that it has already computed. Does this seem right? Should the code mainly be in BatchPythonEvaluation? In addition, we will need to change up the protocol between the java and python sides to support sending this information. What is acceptable? Any help would be much appreciated! Especially w.r.t where to the design choices such that the PR that has a chance of being accepted. Justin