[ https://issues.apache.org/jira/browse/SPARK-32456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17179410#comment-17179410 ]
Apache Spark commented on SPARK-32456: -------------------------------------- User 'HeartSaVioR' has created a pull request for this issue: https://github.com/apache/spark/pull/29461 > Check the Distinct by assuming it as Aggregate for Structured Streaming > ----------------------------------------------------------------------- > > Key: SPARK-32456 > URL: https://issues.apache.org/jira/browse/SPARK-32456 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 3.0.0 > Reporter: Yuanjian Li > Assignee: Yuanjian Li > Priority: Major > Fix For: 3.0.1, 3.1.0 > > > We want to fix 2 things here: > 1. Give better error message for Distinct related operations in append mode > that doesn't have a watermark > Check the following example: > {code:java} > val s1 = spark.readStream.format("rate").option("rowsPerSecond", > 1).load().createOrReplaceTempView("s1") > val s2 = spark.readStream.format("rate").option("rowsPerSecond", > 1).load().createOrReplaceTempView("s2") > val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select > s2.value, s2.timestamp from s2") > unionResult.writeStream.option("checkpointLocation", > ${pathA}).start(${pathB}){code} > We'll get the following confusing exception: > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:529) > at scala.None$.get(Option.scala:527) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561) > at > org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112) > ... > {code} > The union clause in SQL has the requirement of deduplication, the parser will > generate {{Distinct(Union)}} and the optimizer rule > {{ReplaceDistinctWithAggregate}} will change it to {{Aggregate(Union)}}. So > the root cause here is the checking logic for Aggregate is missing for > Distinct. > Actually it happens for all Distinct related operations in Structured > Streaming, e.g > {code:java} > val df = spark.readStream.format("rate").load() > df.createOrReplaceTempView("deduptest") > val distinct = spark.sql("select distinct value from deduptest") > distinct.writeStream.option("checkpointLocation", > ${pathA}).start(${pathB}){code} > > 2. Make {{Distinct}} in complete mode runnable. > The distinct in complete mode will throw the exception: > {quote} > {{Complete output mode not supported when there are no streaming aggregations > on streaming DataFrames/Datasets;}} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org