[ https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Liang-Chi Hsieh updated SPARK-22541: ------------------------------------ Component/s: Documentation > Dataframes: applying multiple filters one after another using udfs and > accumulators results in faulty accumulators > ------------------------------------------------------------------------------------------------------------------ > > Key: SPARK-22541 > URL: https://issues.apache.org/jira/browse/SPARK-22541 > Project: Spark > Issue Type: Bug > Components: Documentation, PySpark > Affects Versions: 2.2.0 > Environment: pyspark 2.2.0, ubuntu > Reporter: Janne K. Olesen > > I'm using udf filters and accumulators to keep track of filtered rows in > dataframes. > If I'm applying multiple filters one after the other, they seem to be > executed in parallel, not in sequence, which messes with the accumulators i'm > using to keep track of filtered data. > {code:title=example.py|borderStyle=solid} > from pyspark.sql.functions import udf, col > from pyspark.sql.types import BooleanType > from pyspark.sql import SparkSession > spark = SparkSession.builder.getOrCreate() > sc = spark.sparkContext > df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", > "val1", "val2"]) > def __myfilter(val, acc): > if val < 2: > return True > else: > acc.add(1) > return False > acc1 = sc.accumulator(0) > acc2 = sc.accumulator(0) > def myfilter1(val): > return __myfilter(val, acc1) > def myfilter2(val): > return __myfilter(val, acc2) > my_udf1 = udf(myfilter1, BooleanType()) > my_udf2 = udf(myfilter2, BooleanType()) > df.show() > # +---+----+----+ > # |key|val1|val2| > # +---+----+----+ > # | a| 1| 1| > # | b| 2| 2| > # | c| 3| 3| > # +---+----+----+ > df = df.filter(my_udf1(col("val1"))) > # df.show() > # +---+----+----+ > # |key|val1|val2| > # +---+----+----+ > # | a| 1| 1| > # +---+----+----+ > # expected acc1: 2 > # expected acc2: 0 > df = df.filter(my_udf2(col("val2"))) > # df.show() > # +---+----+----+ > # |key|val1|val2| > # +---+----+----+ > # | a| 1| 1| > # +---+----+----+ > # expected acc1: 2 > # expected acc2: 0 > df.show() > print("acc1: %s" % acc1.value) # expected 2, is 2 OK > print("acc2: %s" % acc2.value) # expected 0, is 2 !!! > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org