[ 
https://issues.apache.org/jira/browse/SPARK-11008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14955035#comment-14955035
 ] 

Johnathan Garrett commented on SPARK-11008:
-------------------------------------------

We are seeing this issue as well since upgrading to Spark 1.5.1 with ORC and 
parquet files on HDFS.  When using window functions, the first set of results 
are usually incorrect after starting up a new application.  To verify the 
issue, I ran spark-shell and entered the above sequence of commands, but 
replaced the s3 path with a path on HDFS.  After setting up the parquet file, 
every time I restart spark-shell I get the incorrect results on the first pass, 
then correct after that.

I am running spark-shell with:
spark-shell --master yarn-client --num-executors 4

> Spark window function returns inconsistent/wrong results
> --------------------------------------------------------
>
>                 Key: SPARK-11008
>                 URL: https://issues.apache.org/jira/browse/SPARK-11008
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 1.4.0, 1.5.0
>         Environment: Amazon Linux AMI (Amazon Linux version 2015.09)
>            Reporter: Prasad Chalasani
>            Priority: Minor
>
> Summary: applying a windowing function on a data-frame, followed by count() 
> gives widely varying results in repeated runs: none exceed the correct value, 
> but of course all but one are wrong. On large data-sets I sometimes get as 
> small as HALF of the correct value.
> A minimal reproducible example is here: 
> (1) start spark-shell
> (2) run these:
>     val data = 1.to(100).map(x => (x,1))    
>     import sqlContext.implicits._
>     val tbl = sc.parallelize(data).toDF("id", "time")
>     tbl.write.parquet("s3n://path/to/mybucket/id-time-tiny.pqt")
> (3) exit the shell (this is important)
> (4) start spark-shell again
> (5) run these:
> import org.apache.spark.sql.expressions.Window
> val df = sqlContext.read.parquet("s3n://path/to/mybucket/id-time-tiny.pqt")
> val win = Window.partitionBy("id").orderBy("time")
> df.select($"id", 
> (rank().over(win)).alias("rnk")).filter("rnk=1").select("id").count()
> I get 98, but the correct result is 100. 
> If I re-run the code in step 5 in the same shell, then the result gets 
> "fixed" and I always get 100.
> Note this is only a minimal reproducible example to reproduce the error. In 
> my real application the size of the data is much larger and the window 
> function is not trivial as above (i.e. there are multiple "time" values per 
> "id", etc), and I see results sometimes as small as HALF of the correct value 
> (e.g. 120,000 while the correct value is 250,000). So this is a serious 
> problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to