[ https://issues.apache.org/jira/browse/SPARK-24208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16537082#comment-16537082 ]
Apache Spark commented on SPARK-24208: -------------------------------------- User 'mgaido91' has created a pull request for this issue: https://github.com/apache/spark/pull/21737 > Cannot resolve column in self join after applying Pandas UDF > ------------------------------------------------------------ > > Key: SPARK-24208 > URL: https://issues.apache.org/jira/browse/SPARK-24208 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.3.0 > Environment: AWS EMR 5.13.0 > Amazon Hadoop distribution 2.8.3 > Spark 2.3.0 > Pandas 0.22.0 > Reporter: Rafal Ganczarek > Priority: Minor > > I noticed that after applying Pandas UDF function, a self join of resulted > DataFrame will fail to resolve columns. The workaround that I found is to > recreate DataFrame with its RDD and schema. > Below you can find a Python code that reproduces the issue. > {code:java} > from pyspark import Row > import pyspark.sql.functions as F > @F.pandas_udf('key long, col string', F.PandasUDFType.GROUPED_MAP) > def dummy_pandas_udf(df): > return df[['key','col']] > df = spark.createDataFrame([Row(key=1,col='A'), Row(key=1,col='B'), > Row(key=2,col='C')]) > # transformation that causes the issue > df = df.groupBy('key').apply(dummy_pandas_udf) > # WORKAROUND that fixes the issue > # df = spark.createDataFrame(df.rdd, df.schema) > df.alias('temp0').join(df.alias('temp1'), F.col('temp0.key') == > F.col('temp1.key')).show() > {code} > If workaround line is commented out, then above code fails with the following > error: > {code:java} > AnalysisExceptionTraceback (most recent call last) > <ipython-input-88-8de763656d6d> in <module>() > 12 # df = spark.createDataFrame(df.rdd, df.schema) > 13 > ---> 14 df.alias('temp0').join(df.alias('temp1'), F.col('temp0.key') == > F.col('temp1.key')).show() > /usr/lib/spark/python/pyspark/sql/dataframe.py in join(self, other, on, how) > 929 on = self._jseq([]) > 930 assert isinstance(how, basestring), "how should be > basestring" > --> 931 jdf = self._jdf.join(other._jdf, on, how) > 932 return DataFrame(jdf, self.sql_ctx) > 933 > /usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py in __call__(self, > *args) > 1158 answer = self.gateway_client.send_command(command) > 1159 return_value = get_return_value( > -> 1160 answer, self.gateway_client, self.target_id, self.name) > 1161 > 1162 for temp_arg in temp_args: > /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) > 67 > e.java_exception.getStackTrace())) > 68 if s.startswith('org.apache.spark.sql.AnalysisException: > '): > ---> 69 raise AnalysisException(s.split(': ', 1)[1], > stackTrace) > 70 if s.startswith('org.apache.spark.sql.catalyst.analysis'): > 71 raise AnalysisException(s.split(': ', 1)[1], > stackTrace) > AnalysisException: u"cannot resolve '`temp0.key`' given input columns: > [temp0.key, temp0.col];;\n'Join Inner, ('temp0.key = 'temp1.key)\n:- > AnalysisBarrier\n: +- SubqueryAlias temp0\n: +- > FlatMapGroupsInPandas [key#4099L], dummy_pandas_udf(col#4098, key#4099L), > [key#4104L, col#4105]\n: +- Project [key#4099L, col#4098, > key#4099L]\n: +- LogicalRDD [col#4098, key#4099L], false\n+- > AnalysisBarrier\n +- SubqueryAlias temp1\n +- > FlatMapGroupsInPandas [key#4099L], dummy_pandas_udf(col#4098, key#4099L), > [key#4104L, col#4105]\n +- Project [key#4099L, col#4098, > key#4099L]\n +- LogicalRDD [col#4098, key#4099L], false\n" > {code} > The same happens, if instead of DataFrame API I use Spark SQL to do a self > join: > {code:java} > # df is a DataFrame after applying dummy_pandas_udf > df.createOrReplaceTempView('df') > spark.sql(''' > SELECT > * > FROM df temp0 > LEFT JOIN df temp1 ON > temp0.key == temp1.key > ''').show() > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org