[ 
https://issues.apache.org/jira/browse/SPARK-38395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishna Sangeeth KS updated SPARK-38395:
----------------------------------------
    Description: 
Pyspark apply in pandas have some difficulty in resolving columns when there is 
dot in the column name. 

Here is an example that I have which reproduces the issue.  Example taken by 
modifying doctest example 
[here|https://github.com/apache/spark/blob/branch-3.0/python/pyspark/sql/pandas/group_ops.py#L237-L248]
{code:python}
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 
4.0)],
("abc|database|10.159.154|xef", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("abc|database|10.159.154|xef", "id", "v2"))
def asof_join(l, r):
    return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 
string").show(){code}
This gives the below error
{code:python}
AnalysisException                         Traceback (most recent call last)
<ipython-input-126-b1807bb28ae3> in <module>
      8     return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", 
by="id")
      9 df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
---> 10 asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 
double, v2 string").show()

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py
 in applyInPandas(self, func, schema)
    295         udf = pandas_udf(
    296             func, returnType=schema, 
functionType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF)
--> 297         all_cols = self._extract_cols(self._gd1) + 
self._extract_cols(self._gd2)
    298         udf_column = udf(*all_cols)
    299         jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, 
udf_column._jc.expr())

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py
 in _extract_cols(gd)
    303     def _extract_cols(gd):
    304         df = gd._df
--> 305         return [df[col] for col in df.columns]
    306 
    307 

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py
 in <listcomp>(.0)
    303     def _extract_cols(gd):
    304         df = gd._df
--> 305         return [df[col] for col in df.columns]
    306 
    307 

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/dataframe.py in 
__getitem__(self, item)
   1378         """
   1379         if isinstance(item, basestring):
-> 1380             jc = self._jdf.apply(item)
   1381             return Column(jc)
   1382         elif isinstance(item, Column):

~/anaconda3/envs/py37/lib/python3.7/site-packages/py4j/java_gateway.py in 
__call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in 
deco(*a, **kw)
    135                 # Hide where the exception came from that shows a 
non-Pythonic
    136                 # JVM exception message.
--> 137                 raise_from(converted)
    138             else:
    139                 raise

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in 
raise_from(e)

AnalysisException: Cannot resolve column name "abc|database|10.159.154|xef" 
among (abc|database|10.159.154|xef, id, v1); did you mean to quote the 
`abc|database|10.159.154|xef` column?;

{code}
As we can see the column is present there in the `among` list.

When i replace `.` (dot) with `_` (underscore) the code actually works.
{code:python}
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 
4.0)],
("abc|database|10_159_154|xef", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("abc|database|10_159_154|xef", "id", "v2"))
def asof_join(l, r):
    return pd.merge_asof(l, r, on="abc|database|10_159_154|xef", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="`abc|database|10_159_154|xef` int, id int, v1 double, v2 
string").show()
{code}
{code:java}
+---------------------------+---+---+---+
|abc|database|10_159_154|xef| id| v1| v2|
+---------------------------+---+---+---+
|                   20000101|  1|1.0|  x|
|                   20000102|  1|3.0|  x|
|                   20000101|  2|2.0|  y|
|                   20000102|  2|4.0|  y|
+---------------------------+---+---+---+
{code}

  was:
Pyspark apply in pandas have some difficult in resolving columns when there is 
dot in the column name. 

Here is an example that I have which reproduces the issue.  Example taken by 
modifying doctest example 
[here|https://github.com/apache/spark/blob/branch-3.0/python/pyspark/sql/pandas/group_ops.py#L237-L248]
{code:python}
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 
4.0)],
("abc|database|10.159.154|xef", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("abc|database|10.159.154|xef", "id", "v2"))
def asof_join(l, r):
    return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 
string").show(){code}
This gives the below error
{code:python}
AnalysisException                         Traceback (most recent call last)
<ipython-input-126-b1807bb28ae3> in <module>
      8     return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", 
by="id")
      9 df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
---> 10 asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 
double, v2 string").show()

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py
 in applyInPandas(self, func, schema)
    295         udf = pandas_udf(
    296             func, returnType=schema, 
functionType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF)
--> 297         all_cols = self._extract_cols(self._gd1) + 
self._extract_cols(self._gd2)
    298         udf_column = udf(*all_cols)
    299         jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, 
udf_column._jc.expr())

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py
 in _extract_cols(gd)
    303     def _extract_cols(gd):
    304         df = gd._df
--> 305         return [df[col] for col in df.columns]
    306 
    307 

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py
 in <listcomp>(.0)
    303     def _extract_cols(gd):
    304         df = gd._df
--> 305         return [df[col] for col in df.columns]
    306 
    307 

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/dataframe.py in 
__getitem__(self, item)
   1378         """
   1379         if isinstance(item, basestring):
-> 1380             jc = self._jdf.apply(item)
   1381             return Column(jc)
   1382         elif isinstance(item, Column):

~/anaconda3/envs/py37/lib/python3.7/site-packages/py4j/java_gateway.py in 
__call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in 
deco(*a, **kw)
    135                 # Hide where the exception came from that shows a 
non-Pythonic
    136                 # JVM exception message.
--> 137                 raise_from(converted)
    138             else:
    139                 raise

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in 
raise_from(e)

