[ 
https://issues.apache.org/jira/browse/SPARK-46676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-46676.
----------------------------------
    Fix Version/s: 3.5.1
                   4.0.0
       Resolution: Fixed

Issue resolved by pull request 44688
[https://github.com/apache/spark/pull/44688]

> dropDuplicatesWithinWatermark throws error on canonicalizing plan
> -----------------------------------------------------------------
>
>                 Key: SPARK-46676
>                 URL: https://issues.apache.org/jira/browse/SPARK-46676
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.5.0, 4.0.0
>            Reporter: Jungtaek Lim
>            Assignee: Jungtaek Lim
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 3.5.1, 4.0.0
>
>
> Simply said, this test code fails:
> {code:java}
> test("SPARK-XXXXX: canonicalization of 
> StreamingDeduplicateWithinWatermarkExec should work") {
>   withTempDir { checkpoint =>
>     val dedupeInputData = MemoryStream[(String, Int)]
>     val dedupe = dedupeInputData.toDS()
>       .withColumn("eventTime", timestamp_seconds($"_2"))
>       .withWatermark("eventTime", "10 second")
>       .dropDuplicatesWithinWatermark("_1")
>       .select($"_1", $"eventTime".cast("long").as[Long])
>     testStream(dedupe, Append)(
>       StartStream(checkpointLocation = checkpoint.getCanonicalPath),
>       AddData(dedupeInputData, "a" -> 1),
>       CheckNewAnswer("a" -> 1),
>       Execute { q =>
>         // This threw out error!
>         q.lastExecution.executedPlan.canonicalized
>       }
>     )
>   }
> } {code}
> with below error:
> {code:java}
> [info] - SPARK-XXXXX: canonicalization of 
> StreamingDeduplicateWithinWatermarkExec should work *** FAILED *** (1 second, 
> 237 milliseconds)
> [info]   Assert on query failed: Execute: None.get
> [info]   scala.None$.get(Option.scala:627)
> [info]       scala.None$.get(Option.scala:626)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.<init>(statefulOperators.scala:1101)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.copy(statefulOperators.scala:1092)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1148)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1087)
> [info]       
> org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1210)
> [info]       
> org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1208)
> [info]       
> org.apache.spark.sql.execution.streaming.BaseStreamingDeduplicateExec.withNewChildrenInternal(statefulOperators.scala:949)
> [info]       
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:323)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to