[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-06-18 Thread Domagoj (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365468#comment-17365468
 ] 

Domagoj commented on SPARK-35089:
-

[~revans2], tnx for detailed explanation.

I still have a problem understanding why you dropped only one record. Where did 
other record go?  Why it disappeared? Filtering is the method of removing data, 
not sorting. Sort should not drop any data being ambiguous or not.

So if we look at Task 1 (sorting 1, redundant if you ask me, there is no point 
of sorting when column data is equal) and task 2 before applying filter for 
duration, they both have all 3 rows.

And after filtering, task 1 have both record, but sorting 2 lost the one with 
big duration? How can that happen? I cannot understand relation between sorting 
and missing data.

I believe you (and tried and it worked) that adding monotonically_increasing_id 
helps, but cannot understand why? 

If one worker has calculated duration with window function, next step should 
just remove rows where filter condition is not satisfied, regarding of sorting 
data.

It look to me that sorting have some implications with data exchange between 
worker nodes, but I cannot understand how. 

 

So, because data is missing because of strange reasons, I still believe that 
this is something that should be taken care of in code, instead of users who 
should remember that this is danger situation.

It looks that we should add id for every dataset to be sure that this will not 
happen. This will sure slow process down, and is prone to errors (forgot to add 
or something like that).

Furthermore, it is interesting that there is no problems on single instance 
jobs.

 

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
>  - run 1: 2547559
>  - run 2: 2547559
>  - run 3: 2547560
>  - run 4: 2547558
>  - run 5: 2547558
>  - run 6: 2547559
>  - run 7: 2547558
> This results are from new EMR cluster, version 6.3.0, so nothing changed.
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
> x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
> // use iterator a to continue with next 

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-06-16 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364334#comment-17364334
 ] 

Robert Joseph Evans commented on SPARK-35089:
-

{quote}I understand ordering data, but I don't see how it impact results.{quote}

OK lets change my example just a bit and completely run through your query. 
Lets say we have two tasks.

Task 1:
|Start| User | Type|
|44| Anna | TypeA|
|39 | Anna | TypeA|
|10| Joe | TypeB|

Task 2:
|Start| User | Type|
|44| Anna| TypeB|
|21| Joe | TypeB|

The first thing that spark will do is partition the data by {{User}} (which is 
what the window function asked). So each we end up with

Task 1:
|Start| User | Type|
|44| Anna | TypeA|
|39 | Anna | TypeA|
|44| Anna| TypeB|

Task 2:
|Start| User | Type|
|21| Joe | TypeB|
|10| Joe | TypeB|

