Abdeali Kothari created SPARK-25591: ---------------------------------------
Summary: PySpark Accumulators with multiple PythonUDFs Key: SPARK-25591 URL: https://issues.apache.org/jira/browse/SPARK-25591 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.3.2 Reporter: Abdeali Kothari When having multiple Python UDFs - the last Python UDF's accumulator is the only accumulator that gets updated. {code:python} import pyspark from pyspark.sql import SparkSession, Row from pyspark.sql import functions as F from pyspark.sql import types as T from pyspark import AccumulatorParam spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("ERROR") test_accum = spark.sparkContext.accumulator(0.0) SHUFFLE = False def main(data): print(">>> Check0", test_accum.value) def test(x): global test_accum test_accum += 1.0 return x print(">>> Check1", test_accum.value) def test2(x): global test_accum test_accum += 100.0 return x print(">>> Check2", test_accum.value) func_udf = F.udf(test, T.DoubleType()) print(">>> Check3", test_accum.value) func_udf2 = F.udf(test2, T.DoubleType()) print(">>> Check4", test_accum.value) data = data.withColumn("out1", func_udf(data["a"])) if SHUFFLE: data = data.repartition(2) print(">>> Check5", test_accum.value) data = data.withColumn("out2", func_udf2(data["b"])) if SHUFFLE: data = data.repartition(2) print(">>> Check6", test_accum.value) data.show() # ACTION print(">>> Check7", test_accum.value) return data df = spark.createDataFrame([ [1.0, 2.0] ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for field_name in ["a", "b"]])) df2 = main(df) {code} ######## Output 1 - with SHUFFLE=False ... # >>> Check7 100.0 ######## Output 2 - with SHUFFLE=True ... # >>> Check7 101.0 Basically looks like: - Accumulator works only for last UDF before a shuffle-like operation -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org