[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-12-01 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15034641#comment-15034641
 ] 

Maciej Bryński commented on SPARK-12030:


Will the fix be included in 1.6.0 ?

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Assignee: Nong Li
>Priority: Blocker
> Fix For: 1.6.0
>
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-12-01 Thread Davies Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15034615#comment-15034615
 ] 

Davies Liu commented on SPARK-12030:


I also figured out the root cause last night, that's an interesting bug.

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Assignee: Nong Li
>Priority: Blocker
> Fix For: 1.6.0
>
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-12-01 Thread Yin Huai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15034652#comment-15034652
 ] 

Yin Huai commented on SPARK-12030:
--

Yes, it will be in 1.6.0.

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Assignee: Nong Li
>Priority: Blocker
> Fix For: 1.6.0
>
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-12-01 Thread Yin Huai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15034936#comment-15034936
 ] 

Yin Huai commented on SPARK-12030:
--

I also merged the patch to branch 1.5. Please note that, in 1.5, we do not 
trigger the corner case. However, in 1.6, we use the problematic method in tim 
sort, which exposes the problem.

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Assignee: Nong Li
>Priority: Blocker
> Fix For: 1.5.3, 1.6.0
>
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-12-01 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15035164#comment-15035164
 ] 

Xiao Li commented on SPARK-12030:
-

I did verify the fix using my test cases. It works! 

I posted a question in the PR. If possible, could anyone answer it? Really 
appreciate your help! 

Thank you! 

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Assignee: Nong Li
>Priority: Blocker
> Fix For: 1.5.3, 1.6.0
>
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-12-01 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15034141#comment-15034141
 ] 

Apache Spark commented on SPARK-12030:
--

User 'nongli' has created a pull request for this issue:
https://github.com/apache/spark/pull/10068

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Blocker
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-12-01 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033356#comment-15033356
 ] 

Xiao Li commented on SPARK-12030:
-

[~nongli] Thank you very much! 

Your finding sounds reasonable. I observed it happens only when the number of 
rows is larger than 65539. The corrupted area always exists at 65538.  Almost 
the last row. 

Please submit the PR. I will verify it. 

Thanks again! You are SO GREAT!

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Blocker
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-12-01 Thread Nong Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033321#comment-15033321
 ] 

Nong Li commented on SPARK-12030:
-

I think I tracked it down. The bug is from this PR which exposed another bug.

SPARK-7542[SQL] Support off-heap index/sort buffer
https://github.com/apache/spark/pull/9477

The issue I think is in 
https://github.com/apache/spark/blob/master/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L108
which does not support overlapping regions. For example,

If the src region is [100 - 1000) and the dest region is [500-1500) and the 
copy size is 100, the first copy of 100 bytes would overwrite the bytes at 
offset 500. This needs a check to see that the src region is before the dest 
and run this loop backwards I think.

This only hits on larger data sets when a single copy exceeds the threshold 
(1M).

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Blocker
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-30 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15032887#comment-15032887
 ] 

Xiao Li commented on SPARK-12030:
-

[SPARK-7542][SQL] Support off-heap index/sort buffer
https://github.com/apache/spark/pull/9477

and

[SPARK-11389][CORE] Add support for off-heap memory to MemoryManage
https://github.com/apache/spark/pull/9344

The problem does not exist if I took out the code changes by these two JIRAs. 
The code changes of these two JIRAs are mixed. Thus, I assume it should be 
caused by #9477.


> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Blocker
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-30 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15032939#comment-15032939
 ] 

Xiao Li commented on SPARK-12030:
-

Let me post a simple case that can trigger the data corruption. The data set t1 
is downloaded from this JIRA. 

{code}
test("sort result") {
  withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
  SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
  val t1test = 
sqlContext.read.parquet("/Users/xiaoli/Downloads/t1").dropDuplicates().where("fk1=39
 or (fk1=525 and id1 < 664618 and id1 >= 470050)").repartition(1).cache()

  //t1test.orderBy("fk1").explain(true)
  val t1 = t1test.orderBy("fk1").cache()

  checkAnswer( t1test, t1.collect() )
}
{code}

I am not sure if you can see the un-match. I am unable to reproduce it in the 
Thinkpad, but I can easily reproduce it in my macbook. 

My case did not hit any exception, but I saw a data corruption. After sorting, 
one row [664615,525] is replaced by another row [664611,525]. Thus one row 
disappears after sorting, but you can see a duplicate in another row. The 
number of total rows is not changed after the sort. 

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Blocker
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-30 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15032724#comment-15032724
 ] 

Xiao Li commented on SPARK-12030:
-

