Krishna Sangeeth KS created SPARK-38395:
-------------------------------------------

             Summary: Pyspark issue in resolving column when there is dot (.)
                 Key: SPARK-38395
                 URL: https://issues.apache.org/jira/browse/SPARK-38395
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.0.0
         Environment: Issue found in Mac OS Catalina, Pyspark 3.0
            Reporter: Krishna Sangeeth KS


Pyspark apply in pandas have some difficult in resolving columns when there is 
dot in the column name. 

Here is an example that I have which reproduces the issue.  Example taken by 
modifying doctest example 
[here|https://github.com/apache/spark/blob/branch-3.0/python/pyspark/sql/pandas/group_ops.py#L237-L248]
{code:python}
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 
4.0)],
("abc|database|10.159.154|xef", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("abc|database|10.159.154|xef", "id", "v2"))
def asof_join(l, r):
    return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 
string").show(){code}
This gives the below error
{code:python}
AnalysisException                         Traceback (most recent call last)
<ipython-input-126-b1807bb28ae3> in <module>
      8     return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", 
by="id")
      9 df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
---> 10 asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 
double, v2 string").show()

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py
 in applyInPandas(self, func, schema)
    295         udf = pandas_udf(
    296             func, returnType=schema, 
functionType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF)
--> 297         all_cols = self._extract_cols(self._gd1) + 
self._extract_cols(self._gd2)
    298         udf_column = udf(*all_cols)
    299         jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, 
udf_column._jc.expr())

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py
 in _extract_cols(gd)
    303     def _extract_cols(gd):
    304         df = gd._df
--> 305         return [df[col] for col in df.columns]
    306 
    307 

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py
 in <listcomp>(.0)
    303     def _extract_cols(gd):
    304         df = gd._df
--> 305         return [df[col] for col in df.columns]
    306 
    307 

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/dataframe.py in 
__getitem__(self, item)
   1378         """
   1379         if isinstance(item, basestring):
-> 1380             jc = self._jdf.apply(item)
   1381             return Column(jc)
   1382         elif isinstance(item, Column):

~/anaconda3/envs/py37/lib/python3.7/site-packages/py4j/java_gateway.py in 
__call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in 
deco(*a, **kw)
    135                 # Hide where the exception came from that shows a 
non-Pythonic
    136                 # JVM exception message.
--> 137                 raise_from(converted)
    138             else:
    139                 raise

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in 
raise_from(e)

AnalysisException: Cannot resolve column name "abc|database|10.159.154|xef" 
among (abc|database|10.159.154|xef, id, v1); did you mean to quote the 
`abc|database|10.159.154|xef` column?;

{code}
As we can see the column is present there in the `among` list.

When i replace `.` (dot) with `_` (underscore) the code actually works.
{code:python}
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 
4.0)],
("abc|database|10_159_154|xef", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("abc|database|10_159_154|xef", "id", "v2"))
def asof_join(l, r):
    return pd.merge_asof(l, r, on="abc|database|10_159_154|xef", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="`abc|database|10_159_154|xef` int, id int, v1 double, v2 
string").show()
{code}
{code:java}
+---------------------------+---+---+---+
|abc|database|10_159_154|xef| id| v1| v2|
+---------------------------+---+---+---+
|                   20000101|  1|1.0|  x|
|                   20000102|  1|3.0|  x|
|                   20000101|  2|2.0|  y|
|                   20000102|  2|4.0|  y|
+---------------------------+---+---+---+
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to