Then each task will sort the data ascending by {{User, Start}} (We are going to 
ignore task 2 for now because there is no ambiguity in the sorting there, but I 
will sow both options of sorting for Task 1.

Task 1 (sorting 1):
|Start| User | Type|
|39 | Anna | TypeA|
|44| Anna| TypeB|
|44| Anna | TypeA|

Task 1 (sorting 2):
|Start| User | Type|
|39 | Anna | TypeA|
|44| Anna| TypeA|
|44| Anna | TypeB|

Then the window function will run to create the end column, and the duration 
(actually 2 steps, but I'll put it into one here.

Task 1 (sorting 1):
|Start| User | Type|End| Duration|
|39 | Anna | TypeA|44| 5 |
|44| Anna| TypeB| 44| 0 |
|44| Anna | TypeA|3000| 2956 |

Task 1 (sorting 2):
|Start| User | Type|End|
|39 | Anna | TypeA|44| 5 |
|44| Anna| TypeA|44| 0 |
|44| Anna | TypeB|3000| 2956 |

Now lets filter {{type = 'TypeA' and duration > 4}}

Task 1 (sorting 1):
|Start| User | Type|End| Duration|
|39 | Anna | TypeA|44| 5 |
|44| Anna | TypeA|3000| 2956 |

Task 1 (sorting 2):
|Start| User | Type|End|
|39 | Anna | TypeA|44| 5 |

In sorting 2 we dropped 2 entries, but in sorting 1 we dropped only 1. This is 
because the order of the results in can matter if there is ambiguity in the 
ordering and the types are different within that ambiguity too. 

All {{monotonically_increasing_id}} did was make sure that only one order would 
be produced.  So is sorting 1 correct or is sorting 2 correct? From a SQL 
perspective either of them is a correct answer and spark happened to pick one 
of them. 

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
>  - run 1: 2547559
>  - run 2: 2547559
>  - run 3: 2547560
>  - run 4: 2547558
>  - run 5: 2547558
>  - run 6: 2547559
>  - run 7: 2547558
> This results are from new EMR cluster, version 6.3.0, so nothing changed.
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this 

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-06-16 Thread Domagoj (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364306#comment-17364306
 ] 

Domagoj commented on SPARK-35089:
-

[~revans2], by error I mean result that is not correct. What is suspicious to 
me is that I can get more records without monotonically_increasing_id() than 
with monotonically_increasing_id().  Where from those came from?

I understand ordering data, but I don't see how should it impact results. Data 
is partitioned with partition method of Window function, and order just says in 
which order it is accessed by. 

So, since my original data uses timestamp with milliseconds as start column, 
and creates delta from next data row, if two starts are identical and therefore 
you need additional column for sorting those identical starts, delta 
(difference between start and stop) will still be same.

What is more interesting is that this is happening on multiple nodes cluster, 
so it has to do something with sharding data across nodes and aggregating 
results of calculation of all nodes back to master.

To put it simple, if you and I are two nodes, and first time I get 52/100 
records and you 48/100 records, we both do our calculations and return results 
to master node, it should not be different from results that we will get if you 
processes 60/100 and I 40/100.

I know that this is oversimplified, and that there is plenty of complicated 
code and logic in sharding data between nodes, it just look incorrect to me and 
unusable in calculations that needs precision. 

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
>  - run 1: 2547559
>  - run 2: 2547559
>  - run 3: 2547560
>  - run 4: 2547558
>  - run 5: 2547558
>  - run 6: 2547559
>  - run 7: 2547558
> This results are from new EMR cluster, version 6.3.0, so nothing changed.
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
> x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
> // use iterator a to continue with next million, repeat 1 mil times
> val x=Range(a*100,(a*100)+100).toDF("id").
> 

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-06-16 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364277#comment-17364277
 ] 

Robert Joseph Evans commented on SPARK-35089:
-

[~Tonzetic], I don't know what you mean by an error.  You have asked Spark to 
calculate something that has multiple "correct" answers.  Each time Spark runs 
one of those "correct" answers is selected, somewhat at random.  The 
{{monotonically_increasing_id()}} change reduces the number of "correct" 
answers to 1.

For example 

| Start | User | Type |
|40|Anna| TypeA |
|41|Anna| TypeB |
|40|Anna| TypeB |

You are asking Spark to sort the data by Start, and then do a window operation 
that depends on the order of the data.  But there are two correct answers to 
sorting the data.


| Start | User | Type |
|40|Anna| *TypeA* |
|40|Anna| *TypeB* |
|41|Anna| TypeB |


| Start | User | Type |
|40|Anna| *TypeB* |
|40|Anna| *TypeA* |
|41|Anna| TypeB |

So which of these is the "correct" way to sort the data?  Because each of these 
will produce a different answer from the window operation, and because The 
order of {{TypeA}} vs {{TypeB}} is different between the two the relative 
distance between {{start}} and {{end}} will be different (In this case 0 vs 1). 
So if you can tell me what the correct ordering should be, then I can tell you 
if adding the new id has made it correct, or if it is just consistent.

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
>  - run 1: 2547559
>  - run 2: 2547559
>  - run 3: 2547560
>  - run 4: 2547558
>  - run 5: 2547558
>  - run 6: 2547559
>  - run 7: 2547558
> This results are from new EMR cluster, version 6.3.0, so nothing changed.
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
> x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
> // use iterator a to continue with next million, repeat 1 mil times
> val x=Range(a*100,(a*100)+100).toDF("id").
> withColumn("start",getRandomStart(col("id"))).
> withColumn("user",getRandomUser()).
> withColumn("type",getRandomType()).
> drop("id")
> 

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-06-16 Thread Domagoj (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364114#comment-17364114
 ] 

Domagoj commented on SPARK-35089:
-

[~revans2], I've tried your method with monotonically_increasing_id() and you 
are right, I get the same result every time.

I've tried it on my production data.

But I've noticed that this result is always 9397299 in my example, but without 
monotonically_increasing_id()  sometimes I get bigger number (like 9397301 and 
9397304).

So I wander is this a correct result, or just a consistent error? How else 
could I get bigger number of records? I believe that simple sum of results from 
nodes is probably implemented correct...

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
>  - run 1: 2547559
>  - run 2: 2547559
>  - run 3: 2547560
>  - run 4: 2547558
>  - run 5: 2547558
>  - run 6: 2547559
>  - run 7: 2547558
> This results are from new EMR cluster, version 6.3.0, so nothing changed.
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
> x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
> // use iterator a to continue with next million, repeat 1 mil times
> val x=Range(a*100,(a*100)+100).toDF("id").
> withColumn("start",getRandomStart(col("id"))).
> withColumn("user",getRandomUser()).
> withColumn("type",getRandomType()).
> drop("id")
> x.write.mode("append").orc("hdfs:///random.orc")
> }
> // above code should be run only once, I used a cell in Jupyter
> // define window and lead
> val w = Window.partitionBy("user").orderBy("start")
> // if null, replace with 30.000.000
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> // read data to dataframe, create stop column and calculate duration
> val fox2 = spark.read.orc("hdfs:///random.orc").
> withColumn("end", ts_lead).
> withColumn("duration", col("end")-col("start"))
> // repeated executions of this line returns different results for count 
> // I have it in separate cell in JupyterLab
> fox2.where("type='TypeA' and duration>4").count()
> {code}
> My results 

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-06-15 Thread Domagoj (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17363657#comment-17363657
 ] 

