[ 
https://issues.apache.org/jira/browse/SPARK-31370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Riccardo Delegà updated SPARK-31370:
------------------------------------
    Description: 
Python version: '3.7.6 | packaged by conda-forge | (default, Jan 7 2020, 
21:48:41) [MSC v.1916 64 bit (AMD64)]'

I'm getting a weird problem when I'm joining tables with a high number of 
columns.

 
{code:java}
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F

number_of_columns = 50

spark = SparkSession.builder.getOrCreate()

base_df = spark.range(1,100)

secondary_id_df = base_df\
    .withColumn('id1', F.col('id'))\
    .withColumn('id2', F.col('id'))\
    .withColumn('id3', F.col('id'))\
    .withColumn('id4', F.col('id'))

sales_df = base_df
for i in range(1, number_of_columns):
    sales_df = sales_df.withColumn(f'kpi{i}', (F.rand()*100000).cast("int"))

sales_df.registerTempTable('sales')
secondary_id_df.registerTempTable('secondary_id'){code}
 

If I run the following query:

 
{code:java}
spark.sql("""
SELECT
    *
FROM
    secondary_id
    LEFT OUTER JOIN sales s0 ON secondary_id.id = s0.id
    LEFT OUTER JOIN sales s1 ON secondary_id.id1 = s1.id
    LEFT OUTER JOIN sales s2 ON secondary_id.id2 = s2.id
""")
{code}
 

I get the following exception:

 
{code:java}
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pyspark\sql\utils.py in 
deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

~\AppData\Local\Continuum\anaconda3\lib\site-packages\py4j\protocol.py in 
get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o22.sql.
: org.apache.spark.sql.AnalysisException: cannot resolve '`secondary_id.id2`' 
given input columns: [s0.kpi41, s0.kpi5, s1.kpi25, s1.kpi13, s2.kpi14, 
s2.kpi48, s2.kpi20, s1.id, s1.kpi12, s1.kpi43, s1.kpi32, s2.kpi8, s0.kpi1, 
s1.kpi28, s0.kpi28, s0.kpi18, s0.kpi16, secondary_id.id1, s2.kpi7, s0.kpi2, 
s0.kpi7, s2.kpi12, s1.kpi36, s0.kpi4, s2.kpi41, s1.kpi34, s0.kpi34, s2.kpi34, 
s2.kpi32, s1.kpi40, s1.kpi39, s0.kpi15, s1.kpi2, s0.kpi43, s0.kpi26, s1.kpi1, 
s1.kpi37, s2.kpi37, s1.kpi46, s1.kpi47, s1.kpi41, s1.kpi18, s1.kpi35, s2.kpi10, 
s2.kpi1, s2.kpi49, s1.kpi17, s2.kpi39, s0.kpi47, s0.kpi37, s0.kpi13, s2.kpi30, 
s1.kpi6, s1.kpi45, s2.kpi25, s1.kpi30, s2.kpi33, s2.kpi26, s2.kpi3, s2.kpi23, 
s0.kpi27, s0.kpi32, s0.kpi24, s1.kpi21, s1.kpi8, s1.kpi33, s2.kpi31, s2.kpi2, 
s2.kpi45, s0.kpi46, s0.kpi30, s2.kpi15, s0.kpi23, s0.kpi25, s2.kpi42, s0.kpi19, 
s2.kpi22, s0.kpi29, s1.kpi9, s2.kpi46, s1.kpi10, s1.kpi14, s1.kpi7, s0.kpi3, 
s0.kpi45, s2.kpi21, s2.kpi47, s2.kpi40, s0.kpi14, s2.kpi28, s1.kpi15, s1.kpi24, 
s2.kpi19, s2.kpi38, s1.kpi42, s2.kpi29, s0.kpi39, s0.kpi6, s0.kpi9, s1.kpi20, 
s2.kpi6, s1.kpi11, s0.kpi12, secondary_id.id4, s0.id, s0.kpi44, s0.kpi11, 
s0.kpi20, s2.kpi27, s0.kpi42, s0.kpi33, s2.kpi16, s1.kpi26, s2.kpi43, s0.kpi48, 
s0.kpi8, secondary_id.id2, s1.kpi49, s1.kpi4, s1.kpi48, s1.kpi31, s2.kpi35, 
s2.kpi17, s0.kpi10, s1.kpi16, s1.kpi22, s1.kpi29, s2.kpi11, s0.kpi49, s0.kpi40, 
s2.kpi5, s2.kpi4, s1.kpi3, s2.kpi13, secondary_id.id, s1.kpi27, s2.kpi9, 
s2.kpi36, s0.kpi21, s2.kpi44, s1.kpi44, s0.kpi35, s1.kpi19, s2.kpi18, s0.kpi36, 
s0.kpi17, s0.kpi38, secondary_id.id3, s0.kpi31, s1.kpi38, s0.kpi22, s2.kpi24, 
s1.kpi5, s1.kpi23]; line 8 pos 32;
{code}
 

