[ https://issues.apache.org/jira/browse/SPARK-33039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17209030#comment-17209030 ]
Aoyuan Liao commented on SPARK-33039: ------------------------------------- Since the window: [2020-08-29 10:15:00, 2020-08-29 10:15:10] ended after the watermark: 2020-08-29T17:14:55.000, the partial count maintained as internal state while waiting for later data, so not yet added to the result table. Please take a deeper look at the append mode example shown in the documentation: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking > Misleading watermark calculation in structure streaming > ------------------------------------------------------- > > Key: SPARK-33039 > URL: https://issues.apache.org/jira/browse/SPARK-33039 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.4 > Reporter: Sandish Kumar HN > Priority: Major > > source code: > {code:java} > import org.apache.spark.sql.SparkSession > import org.apache.hadoop.fs.Path > import java.sql.Timestamp > import org.apache.spark.sql.types._ > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} > object TestWaterMark extends App { > val spark = SparkSession.builder().master("local").getOrCreate() > val sc = spark.sparkContext > val dir = new Path("/tmp/test-structured-streaming") > val fs = dir.getFileSystem(sc.hadoopConfiguration) > fs.mkdirs(dir) > val schema = StructType(StructField("vilue", StringType) :: > StructField("timestamp", TimestampType) :: > Nil) > val eventStream = spark > .readStream > .option("sep", ";") > .option("header", "false") > .schema(schema) > .csv(dir.toString) > // Watermarked aggregation > val eventsCount = eventStream > .withWatermark("timestamp", "5 seconds") > .groupBy(window(col("timestamp"), "10 seconds")) > .count > def writeFile(path: Path, data: String) { > val file = fs.create(path) > file.writeUTF(data) > file.close() > } > // Debug query > val query = eventsCount.writeStream > .format("console") > .outputMode("complete") > .option("truncate", "false") > .trigger(Trigger.ProcessingTime("5 seconds")) > .start() > writeFile(new Path(dir, "file1"), """ > |OLD;2019-08-09 10:05:00 > |OLD;2019-08-09 10:10:00 > |OLD;2019-08-09 10:15:00""".stripMargin) > query.processAllAvailable() > val lp1 = query.lastProgress > println(lp1.eventTime) > writeFile(new Path(dir, "file2"), """ > |NEW;2020-08-29 10:05:00 > |NEW;2020-08-29 10:10:00 > |NEW;2020-08-29 10:15:00""".stripMargin) > query.processAllAvailable() > val lp2 = query.lastProgress > println(lp2.eventTime) > writeFile(new Path(dir, "file4"), """ > |OLD;2017-08-10 10:05:00 > |OLD;2017-08-10 10:10:00 > |OLD;2017-08-10 10:15:00""".stripMargin) > writeFile(new Path(dir, "file3"), "") > query.processAllAvailable() > val lp3 = query.lastProgress > println(lp3.eventTime) > query.awaitTermination() > fs.delete(dir, true) > } > {code} > OUTPUT: > > {code:java} > ------------------------------------------- > Batch: 0 > ------------------------------------------- > +------------------------------------------+-----+ > |window |count| > +------------------------------------------+-----+ > |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | > |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | > |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | > +------------------------------------------+-----+ > {min=2019-08-09T17:05:00.000Z, avg=2019-08-09T17:10:00.000Z, > watermark=1970-01-01T00:00:00.000Z, max=2019-08-09T17:15:00.000Z} > ------------------------------------------- > Batch: 1 > ------------------------------------------- > +------------------------------------------+-----+ > |window |count| > +------------------------------------------+-----+ > |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 | > |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 | > |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | > |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 | > |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | > |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | > +------------------------------------------+-----+ > {min=2020-08-29T17:05:00.000Z, avg=2020-08-29T17:10:00.000Z, > watermark=2019-08-09T17:14:55.000Z, max=2020-08-29T17:15:00.000Z} > ------------------------------------------- > Batch: 2 > ------------------------------------------- > +------------------------------------------+-----+ > |window |count| > +------------------------------------------+-----+ > |[2017-08-10 10:15:00, 2017-08-10 10:15:10]|1 | > |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 | > |[2017-08-10 10:05:00, 2017-08-10 10:05:10]|1 | > |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 | > |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | > |[2017-08-10 10:10:00, 2017-08-10 10:10:10]|1 | > |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 | > |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | > |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | > +------------------------------------------+-----+ > {min=2017-08-10T17:05:00.000Z, avg=2017-08-10T17:10:00.000Z, > watermark=2020-08-29T17:14:55.000Z, max=2017-08-10T17:15:00.000Z} > {code} > EXPECTED: > expected to drop the last batch events to get dropped as the watermark is > 2019-08-09T17:14:55.000Z. > expected events to get droped: > |OLD;2017-08-10 10:05:00 > |OLD;2017-08-10 10:10:00 > |OLD;2017-08-10 10:15:00 -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org