[ https://issues.apache.org/jira/browse/SPARK-27340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-27340: ------------------------------------ Assignee: Apache Spark > Alias on TimeWIndow expression may cause watermark metadata lost > ----------------------------------------------------------------- > > Key: SPARK-27340 > URL: https://issues.apache.org/jira/browse/SPARK-27340 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming > Affects Versions: 2.4.0 > Reporter: Kevin Zhang > Assignee: Apache Spark > Priority: Major > > When we use data api to write a structured streaming query job we usually > specify a watermark on event time column. If we define a window on the event > time column, the delayKey metadata of the event time column is supposed to be > propagated to the new column generated by time window expression. But if we > add additional alias on the time window column, the delayKey metadata is lost. > Currently I only find the bug will affect stream-stream join with equal > window join keys. In terms of aggregation, the gourping expression can be > trimed(in CleanupAliases rule) so additional alias are removed and the > metadata is kept. > Here is an example: > {code:scala} > val sparkSession = SparkSession > .builder() > .master("local") > .getOrCreate() > val rateStream = sparkSession.readStream > .format("rate") > .option("rowsPerSecond", 10) > .load() > val fooStream = rateStream > .select( > col("value").as("fooId"), > col("timestamp").as("fooTime") > ) > .withWatermark("fooTime", "2 seconds") > .select($"fooId", $"fooTime", window($"fooTime", "2 > seconds").alias("fooWindow")) > val barStream = rateStream > .where(col("value") % 2 === 0) > .select( > col("value").as("barId"), > col("timestamp").as("barTime") > ) > .withWatermark("barTime", "2 seconds") > .select($"barId", $"barTime", window($"barTime", "2 > seconds").alias("barWindow")) > val joinedDf = fooStream > .join( > barStream, > $"fooId" === $"barId" && > fooStream.col("fooWindow") === barStream.col("barWindow"), > joinType = "LeftOuter" > ) > val query = joinedDf > .writeStream > .format("console") > .option("truncate", 100) > .trigger(Trigger.ProcessingTime("5 seconds")) > .start() > query.awaitTermination() > {code} > this program will end with an exception, and from the analyzed plan we can > see there is no delayKey metadata on 'fooWindow' > {code:java} > org.apache.spark.sql.AnalysisException: Stream-stream outer join between two > streaming DataFrame/Datasets is not supported without a watermark in the join > keys, or a watermark on the nullable side and an appropriate range condition;; > Join LeftOuter, ((fooId#4L = barId#14L) && (fooWindow#9 = barWindow#19)) > :- Project [fooId#4L, fooTime#5-T2000ms, window#10-T2000ms AS fooWindow#9] > : +- Filter isnotnull(fooTime#5-T2000ms) > : +- Project [named_struct(start, precisetimestampconversion(((((CASE > WHEN (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, > TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as > double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, > LongType) - 0) as double) / cast(2000000 as double))) THEN > (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, > LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) > ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, > LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as > bigint)) - cast(1 as bigint)) * 2000000) + 0), LongType, TimestampType), end, > precisetimestampconversion((((((CASE WHEN > (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, > TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as > double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, > LongType) - 0) as double) / cast(2000000 as double))) THEN > (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, > LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) > ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, > LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as > bigint)) - cast(1 as bigint)) * 2000000) + 0) + 2000000), LongType, > TimestampType)) AS window#10-T2000ms, fooId#4L, fooTime#5-T2000ms] > : +- EventTimeWatermark fooTime#5: timestamp, interval 2 seconds > : +- Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5] > : +- StreamingRelationV2 > org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, > rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L] > +- Project [barId#14L, barTime#15-T2000ms, window#20-T2000ms AS barWindow#19] > +- Filter isnotnull(barTime#15-T2000ms) > +- Project [named_struct(start, precisetimestampconversion(((((CASE > WHEN (cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, > TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as > double) = (cast((precisetimestampconversion(barTime#15-T2000ms, > TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN > (CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, > LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) > ELSE CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, > TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + > cast(0 as bigint)) - cast(1 as bigint)) * 2000000) + 0), LongType, > TimestampType), end, precisetimestampconversion((((((CASE WHEN > (cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, > TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as > double) = (cast((precisetimestampconversion(barTime#15-T2000ms, > TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN > (CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, > LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) > ELSE CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, > TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + > cast(0 as bigint)) - cast(1 as bigint)) * 2000000) + 0) + 2000000), LongType, > TimestampType)) AS window#20-T2000ms, barId#14L, barTime#15-T2000ms] > +- EventTimeWatermark barTime#15: timestamp, interval 2 seconds > +- Project [value#1L AS barId#14L, timestamp#0 AS barTime#15] > +- Filter ((value#1L % cast(2 as bigint)) = cast(0 as bigint)) > +- StreamingRelationV2 > org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, > rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org