Domagoj commented on SPARK-35089:
-

I can create similar solution with creating snapshot after calculation of stop 
column, but this is pretty time expensive, specially on large datasets.

I believe that something should be done with this, and to be fixed without need 
for a workaround.

I actually have a unique Id (in real data start column is timestamp with 
milliseconds precision), but I will try this workaround with 
monotonically_increasing_id. I'm not sure that I understand why it would help, 
since  there is no primary key in ORC (I may be wrong, but think not).

In the mean time, I have disabled public access to S3 bucket as there is some 
cost generated and I can no longer have it accessible for public access.

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
>  - run 1: 2547559
>  - run 2: 2547559
>  - run 3: 2547560
>  - run 4: 2547558
>  - run 5: 2547558
>  - run 6: 2547559
>  - run 7: 2547558
> This results are from new EMR cluster, version 6.3.0, so nothing changed.
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
> x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
> // use iterator a to continue with next million, repeat 1 mil times
> val x=Range(a*100,(a*100)+100).toDF("id").
> withColumn("start",getRandomStart(col("id"))).
> withColumn("user",getRandomUser()).
> withColumn("type",getRandomType()).
> drop("id")
> x.write.mode("append").orc("hdfs:///random.orc")
> }
> // above code should be run only once, I used a cell in Jupyter
> // define window and lead
> val w = Window.partitionBy("user").orderBy("start")
> // if null, replace with 30.000.000
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> // read data to dataframe, create stop column and calculate duration
> val fox2 = spark.read.orc("hdfs:///random.orc").
> withColumn("end", ts_lead).
> withColumn("duration", col("end")-col("start"))
> // repeated executions of this 

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-06-01 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17355088#comment-17355088
 ] 

Robert Joseph Evans commented on SPARK-35089:
-

