WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148
########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ########## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] - testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) - ) + // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: I've made a list, let's discuss this later. We only care about parents (down stream ops) of each operator. - Dedup: Only counted as a streaming stateful operator when it has event time column. - In Complete, Update mode, Aggregations followed by any stateful op are disallowed - Note that Dedup w/o event time is not counted here. - flatMapGroupsWithState (and mapGroupWithState, also pandas version): - If `flatMapGroupsWithState` is configured with processing time, don't need to check. - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. Note that Dedup w/o event time is not counted here. - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior] - stream-stream join: - only allowed in append mode, inner join with equality. - Outer join with equality and time-interval join are disallowed. - Append mode: time interval join followed by any stateful ops: disallowed; equality inner & outer join followed by any stateful op: supported - Can't do stream-stream join on other two modes. - Currently: `MapGroupsWithState` with aggregation is disallowed - Currently: `MapGroupsWithState` only allowed in Update mode - [Q] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? [A] There may be some cases that key space is bounded. Also why Complete mode makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org