Sandish Kumar HN created SPARK-33039: ----------------------------------------
Summary: Misleading watermark calculation in structure streaming Key: SPARK-33039 URL: https://issues.apache.org/jira/browse/SPARK-33039 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.4 Reporter: Sandish Kumar HN source code: {code:java} import org.apache.spark.sql.SparkSession import org.apache.hadoop.fs.Path import java.sql.Timestamp import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} object TestWaterMark extends App { val spark = SparkSession.builder().master("local").getOrCreate() val sc = spark.sparkContext val dir = new Path("/tmp/test-structured-streaming") val fs = dir.getFileSystem(sc.hadoopConfiguration) fs.mkdirs(dir) val schema = StructType(StructField("vilue", StringType) :: StructField("timestamp", TimestampType) :: Nil) val eventStream = spark .readStream .option("sep", ";") .option("header", "false") .schema(schema) .csv(dir.toString) // Watermarked aggregation val eventsCount = eventStream .withWatermark("timestamp", "5 seconds") .groupBy(window(col("timestamp"), "10 seconds")) .count def writeFile(path: Path, data: String) { val file = fs.create(path) file.writeUTF(data) file.close() } // Debug query val query = eventsCount.writeStream .format("console") .outputMode("complete") .option("truncate", "false") .trigger(Trigger.ProcessingTime("5 seconds")) .start() writeFile(new Path(dir, "file1"), """ |OLD;2019-08-09 10:05:00 |OLD;2019-08-09 10:10:00 |OLD;2019-08-09 10:15:00""".stripMargin) query.processAllAvailable() val lp1 = query.lastProgress println(lp1.eventTime) writeFile(new Path(dir, "file2"), """ |NEW;2020-08-29 10:05:00 |NEW;2020-08-29 10:10:00 |NEW;2020-08-29 10:15:00""".stripMargin) query.processAllAvailable() val lp2 = query.lastProgress println(lp2.eventTime) writeFile(new Path(dir, "file4"), """ |OLD;2017-08-10 10:05:00 |OLD;2017-08-10 10:10:00 |OLD;2017-08-10 10:15:00""".stripMargin) writeFile(new Path(dir, "file3"), "") query.processAllAvailable() val lp3 = query.lastProgress println(lp3.eventTime) query.awaitTermination() fs.delete(dir, true) } {code} OUTPUT: {code:java} ------------------------------------------- Batch: 0 ------------------------------------------- +------------------------------------------+-----+ |window |count| +------------------------------------------+-----+ |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | +------------------------------------------+-----+ {min=2019-08-09T17:05:00.000Z, avg=2019-08-09T17:10:00.000Z, watermark=1970-01-01T00:00:00.000Z, max=2019-08-09T17:15:00.000Z} ------------------------------------------- Batch: 1 ------------------------------------------- +------------------------------------------+-----+ |window |count| +------------------------------------------+-----+ |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 | |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 | |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 | |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | +------------------------------------------+-----+ {min=2020-08-29T17:05:00.000Z, avg=2020-08-29T17:10:00.000Z, watermark=2019-08-09T17:14:55.000Z, max=2020-08-29T17:15:00.000Z} ------------------------------------------- Batch: 2 ------------------------------------------- +------------------------------------------+-----+ |window |count| +------------------------------------------+-----+ |[2017-08-10 10:15:00, 2017-08-10 10:15:10]|1 | |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 | |[2017-08-10 10:05:00, 2017-08-10 10:05:10]|1 | |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 | |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 | |[2017-08-10 10:10:00, 2017-08-10 10:10:10]|1 | |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 | |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 | |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 | +------------------------------------------+-----+ {min=2017-08-10T17:05:00.000Z, avg=2017-08-10T17:10:00.000Z, watermark=2020-08-29T17:14:55.000Z, max=2017-08-10T17:15:00.000Z} {code} EXPECTED: expected to drop the last batch events to get dropped as the watermark is 2019-08-09T17:14:55.000Z. expected events to get droped: |OLD;2017-08-10 10:05:00 |OLD;2017-08-10 10:10:00 |OLD;2017-08-10 10:15:00 -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org