even though the column "secondary_id.id2" is present in the input columns list.

If I remove the last join, the query runs fine:
{code:java}
spark.sql("""
SELECT
    *
FROM
    secondary_id
    LEFT OUTER JOIN sales s0 ON secondary_id.id = s0.id
    LEFT OUTER JOIN sales s1 ON secondary_id.id1 = s1.id
""")
{code}
At the same time, if I reimplement the initial query using Spark APIs, the code 
doesn't raise an exception: 
{code:java}
secondary_id_df.alias('secondary_id')\
    .join(sales_df.alias('s0'), on=col('secondary_id.id')==col('s0.id'))\
    .join(sales_df.alias('s1'), on=col('secondary_id.id')==col('s1.id'))\
    .join(sales_df.alias('s2'), on=col('secondary_id.id')==col('s2.id'))
{code}
Raising the parameter "number_of_columns" to 150 makes this piece of code raise 
an exception as well, so the bug seems connected to the number of columns.

  was:
Python version: '3.7.6 | packaged by conda-forge | (default, Jan 7 2020, 
21:48:41) [MSC v.1916 64 bit (AMD64)]'

I'm getting a weird problem when I'm joining tables with too many columns.

 
{code:java}
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F

number_of_columns = 50

spark = SparkSession.builder.getOrCreate()

base_df = spark.range(1,100)

secondary_id_df = base_df\
    .withColumn('id1', F.col('id'))\
    .withColumn('id2', F.col('id'))\
    .withColumn('id3', F.col('id'))\
    .withColumn('id4', F.col('id'))

sales_df = base_df
for i in range(1, number_of_columns):
    sales_df = sales_df.withColumn(f'kpi{i}', (F.rand()*100000).cast("int"))

sales_df.registerTempTable('sales')
secondary_id_df.registerTempTable('secondary_id'){code}
 

If I run the following query:

 
{code:java}
spark.sql("""
SELECT
    *
FROM
    secondary_id
    LEFT OUTER JOIN sales s0 ON secondary_id.id = s0.id
    LEFT OUTER JOIN sales s1 ON secondary_id.id1 = s1.id
    LEFT OUTER JOIN sales s2 ON secondary_id.id2 = s2.id
""")
{code}
 

I get the following exception:

 
{code:java}
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pyspark\sql\utils.py in 
deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

~\AppData\Local\Continuum\anaconda3\lib\site-packages\py4j\protocol.py in 
get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o22.sql.
: org.apache.spark.sql.AnalysisException: cannot resolve '`secondary_id.id2`' 
given input columns: [s0.kpi41, s0.kpi5, s1.kpi25, s1.kpi13, s2.kpi14, 
s2.kpi48, s2.kpi20, s1.id, s1.kpi12, s1.kpi43, s1.kpi32, s2.kpi8, s0.kpi1, 
s1.kpi28, s0.kpi28, s0.kpi18, s0.kpi16, secondary_id.id1, s2.kpi7, s0.kpi2, 
s0.kpi7, s2.kpi12, s1.kpi36, s0.kpi4, s2.kpi41, s1.kpi34, s0.kpi34, s2.kpi34, 
s2.kpi32, s1.kpi40, s1.kpi39, s0.kpi15, s1.kpi2, s0.kpi43, s0.kpi26, s1.kpi1, 
s1.kpi37, s2.kpi37, s1.kpi46, s1.kpi47, s1.kpi41, s1.kpi18, s1.kpi35, s2.kpi10, 
s2.kpi1, s2.kpi49, s1.kpi17, s2.kpi39, s0.kpi47, s0.kpi37, s0.kpi13, s2.kpi30, 
s1.kpi6, s1.kpi45, s2.kpi25, s1.kpi30, s2.kpi33, s2.kpi26, s2.kpi3, s2.kpi23, 
s0.kpi27, s0.kpi32, s0.kpi24, s1.kpi21, s1.kpi8, s1.kpi33, s2.kpi31, s2.kpi2, 
s2.kpi45, s0.kpi46, s0.kpi30, s2.kpi15, s0.kpi23, s0.kpi25, s2.kpi42, s0.kpi19, 
s2.kpi22, s0.kpi29, s1.kpi9, s2.kpi46, s1.kpi10, s1.kpi14, s1.kpi7, s0.kpi3, 
s0.kpi45, s2.kpi21, s2.kpi47, s2.kpi40, s0.kpi14, s2.kpi28, s1.kpi15, s1.kpi24, 
s2.kpi19, s2.kpi38, s1.kpi42, s2.kpi29, s0.kpi39, s0.kpi6, s0.kpi9, s1.kpi20, 
s2.kpi6, s1.kpi11, s0.kpi12, secondary_id.id4, s0.id, s0.kpi44, s0.kpi11, 
s0.kpi20, s2.kpi27, s0.kpi42, s0.kpi33, s2.kpi16, s1.kpi26, s2.kpi43, s0.kpi48, 
s0.kpi8, secondary_id.id2, s1.kpi49, s1.kpi4, s1.kpi48, s1.kpi31, s2.kpi35, 
s2.kpi17, s0.kpi10, s1.kpi16, s1.kpi22, s1.kpi29, s2.kpi11, s0.kpi49, s0.kpi40, 
s2.kpi5, s2.kpi4, s1.kpi3, s2.kpi13, secondary_id.id, s1.kpi27, s2.kpi9, 
s2.kpi36, s0.kpi21, s2.kpi44, s1.kpi44, s0.kpi35, s1.kpi19, s2.kpi18, s0.kpi36, 
s0.kpi17, s0.kpi38, secondary_id.id3, s0.kpi31, s1.kpi38, s0.kpi22, s2.kpi24, 
s1.kpi5, s1.kpi23]; line 8 pos 32;
{code}
 

