Jungtaek Lim created SPARK-46676:
------------------------------------

             Summary: dropDuplicatesWithinWatermark throws error on 
canonicalizing plan
                 Key: SPARK-46676
                 URL: https://issues.apache.org/jira/browse/SPARK-46676
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 3.5.0, 4.0.0
            Reporter: Jungtaek Lim


Simply said, this test code fails:
{code:java}
test("SPARK-XXXXX: canonicalization of StreamingDeduplicateWithinWatermarkExec 
should work") {
  withTempDir { checkpoint =>
    val dedupeInputData = MemoryStream[(String, Int)]
    val dedupe = dedupeInputData.toDS()
      .withColumn("eventTime", timestamp_seconds($"_2"))
      .withWatermark("eventTime", "10 second")
      .dropDuplicatesWithinWatermark("_1")
      .select($"_1", $"eventTime".cast("long").as[Long])

    testStream(dedupe, Append)(
      StartStream(checkpointLocation = checkpoint.getCanonicalPath),

      AddData(dedupeInputData, "a" -> 1),
      CheckNewAnswer("a" -> 1),

      Execute { q =>
        // This threw out error!
        q.lastExecution.executedPlan.canonicalized
      }
    )
  }
} {code}
with below error:
{code:java}
[info] - SPARK-XXXXX: canonicalization of 
StreamingDeduplicateWithinWatermarkExec should work *** FAILED *** (1 second, 
237 milliseconds)
[info]   Assert on query failed: Execute: None.get
[info]   scala.None$.get(Option.scala:627)
[info]       scala.None$.get(Option.scala:626)
[info]       
org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.<init>(statefulOperators.scala:1101)
[info]       
org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.copy(statefulOperators.scala:1092)
[info]       
org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1148)
[info]       
org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1087)
[info]       
org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1210)
[info]       
org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1208)
[info]       
org.apache.spark.sql.execution.streaming.BaseStreamingDeduplicateExec.withNewChildrenInternal(statefulOperators.scala:949)
[info]       
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:323)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to