AnalysisException: Cannot resolve column name "abc|database|10.159.154|xef" 
among (abc|database|10.159.154|xef, id, v1); did you mean to quote the 
`abc|database|10.159.154|xef` column?;

{code}
As we can see the column is present there in the `among` list.

When i replace `.` (dot) with `_` (underscore) the code actually works.
{code:python}
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 
4.0)],
("abc|database|10_159_154|xef", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("abc|database|10_159_154|xef", "id", "v2"))
def asof_join(l, r):
    return pd.merge_asof(l, r, on="abc|database|10_159_154|xef", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="`abc|database|10_159_154|xef` int, id int, v1 double, v2 
string").show()
{code}
{code:java}
+---------------------------+---+---+---+
|abc|database|10_159_154|xef| id| v1| v2|
+---------------------------+---+---+---+
|                   20000101|  1|1.0|  x|
|                   20000102|  1|3.0|  x|
|                   20000101|  2|2.0|  y|
|                   20000102|  2|4.0|  y|
+---------------------------+---+---+---+
{code}


> Pyspark issue in resolving column when there is dot (.)
> -------------------------------------------------------
>
>                 Key: SPARK-38395
>                 URL: https://issues.apache.org/jira/browse/SPARK-38395
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.0.0
>         Environment: Issue found in Mac OS Catalina, Pyspark 3.0
>            Reporter: Krishna Sangeeth KS
>            Priority: Major
>
> Pyspark apply in pandas have some difficulty in resolving columns when there 
> is dot in the column name. 
> Here is an example that I have which reproduces the issue.  Example taken by 
> modifying doctest example 
> [here|https://github.com/apache/spark/blob/branch-3.0/python/pyspark/sql/pandas/group_ops.py#L237-L248]
> {code:python}
> df1 = spark.createDataFrame(
> [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 
> 4.0)],
> ("abc|database|10.159.154|xef", "id", "v1"))
> df2 = spark.createDataFrame(
> [(20000101, 1, "x"), (20000101, 2, "y")],
> ("abc|database|10.159.154|xef", "id", "v2"))
> def asof_join(l, r):
>     return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id")
> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
> asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 
> string").show(){code}
> This gives the below error
> {code:python}
> AnalysisException                         Traceback (most recent call last)
> <ipython-input-126-b1807bb28ae3> in <module>
>       8     return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", 
> by="id")
>       9 df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
> ---> 10 asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 
> double, v2 string").show()
> ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py
>  in applyInPandas(self, func, schema)
>     295         udf = pandas_udf(
>     296             func, returnType=schema, 
> functionType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF)
> --> 297         all_cols = self._extract_cols(self._gd1) + 
> self._extract_cols(self._gd2)
>     298         udf_column = udf(*all_cols)
>     299         jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, 
> udf_column._jc.expr())
> ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py
>  in _extract_cols(gd)
>     303     def _extract_cols(gd):
>     304         df = gd._df
> --> 305         return [df[col] for col in df.columns]
>     306 
>     307 
> ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py
>  in <listcomp>(.0)
>     303     def _extract_cols(gd):
>     304         df = gd._df
> --> 305         return [df[col] for col in df.columns]
>     306 
>     307 
> ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/dataframe.py in 
> __getitem__(self, item)
>    1378         """
>    1379         if isinstance(item, basestring):
> -> 1380             jc = self._jdf.apply(item)
>    1381             return Column(jc)
>    1382         elif isinstance(item, Column):
> ~/anaconda3/envs/py37/lib/python3.7/site-packages/py4j/java_gateway.py in 
> __call__(self, *args)
>    1303         answer = self.gateway_client.send_command(command)
>    1304         return_value = get_return_value(
> -> 1305             answer, self.gateway_client, self.target_id, self.name)
>    1306 
>    1307         for temp_arg in temp_args:
> ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in 
> deco(*a, **kw)
>     135                 # Hide where the exception came from that shows a 
> non-Pythonic
>     136                 # JVM exception message.
> --> 137                 raise_from(converted)
>     138             else:
>     139                 raise
> ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in 
> raise_from(e)
> AnalysisException: Cannot resolve column name "abc|database|10.159.154|xef" 
> among (abc|database|10.159.154|xef, id, v1); did you mean to quote the 
> `abc|database|10.159.154|xef` column?;
> {code}
> As we can see the column is present there in the `among` list.
> When i replace `.` (dot) with `_` (underscore) the code actually works.
> {code:python}
> df1 = spark.createDataFrame(
> [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 
> 4.0)],
> ("abc|database|10_159_154|xef", "id", "v1"))
> df2 = spark.createDataFrame(
> [(20000101, 1, "x"), (20000101, 2, "y")],
> ("abc|database|10_159_154|xef", "id", "v2"))
> def asof_join(l, r):
>     return pd.merge_asof(l, r, on="abc|database|10_159_154|xef", by="id")
> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
> asof_join, schema="`abc|database|10_159_154|xef` int, id int, v1 double, v2 
> string").show()
> {code}
> {code:java}
> +---------------------------+---+---+---+
> |abc|database|10_159_154|xef| id| v1| v2|
> +---------------------------+---+---+---+
> |                   20000101|  1|1.0|  x|
> |                   20000102|  1|3.0|  x|
> |                   20000101|  2|2.0|  y|
> |                   20000102|  2|4.0|  y|
> +---------------------------+---+---+---+
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to