[ https://issues.apache.org/jira/browse/SPARK-45637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jungtaek Lim updated SPARK-45637: --------------------------------- Labels: correctness (was: ) Priority: Blocker (was: Major) > Time window aggregation in separate streams followed by stream-stream join > not returning results > ------------------------------------------------------------------------------------------------ > > Key: SPARK-45637 > URL: https://issues.apache.org/jira/browse/SPARK-45637 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 3.5.0 > Environment: I'm using Spark 3.5.0 on Databricks Runtime 14.1 > Reporter: Andrzej Zera > Priority: Blocker > Labels: correctness > > According to documentation update (SPARK-42591) resulting from SPARK-42376, > Spark 3.5.0 should support time-window aggregations in two separate streams > followed by stream-stream window join: > [https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995] > However, I failed to reproduce this example and the query I built doesn't > return any results: > {code:java} > from pyspark.sql.functions import rand > from pyspark.sql.functions import expr, window, window_time > spark.conf.set("spark.sql.shuffle.partitions", "1") > impressions = ( > spark > .readStream.format("rate").option("rowsPerSecond", > "5").option("numPartitions", "1").load() > .selectExpr("value AS adId", "timestamp AS impressionTime") > ) > impressionsWithWatermark = impressions \ > .selectExpr("adId AS impressionAdId", "impressionTime") \ > .withWatermark("impressionTime", "10 seconds") > clicks = ( > spark > .readStream.format("rate").option("rowsPerSecond", > "5").option("numPartitions", "1").load() > .where((rand() * 100).cast("integer") < 10) # 10 out of every 100 > impressions result in a click > .selectExpr("(value - 10) AS adId ", "timestamp AS clickTime") # -10 so > that a click with same id as impression is generated later (i.e. delayed > data). > .where("adId > 0") > ) > clicksWithWatermark = clicks \ > .selectExpr("adId AS clickAdId", "clickTime") \ > .withWatermark("clickTime", "10 seconds") > clicksWindow = clicksWithWatermark.groupBy( > window(clicksWithWatermark.clickTime, "1 minute") > ).count() > impressionsWindow = impressionsWithWatermark.groupBy( > window(impressionsWithWatermark.impressionTime, "1 minute") > ).count() > clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner") > clicksAndImpressions.writeStream \ > .format("memory") \ > .queryName("clicksAndImpressions") \ > .outputMode("append") \ > .start() {code} > > My intuition is that I'm getting no results because to output results of the > first stateful operator (time window aggregation), a watermark needs to pass > the end timestamp of the window. And once the watermark is after the end > timestamp of the window, this window is ignored at the second stateful > operator (stream-stream) join because it's behind the watermark. Indeed, a > small hack done to event time column (adding one minute) between two stateful > operators makes it possible to get results: > {code:java} > clicksWindow2 = clicksWithWatermark.groupBy( > window(clicksWithWatermark.clickTime, "1 minute") > ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 > MINUTE')).drop("window") > impressionsWindow2 = impressionsWithWatermark.groupBy( > window(impressionsWithWatermark.impressionTime, "1 minute") > ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 > MINUTE')).drop("window") > clicksAndImpressions2 = clicksWindow2.join(impressionsWindow2, "window_time", > "inner") > clicksAndImpressions2.writeStream \ > .format("memory") \ > .queryName("clicksAndImpressions2") \ > .outputMode("append") \ > .start() {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org