[ 
https://issues.apache.org/jira/browse/SPARK-45637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-45637:
---------------------------------
      Labels: correctness  (was: )
    Priority: Blocker  (was: Major)

> Time window aggregation in separate streams followed by stream-stream join 
> not returning results
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-45637
>                 URL: https://issues.apache.org/jira/browse/SPARK-45637
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.5.0
>         Environment: I'm using Spark 3.5.0 on Databricks Runtime 14.1
>            Reporter: Andrzej Zera
>            Priority: Blocker
>              Labels: correctness
>
> According to documentation update (SPARK-42591) resulting from SPARK-42376, 
> Spark 3.5.0 should support time-window aggregations in two separate streams 
> followed by stream-stream window join:
> [https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995]
> However, I failed to reproduce this example and the query I built doesn't 
> return any results:
> {code:java}
> from pyspark.sql.functions import rand
> from pyspark.sql.functions import expr, window, window_time
> spark.conf.set("spark.sql.shuffle.partitions", "1")
> impressions = (
>     spark    
>     .readStream.format("rate").option("rowsPerSecond", 
> "5").option("numPartitions", "1").load()    
>     .selectExpr("value AS adId", "timestamp AS impressionTime")
> )
> impressionsWithWatermark = impressions \
>     .selectExpr("adId AS impressionAdId", "impressionTime") \
>     .withWatermark("impressionTime", "10 seconds")
> clicks = (  
>     spark  
>     .readStream.format("rate").option("rowsPerSecond", 
> "5").option("numPartitions", "1").load()  
>     .where((rand() * 100).cast("integer") < 10)  # 10 out of every 100 
> impressions result in a click  
>     .selectExpr("(value - 10) AS adId ", "timestamp AS clickTime")  # -10 so 
> that a click with same id as impression is generated later (i.e. delayed 
> data).
>     .where("adId > 0")  
> ) 
> clicksWithWatermark = clicks \
>     .selectExpr("adId AS clickAdId", "clickTime") \
>     .withWatermark("clickTime", "10 seconds")
> clicksWindow = clicksWithWatermark.groupBy(      
>     window(clicksWithWatermark.clickTime, "1 minute")
> ).count()
> impressionsWindow = impressionsWithWatermark.groupBy(
>     window(impressionsWithWatermark.impressionTime, "1 minute")
> ).count()
> clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner")
> clicksAndImpressions.writeStream \
>     .format("memory") \
>     .queryName("clicksAndImpressions") \
>     .outputMode("append") \
>     .start() {code}
>  
> My intuition is that I'm getting no results because to output results of the 
> first stateful operator (time window aggregation), a watermark needs to pass 
> the end timestamp of the window. And once the watermark is after the end 
> timestamp of the window, this window is ignored at the second stateful 
> operator (stream-stream) join because it's behind the watermark. Indeed, a 
> small hack done to event time column (adding one minute) between two stateful 
> operators makes it possible to get results:
> {code:java}
> clicksWindow2 = clicksWithWatermark.groupBy( 
>     window(clicksWithWatermark.clickTime, "1 minute")
> ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 
> MINUTE')).drop("window")
> impressionsWindow2 = impressionsWithWatermark.groupBy(
>     window(impressionsWithWatermark.impressionTime, "1 minute")
> ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 
> MINUTE')).drop("window")
> clicksAndImpressions2 = clicksWindow2.join(impressionsWindow2, "window_time", 
> "inner")
> clicksAndImpressions2.writeStream \
>     .format("memory") \
>     .queryName("clicksAndImpressions2") \
>     .outputMode("append") \
>     .start()  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to