[ https://issues.apache.org/jira/browse/SPARK-38204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-38204: ------------------------------------ Assignee: (was: Apache Spark) > All state operators are at a risk of inconsistency between state partitioning > and operator partitioning > ------------------------------------------------------------------------------------------------------- > > Key: SPARK-38204 > URL: https://issues.apache.org/jira/browse/SPARK-38204 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.2.3, 2.3.4, 2.4.8, 3.0.3, 3.1.2, 3.2.1, 3.3.0 > Reporter: Jungtaek Lim > Priority: Blocker > Labels: correctness > > Except stream-stream join, all stateful operators use ClusteredDistribution > as a requirement of child distribution. > ClusteredDistribution is very relaxed one - any output partitioning can > satisfy the distribution if the partitioning can ensure all tuples having > same grouping keys are placed in same partition. > To illustrate an example, support we do streaming aggregation like below code: > {code:java} > df > .withWatermark("timestamp", "30 minutes") > .groupBy("group1", "group2", window("timestamp", "10 minutes")) > .agg(count("*")) {code} > In the code, streaming aggregation operator will be involved in physical > plan, which would have ClusteredDistribution("group1", "group2", "window"). > The problem is, various output partitionings can satisfy this distribution: > * RangePartitioning > ** This accepts exact and subset of the grouping key, with any order of keys > (combination), with any sort order (asc/desc) > * HashPartitioning > ** This accepts exact and subset of the grouping key, with any order of keys > (combination) > * (upcoming Spark 3.3.0+) DataSourcePartitioning > ** output partitioning provided by data source will be able to satisfy > ClusteredDistribution, which will make things worse (assuming data source can > provide different output partitioning relatively easier) > e.g. even we only consider HashPartitioning, HashPartitioning("group1"), > HashPartitioning("group2"), HashPartitioning("group1", "group2"), > HashPartitioning("group2", "group1"), HashPartitioning("group1", "group2", > "window"), etc. > The requirement of state partitioning is much more strict, since we should > not change the partitioning once it is partitioned and built. *It should > ensure that all tuples having same grouping keys are placed in same partition > (same partition ID) across query lifetime.* > *The impedance of distribution requirement between ClusteredDistribution and > state partitioning leads correctness issue silently.* > For example, let's assume we have a streaming query like below: > {code:java} > df > .withWatermark("timestamp", "30 minutes") > .repartition("group2") > .groupBy("group1", "group2", window("timestamp", "10 minutes")) > .agg(count("*")) {code} > repartition("group2") satisfies ClusteredDistribution("group1", "group2", > "window"), so Spark won't introduce additional shuffle there, and state > partitioning would be HashPartitioning("group2"). > we run this query for a while, and stop the query, and change the manual > partitioning like below: > {code:java} > df > .withWatermark("timestamp", "30 minutes") > .repartition("group1") > .groupBy("group1", "group2", window("timestamp", "10 minutes")) > .agg(count("*")) {code} > repartition("group1") also satisfies ClusteredDistribution("group1", > "group2", "window"), so Spark won't introduce additional shuffle there. That > said, child output partitioning of streaming aggregation operator would be > HashPartitioning("group1"), whereas state partitioning is > HashPartitioning("group2"). > [https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-after-changes-in-a-streaming-query] > In SS guide doc we enumerate the unsupported modifications of the query > during the lifetime of streaming query, but there is no notion of this. > Making this worse, Spark doesn't store any information on state partitioning > (that said, there is no way to validate), so *Spark simply allows this change > and brings up correctness issue while the streaming query runs like no > problem at all.* The only way to indicate the correctness is from the result > of the query. > We have no idea whether end users already suffer from this in their queries > or not. *The only way to look into is to list up all state rows and apply > hash function with expected grouping keys, and confirm all rows provide the > exact partition ID where they are in.* If it turns out as broken, we will > have to have a tool to “re”partition the state correctly, or in worst case, > have to ask throwing out checkpoint and reprocess. > {*}This issue has been laid from the introduction of stateful operators > (Spark 2.2+){*}, since HashClusteredDistribution (strict requirement) had > introduced in Spark 2.3 and we didn't change stateful operators to use this > distribution. stream-stream join hopefully used HashClusteredDistribution > from Spark 2.3, so it seems to be safe. -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org