I should add that the above "solution" is fragile because it relies on Spark to 
keep the call to {{monotonically_increasing_id}} in the same task the reads in 
the ORC data. It really would be best if Spark could automatically insert 
something like this automatically and then drop it later before 
writing/returning results.

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
>  - run 1: 2547559
>  - run 2: 2547559
>  - run 3: 2547560
>  - run 4: 2547558
>  - run 5: 2547558
>  - run 6: 2547559
>  - run 7: 2547558
> This results are from new EMR cluster, version 6.3.0, so nothing changed.
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
> x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
> // use iterator a to continue with next million, repeat 1 mil times
> val x=Range(a*100,(a*100)+100).toDF("id").
> withColumn("start",getRandomStart(col("id"))).
> withColumn("user",getRandomUser()).
> withColumn("type",getRandomType()).
> drop("id")
> x.write.mode("append").orc("hdfs:///random.orc")
> }
> // above code should be run only once, I used a cell in Jupyter
> // define window and lead
> val w = Window.partitionBy("user").orderBy("start")
> // if null, replace with 30.000.000
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> // read data to dataframe, create stop column and calculate duration
> val fox2 = spark.read.orc("hdfs:///random.orc").
> withColumn("end", ts_lead).
> withColumn("duration", col("end")-col("start"))
> // repeated executions of this line returns different results for count 
> // I have it in separate cell in JupyterLab
> fox2.where("type='TypeA' and duration>4").count()
> {code}
> My results for three consecutive runs of last line were:
>  * run 1: 2551259
>  * run 2: 2550756
>  * run 3: 2551279
> It's very important to say that if I use filter:
> 

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-06-01 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17355083#comment-17355083
 ] 

Robert Joseph Evans commented on SPARK-35089:
-

[~Tonzetic] to be clear my point was just to provide more information about the 
problem. I agree with you that this feels very much like a bug, and I would 
like to see it fixed. My hope was that with the added information someone in 
the Spark community could look at ways to fix it and at a minimum you could 
look at ways to work around it for your particular use case.  One such option 
is to remove the ambiguity by adding in a total ordering with 
{{monotonically_increasing_id}} early on in your processing (when you read the 
data in).  You should not rely on the exact value in this column (as it can 
change based off of the shape of the cluster you are running on), but you can 
use it as a part of your ordering to get unambiguous results.

For example.
 
{code:scala}
// define window and lead
val w = Window.partitionBy("user").orderBy("start", "unambiguous_id")
// if null, replace with 30.000.000
val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))

// read data to dataframe, create stop column and calculate duration
val fox2 = spark.read.orc("hdfs:///random.orc").
withColumn("unambiguous_id",  monotonically_increasing_id()).
withColumn("end", ts_lead).
withColumn("duration", col("end")-col("start"))


// repeated executions of this line returns different results for count 
// I have it in separate cell in JupyterLab
fox2.where("type='TypeA' and duration>4").count()
{code}

The above code should produce the exact same result, every time, no matter 
where it is run, or how it is run.  If you have a separate unique ID per row, 
which often exists as a primary key, you could use that instead of the 
{{monotonically_increasing_id}} to remove the ambiguity.

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
>  - run 1: 2547559
>  - run 2: 2547559
>  - run 3: 2547560
>  - run 4: 2547558
>  - run 5: 2547558
>  - run 6: 2547559
>  - run 7: 2547558
> This results are from new EMR cluster, version 6.3.0, so nothing changed.
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = 

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-06-01 Thread Domagoj (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17354889#comment-17354889
 ] 

Domagoj commented on SPARK-35089:
-

It's really not important to look at random functions.

They are here just because I do not have permission to share data with you. 
Therefore I created test set of data using this random functions, merely to 
have some data to work with.

You can see that I've made sample data publicly available on S3 bucket.

So to cut the long story short, you only need to download data and try to run 
code with window functions.

Regarding is this is a bug or a design flaw or something else, I must say that 
I (and probably everyone else) am expecting consistent results from same data.

So yes, this is a problem and explanation why is probably happening will not 
solve this issue.

This means that using spark in, for example, financial calculations, is 
practically a big no no, because you cannot get consistent results of 
calculations.

 

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
>  - run 1: 2547559
>  - run 2: 2547559
>  - run 3: 2547560
>  - run 4: 2547558
>  - run 5: 2547558
>  - run 6: 2547559
>  - run 7: 2547558
> This results are from new EMR cluster, version 6.3.0, so nothing changed.
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
> x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
> // use iterator a to continue with next million, repeat 1 mil times
> val x=Range(a*100,(a*100)+100).toDF("id").
> withColumn("start",getRandomStart(col("id"))).
> withColumn("user",getRandomUser()).
> withColumn("type",getRandomType()).
> drop("id")
> x.write.mode("append").orc("hdfs:///random.orc")
> }
> // above code should be run only once, I used a cell in Jupyter
> // define window and lead
> val w = Window.partitionBy("user").orderBy("start")
> // if null, replace with 30.000.000
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> // read data to dataframe, create stop column and calculate duration
> val fox2 = 

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-05-29 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17353778#comment-17353778
 ] 