I believe I already found which PRs introduced the regression. However, these 
two PRs are huge. I need more times to locate the exact lines. 

Thanks!

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Critical
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-30 Thread Davies Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15032857#comment-15032857
 ] 

Davies Liu commented on SPARK-12030:


[~smilegator] Could you post the related PRs here? So we can also looking into 
it, thanks!

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Blocker
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-30 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15032902#comment-15032902
 ] 

Xiao Li commented on SPARK-12030:
-

I already excluded Exchange and Partitioning. It should be caused by Sort. Will 
continue the investigation tonight. Will keep you posted if I can locate the 
exact changes. Thanks!

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Blocker
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-30 Thread Yin Huai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15032927#comment-15032927
 ] 

Yin Huai commented on SPARK-12030:
--

[~smilegator] Can you post the case that triggers the problem? Also, is 
https://issues.apache.org/jira/browse/SPARK-12055 a related issue?

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Blocker
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-29 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15031206#comment-15031206
 ] 

Xiao Li commented on SPARK-12030:
-

I can reproduced a similar issue in a Sort. I think the impact could be 
broader. 

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Critical
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-29 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15030883#comment-15030883
 ] 

Maciej Bryński commented on SPARK-12030:


[~smilegator]
Problem is not only with distinct but with aggregate functions in general. 
Sample:
{code}
t1.rdd.groupBy(lambda row: row.id1).map(lambda t: len(t[1])).sum() # OK: 5904733
joined.rdd.groupBy(lambda row: row.id1).map(lambda t: len(t[1])).sum() # Not 
OK: 5904629
{code}

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Critical
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-29 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15030887#comment-15030887
 ] 

Maciej Bryński commented on SPARK-12030:


[~smilegator]
I tested 1.5.2 (binaries from spark page) and it works great.
So I think it's only Spark 1.6.0 problem.

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Critical
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-29 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15031060#comment-15031060
 ] 

Xiao Li commented on SPARK-12030:
-

[~maver1ck] Yeah, the problem was introduced in 1.6.0. So far, I think it is 
caused by SortMergeJoin. I believe you will not hit the problem if your plan 
does not use sort merge join. Thanks!

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Critical
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-28 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15030430#comment-15030430
 ] 

Xiao Li commented on SPARK-12030:
-

What is the data type of id1?

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Critical
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-28 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15030618#comment-15030618
 ] 

Xiao Li commented on SPARK-12030:
-

If you cache `joined`, can you see the same issue? 

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Critical
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15030612#comment-15030612
 ] 

Maciej Bryński commented on SPARK-12030:


id1, id2 and fk1 are integers.

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Critical
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-28 Thread Xiu(Joe) Guo (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15030615#comment-15030615
 ] 

Xiu(Joe) Guo commented on SPARK-12030:
--

I tried your scenario with some TPCDS table last night, joined on integer 
columns, but could not reproduce incorrect results.

Does your table have very large integer values which might overflow?

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Critical
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15030632#comment-15030632
 ] 

Maciej Bryński commented on SPARK-12030:


When I cache joined the result of distinct(id) is always the same but not equal 
5900729.

I save data from JDBC to parquet. 
And without cache I have same issue.

I attached both dataframes as parquet. Readed by sqlCtx.read.parquet

{code}
t1 = sqlCtx.read.parquet('t1')
t2 = sqlCtx.read.parquet('t2')
t1.registerTempTable("t1")
t2.registerTempTable("t2")
joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
t1.count()  -> 5904733
sqlCtx.sql("select distinct(id1) from t1").count() -> 5904733
t2.count() -> 54298
joined.count() -> 5904733
joined.registerTempTable("joined")
sqlCtx.sql("select distinct(id1) from joined").count() -> 5904435 # this varies 
between queries
{code}

