[ 
https://issues.apache.org/jira/browse/SPARK-48869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Swinbank updated SPARK-48869:
-------------------------------------
    Description: 
When I run this query:
{code:python}
from pyspark.sql.functions import col, first

df_src = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 3)
    .load()
    .withColumn("text_value", col("value").cast("string"))
    .withColumn("label", col("value"))
)

df_pivot = (
    df_src
    .withWatermark("timestamp", "10 minutes")
    .groupBy("label")
    .pivot("value", ["x", "y"])
    .agg(first("value"))
)
display(df_pivot)
{code}
 
it fails with error {_}AnalysisException: Detected pattern of possible 
'correctness' issue due to global watermark{_}.

This happens because the analyzer [resolves the pivot into two aggregate 
operations|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L817].
 Multiple aggregations in a watermarked streaming pipeline [cause this error to 
be 
thrown|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala#L123]
 to prevent correctness issues caused by stateful operations using a single 
global watermark.

I've raised this as a bug because of the pivot failure, but please advise if 
that's inappropriate! Some observations:
 * We could say that pivot is not supported in a watermarked streaming 
pipeline. That would be disappointing, and it's not always true (depending on 
the datatype of the column being pivoted - for example, pivoting a string 
column causes the pivot to be implemented as a one-step aggregate).
 * I don't understand the internals well enough to know whether the two-step 
aggregate is vulnerable to global watermark correctness issues at all. If not, 
it would seem to me that the unsupported operation check is too aggressive - it 
would need to be able to distinguish between the general multiple stateful 
operation problem and the adjacent aggregates implemented by the analyzer for 
pivot.
 * If the two-step aggregate for pivot is safe, one can disable the correctness 
check with 
{{{}spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled",
 "false"){}}}, but that affects all operations - in principle a query could 
contain pivot operations *and* multiple operations vulnerable to global 
watermark correctness.

Many thanks

  was:
When I run this query:
{code:python}
from pyspark.sql.functions import col, first

df_src = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 3)
    .load()
    .withColumn("text_value", col("value").cast("string"))
    .withColumn("label", col("text_value"))
)

df_pivot = (
    df_src
    .withWatermark("timestamp", "10 minutes")
    .groupBy("label")
    .pivot("value", ["x", "y"])
    .agg(first("value"))
)
display(df_pivot)
{code}
 
it fails with error {_}AnalysisException: Detected pattern of possible 
'correctness' issue due to global watermark{_}.

This happens because the analyzer [resolves the pivot into two aggregate 
operations|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L817].
 Multiple aggregations in a watermarked streaming pipeline [cause this error to 
be 
thrown|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala#L123]
 to prevent correctness issues caused by stateful operations using a single 
global watermark.

I've raised this as a bug because of the pivot failure, but please advise if 
that's inappropriate! Some observations:
 * We could say that pivot is not supported in a watermarked streaming 
pipeline. That would be disappointing, and it's not always true (depending on 
the datatype of the column being pivoted - for example, pivoting a string 
column causes the pivot to be implemented as a one-step aggregate).
 * I don't understand the internals well enough to know whether the two-step 
aggregate is vulnerable to global watermark correctness issues at all. If not, 
it would seem to me that the unsupported operation check is too aggressive - it 
would need to be able to distinguish between the general multiple stateful 
operation problem and the adjacent aggregates implemented by the analyzer for 
pivot.
 * If the two-step aggregate for pivot is safe, one can disable the correctness 
check with 
{{{}spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled",
 "false"){}}}, but that affects all operations - in principle a query could 
contain pivot operations *and* multiple operations vulnerable to global 
watermark correctness.

Many thanks


> Pivoting watermarked streaming dataframe fails with correctness error
> ---------------------------------------------------------------------
>
>                 Key: SPARK-48869
>                 URL: https://issues.apache.org/jira/browse/SPARK-48869
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.5.0
>         Environment: Azure Databricks runtime 14.3 LTS
>            Reporter: Richard Swinbank
>            Priority: Minor
>
> When I run this query:
> {code:python}
> from pyspark.sql.functions import col, first
> df_src = (
>     spark.readStream
>     .format("rate")
>     .option("rowsPerSecond", 3)
>     .load()
>     .withColumn("text_value", col("value").cast("string"))
>     .withColumn("label", col("value"))
> )
> df_pivot = (
>     df_src
>     .withWatermark("timestamp", "10 minutes")
>     .groupBy("label")
>     .pivot("value", ["x", "y"])
>     .agg(first("value"))
> )
> display(df_pivot)
> {code}
>  
> it fails with error {_}AnalysisException: Detected pattern of possible 
> 'correctness' issue due to global watermark{_}.
> This happens because the analyzer [resolves the pivot into two aggregate 
> operations|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L817].
>  Multiple aggregations in a watermarked streaming pipeline [cause this error 
> to be 
> thrown|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala#L123]
>  to prevent correctness issues caused by stateful operations using a single 
> global watermark.
> I've raised this as a bug because of the pivot failure, but please advise if 
> that's inappropriate! Some observations:
>  * We could say that pivot is not supported in a watermarked streaming 
> pipeline. That would be disappointing, and it's not always true (depending on 
> the datatype of the column being pivoted - for example, pivoting a string 
> column causes the pivot to be implemented as a one-step aggregate).
>  * I don't understand the internals well enough to know whether the two-step 
> aggregate is vulnerable to global watermark correctness issues at all. If 
> not, it would seem to me that the unsupported operation check is too 
> aggressive - it would need to be able to distinguish between the general 
> multiple stateful operation problem and the adjacent aggregates implemented 
> by the analyzer for pivot.
>  * If the two-step aggregate for pivot is safe, one can disable the 
> correctness check with 
> {{{}spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled",
>  "false"){}}}, but that affects all operations - in principle a query could 
> contain pivot operations *and* multiple operations vulnerable to global 
> watermark correctness.
> Many thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to