[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15030632#comment-15030632
 ] 

Maciej Bryński commented on SPARK-12030:
----------------------------------------

When I cache joined the result of distinct(id) is always the same but not equal 
5900729.

I save data from JDBC to parquet. 
And without cache I have same issue.

I attached both dataframes as parquet. Readed by sqlCtx.read.parquet

{code}
t1 = sqlCtx.read.parquet('t1')
t2 = sqlCtx.read.parquet('t2')
t1.registerTempTable("t1")
t2.registerTempTable("t2")
joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
t1.count()  -> 5904733
sqlCtx.sql("select distinct(id1) from t1").count() -> 5904733
t2.count() -> 54298
joined.count() -> 5904733
joined.registerTempTable("joined")
sqlCtx.sql("select distinct(id1) from joined").count() -> 5904435 # this varies 
between queries
{code}

Plan for last select:
{code}
== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
output=[count#3930L])
 TungstenExchange SinglePartition, None
  TungstenAggregate(key=[], 
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3933L])
   TungstenAggregate(key=[id1#3895], functions=[], output=[])
    TungstenExchange hashpartitioning(id1#3895,400), None
     TungstenAggregate(key=[id1#3895], functions=[], output=[id1#3895])
      Project [id1#3895]
       SortMergeOuterJoin [fk1#3896], [id2#3897], LeftOuter, None
        Sort [fk1#3896 ASC], false, 0
         TungstenExchange hashpartitioning(fk1#3896,400), None
          Scan ParquetRelation[file:t1] PushedFilter: [] [id1#3895,fk1#3896]
        Sort [id2#3897 ASC], false, 0
         TungstenExchange hashpartitioning(id2#3897,400), None
          Scan ParquetRelation[file:t2] PushedFilter: [] [id2#3897]
{code}
On GUI I see the problem in first TungstenAggregate:
number of input rows: 5904733
number of output rows: 5904435

> Incorrect results when aggregate joined data
> --------------------------------------------
>
>                 Key: SPARK-12030
>                 URL: https://issues.apache.org/jira/browse/SPARK-12030
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0
>            Reporter: Maciej Bryński
>            Priority: Critical
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 5900000 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to