[ https://issues.apache.org/jira/browse/SPARK-48869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Richard Swinbank updated SPARK-48869: ------------------------------------- Description: When I run this query: {code:python} from pyspark.sql.functions import col, first df_src = ( spark.readStream .format("rate") .option("rowsPerSecond", 3) .load() .withColumn("text_value", col("value").cast("string")) .withColumn("label", col("value")) ) df_pivot = ( df_src .withWatermark("timestamp", "10 minutes") .groupBy("label") .pivot("value", ["x", "y"]) .agg(first("value")) ) display(df_pivot) {code} it fails with error {_}AnalysisException: Detected pattern of possible 'correctness' issue due to global watermark{_}. This happens because the analyzer [resolves the pivot into two aggregate operations|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L817]. Multiple aggregations in a watermarked streaming pipeline [cause this error to be thrown|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala#L123] to prevent correctness issues caused by stateful operations using a single global watermark. I've raised this as a bug because of the pivot failure, but please advise if that's inappropriate! Some observations: * We could say that pivot is not supported in a watermarked streaming pipeline. That would be disappointing, and it's not always true (depending on the datatype of the column being pivoted - for example, pivoting a string column causes the pivot to be implemented as a one-step aggregate). * I don't understand the internals well enough to know whether the two-step aggregate is vulnerable to global watermark correctness issues at all. If not, it would seem to me that the unsupported operation check is too aggressive - it would need to be able to distinguish between the general multiple stateful operation problem and the adjacent aggregates implemented by the analyzer for pivot. * If the two-step aggregate for pivot is safe, one can disable the correctness check with {{{}spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false"){}}}, but that affects all operations - in principle a query could contain pivot operations *and* multiple operations vulnerable to global watermark correctness. Many thanks was: When I run this query: {code:python} from pyspark.sql.functions import col, first df_src = ( spark.readStream .format("rate") .option("rowsPerSecond", 3) .load() .withColumn("text_value", col("value").cast("string")) .withColumn("label", col("text_value")) ) df_pivot = ( df_src .withWatermark("timestamp", "10 minutes") .groupBy("label") .pivot("value", ["x", "y"]) .agg(first("value")) ) display(df_pivot) {code} it fails with error {_}AnalysisException: Detected pattern of possible 'correctness' issue due to global watermark{_}. This happens because the analyzer [resolves the pivot into two aggregate operations|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L817]. Multiple aggregations in a watermarked streaming pipeline [cause this error to be thrown|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala#L123] to prevent correctness issues caused by stateful operations using a single global watermark. I've raised this as a bug because of the pivot failure, but please advise if that's inappropriate! Some observations: * We could say that pivot is not supported in a watermarked streaming pipeline. That would be disappointing, and it's not always true (depending on the datatype of the column being pivoted - for example, pivoting a string column causes the pivot to be implemented as a one-step aggregate). * I don't understand the internals well enough to know whether the two-step aggregate is vulnerable to global watermark correctness issues at all. If not, it would seem to me that the unsupported operation check is too aggressive - it would need to be able to distinguish between the general multiple stateful operation problem and the adjacent aggregates implemented by the analyzer for pivot. * If the two-step aggregate for pivot is safe, one can disable the correctness check with {{{}spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false"){}}}, but that affects all operations - in principle a query could contain pivot operations *and* multiple operations vulnerable to global watermark correctness. Many thanks > Pivoting watermarked streaming dataframe fails with correctness error > --------------------------------------------------------------------- > > Key: SPARK-48869 > URL: https://issues.apache.org/jira/browse/SPARK-48869 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 3.5.0 > Environment: Azure Databricks runtime 14.3 LTS > Reporter: Richard Swinbank > Priority: Minor > > When I run this query: > {code:python} > from pyspark.sql.functions import col, first > df_src = ( > spark.readStream > .format("rate") > .option("rowsPerSecond", 3) > .load() > .withColumn("text_value", col("value").cast("string")) > .withColumn("label", col("value")) > ) > df_pivot = ( > df_src > .withWatermark("timestamp", "10 minutes") > .groupBy("label") > .pivot("value", ["x", "y"]) > .agg(first("value")) > ) > display(df_pivot) > {code} > > it fails with error {_}AnalysisException: Detected pattern of possible > 'correctness' issue due to global watermark{_}. > This happens because the analyzer [resolves the pivot into two aggregate > operations|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L817]. > Multiple aggregations in a watermarked streaming pipeline [cause this error > to be > thrown|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala#L123] > to prevent correctness issues caused by stateful operations using a single > global watermark. > I've raised this as a bug because of the pivot failure, but please advise if > that's inappropriate! Some observations: > * We could say that pivot is not supported in a watermarked streaming > pipeline. That would be disappointing, and it's not always true (depending on > the datatype of the column being pivoted - for example, pivoting a string > column causes the pivot to be implemented as a one-step aggregate). > * I don't understand the internals well enough to know whether the two-step > aggregate is vulnerable to global watermark correctness issues at all. If > not, it would seem to me that the unsupported operation check is too > aggressive - it would need to be able to distinguish between the general > multiple stateful operation problem and the adjacent aggregates implemented > by the analyzer for pivot. > * If the two-step aggregate for pivot is safe, one can disable the > correctness check with > {{{}spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", > "false"){}}}, but that affects all operations - in principle a query could > contain pivot operations *and* multiple operations vulnerable to global > watermark correctness. > Many thanks -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org