[jira] [Assigned] (SPARK-46676) dropDuplicatesWithinWatermark throws error on canonicalizing plan

2024-01-18 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-46676:


Assignee: Jungtaek Lim

> dropDuplicatesWithinWatermark throws error on canonicalizing plan
> -
>
> Key: SPARK-46676
> URL: https://issues.apache.org/jira/browse/SPARK-46676
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.5.0, 4.0.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
>
> Simply said, this test code fails:
> {code:java}
> test("SPARK-X: canonicalization of 
> StreamingDeduplicateWithinWatermarkExec should work") {
>   withTempDir { checkpoint =>
> val dedupeInputData = MemoryStream[(String, Int)]
> val dedupe = dedupeInputData.toDS()
>   .withColumn("eventTime", timestamp_seconds($"_2"))
>   .withWatermark("eventTime", "10 second")
>   .dropDuplicatesWithinWatermark("_1")
>   .select($"_1", $"eventTime".cast("long").as[Long])
> testStream(dedupe, Append)(
>   StartStream(checkpointLocation = checkpoint.getCanonicalPath),
>   AddData(dedupeInputData, "a" -> 1),
>   CheckNewAnswer("a" -> 1),
>   Execute { q =>
> // This threw out error!
> q.lastExecution.executedPlan.canonicalized
>   }
> )
>   }
> } {code}
> with below error:
> {code:java}
> [info] - SPARK-X: canonicalization of 
> StreamingDeduplicateWithinWatermarkExec should work *** FAILED *** (1 second, 
> 237 milliseconds)
> [info]   Assert on query failed: Execute: None.get
> [info]   scala.None$.get(Option.scala:627)
> [info]       scala.None$.get(Option.scala:626)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.(statefulOperators.scala:1101)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.copy(statefulOperators.scala:1092)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1148)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1087)
> [info]       
> org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1210)
> [info]       
> org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1208)
> [info]       
> org.apache.spark.sql.execution.streaming.BaseStreamingDeduplicateExec.withNewChildrenInternal(statefulOperators.scala:949)
> [info]       
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:323)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46676) dropDuplicatesWithinWatermark throws error on canonicalizing plan

2024-01-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-46676:
--

Assignee: (was: Apache Spark)

> dropDuplicatesWithinWatermark throws error on canonicalizing plan
> -
>
> Key: SPARK-46676
> URL: https://issues.apache.org/jira/browse/SPARK-46676
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.5.0, 4.0.0
>Reporter: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
>
> Simply said, this test code fails:
> {code:java}
> test("SPARK-X: canonicalization of 
> StreamingDeduplicateWithinWatermarkExec should work") {
>   withTempDir { checkpoint =>
> val dedupeInputData = MemoryStream[(String, Int)]
> val dedupe = dedupeInputData.toDS()
>   .withColumn("eventTime", timestamp_seconds($"_2"))
>   .withWatermark("eventTime", "10 second")
>   .dropDuplicatesWithinWatermark("_1")
>   .select($"_1", $"eventTime".cast("long").as[Long])
> testStream(dedupe, Append)(
>   StartStream(checkpointLocation = checkpoint.getCanonicalPath),
>   AddData(dedupeInputData, "a" -> 1),
>   CheckNewAnswer("a" -> 1),
>   Execute { q =>
> // This threw out error!
> q.lastExecution.executedPlan.canonicalized
>   }
> )
>   }
> } {code}
> with below error:
> {code:java}
> [info] - SPARK-X: canonicalization of 
> StreamingDeduplicateWithinWatermarkExec should work *** FAILED *** (1 second, 
> 237 milliseconds)
> [info]   Assert on query failed: Execute: None.get
> [info]   scala.None$.get(Option.scala:627)
> [info]       scala.None$.get(Option.scala:626)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.(statefulOperators.scala:1101)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.copy(statefulOperators.scala:1092)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1148)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1087)
> [info]       
> org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1210)
> [info]       
> org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1208)
> [info]       
> org.apache.spark.sql.execution.streaming.BaseStreamingDeduplicateExec.withNewChildrenInternal(statefulOperators.scala:949)
> [info]       
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:323)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46676) dropDuplicatesWithinWatermark throws error on canonicalizing plan

2024-01-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-46676:
--

Assignee: Apache Spark

