[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15030632#comment-15030632 ]
Maciej Bryński commented on SPARK-12030: ---------------------------------------- When I cache joined the result of distinct(id) is always the same but not equal 5900729. I save data from JDBC to parquet. And without cache I have same issue. I attached both dataframes as parquet. Readed by sqlCtx.read.parquet {code} t1 = sqlCtx.read.parquet('t1') t2 = sqlCtx.read.parquet('t2') t1.registerTempTable("t1") t2.registerTempTable("t2") joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") t1.count() -> 5904733 sqlCtx.sql("select distinct(id1) from t1").count() -> 5904733 t2.count() -> 54298 joined.count() -> 5904733 joined.registerTempTable("joined") sqlCtx.sql("select distinct(id1) from joined").count() -> 5904435 # this varies between queries {code} Plan for last select: {code} == Physical Plan == TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#3930L]) TungstenExchange SinglePartition, None TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3933L]) TungstenAggregate(key=[id1#3895], functions=[], output=[]) TungstenExchange hashpartitioning(id1#3895,400), None TungstenAggregate(key=[id1#3895], functions=[], output=[id1#3895]) Project [id1#3895] SortMergeOuterJoin [fk1#3896], [id2#3897], LeftOuter, None Sort [fk1#3896 ASC], false, 0 TungstenExchange hashpartitioning(fk1#3896,400), None Scan ParquetRelation[file:t1] PushedFilter: [] [id1#3895,fk1#3896] Sort [id2#3897 ASC], false, 0 TungstenExchange hashpartitioning(id2#3897,400), None Scan ParquetRelation[file:t2] PushedFilter: [] [id2#3897] {code} On GUI I see the problem in first TungstenAggregate: number of input rows: 5904733 number of output rows: 5904435 > Incorrect results when aggregate joined data > -------------------------------------------- > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.0 > Reporter: Maciej Bryński > Priority: Critical > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 5900000 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org