[ https://issues.apache.org/jira/browse/SPARK-18014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Patterson updated SPARK-18014: -------------------------------------- Environment: Pyspark 2.0.0, Ipython 4.2 (was: Pyspark 2.0.1, Ipython 4.2) > Filters are incorrectly being grouped together when there is processing in > between > ---------------------------------------------------------------------------------- > > Key: SPARK-18014 > URL: https://issues.apache.org/jira/browse/SPARK-18014 > Project: Spark > Issue Type: Bug > Affects Versions: 2.0.1 > Environment: Pyspark 2.0.0, Ipython 4.2 > Reporter: Michael Patterson > Priority: Minor > > I created a dataframe that needed to filter the data on columnA, create a new > columnB by applying a user defined function to columnA, and then filter on > columnB. However, the two filters were being grouped together in the > execution plan after the withColumn statement, which was causing errors due > to unexpected input to the withColumn statement. > Example code to reproduce: > {code} > import pyspark.sql.functions as F > import pyspark.sql.types as T > from functools import partial > data = [{'input':0}, {'input':1}, {'input':2}] > input_df = sc.parallelize(data).toDF() > my_dict = {1:'first', 2:'second'} > def apply_dict( input_dict, value): > return input_dict[value] > test_udf = F.udf( partial(apply_dict, my_dict ), T.StringType() ) > test_df = input_df.filter('input > 0').withColumn('output', > test_udf('input')).filter(F.col('output').rlike('^s')) > test_df.explain(True) > {code} > Execution plan: > {code} > == Analyzed Logical Plan == > input: bigint, output: string > Filter output#4 RLIKE ^s > +- Project [input#0L, partial(input#0L) AS output#4] > +- Filter (input#0L > cast(0 as bigint)) > +- LogicalRDD [input#0L] > == Optimized Logical Plan == > Project [input#0L, partial(input#0L) AS output#4] > +- Filter ((isnotnull(input#0L) && (input#0L > 0)) && partial(input#0L) RLIKE > ^s) > +- LogicalRDD [input#0L] > {code} > Executing test_def.show() after the above code in pyspark 2.0.1 yields: > KeyError: 0 > Executing test_def.show() in pyspark 1.6.2 yields: > {code} > +-----+------+ > |input|output| > +-----+------+ > | 2|second| > +-----+------+ > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org