Jungtaek Lim created SPARK-46676: ------------------------------------ Summary: dropDuplicatesWithinWatermark throws error on canonicalizing plan Key: SPARK-46676 URL: https://issues.apache.org/jira/browse/SPARK-46676 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 3.5.0, 4.0.0 Reporter: Jungtaek Lim
Simply said, this test code fails: {code:java} test("SPARK-XXXXX: canonicalization of StreamingDeduplicateWithinWatermarkExec should work") { withTempDir { checkpoint => val dedupeInputData = MemoryStream[(String, Int)] val dedupe = dedupeInputData.toDS() .withColumn("eventTime", timestamp_seconds($"_2")) .withWatermark("eventTime", "10 second") .dropDuplicatesWithinWatermark("_1") .select($"_1", $"eventTime".cast("long").as[Long]) testStream(dedupe, Append)( StartStream(checkpointLocation = checkpoint.getCanonicalPath), AddData(dedupeInputData, "a" -> 1), CheckNewAnswer("a" -> 1), Execute { q => // This threw out error! q.lastExecution.executedPlan.canonicalized } ) } } {code} with below error: {code:java} [info] - SPARK-XXXXX: canonicalization of StreamingDeduplicateWithinWatermarkExec should work *** FAILED *** (1 second, 237 milliseconds) [info] Assert on query failed: Execute: None.get [info] scala.None$.get(Option.scala:627) [info] scala.None$.get(Option.scala:626) [info] org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.<init>(statefulOperators.scala:1101) [info] org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.copy(statefulOperators.scala:1092) [info] org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1148) [info] org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1087) [info] org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1210) [info] org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1208) [info] org.apache.spark.sql.execution.streaming.BaseStreamingDeduplicateExec.withNewChildrenInternal(statefulOperators.scala:949) [info] org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:323) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org