[ https://issues.apache.org/jira/browse/SPARK-46676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jungtaek Lim resolved SPARK-46676. ---------------------------------- Fix Version/s: 3.5.1 4.0.0 Resolution: Fixed Issue resolved by pull request 44688 [https://github.com/apache/spark/pull/44688] > dropDuplicatesWithinWatermark throws error on canonicalizing plan > ----------------------------------------------------------------- > > Key: SPARK-46676 > URL: https://issues.apache.org/jira/browse/SPARK-46676 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 3.5.0, 4.0.0 > Reporter: Jungtaek Lim > Assignee: Jungtaek Lim > Priority: Major > Labels: pull-request-available > Fix For: 3.5.1, 4.0.0 > > > Simply said, this test code fails: > {code:java} > test("SPARK-XXXXX: canonicalization of > StreamingDeduplicateWithinWatermarkExec should work") { > withTempDir { checkpoint => > val dedupeInputData = MemoryStream[(String, Int)] > val dedupe = dedupeInputData.toDS() > .withColumn("eventTime", timestamp_seconds($"_2")) > .withWatermark("eventTime", "10 second") > .dropDuplicatesWithinWatermark("_1") > .select($"_1", $"eventTime".cast("long").as[Long]) > testStream(dedupe, Append)( > StartStream(checkpointLocation = checkpoint.getCanonicalPath), > AddData(dedupeInputData, "a" -> 1), > CheckNewAnswer("a" -> 1), > Execute { q => > // This threw out error! > q.lastExecution.executedPlan.canonicalized > } > ) > } > } {code} > with below error: > {code:java} > [info] - SPARK-XXXXX: canonicalization of > StreamingDeduplicateWithinWatermarkExec should work *** FAILED *** (1 second, > 237 milliseconds) > [info] Assert on query failed: Execute: None.get > [info] scala.None$.get(Option.scala:627) > [info] scala.None$.get(Option.scala:626) > [info] > org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.<init>(statefulOperators.scala:1101) > [info] > org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.copy(statefulOperators.scala:1092) > [info] > org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1148) > [info] > org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1087) > [info] > org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1210) > [info] > org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1208) > [info] > org.apache.spark.sql.execution.streaming.BaseStreamingDeduplicateExec.withNewChildrenInternal(statefulOperators.scala:949) > [info] > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:323) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org