[ 
https://issues.apache.org/jira/browse/SPARK-32456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-32456:
------------------------------------

    Assignee: Yuanjian Li

> Check the Distinct by assuming it as Aggregate for Structured Streaming
> -----------------------------------------------------------------------
>
>                 Key: SPARK-32456
>                 URL: https://issues.apache.org/jira/browse/SPARK-32456
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.0.0
>            Reporter: Yuanjian Li
>            Assignee: Yuanjian Li
>            Priority: Major
>
> We want to fix 2 things here:
> 1. Give better error message for Distinct related operations in append mode 
> that doesn't have a watermark
> Check the following example: 
> {code:java}
> val s1 = spark.readStream.format("rate").option("rowsPerSecond", 
> 1).load().createOrReplaceTempView("s1")
> val s2 = spark.readStream.format("rate").option("rowsPerSecond", 
> 1).load().createOrReplaceTempView("s2")
> val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select 
> s2.value, s2.timestamp from s2")
> unionResult.writeStream.option("checkpointLocation", 
> ${pathA}).start(${pathB}){code}
> We'll get the following confusing exception:
> {code:java}
> java.util.NoSuchElementException: None.get
>       at scala.None$.get(Option.scala:529)
>       at scala.None$.get(Option.scala:527)
>       at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561)
>       at 
> org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112)
> ...
> {code}
> The union clause in SQL has the requirement of deduplication, the parser will 
> generate {{Distinct(Union)}} and the optimizer rule 
> {{ReplaceDistinctWithAggregate}} will change it to {{Aggregate(Union)}}. So 
> the root cause here is the checking logic for Aggregate is missing for 
> Distinct.
> Actually it happens for all Distinct related operations in Structured 
> Streaming, e.g
> {code:java}
> val df = spark.readStream.format("rate").load()
> df.createOrReplaceTempView("deduptest")
> val distinct = spark.sql("select distinct value from deduptest")
> distinct.writeStream.option("checkpointLocation", 
> ${pathA}).start(${pathB}){code}
>  
> 2. Make {{Distinct}} in complete mode runnable.
> The distinct in complete mode will throw the exception:
> {quote} 
> {{Complete output mode not supported when there are no streaming aggregations 
> on streaming DataFrames/Datasets;}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to