[ 
https://issues.apache.org/jira/browse/SPARK-20169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15972270#comment-15972270
 ] 

Hyukjin Kwon commented on SPARK-20169:
--------------------------------------

Yea, I was confused too when I tried to reproduce this before. Probably, let me 
repharse the steps to reproduce as below:

*source A*

with a {{graph.csv}} file as below:

{code}
src,dst
1,2
1,3
1,4
2,1
3,1
4,1
{code}

{code}
from pyspark.sql.functions import *

e=sc.parallelize([(1,2),(1,3),(1,4),(2,1),(3,1),(4,1)]).toDF(["src","dst"])
e.show()
{code}

{code}
+---+---+
|src|dst|
+---+---+
|  1|  2|
|  1|  3|
|  1|  4|
|  2|  1|
|  3|  1|
|  4|  1|
+---+---+
{code}


*source B*


{code}
from pyspark.sql.functions import *

e = spark.read.csv("graph.csv", header=True)
e.show()
{code}

{code}
+---+---+
|src|dst|
+---+---+
|  1|  2|
|  1|  3|
|  1|  4|
|  2|  1|
|  3|  1|
|  4|  1|
+---+---+
{code}


*Reproducer*

{code}
r = sc.parallelize([(1,),(2,),(3,),(4,)]).toDF(['src'])
r1 = e.join(r, 'src').groupBy('dst').count().withColumnRenamed('dst','src')
jr = e.join(r1, 'src')
r2 = jr.groupBy('dst').count()
r2.show()
{code}



*Results from source A and the reproducer*

{code}
+---+-----+
|dst|count|
+---+-----+
|  1|    3|
|  3|    1|
|  2|    1|
|  4|    1|
+---+-----+
{code}


*Results from source B and the reproducer*

{code}
+---+-----+
|dst|count|
+---+-----+
|  1|    1|
|  4|    1|
|  3|    1|
|  2|    1|
|  1|    1|
|  1|    1|
+---+-----+
{code}

> Groupby Bug with Sparksql
> -------------------------
>
>                 Key: SPARK-20169
>                 URL: https://issues.apache.org/jira/browse/SPARK-20169
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0, 2.1.0
>            Reporter: Bin Wu
>
> We find a potential bug in Catalyst optimizer which cannot correctly 
> process "groupby". You can reproduce it by following simple example:
> =========================
> from pyspark.sql.functions import *
> #e=sc.parallelize([(1,2),(1,3),(1,4),(2,1),(3,1),(4,1)]).toDF(["src","dst"])
> e = spark.read.csv("graph.csv", header=True)
> r = sc.parallelize([(1,),(2,),(3,),(4,)]).toDF(['src'])
> r1 = e.join(r, 'src').groupBy('dst').count().withColumnRenamed('dst','src')
> jr = e.join(r1, 'src')
> jr.show()
> r2 = jr.groupBy('dst').count()
> r2.show()
> =========================
> FYI, "graph.csv" contains exactly the same data as the commented line.
> You can find that jr is:
> |src|dst|count|
> |  3|  1|    1|
> |  1|  4|    3|
> |  1|  3|    3|
> |  1|  2|    3|
> |  4|  1|    1|
> |  2|  1|    1|
> But, after the last groupBy, the 3 rows with dst = 1 are not grouped together:
> |dst|count|
> |  1|    1|
> |  4|    1|
> |  3|    1|
> |  2|    1|
> |  1|    1|
> |  1|    1|
> If we build jr directly from raw data (commented line), this error will not 
> show up.  So 
> we suspect  that there is a bug in the Catalyst optimizer when multiple joins 
> and groupBy's 
> are being optimized. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to