[
https://issues.apache.org/jira/browse/SPARK-46676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun closed SPARK-46676.
---------------------------------
> dropDuplicatesWithinWatermark throws error on canonicalizing plan
> -----------------------------------------------------------------
>
> Key: SPARK-46676
> URL: https://issues.apache.org/jira/browse/SPARK-46676
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.5.0, 4.0.0
> Reporter: Jungtaek Lim
> Assignee: Jungtaek Lim
> Priority: Major
> Labels: pull-request-available
> Fix For: 3.5.1, 4.0.0
>
>
> Simply said, this test code fails:
> {code:java}
> test("SPARK-XXXXX: canonicalization of
> StreamingDeduplicateWithinWatermarkExec should work") {
> withTempDir { checkpoint =>
> val dedupeInputData = MemoryStream[(String, Int)]
> val dedupe = dedupeInputData.toDS()
> .withColumn("eventTime", timestamp_seconds($"_2"))
> .withWatermark("eventTime", "10 second")
> .dropDuplicatesWithinWatermark("_1")
> .select($"_1", $"eventTime".cast("long").as[Long])
> testStream(dedupe, Append)(
> StartStream(checkpointLocation = checkpoint.getCanonicalPath),
> AddData(dedupeInputData, "a" -> 1),
> CheckNewAnswer("a" -> 1),
> Execute { q =>
> // This threw out error!
> q.lastExecution.executedPlan.canonicalized
> }
> )
> }
> } {code}
> with below error:
> {code:java}
> [info] - SPARK-XXXXX: canonicalization of
> StreamingDeduplicateWithinWatermarkExec should work *** FAILED *** (1 second,
> 237 milliseconds)
> [info] Assert on query failed: Execute: None.get
> [info] scala.None$.get(Option.scala:627)
> [info] scala.None$.get(Option.scala:626)
> [info]
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.<init>(statefulOperators.scala:1101)
> [info]
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.copy(statefulOperators.scala:1092)
> [info]
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1148)
> [info]
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1087)
> [info]
> org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1210)
> [info]
> org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1208)
> [info]
> org.apache.spark.sql.execution.streaming.BaseStreamingDeduplicateExec.withNewChildrenInternal(statefulOperators.scala:949)
> [info]
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:323)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]