[ 
https://issues.apache.org/jira/browse/SPARK-11008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14949308#comment-14949308
 ] 

Prasad Chalasani commented on SPARK-11008:
------------------------------------------

I don't think you can reproduce in local mode. I get this error when I run 
spark-shell on an EMR cluster via AWS. I've seen it with both Spark 1.4.0 and 
1.5.0. No, there is no other field named "rnk". Everything needed to reproduce 
the error is in the code I showed.

The output of df.explain(true) is below:

15/10/08 20:12:38 INFO MemoryStore: ensureFreeSpace(93288) called with 
curMem=90469, maxMem=560497950
15/10/08 20:12:38 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 91.1 KB, free 534.4 MB)
15/10/08 20:12:38 INFO MemoryStore: ensureFreeSpace(21585) called with 
curMem=183757, maxMem=560497950
15/10/08 20:12:38 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 21.1 KB, free 534.3 MB)
15/10/08 20:12:38 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
172.31.55.113:36035 (size: 21.1 KB, free: 534.5 MB)
15/10/08 20:12:38 INFO SparkContext: Created broadcast 1 from explain at 
<console>:23
== Parsed Logical Plan ==
Relation[id#0,time#1] 
ParquetRelation[s3n://mm-datascience-dev-pc/bugs/id-time-tiny.pqt]

== Analyzed Logical Plan ==
id: int, time: int
Relation[id#0,time#1] 
ParquetRelation[s3n://mm-datascience-dev-pc/bugs/id-time-tiny.pqt]

== Optimized Logical Plan ==
Relation[id#0,time#1] 
ParquetRelation[s3n://mm-datascience-dev-pc/bugs/id-time-tiny.pqt]

== Physical Plan ==
Scan 
ParquetRelation[s3n://mm-datascience-dev-pc/bugs/id-time-tiny.pqt][id#0,time#1]

Code Generation: true


> Spark window function returns inconsistent/wrong results
> --------------------------------------------------------
>
>                 Key: SPARK-11008
>                 URL: https://issues.apache.org/jira/browse/SPARK-11008
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 1.4.0, 1.5.0
>         Environment: Amazon Linux AMI (Amazon Linux version 2015.09)
>            Reporter: Prasad Chalasani
>            Priority: Blocker
>
> Summary: applying a windowing function on a data-frame, followed by count() 
> gives widely varying results in repeated runs: none exceed the correct value, 
> but of course all but one are wrong. On large data-sets I sometimes get as 
> small as HALF of the correct value.
> A minimal reproducible example is here: 
> (1) start spark-shell
> (2) run these:
>     val data = 1.to(100).map(x => (x,1))    
>     import sqlContext.implicits._
>     val tbl = sc.parallelize(data).toDF("id", "time")
>     tbl.write.parquet("s3n://path/to/mybucket/id-time-tiny.pqt")
> (3) exit the shell (this is important)
> (4) start spark-shell again
> (5) run these:
> import org.apache.spark.sql.expressions.Window
> val df = sqlContext.read.parquet("s3n://path/to/mybucket/id-time-tiny.pqt")
> val win = Window.partitionBy("id").orderBy("time")
> df.select($"id", 
> (rank().over(win)).alias("rnk")).filter("rnk=1").select("id").count()
> I get 98, but the correct result is 100. 
> If I re-run the code in step 5 in the same shell, then the result gets 
> "fixed" and I always get 100.
> Note this is only a minimal reproducible example to reproduce the error. In 
> my real application the size of the data is much larger and the window 
> function is not trivial as above (i.e. there are multiple "time" values per 
> "id", etc), and I see results sometimes as small as HALF of the correct value 
> (e.g. 120,000 while the correct value is 250,000). So this is a serious 
> problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to