[ 
https://issues.apache.org/jira/browse/SPARK-39347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682907#comment-17682907
 ] 

Apache Spark commented on SPARK-39347:
--------------------------------------

User 'WweiL' has created a pull request for this issue:
https://github.com/apache/spark/pull/39843

> Generate wrong time window when (timestamp-startTime) % slideDuration < 0
> -------------------------------------------------------------------------
>
>                 Key: SPARK-39347
>                 URL: https://issues.apache.org/jira/browse/SPARK-39347
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.3.0
>            Reporter: nyingping
>            Priority: Major
>
> Since the generation strategy of the sliding window in PR 
> [#35362]([https://github.com/apache/spark/pull/35362]) is changed to the 
> current one, and that leads to a new problem.
> A window generation error occurs when the time required to process the 
> recorded data is negative and the modulo value between the time and window 
> length is less than 0. In the current test cases, this bug does not thorw up.
> [ test("negative 
> timestamps")]([https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala#L299])
>  
> {code:java}
> val df1 = Seq(
>   ("1970-01-01 00:00:02", 1),
>   ("1970-01-01 00:00:12", 2)).toDF("time", "value")
> val df2 = Seq(
>   (LocalDateTime.parse("1970-01-01T00:00:02"), 1),
>   (LocalDateTime.parse("1970-01-01T00:00:12"), 2)).toDF("time", "value")
> Seq(df1, df2).foreach { df =>
>   checkAnswer(
>     df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), 
> $"value")
>       .orderBy($"window.start".asc)
>       .select($"window.start".cast(StringType), 
> $"window.end".cast(StringType), $"value"),
>     Seq(
>       Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1),
>       Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2))
>   )
> } {code}
>  
>  
> The timestamp of the above test data is not negative, and the value modulo 
> the window length is not negative, so it can be passes the test case.
> An exception occurs when the timestamp becomes something like this.
>  
> {code:java}
> val df3 = Seq(
>   ("1969-12-31 00:00:02", 1),
>   ("1969-12-31 00:00:12", 2)).toDF("time", "value")
> val df4 = Seq(
>   (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
>   (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")
> Seq(df3, df4).foreach { df =>
>   checkAnswer(
>     df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), 
> $"value")
>       .orderBy($"window.start".asc)
>       .select($"window.start".cast(StringType), 
> $"window.end".cast(StringType), $"value"),
>     Seq(
>       Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
>       Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
>   )
> } {code}
>  
> run and get unexpected result:
>  
> {code:java}
> == Results ==
> !== Correct Answer - 2 ==                      == Spark Answer - 2 ==
> !struct<>                                      struct<CAST(window.start AS 
> STRING):string,CAST(window.end AS STRING):string,value:int>
> ![1969-12-30 23:59:55,1969-12-31 00:00:05,1]   [1969-12-31 
> 00:00:05,1969-12-31 00:00:15,1]
> ![1969-12-31 00:00:05,1969-12-31 00:00:15,2]   [1969-12-31 
> 00:00:15,1969-12-31 00:00:25,2] {code}
>  
> *benchmark result*
>  
> oldlogic[#18364]([https://github.com/apache/spark/pull/18364])  VS 【fix 
> version】
> {code:java}
> Running benchmark: tumbling windows
> Running case: old logic
> Stopped after 407 iterations, 10012 ms
> Running case: new logic
> Stopped after 615 iterations, 10007 ms
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
> Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
> tumbling windows:                         Best Time(ms)   Avg Time(ms)   
> Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
> ------------------------------------------------------------------------------------------------------------------------
> old logic                                            17             25        
>    9        580.1           1.7       1.0X
> new logic                                            15             16        
>    2        680.8           1.5       1.2X
> Running benchmark: sliding windows
> Running case: old logic
> Stopped after 10 iterations, 10296 ms
> Running case: new logic
> Stopped after 15 iterations, 10391 ms
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
> Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
> sliding windows:                          Best Time(ms)   Avg Time(ms)   
> Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
> ------------------------------------------------------------------------------------------------------------------------
> old logic                                          1000           1030        
>   19         10.0         100.0       1.0X
> new logic                                           668            693        
>   21         15.0          66.8       1.5X
> {code}
>  
>  
> Fixed version than PR [#38069]([https://github.com/apache/spark/pull/35362]) 
> lost a bit of the performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to