even though the column "secondary_id.id2" is present in the input columns list.

If I remove the last join, the query runs fine:
{code:java}
spark.sql("""
SELECT
    *
FROM
    secondary_id
    LEFT OUTER JOIN sales s0 ON secondary_id.id = s0.id
    LEFT OUTER JOIN sales s1 ON secondary_id.id1 = s1.id
""")
{code}
At the same time, if I reimplement the initial query using Spark APIs, the code 
doesn't raise an exception: 
{code:java}
secondary_id_df.alias('secondary_id')\
    .join(sales_df.alias('s0'), on=col('secondary_id.id')==col('s0.id'))\
    .join(sales_df.alias('s1'), on=col('secondary_id.id')==col('s1.id'))\
    .join(sales_df.alias('s2'), on=col('secondary_id.id')==col('s2.id'))
{code}
Raising the parameter "number_of_columns" to 150 makes this piece of code raise 
an exception as well, so the bug seems connected to the number of columns.


> AnalysisException when too many columns in join
> -----------------------------------------------
>
>                 Key: SPARK-31370
>                 URL: https://issues.apache.org/jira/browse/SPARK-31370
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 2.4.4
>            Reporter: Riccardo Delegà
>            Priority: Minor
>
> Python version: '3.7.6 | packaged by conda-forge | (default, Jan 7 2020, 
> 21:48:41) [MSC v.1916 64 bit (AMD64)]'
> I'm getting a weird problem when I'm joining tables with a high number of 
> columns.
>  
> {code:java}
> from pyspark.sql import SparkSession
> import  pyspark.sql.functions as F
> number_of_columns = 50
> spark = SparkSession.builder.getOrCreate()
> base_df = spark.range(1,100)
> secondary_id_df = base_df\
>     .withColumn('id1', F.col('id'))\
>     .withColumn('id2', F.col('id'))\
>     .withColumn('id3', F.col('id'))\
>     .withColumn('id4', F.col('id'))
> sales_df = base_df
> for i in range(1, number_of_columns):
>     sales_df = sales_df.withColumn(f'kpi{i}', (F.rand()*100000).cast("int"))
> sales_df.registerTempTable('sales')
> secondary_id_df.registerTempTable('secondary_id'){code}
>  
> If I run the following query:
>  
> {code:java}
> spark.sql("""
> SELECT
>     *
> FROM
>     secondary_id
>     LEFT OUTER JOIN sales s0 ON secondary_id.id = s0.id
>     LEFT OUTER JOIN sales s1 ON secondary_id.id1 = s1.id
>     LEFT OUTER JOIN sales s2 ON secondary_id.id2 = s2.id
> """)
> {code}
>  
> I get the following exception:
>  
> {code:java}
> ---------------------------------------------------------------------------
> Py4JJavaError                             Traceback (most recent call last)
> ~\AppData\Local\Continuum\anaconda3\lib\site-packages\pyspark\sql\utils.py in 
> deco(*a, **kw)
>      62         try:
> ---> 63             return f(*a, **kw)
>      64         except py4j.protocol.Py4JJavaError as e:
> ~\AppData\Local\Continuum\anaconda3\lib\site-packages\py4j\protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)
>     327                     "An error occurred while calling {0}{1}{2}.\n".
> --> 328                     format(target_id, ".", name), value)
>     329             else:
> Py4JJavaError: An error occurred while calling o22.sql.
> : org.apache.spark.sql.AnalysisException: cannot resolve '`secondary_id.id2`' 
> given input columns: [s0.kpi41, s0.kpi5, s1.kpi25, s1.kpi13, s2.kpi14, 
> s2.kpi48, s2.kpi20, s1.id, s1.kpi12, s1.kpi43, s1.kpi32, s2.kpi8, s0.kpi1, 
> s1.kpi28, s0.kpi28, s0.kpi18, s0.kpi16, secondary_id.id1, s2.kpi7, s0.kpi2, 
> s0.kpi7, s2.kpi12, s1.kpi36, s0.kpi4, s2.kpi41, s1.kpi34, s0.kpi34, s2.kpi34, 
> s2.kpi32, s1.kpi40, s1.kpi39, s0.kpi15, s1.kpi2, s0.kpi43, s0.kpi26, s1.kpi1, 
> s1.kpi37, s2.kpi37, s1.kpi46, s1.kpi47, s1.kpi41, s1.kpi18, s1.kpi35, 
> s2.kpi10, s2.kpi1, s2.kpi49, s1.kpi17, s2.kpi39, s0.kpi47, s0.kpi37, 
> s0.kpi13, s2.kpi30, s1.kpi6, s1.kpi45, s2.kpi25, s1.kpi30, s2.kpi33, 
> s2.kpi26, s2.kpi3, s2.kpi23, s0.kpi27, s0.kpi32, s0.kpi24, s1.kpi21, s1.kpi8, 
> s1.kpi33, s2.kpi31, s2.kpi2, s2.kpi45, s0.kpi46, s0.kpi30, s2.kpi15, 
> s0.kpi23, s0.kpi25, s2.kpi42, s0.kpi19, s2.kpi22, s0.kpi29, s1.kpi9, 
> s2.kpi46, s1.kpi10, s1.kpi14, s1.kpi7, s0.kpi3, s0.kpi45, s2.kpi21, s2.kpi47, 
> s2.kpi40, s0.kpi14, s2.kpi28, s1.kpi15, s1.kpi24, s2.kpi19, s2.kpi38, 
> s1.kpi42, s2.kpi29, s0.kpi39, s0.kpi6, s0.kpi9, s1.kpi20, s2.kpi6, s1.kpi11, 
> s0.kpi12, secondary_id.id4, s0.id, s0.kpi44, s0.kpi11, s0.kpi20, s2.kpi27, 
> s0.kpi42, s0.kpi33, s2.kpi16, s1.kpi26, s2.kpi43, s0.kpi48, s0.kpi8, 
> secondary_id.id2, s1.kpi49, s1.kpi4, s1.kpi48, s1.kpi31, s2.kpi35, s2.kpi17, 
> s0.kpi10, s1.kpi16, s1.kpi22, s1.kpi29, s2.kpi11, s0.kpi49, s0.kpi40, 
> s2.kpi5, s2.kpi4, s1.kpi3, s2.kpi13, secondary_id.id, s1.kpi27, s2.kpi9, 
> s2.kpi36, s0.kpi21, s2.kpi44, s1.kpi44, s0.kpi35, s1.kpi19, s2.kpi18, 
> s0.kpi36, s0.kpi17, s0.kpi38, secondary_id.id3, s0.kpi31, s1.kpi38, s0.kpi22, 
> s2.kpi24, s1.kpi5, s1.kpi23]; line 8 pos 32;
> {code}
>  
> even though the column "secondary_id.id2" is present in the input columns 
> list.
> If I remove the last join, the query runs fine:
> {code:java}
> spark.sql("""
> SELECT
>     *
> FROM
>     secondary_id
>     LEFT OUTER JOIN sales s0 ON secondary_id.id = s0.id
>     LEFT OUTER JOIN sales s1 ON secondary_id.id1 = s1.id
> """)
> {code}
> At the same time, if I reimplement the initial query using Spark APIs, the 
> code doesn't raise an exception: 
> {code:java}
> secondary_id_df.alias('secondary_id')\
>     .join(sales_df.alias('s0'), on=col('secondary_id.id')==col('s0.id'))\
>     .join(sales_df.alias('s1'), on=col('secondary_id.id')==col('s1.id'))\
>     .join(sales_df.alias('s2'), on=col('secondary_id.id')==col('s2.id'))
> {code}
> Raising the parameter "number_of_columns" to 150 makes this piece of code 
> raise an exception as well, so the bug seems connected to the number of 
> columns.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to