[ https://issues.apache.org/jira/browse/SPARK-38395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Krishna Sangeeth KS updated SPARK-38395: ---------------------------------------- Description: Pyspark apply in pandas have some difficulty in resolving columns when there is dot in the column name. Here is an example that I have which reproduces the issue. Example taken by modifying doctest example [here|https://github.com/apache/spark/blob/branch-3.0/python/pyspark/sql/pandas/group_ops.py#L237-L248] {code:python} df1 = spark.createDataFrame( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ("abc|database|10.159.154|xef", "id", "v1")) df2 = spark.createDataFrame( [(20000101, 1, "x"), (20000101, 2, "y")], ("abc|database|10.159.154|xef", "id", "v2")) def asof_join(l, r): return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id") df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 string").show(){code} This gives the below error {code:python} AnalysisException Traceback (most recent call last) <ipython-input-126-b1807bb28ae3> in <module> 8 return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id") 9 df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( ---> 10 asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 string").show() ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in applyInPandas(self, func, schema) 295 udf = pandas_udf( 296 func, returnType=schema, functionType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF) --> 297 all_cols = self._extract_cols(self._gd1) + self._extract_cols(self._gd2) 298 udf_column = udf(*all_cols) 299 jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, udf_column._jc.expr()) ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in _extract_cols(gd) 303 def _extract_cols(gd): 304 df = gd._df --> 305 return [df[col] for col in df.columns] 306 307 ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in <listcomp>(.0) 303 def _extract_cols(gd): 304 df = gd._df --> 305 return [df[col] for col in df.columns] 306 307 ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/dataframe.py in __getitem__(self, item) 1378 """ 1379 if isinstance(item, basestring): -> 1380 jc = self._jdf.apply(item) 1381 return Column(jc) 1382 elif isinstance(item, Column): ~/anaconda3/envs/py37/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args) 1303 answer = self.gateway_client.send_command(command) 1304 return_value = get_return_value( -> 1305 answer, self.gateway_client, self.target_id, self.name) 1306 1307 for temp_arg in temp_args: ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw) 135 # Hide where the exception came from that shows a non-Pythonic 136 # JVM exception message. --> 137 raise_from(converted) 138 else: 139 raise ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in raise_from(e) AnalysisException: Cannot resolve column name "abc|database|10.159.154|xef" among (abc|database|10.159.154|xef, id, v1); did you mean to quote the `abc|database|10.159.154|xef` column?; {code} As we can see the column is present there in the `among` list. When i replace `.` (dot) with `_` (underscore) the code actually works. {code:python} df1 = spark.createDataFrame( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ("abc|database|10_159_154|xef", "id", "v1")) df2 = spark.createDataFrame( [(20000101, 1, "x"), (20000101, 2, "y")], ("abc|database|10_159_154|xef", "id", "v2")) def asof_join(l, r): return pd.merge_asof(l, r, on="abc|database|10_159_154|xef", by="id") df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( asof_join, schema="`abc|database|10_159_154|xef` int, id int, v1 double, v2 string").show() {code} {code:java} +---------------------------+---+---+---+ |abc|database|10_159_154|xef| id| v1| v2| +---------------------------+---+---+---+ | 20000101| 1|1.0| x| | 20000102| 1|3.0| x| | 20000101| 2|2.0| y| | 20000102| 2|4.0| y| +---------------------------+---+---+---+ {code} was: Pyspark apply in pandas have some difficult in resolving columns when there is dot in the column name. Here is an example that I have which reproduces the issue. Example taken by modifying doctest example [here|https://github.com/apache/spark/blob/branch-3.0/python/pyspark/sql/pandas/group_ops.py#L237-L248] {code:python} df1 = spark.createDataFrame( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ("abc|database|10.159.154|xef", "id", "v1")) df2 = spark.createDataFrame( [(20000101, 1, "x"), (20000101, 2, "y")], ("abc|database|10.159.154|xef", "id", "v2")) def asof_join(l, r): return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id") df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 string").show(){code} This gives the below error {code:python} AnalysisException Traceback (most recent call last) <ipython-input-126-b1807bb28ae3> in <module> 8 return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id") 9 df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( ---> 10 asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 string").show() ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in applyInPandas(self, func, schema) 295 udf = pandas_udf( 296 func, returnType=schema, functionType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF) --> 297 all_cols = self._extract_cols(self._gd1) + self._extract_cols(self._gd2) 298 udf_column = udf(*all_cols) 299 jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, udf_column._jc.expr()) ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in _extract_cols(gd) 303 def _extract_cols(gd): 304 df = gd._df --> 305 return [df[col] for col in df.columns] 306 307 ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in <listcomp>(.0) 303 def _extract_cols(gd): 304 df = gd._df --> 305 return [df[col] for col in df.columns] 306 307 ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/dataframe.py in __getitem__(self, item) 1378 """ 1379 if isinstance(item, basestring): -> 1380 jc = self._jdf.apply(item) 1381 return Column(jc) 1382 elif isinstance(item, Column): ~/anaconda3/envs/py37/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args) 1303 answer = self.gateway_client.send_command(command) 1304 return_value = get_return_value( -> 1305 answer, self.gateway_client, self.target_id, self.name) 1306 1307 for temp_arg in temp_args: ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw) 135 # Hide where the exception came from that shows a non-Pythonic 136 # JVM exception message. --> 137 raise_from(converted) 138 else: 139 raise ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in raise_from(e) AnalysisException: Cannot resolve column name "abc|database|10.159.154|xef" among (abc|database|10.159.154|xef, id, v1); did you mean to quote the `abc|database|10.159.154|xef` column?; {code} As we can see the column is present there in the `among` list. When i replace `.` (dot) with `_` (underscore) the code actually works. {code:python} df1 = spark.createDataFrame( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ("abc|database|10_159_154|xef", "id", "v1")) df2 = spark.createDataFrame( [(20000101, 1, "x"), (20000101, 2, "y")], ("abc|database|10_159_154|xef", "id", "v2")) def asof_join(l, r): return pd.merge_asof(l, r, on="abc|database|10_159_154|xef", by="id") df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( asof_join, schema="`abc|database|10_159_154|xef` int, id int, v1 double, v2 string").show() {code} {code:java} +---------------------------+---+---+---+ |abc|database|10_159_154|xef| id| v1| v2| +---------------------------+---+---+---+ | 20000101| 1|1.0| x| | 20000102| 1|3.0| x| | 20000101| 2|2.0| y| | 20000102| 2|4.0| y| +---------------------------+---+---+---+ {code} > Pyspark issue in resolving column when there is dot (.) > ------------------------------------------------------- > > Key: SPARK-38395 > URL: https://issues.apache.org/jira/browse/SPARK-38395 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.0.0 > Environment: Issue found in Mac OS Catalina, Pyspark 3.0 > Reporter: Krishna Sangeeth KS > Priority: Major > > Pyspark apply in pandas have some difficulty in resolving columns when there > is dot in the column name. > Here is an example that I have which reproduces the issue. Example taken by > modifying doctest example > [here|https://github.com/apache/spark/blob/branch-3.0/python/pyspark/sql/pandas/group_ops.py#L237-L248] > {code:python} > df1 = spark.createDataFrame( > [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, > 4.0)], > ("abc|database|10.159.154|xef", "id", "v1")) > df2 = spark.createDataFrame( > [(20000101, 1, "x"), (20000101, 2, "y")], > ("abc|database|10.159.154|xef", "id", "v2")) > def asof_join(l, r): > return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id") > df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( > asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 > string").show(){code} > This gives the below error > {code:python} > AnalysisException Traceback (most recent call last) > <ipython-input-126-b1807bb28ae3> in <module> > 8 return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", > by="id") > 9 df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( > ---> 10 asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 > double, v2 string").show() > ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py > in applyInPandas(self, func, schema) > 295 udf = pandas_udf( > 296 func, returnType=schema, > functionType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF) > --> 297 all_cols = self._extract_cols(self._gd1) + > self._extract_cols(self._gd2) > 298 udf_column = udf(*all_cols) > 299 jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, > udf_column._jc.expr()) > ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py > in _extract_cols(gd) > 303 def _extract_cols(gd): > 304 df = gd._df > --> 305 return [df[col] for col in df.columns] > 306 > 307 > ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py > in <listcomp>(.0) > 303 def _extract_cols(gd): > 304 df = gd._df > --> 305 return [df[col] for col in df.columns] > 306 > 307 > ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/dataframe.py in > __getitem__(self, item) > 1378 """ > 1379 if isinstance(item, basestring): > -> 1380 jc = self._jdf.apply(item) > 1381 return Column(jc) > 1382 elif isinstance(item, Column): > ~/anaconda3/envs/py37/lib/python3.7/site-packages/py4j/java_gateway.py in > __call__(self, *args) > 1303 answer = self.gateway_client.send_command(command) > 1304 return_value = get_return_value( > -> 1305 answer, self.gateway_client, self.target_id, self.name) > 1306 > 1307 for temp_arg in temp_args: > ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in > deco(*a, **kw) > 135 # Hide where the exception came from that shows a > non-Pythonic > 136 # JVM exception message. > --> 137 raise_from(converted) > 138 else: > 139 raise > ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in > raise_from(e) > AnalysisException: Cannot resolve column name "abc|database|10.159.154|xef" > among (abc|database|10.159.154|xef, id, v1); did you mean to quote the > `abc|database|10.159.154|xef` column?; > {code} > As we can see the column is present there in the `among` list. > When i replace `.` (dot) with `_` (underscore) the code actually works. > {code:python} > df1 = spark.createDataFrame( > [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, > 4.0)], > ("abc|database|10_159_154|xef", "id", "v1")) > df2 = spark.createDataFrame( > [(20000101, 1, "x"), (20000101, 2, "y")], > ("abc|database|10_159_154|xef", "id", "v2")) > def asof_join(l, r): > return pd.merge_asof(l, r, on="abc|database|10_159_154|xef", by="id") > df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( > asof_join, schema="`abc|database|10_159_154|xef` int, id int, v1 double, v2 > string").show() > {code} > {code:java} > +---------------------------+---+---+---+ > |abc|database|10_159_154|xef| id| v1| v2| > +---------------------------+---+---+---+ > | 20000101| 1|1.0| x| > | 20000102| 1|3.0| x| > | 20000101| 2|2.0| y| > | 20000102| 2|4.0| y| > +---------------------------+---+---+---+ > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org