Robert Joseph Evans commented on SPARK-35089:
-

On window functions if the {{order by}} clause is ambiguous you can get 
different results from one run to the next.  This is because the order in which 
shuffle data is read in is not deterministic, even though the sorting is.   In 
this example you are generating start with

{code}
val getRandomStart = udf((x:Int)=>{
x+scala.util.Random.nextInt(47)
})
{code}

The input to this {x} appears to be non-abiguous (0-some very large number), 
but because of the + random(0 to 47) there is the possibility of multiple start 
values being the same.

So for operations where order matters you can get ambiguous results. For lead 
and lag a different lead/lag value can show up, because the one right after 
this one {{lead(1)}} is different.  For operations like rank, dense_rank, and 
row_number the order of the values output is the same, but the rows are in a 
different order so the value at each row/rank is different. This can also 
impact operations like SUM, MIN, and MAX that use a row bounds on a windows 
instead of value ranges.  I'm not sure if this should be considered a bug or 
not.  Spark treats all window operations as deterministic, so in theory if 
there is a crash you can get inconsistent results within the same query, but 
that only happens if the end user put in a non-deterministic ordering.

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
>  - run 1: 2547559
>  - run 2: 2547559
>  - run 3: 2547560
>  - run 4: 2547558
>  - run 5: 2547558
>  - run 6: 2547559
>  - run 7: 2547558
> This results are from new EMR cluster, version 6.3.0, so nothing changed.
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
> x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
> // use iterator a to continue with next million, repeat 1 mil times
> val x=Range(a*100,(a*100)+100).toDF("id").
> withColumn("start",getRandomStart(col("id"))).
> 

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-05-18 Thread Domagoj (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346704#comment-17346704
 ] 

Domagoj commented on SPARK-35089:
-

I've made test data available via public s3 bucket, so it's easier to reproduce 
now.

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
> - run 1: 2547559
> - run 2: 2547559
> - run 3: 2547560
> - run 4: 2547558
> - run 5: 2547558
> - run 6: 2547559
> - run 7: 2547558
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
> x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
> // use iterator a to continue with next million, repeat 1 mil times
> val x=Range(a*100,(a*100)+100).toDF("id").
> withColumn("start",getRandomStart(col("id"))).
> withColumn("user",getRandomUser()).
> withColumn("type",getRandomType()).
> drop("id")
> x.write.mode("append").orc("hdfs:///random.orc")
> }
> // above code should be run only once, I used a cell in Jupyter
> // define window and lead
> val w = Window.partitionBy("user").orderBy("start")
> // if null, replace with 30.000.000
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> // read data to dataframe, create stop column and calculate duration
> val fox2 = spark.read.orc("hdfs:///random.orc").
> withColumn("end", ts_lead).
> withColumn("duration", col("end")-col("start"))
> // repeated executions of this line returns different results for count 
> // I have it in separate cell in JupyterLab
> fox2.where("type='TypeA' and duration>4").count()
> {code}
> My results for three consecutive runs of last line were:
>  * run 1: 2551259
>  * run 2: 2550756
>  * run 3: 2551279
> It's very important to say that if I use filter:
> fox2.where("type='TypeA' ")
> or 
> fox2.where("duration>4"),
>  
> each of them can be executed repeatedly and I get consistent result every 
> time.
> I can save dataframe after crating stop and duration columns, and after that, 
> I get consistent results every time.
> It is not very practical workaround, as I need a lot of space and time to 
> implement it.
> This 

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-04-17 Thread Domagoj (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324422#comment-17324422
 ] 

Domagoj commented on SPARK-35089:
-

I can, but it is not random that is problem.

Once you generate set with random data, you write it an read it to separate 
dataframe, and than run window function and filter on that dataframe. 