> dropDuplicatesWithinWatermark throws error on canonicalizing plan
> -
>
> Key: SPARK-46676
> URL: https://issues.apache.org/jira/browse/SPARK-46676
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.5.0, 4.0.0
>Reporter: Jungtaek Lim
>Assignee: Apache Spark
>Priority: Major
>  Labels: pull-request-available
>
> Simply said, this test code fails:
> {code:java}
> test("SPARK-X: canonicalization of 
> StreamingDeduplicateWithinWatermarkExec should work") {
>   withTempDir { checkpoint =>
> val dedupeInputData = MemoryStream[(String, Int)]
> val dedupe = dedupeInputData.toDS()
>   .withColumn("eventTime", timestamp_seconds($"_2"))
>   .withWatermark("eventTime", "10 second")
>   .dropDuplicatesWithinWatermark("_1")
>   .select($"_1", $"eventTime".cast("long").as[Long])
> testStream(dedupe, Append)(
>   StartStream(checkpointLocation = checkpoint.getCanonicalPath),
>   AddData(dedupeInputData, "a" -> 1),
>   CheckNewAnswer("a" -> 1),
>   Execute { q =>
> // This threw out error!
> q.lastExecution.executedPlan.canonicalized
>   }
> )
>   }
> } {code}
> with below error:
> {code:java}
> [info] - SPARK-X: canonicalization of 
> StreamingDeduplicateWithinWatermarkExec should work *** FAILED *** (1 second, 
> 237 milliseconds)
> [info]   Assert on query failed: Execute: None.get
> [info]   scala.None$.get(Option.scala:627)
> [info]       scala.None$.get(Option.scala:626)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.(statefulOperators.scala:1101)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.copy(statefulOperators.scala:1092)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1148)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1087)
> [info]       
> org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1210)
> [info]       
> org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1208)
> [info]       
> org.apache.spark.sql.execution.streaming.BaseStreamingDeduplicateExec.withNewChildrenInternal(statefulOperators.scala:949)
> [info]       
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:323)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46676) dropDuplicatesWithinWatermark throws error on canonicalizing plan

2024-01-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-46676:
--

Assignee: (was: Apache Spark)

> dropDuplicatesWithinWatermark throws error on canonicalizing plan
> -
>
> Key: SPARK-46676
> URL: https://issues.apache.org/jira/browse/SPARK-46676
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.5.0, 4.0.0
>Reporter: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
>
> Simply said, this test code fails:
> {code:java}
> test("SPARK-X: canonicalization of 
> StreamingDeduplicateWithinWatermarkExec should work") {
>   withTempDir { checkpoint =>
> val dedupeInputData = MemoryStream[(String, Int)]
> val dedupe = dedupeInputData.toDS()
>   .withColumn("eventTime", timestamp_seconds($"_2"))
>   .withWatermark("eventTime", "10 second")
>   .dropDuplicatesWithinWatermark("_1")
>   .select($"_1", $"eventTime".cast("long").as[Long])
> testStream(dedupe, Append)(
>   StartStream(checkpointLocation = checkpoint.getCanonicalPath),
>   AddData(dedupeInputData, "a" -> 1),
>   CheckNewAnswer("a" -> 1),
>   Execute { q =>
> // This threw out error!
> q.lastExecution.executedPlan.canonicalized
>   }
> )
>   }
> } {code}
> with below error:
> {code:java}
> [info] - SPARK-X: canonicalization of 
> StreamingDeduplicateWithinWatermarkExec should work *** FAILED *** (1 second, 
> 237 milliseconds)
> [info]   Assert on query failed: Execute: None.get
> [info]   scala.None$.get(Option.scala:627)
> [info]       scala.None$.get(Option.scala:626)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.(statefulOperators.scala:1101)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.copy(statefulOperators.scala:1092)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1148)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1087)
> [info]       
> org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1210)
> [info]       
> org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1208)
> [info]       
> org.apache.spark.sql.execution.streaming.BaseStreamingDeduplicateExec.withNewChildrenInternal(statefulOperators.scala:949)
> [info]       
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:323)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46676) dropDuplicatesWithinWatermark throws error on canonicalizing plan

2024-01-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-46676:
--

Assignee: Apache Spark

> dropDuplicatesWithinWatermark throws error on canonicalizing plan
> -
>
> Key: SPARK-46676
> URL: https://issues.apache.org/jira/browse/SPARK-46676
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.5.0, 4.0.0
>Reporter: Jungtaek Lim
>Assignee: Apache Spark
>Priority: Major
>  Labels: pull-request-available
>
> Simply said, this test code fails:
> {code:java}
> test("SPARK-X: canonicalization of 
> StreamingDeduplicateWithinWatermarkExec should work") {
>   withTempDir { checkpoint =>
> val dedupeInputData = MemoryStream[(String, Int)]
> val dedupe = dedupeInputData.toDS()
>   .withColumn("eventTime", timestamp_seconds($"_2"))
>   .withWatermark("eventTime", "10 second")
>   .dropDuplicatesWithinWatermark("_1")
>   .select($"_1", $"eventTime".cast("long").as[Long])
> testStream(dedupe, Append)(
>   StartStream(checkpointLocation = checkpoint.getCanonicalPath),
>   AddData(dedupeInputData, "a" -> 1),
>   CheckNewAnswer("a" -> 1),
>   Execute { q =>
> // This threw out error!
> q.lastExecution.executedPlan.canonicalized
>   }
> )
>   }
> } {code}
> with below error:
> {code:java}
> [info] - SPARK-X: canonicalization of 
> StreamingDeduplicateWithinWatermarkExec should work *** FAILED *** (1 second, 
> 237 milliseconds)
> [info]   Assert on query failed: Execute: None.get
> [info]   scala.None$.get(Option.scala:627)
> [info]       scala.None$.get(Option.scala:626)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.(statefulOperators.scala:1101)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.copy(statefulOperators.scala:1092)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1148)
> [info]       
> org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1087)
> [info]       
> org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1210)
> [info]       
> org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1208)
> [info]       
> org.apache.spark.sql.execution.streaming.BaseStreamingDeduplicateExec.withNewChildrenInternal(statefulOperators.scala:949)
> [info]       
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:323)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org