Plan for last select:
{code}
== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
output=[count#3930L])
 TungstenExchange SinglePartition, None
  TungstenAggregate(key=[], 
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3933L])
   TungstenAggregate(key=[id1#3895], functions=[], output=[])
TungstenExchange hashpartitioning(id1#3895,400), None
 TungstenAggregate(key=[id1#3895], functions=[], output=[id1#3895])
  Project [id1#3895]
   SortMergeOuterJoin [fk1#3896], [id2#3897], LeftOuter, None
Sort [fk1#3896 ASC], false, 0
 TungstenExchange hashpartitioning(fk1#3896,400), None
  Scan ParquetRelation[file:t1] PushedFilter: [] [id1#3895,fk1#3896]
Sort [id2#3897 ASC], false, 0
 TungstenExchange hashpartitioning(id2#3897,400), None
  Scan ParquetRelation[file:t2] PushedFilter: [] [id2#3897]
{code}
On GUI I see the problem in first TungstenAggregate:
number of input rows: 5904733
number of output rows: 5904435

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Critical
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-28 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15030652#comment-15030652
 ] 

Xiao Li commented on SPARK-12030:
-

Thank you! [~maver1ck]

That will be great if we can know if this is an existing bug in 1.5.2. We need 
to deliver the fix to all the affected versions. 

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Critical
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15030633#comment-15030633
 ] 

Maciej Bryński commented on SPARK-12030:


And spark-defaults.conf:
{code}
spark.master spark://somehost:7077
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory  20g
spark.executor.extraJavaOptions -XX:-UseGCOverheadLimit -XX:+UseG1GC  
-Dhdp.version=current -XX:-UseCompressedOops
spark.driver.extraJavaOptions -XX:-UseGCOverheadLimit -XX:+UseG1GC  
-Dhdp.version=current -XX:-UseCompressedOops
spark.executor.memory   30g
spark.storage.memoryFraction0.2
spark.shuffle.memoryFraction0.4
spark.driver.maxResultSize  4g
spark.cores.max 30
spark.executor.extraClassPath   
spark/mysql-connector-java-5.1.35-bin.jar:spark/spark-csv_2.10-1.3.0-SNAPSHOT.jar:spark/commons-csv-1.2.jar:spark/aerospike-spark-0.3-SNAPSHOT-jar-with-dependencies.jar
spark.driver.extraClassPath
spark/mysql-connector-java-5.1.35-bin.jar:spark/spark-csv_2.10-1.3.0-SNAPSHOT.jar:spark/commons-csv-1.2.jar:spark/aerospike-spark-0.3-SNAPSHOT-jar-with-dependencies.jar
spark.kryoserializer.buffer.max 1g
spark.default.parallelism   400
spark.sql.autoBroadcastJoinThreshold0
spark.yarn.am.extraJavaOptions -Dhdp.version=current
spark.executor.instances10
spark.yarn.queuedwh
spark.python.worker.reuse   false
spark.sql.shuffle.partitions400
spark.io.compression.codec  lz4
spark.eventLog.enabled  true
spark.eventLog.dir  history/
{code}

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Critical
> Attachments: t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15030643#comment-15030643
 ] 

Maciej Bryński commented on SPARK-12030:


I tried following things:
- disable kryoserializer
- disable lz4
- change join type to inner
Without success. 
Next I'll try to reproduce this on 1.5.2.

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Critical
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-28 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15030638#comment-15030638
 ] 

Xiao Li commented on SPARK-12030:
-

Trying to reproduce it using your parquet files. Thanks!

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Critical
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12030) Incorrect results when aggregate joined data

2015-11-28 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15030694#comment-15030694
 ] 

Xiao Li commented on SPARK-12030:
-

I can reproduce it now. Will take a look at it and try to fix it. Thanks!

> Incorrect results when aggregate joined data
> 
>
> Key: SPARK-12030
> URL: https://issues.apache.org/jira/browse/SPARK-12030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Maciej Bryński
>Priority: Critical
> Attachments: spark.jpg, t1.tar.gz, t2.tar.gz
>
>
> I have following issue.
> I created 2 dataframes from JDBC (MySQL) and joined them (t1 has fk1 to t2)
> {code}
> t1 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t1, id1, 0, size1, 200).cache()
> t2 = sqlCtx.read.jdbc("jdbc:mysql://XXX", t2, id2, 0, size1, 200).cache()
> joined = t1.join(t2, t1.fk1 == t2.id2, "left_outer")
> {code}
> Important: both table are cached, so results should be the same on every 
> query.
> Then I did come counts:
> {code}
> t1.count() -> 5900729
> t1.registerTempTable("t1")
> sqlCtx.sql("select distinct(id1) from t1").count() -> 5900729
> t2.count() -> 54298
> joined.count() -> 5900729
> {code}
> And here magic begins - I counted distinct id1 from joined table
> {code}
> joined.registerTempTable("joined")
> sqlCtx.sql("select distinct(id1) from joined").count()
> {code}
> Results varies *(are different on every run)* between 5899000 and 
> 590 but never are equal to 5900729.
> In addition. I did more queries:
> {code}
> sqlCtx.sql("select id1, count(*) from joined group by id1 having count(*) > 
> 1").collect() 
> {code}
> This gives some results but this query return *1*
> {code}
> len(sqlCtx.sql("select * from joined where id1 = result").collect())
> {code}
> What's wrong ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org