So, in repeated runs you use same data every time.

I generated this random dataset just to have some data to make a proof.

 

I actually discovered this running on real data, that is read from JSONNL files.

There are no UDFs and nothing is random, just data read from file. 

So you need to make two separate steps here:

1 . generate random data and write it to file (here it is ORC)  – THIS STEP 
SHOULD BE RUN ONLY ONCE 

2. read that data to new dataframe and run window lead and complex filter (with 
two conditions).  - THIS STEP SHOULD BE REPEATED

 

Again, it has to be in cluster setup, not single node.

On my docker setup (I can provide you with dockerfile) that I need to 
repartition dataframe on read to get different results.

example :

read.orc("bla_bla").repartition(200)

and 

read.orc("bla_bla").repartition(100)

 On AWS EMR Spark setup I does not matters, returns different numbers on every 
count.

 

 

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
> x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
> // use iterator a to continue with next million, repeat 1 mil times
> val x=Range(a*100,(a*100)+100).toDF("id").
> withColumn("start",getRandomStart(col("id"))).
> withColumn("user",getRandomUser()).
> withColumn("type",getRandomType()).
> drop("id")
> x.write.mode("append").orc("hdfs:///random.orc")
> }
> // above code should be run only once, I used a cell in Jupyter
> // define window and lead
> val w = Window.partitionBy("user").orderBy("start")
> // if null, replace with 30.000.000
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> // read data to dataframe, create stop column and calculate duration
> val fox2 = spark.read.orc("hdfs:///random.orc").
> withColumn("end", ts_lead).
> withColumn("duration", col("end")-col("start"))
> // repeated executions of this line returns different results for count 
> // I have it in separate cell in JupyterLab
> fox2.where("type='TypeA' and duration>4").count()
> {code}
> My results for three consecutive runs of last line were:
>  * run 1: 2551259
>  * run 2: 2550756
>  * run 3: 2551279
> It's very important to say that if I use filter:
> fox2.where("type='TypeA' ")
> or 
> fox2.where("duration>4"),
>  
> each of them can be executed repeatedly and I get consistent result every 
> time.
> I can save dataframe after crating stop 

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-04-17 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324392#comment-17324392
 ] 

Hyukjin Kwon commented on SPARK-35089:
--

Can you try enable {{asNondeterministic()}}? e.g.) 
{{getRandomType.asNondeterministic()}}

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
> x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
> // use iterator a to continue with next million, repeat 1 mil times
> val x=Range(a*100,(a*100)+100).toDF("id").
> withColumn("start",getRandomStart(col("id"))).
> withColumn("user",getRandomUser()).
> withColumn("type",getRandomType()).
> drop("id")
> x.write.mode("append").orc("hdfs:///random.orc")
> }
> // above code should be run only once, I used a cell in Jupyter
> // define window and lead
> val w = Window.partitionBy("user").orderBy("start")
> // if null, replace with 30.000.000
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> // read data to dataframe, create stop column and calculate duration
> val fox2 = spark.read.orc("hdfs:///random.orc").
> withColumn("end", ts_lead).
> withColumn("duration", col("end")-col("start"))
> // repeated executions of this line returns different results for count 
> // I have it in separate cell in JupyterLab
> fox2.where("type='TypeA' and duration>4").count()
> {code}
> My results for three consecutive runs of last line were:
>  * run 1: 2551259
>  * run 2: 2550756
>  * run 3: 2551279
> It's very important to say that if I use filter:
> fox2.where("type='TypeA' ")
> or 
> fox2.where("duration>4"),
>  
> each of them can be executed repeatedly and I get consistent result every 
> time.
> I can save dataframe after crating stop and duration columns, and after that, 
> I get consistent results every time.
> It is not very practical workaround, as I need a lot of space and time to 
> implement it.
> This dataset is really big (in my eyes at least, aprox 100.000.000 new 
> records per day).
> If I run this same example on my local machine using master = local[*], 
> everything works as expected, it's just on cluster setup. I tried to create 
> cluster using docker on my local machine, created 3.0.1 and 3.1.1 clusters 
> with one master and two workers, and have successfully reproduced issue.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org