[ 
https://issues.apache.org/jira/browse/SPARK-45637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrzej Zera updated SPARK-45637:
---------------------------------
    Description: 
According to documentation update (SPARK-42591) resulting from SPARK-42376, 
Spark 3.5.0 should support time-window aggregations in two separate streams 
followed by stream-stream window join:

https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995

However, I failed to reproduce this example and the query I built doesn't 
return any results:
{code:java}
from pyspark.sql.functions import rand
from pyspark.sql.functions import expr, window, window_time

spark.conf.set("spark.sql.shuffle.partitions", "1")

impressions = (
    spark    
    .readStream.format("rate").option("rowsPerSecond", 
"5").option("numPartitions", "1").load()    
    .selectExpr("value AS adId", "timestamp AS impressionTime")
)

impressionsWithWatermark = impressions \
    .selectExpr("adId AS impressionAdId", "impressionTime") \
    .withWatermark("impressionTime", "10 seconds")

clicks = (  
    spark  
    .readStream.format("rate").option("rowsPerSecond", 
"5").option("numPartitions", "1").load()  
    .where((rand() * 100).cast("integer") < 10)  # 10 out of every 100 
impressions result in a click  
    .selectExpr("(value - 10) AS adId ", "timestamp AS clickTime")  # -10 so 
that a click with same id as impression is generated later (i.e. delayed data). 
 .where("adId > 0")
) 

clicksWithWatermark = clicks \
    .selectExpr("adId AS clickAdId", "clickTime") \
    .withWatermark("clickTime", "10 seconds")

clicksWindow = clicksWithWatermark.groupBy(      
    window(clicksWithWatermark.clickTime, "1 minute")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
    window(impressionsWithWatermark.impressionTime, "1 minute")
).count()

clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner")

clicksAndImpressions.writeStream \
    .format("memory") \
    .queryName("clicksAndImpressions") \
    .outputMode("append") \
    .start() {code}
 

My intuition is that I'm getting no results because to output results of the 
first stateful operator (time window aggregation), a watermark needs to pass 
the end timestamp of the window. And once the watermark is after the end 
timestamp of the window, this window is ignored at the second stateful operator 
(stream-stream) join because it's behind the watermark. Indeed, a small hack 
done to event time column (adding one minute) between two stateful operators 
makes it possible to get results:
{code:java}
clicksWindow2 = clicksWithWatermark.groupBy( 
    window(clicksWithWatermark.clickTime, "1 minute")
).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 
MINUTE')).drop("window")

impressionsWindow2 = impressionsWithWatermark.groupBy(
    window(impressionsWithWatermark.impressionTime, "1 minute")
).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 
MINUTE')).drop("window")

clicksAndImpressions2 = clicksWindow2.join(impressionsWindow2, "window_time", 
"inner")

clicksAndImpressions2.writeStream \
    .format("memory") \
    .queryName("clicksAndImpressions2") \
    .outputMode("append") \
    .start()  {code}
 

  was:
According to documentation update (SPARK-42591) resulting from SPARK-42376, 
Spark 3.5.0 should support time-window aggregations in two separate streams 
followed by stream-stream window join:

[https://github.com/HeartSaVioR/spark/blob/eb0b09f0f2b518915421365a61d1f3d7d58b4404/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995]

However, I failed to reproduce this example and the query I built doesn't 
return any results:
{code:java}
from pyspark.sql.functions import rand
from pyspark.sql.functions import expr, window, window_time

spark.conf.set("spark.sql.shuffle.partitions", "1")

impressions = (
    spark    
    .readStream.format("rate").option("rowsPerSecond", 
"5").option("numPartitions", "1").load()    
    .selectExpr("value AS adId", "timestamp AS impressionTime")
)

impressionsWithWatermark = impressions \
    .selectExpr("adId AS impressionAdId", "impressionTime") \
    .withWatermark("impressionTime", "10 seconds")

clicks = (  
    spark  
    .readStream.format("rate").option("rowsPerSecond", 
"5").option("numPartitions", "1").load()  
    .where((rand() * 100).cast("integer") < 10)  # 10 out of every 100 
impressions result in a click  
    .selectExpr("(value - 10) AS adId ", "timestamp AS clickTime")  # -10 so 
that a click with same id as impression is generated later (i.e. delayed data). 
 .where("adId > 0")
) 

clicksWithWatermark = clicks \
    .selectExpr("adId AS clickAdId", "clickTime") \
    .withWatermark("clickTime", "10 seconds")

clicksWindow = clicksWithWatermark.groupBy(      
    window(clicksWithWatermark.clickTime, "1 minute")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
    window(impressionsWithWatermark.impressionTime, "1 minute")
).count()

clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner")

clicksAndImpressions.writeStream \
    .format("memory") \
    .queryName("clicksAndImpressions") \
    .outputMode("append") \
    .start() {code}
 

My intuition is that I'm getting no results because to output results of the 
first stateful operator (time window aggregation), a watermark needs to pass 
the end timestamp of the window. And once the watermark is after the end 
timestamp of the window, this window is ignored at the second stateful operator 
(stream-stream) join because it's behind the watermark. Indeed, a small hack 
done to event time column (adding one minute) between two stateful operators 
makes it possible to get results:
{code:java}
clicksWindow2 = clicksWithWatermark.groupBy( 
    window(clicksWithWatermark.clickTime, "1 minute")
).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 
MINUTE')).drop("window")

impressionsWindow2 = impressionsWithWatermark.groupBy(
    window(impressionsWithWatermark.impressionTime, "1 minute")
).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 
MINUTE')).drop("window")

clicksAndImpressions2 = clicksWindow2.join(impressionsWindow2, "window_time", 
"inner")

clicksAndImpressions2.writeStream \
    .format("memory") \
    .queryName("clicksAndImpressions2") \
    .outputMode("append") \
    .start()  {code}
 


> Time window aggregation in separate streams followed by stream-stream join 
> not returning results
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-45637
>                 URL: https://issues.apache.org/jira/browse/SPARK-45637
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.5.0
>         Environment: I'm using Spark 3.5.0 on Databricks Runtime 14.1
>            Reporter: Andrzej Zera
>            Priority: Major
>
> According to documentation update (SPARK-42591) resulting from SPARK-42376, 
> Spark 3.5.0 should support time-window aggregations in two separate streams 
> followed by stream-stream window join:
> https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995
> However, I failed to reproduce this example and the query I built doesn't 
> return any results:
> {code:java}
> from pyspark.sql.functions import rand
> from pyspark.sql.functions import expr, window, window_time
> spark.conf.set("spark.sql.shuffle.partitions", "1")
> impressions = (
>     spark    
>     .readStream.format("rate").option("rowsPerSecond", 
> "5").option("numPartitions", "1").load()    
>     .selectExpr("value AS adId", "timestamp AS impressionTime")
> )
> impressionsWithWatermark = impressions \
>     .selectExpr("adId AS impressionAdId", "impressionTime") \
>     .withWatermark("impressionTime", "10 seconds")
> clicks = (  
>     spark  
>     .readStream.format("rate").option("rowsPerSecond", 
> "5").option("numPartitions", "1").load()  
>     .where((rand() * 100).cast("integer") < 10)  # 10 out of every 100 
> impressions result in a click  
>     .selectExpr("(value - 10) AS adId ", "timestamp AS clickTime")  # -10 so 
> that a click with same id as impression is generated later (i.e. delayed 
> data).  .where("adId > 0")
> ) 
> clicksWithWatermark = clicks \
>     .selectExpr("adId AS clickAdId", "clickTime") \
>     .withWatermark("clickTime", "10 seconds")
> clicksWindow = clicksWithWatermark.groupBy(      
>     window(clicksWithWatermark.clickTime, "1 minute")
> ).count()
> impressionsWindow = impressionsWithWatermark.groupBy(
>     window(impressionsWithWatermark.impressionTime, "1 minute")
> ).count()
> clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner")
> clicksAndImpressions.writeStream \
>     .format("memory") \
>     .queryName("clicksAndImpressions") \
>     .outputMode("append") \
>     .start() {code}
>  
> My intuition is that I'm getting no results because to output results of the 
> first stateful operator (time window aggregation), a watermark needs to pass 
> the end timestamp of the window. And once the watermark is after the end 
> timestamp of the window, this window is ignored at the second stateful 
> operator (stream-stream) join because it's behind the watermark. Indeed, a 
> small hack done to event time column (adding one minute) between two stateful 
> operators makes it possible to get results:
> {code:java}
> clicksWindow2 = clicksWithWatermark.groupBy( 
>     window(clicksWithWatermark.clickTime, "1 minute")
> ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 
> MINUTE')).drop("window")
> impressionsWindow2 = impressionsWithWatermark.groupBy(
>     window(impressionsWithWatermark.impressionTime, "1 minute")
> ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 
> MINUTE')).drop("window")
> clicksAndImpressions2 = clicksWindow2.join(impressionsWindow2, "window_time", 
> "inner")
> clicksAndImpressions2.writeStream \
>     .format("memory") \
>     .queryName("clicksAndImpressions2") \
>     .outputMode("append") \
>     .start()  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to