[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15035164#comment-15035164 ] Xiao Li commented on SPARK-12030: - I did verify the fix using my test cases. It works! I posted a question in the PR. If possible, could anyone answer it? Really appreciate your help! Thank you! > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Assignee: Nong Li >Priority: Blocker > Fix For: 1.5.3, 1.6.0 > > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15034936#comment-15034936 ] Yin Huai commented on SPARK-12030: -- I also merged the patch to branch 1.5. Please note that, in 1.5, we do not trigger the corner case. However, in 1.6, we use the problematic method in tim sort, which exposes the problem. > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Assignee: Nong Li >Priority: Blocker > Fix For: 1.5.3, 1.6.0 > > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15034652#comment-15034652 ] Yin Huai commented on SPARK-12030: -- Yes, it will be in 1.6.0. > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Assignee: Nong Li >Priority: Blocker > Fix For: 1.6.0 > > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15034641#comment-15034641 ] Maciej Bryński commented on SPARK-12030: Will the fix be included in 1.6.0 ? > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Assignee: Nong Li >Priority: Blocker > Fix For: 1.6.0 > > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15034615#comment-15034615 ] Davies Liu commented on SPARK-12030: I also figured out the root cause last night, that's an interesting bug. > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Assignee: Nong Li >Priority: Blocker > Fix For: 1.6.0 > > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15034141#comment-15034141 ] Apache Spark commented on SPARK-12030: -- User 'nongli' has created a pull request for this issue: https://github.com/apache/spark/pull/10068 > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Blocker > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15033356#comment-15033356 ] Xiao Li commented on SPARK-12030: - [~nongli] Thank you very much! Your finding sounds reasonable. I observed it happens only when the number of rows is larger than 65539. The corrupted area always exists at 65538. Almost the last row. Please submit the PR. I will verify it. Thanks again! You are SO GREAT! > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Blocker > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15033321#comment-15033321 ] Nong Li commented on SPARK-12030: - I think I tracked it down. The bug is from this PR which exposed another bug. SPARK-7542[SQL] Support off-heap index/sort buffer https://github.com/apache/spark/pull/9477 The issue I think is in https://github.com/apache/spark/blob/master/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L108 which does not support overlapping regions. For example, If the src region is [100 - 1000) and the dest region is [500-1500) and the copy size is 100, the first copy of 100 bytes would overwrite the bytes at offset 500. This needs a check to see that the src region is before the dest and run this loop backwards I think. This only hits on larger data sets when a single copy exceeds the threshold (1M). > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Blocker > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15032939#comment-15032939 ] Xiao Li commented on SPARK-12030: - Let me post a simple case that can trigger the data corruption. The data set t1 is downloaded from this JIRA. {code} test("sort result") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", SQLConf.SHUFFLE_PARTITIONS.key -> "1") { val t1test = sqlContext.read.parquet("/Users/xiaoli/Downloads/t1").dropDuplicates().where("fk1=39 or (fk1=525 and id1 < 664618 and id1 >= 470050)").repartition(1).cache() //t1test.orderBy("fk1").explain(true) val t1 = t1test.orderBy("fk1").cache() checkAnswer( t1test, t1.collect() ) } {code} I am not sure if you can see the un-match. I am unable to reproduce it in the Thinkpad, but I can easily reproduce it in my macbook. My case did not hit any exception, but I saw a data corruption. After sorting, one row [664615,525] is replaced by another row [664611,525]. Thus one row disappears after sorting, but you can see a duplicate in another row. The number of total rows is not changed after the sort. > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Blocker > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15032927#comment-15032927 ] Yin Huai commented on SPARK-12030: -- [~smilegator] Can you post the case that triggers the problem? Also, is https://issues.apache.org/jira/browse/SPARK-12055 a related issue? > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Blocker > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15032902#comment-15032902 ] Xiao Li commented on SPARK-12030: - I already excluded Exchange and Partitioning. It should be caused by Sort. Will continue the investigation tonight. Will keep you posted if I can locate the exact changes. Thanks! > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Blocker > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15032887#comment-15032887 ] Xiao Li commented on SPARK-12030: - [SPARK-7542][SQL] Support off-heap index/sort buffer https://github.com/apache/spark/pull/9477 and [SPARK-11389][CORE] Add support for off-heap memory to MemoryManage https://github.com/apache/spark/pull/9344 The problem does not exist if I took out the code changes by these two JIRAs. The code changes of these two JIRAs are mixed. Thus, I assume it should be caused by #9477. > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Blocker > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15032857#comment-15032857 ] Davies Liu commented on SPARK-12030: [~smilegator] Could you post the related PRs here? So we can also looking into it, thanks! > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Blocker > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15032724#comment-15032724 ] Xiao Li commented on SPARK-12030: - I believe I already found which PRs introduced the regression. However, these two PRs are huge. I need more times to locate the exact lines. Thanks! > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Critical > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15031206#comment-15031206 ] Xiao Li commented on SPARK-12030: - I can reproduced a similar issue in a Sort. I think the impact could be broader. > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Critical > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15031060#comment-15031060 ] Xiao Li commented on SPARK-12030: - [~maver1ck] Yeah, the problem was introduced in 1.6.0. So far, I think it is caused by SortMergeJoin. I believe you will not hit the problem if your plan does not use sort merge join. Thanks! > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Critical > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15030887#comment-15030887 ] Maciej Bryński commented on SPARK-12030: [~smilegator] I tested 1.5.2 (binaries from spark page) and it works great. So I think it's only Spark 1.6.0 problem. > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Critical > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15030883#comment-15030883 ] Maciej Bryński commented on SPARK-12030: [~smilegator] Problem is not only with distinct but with aggregate functions in general. Sample: {code} t1.rdd.groupBy(lambda row: row.id1).map(lambda t: len(t[1])).sum() # OK: 5904733 joined.rdd.groupBy(lambda row: row.id1).map(lambda t: len(t[1])).sum() # Not OK: 5904629 {code} > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Critical > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15030694#comment-15030694 ] Xiao Li commented on SPARK-12030: - I can reproduce it now. Will take a look at it and try to fix it. Thanks! > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Critical > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15030652#comment-15030652 ] Xiao Li commented on SPARK-12030: - Thank you! [~maver1ck] That will be great if we can know if this is an existing bug in 1.5.2. We need to deliver the fix to all the affected versions. > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Critical > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15030643#comment-15030643 ] Maciej Bryński commented on SPARK-12030: I tried following things: - disable kryoserializer - disable lz4 - change join type to inner Without success. Next I'll try to reproduce this on 1.5.2. > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Critical > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15030638#comment-15030638 ] Xiao Li commented on SPARK-12030: - Trying to reproduce it using your parquet files. Thanks! > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Critical > Attachments: spark.jpg, t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15030633#comment-15030633 ] Maciej Bryński commented on SPARK-12030: And spark-defaults.conf: {code} spark.master spark://somehost:7077 spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory 20g spark.executor.extraJavaOptions -XX:-UseGCOverheadLimit -XX:+UseG1GC -Dhdp.version=current -XX:-UseCompressedOops spark.driver.extraJavaOptions -XX:-UseGCOverheadLimit -XX:+UseG1GC -Dhdp.version=current -XX:-UseCompressedOops spark.executor.memory 30g spark.storage.memoryFraction0.2 spark.shuffle.memoryFraction0.4 spark.driver.maxResultSize 4g spark.cores.max 30 spark.executor.extraClassPath spark/mysql-connector-java-5.1.35-bin.jar:spark/spark-csv_2.10-1.3.0-SNAPSHOT.jar:spark/commons-csv-1.2.jar:spark/aerospike-spark-0.3-SNAPSHOT-jar-with-dependencies.jar spark.driver.extraClassPath spark/mysql-connector-java-5.1.35-bin.jar:spark/spark-csv_2.10-1.3.0-SNAPSHOT.jar:spark/commons-csv-1.2.jar:spark/aerospike-spark-0.3-SNAPSHOT-jar-with-dependencies.jar spark.kryoserializer.buffer.max 1g spark.default.parallelism 400 spark.sql.autoBroadcastJoinThreshold0 spark.yarn.am.extraJavaOptions -Dhdp.version=current spark.executor.instances10 spark.yarn.queuedwh spark.python.worker.reuse false spark.sql.shuffle.partitions400 spark.io.compression.codec lz4 spark.eventLog.enabled true spark.eventLog.dir history/ {code} > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Critical > Attachments: t1.tar.gz, t2.tar.gz > > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15030632#comment-15030632 ] Maciej Bryński commented on SPARK-12030: When I cache joined the result of distinct(id) is always the same but not equal 5900729. I save data from JDBC to parquet. And without cache I have same issue. I attached both dataframes as parquet. Readed by sqlCtx.read.parquet {code} t1 = sqlCtx.read.parquet('t1') t2 = sqlCtx.read.parquet('t2') t1.registerTempTable("t1") t2.registerTempTable("t2") joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") t1.count() -> 5904733 sqlCtx.sql("select distinct(id1) from t1").count() -> 5904733 t2.count() -> 54298 joined.count() -> 5904733 joined.registerTempTable("joined") sqlCtx.sql("select distinct(id1) from joined").count() -> 5904435 # this varies between queries {code} Plan for last select: {code} == Physical Plan == TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#3930L]) TungstenExchange SinglePartition, None TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3933L]) TungstenAggregate(key=[id1#3895], functions=[], output=[]) TungstenExchange hashpartitioning(id1#3895,400), None TungstenAggregate(key=[id1#3895], functions=[], output=[id1#3895]) Project [id1#3895] SortMergeOuterJoin [fk1#3896], [id2#3897], LeftOuter, None Sort [fk1#3896 ASC], false, 0 TungstenExchange hashpartitioning(fk1#3896,400), None Scan ParquetRelation[file:t1] PushedFilter: [] [id1#3895,fk1#3896] Sort [id2#3897 ASC], false, 0 TungstenExchange hashpartitioning(id2#3897,400), None Scan ParquetRelation[file:t2] PushedFilter: [] [id2#3897] {code} On GUI I see the problem in first TungstenAggregate: number of input rows: 5904733 number of output rows: 5904435 > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Critical > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15030618#comment-15030618 ] Xiao Li commented on SPARK-12030: - If you cache `joined`, can you see the same issue? > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Critical > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15030615#comment-15030615 ] Xiu(Joe) Guo commented on SPARK-12030: -- I tried your scenario with some TPCDS table last night, joined on integer columns, but could not reproduce incorrect results. Does your table have very large integer values which might overflow? > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Critical > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15030612#comment-15030612 ] Maciej Bryński commented on SPARK-12030: id1, id2 and fk1 are integers. > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Critical > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data
[ https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15030430#comment-15030430 ] Xiao Li commented on SPARK-12030: - What is the data type of id1? > Incorrect results when aggregate joined data > > > Key: SPARK-12030 > URL: https://issues.apache.org/jira/browse/SPARK-12030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Maciej Bryński >Priority: Critical > > I have following issue. > I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2) > {code} > t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache() > t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache() > joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer") > {code} > Important: both table are cached, so results should be the same on every > query. > Then I did come counts: > {code} > t1.count() -> 5900729 > t1.registerTempTable("t1") > sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729 > t2.count() -> 54298 > joined.count() -> 5900729 > {code} > And here magic begins - I counted distinct id1 from joined table > {code} > joined.registerTempTable("joined") > sqlCtx.sql("select distinct(id1) from joined").count() > {code} > Results varies *(are different on every run)* between 5899000 and > 590 but never are equal to 5900729. > In addition. I did more queries: > {code} > sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > > 1").collect() > {code} > This gives some results but this query return *1* > {code} > len(sqlCtx.sql("select * from joined where id1 = result").collect()) > {code} > What's wrong ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org