[ https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732556#comment-16732556 ]
Dongjoon Hyun commented on SPARK-25591: --------------------------------------- Hi, [~viirya], [~hyukjin.kwon]. This is only in branch-2.4. Can we backport this to older branches like branch-2.3 and branch-2.2? cc [~AbdealiJK] > PySpark Accumulators with multiple PythonUDFs > --------------------------------------------- > > Key: SPARK-25591 > URL: https://issues.apache.org/jira/browse/SPARK-25591 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.3.2 > Reporter: Abdeali Kothari > Assignee: Liang-Chi Hsieh > Priority: Blocker > Labels: correctness > Fix For: 2.4.0 > > > When having multiple Python UDFs - the last Python UDF's accumulator is the > only accumulator that gets updated. > {code:python} > import pyspark > from pyspark.sql import SparkSession, Row > from pyspark.sql import functions as F > from pyspark.sql import types as T > from pyspark import AccumulatorParam > spark = SparkSession.builder.getOrCreate() > spark.sparkContext.setLogLevel("ERROR") > test_accum = spark.sparkContext.accumulator(0.0) > SHUFFLE = False > def main(data): > print(">>> Check0", test_accum.value) > def test(x): > global test_accum > test_accum += 1.0 > return x > print(">>> Check1", test_accum.value) > def test2(x): > global test_accum > test_accum += 100.0 > return x > print(">>> Check2", test_accum.value) > func_udf = F.udf(test, T.DoubleType()) > print(">>> Check3", test_accum.value) > func_udf2 = F.udf(test2, T.DoubleType()) > print(">>> Check4", test_accum.value) > data = data.withColumn("out1", func_udf(data["a"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check5", test_accum.value) > data = data.withColumn("out2", func_udf2(data["b"])) > if SHUFFLE: > data = data.repartition(2) > print(">>> Check6", test_accum.value) > data.show() # ACTION > print(">>> Check7", test_accum.value) > return data > df = spark.createDataFrame([ > [1.0, 2.0] > ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for > field_name in ["a", "b"]])) > df2 = main(df) > {code} > {code:python} > ######## Output 1 - with SHUFFLE=False > ... > # >>> Check7 100.0 > ######## Output 2 - with SHUFFLE=True > ... > # >>> Check7 101.0 > {code} > Basically looks like: > - Accumulator works only for last UDF before a shuffle-like operation -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org