[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-15 Thread GitBox


alex-balikov commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1023155184


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##
@@ -940,22 +1056,22 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
   def assertPassOnGlobalWatermarkLimit(
   testNamePostfix: String,
   plan: LogicalPlan,
-  outputMode: OutputMode): Unit = {
-testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure 
= false)
+  outputMode: OutputMode = OutputMode.Append()): Unit = {
+testGlobalWatermarkLimit(testNamePostfix, plan, expectFailure = false, 
outputMode)
   }
 
   def assertFailOnGlobalWatermarkLimit(
   testNamePostfix: String,
   plan: LogicalPlan,
-  outputMode: OutputMode): Unit = {
-testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure 
= true)
+  outputMode: OutputMode = OutputMode.Append()): Unit = {

Review Comment:
   ditto



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,41 +42,70 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = 
e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  private def hasEventTimeColNeq(neq: Expression): Boolean = {
+val exp = neq.asInstanceOf[BinaryComparison]
+hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  private def hasEventTimeCol(exps: Expression): Boolean =
+exps.exists {
+  case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+  case _ => false
+}
+
+  private def isStatefulOperationPossiblyEmitLateRows(
+  p: LogicalPlan, outputMode: OutputMode): Boolean = p match {
+case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+  left.isStreaming && right.isStreaming &&
+otherCondition.isDefined && 
hasRangeExprAgainstEventTimeCol(otherCondition.get)
+// FlatMapGroupsWithState configured with event time
+case f @ FlatMapGroupsWithState(_, _, _, _, _, _, _, _, _, timeout, _, _, 
_, _, _, _)
+  if f.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+case p @ FlatMapGroupsInPandasWithState(_, _, _, _, _, timeout, _)
+  if p.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
+case a: Aggregate if a.isStreaming && outputMode != 
InternalOutputModes.Append => true
+// Since the Distinct node will be replaced to Aggregate in the optimizer 
rule
+// [[ReplaceDistinctWithAggregate]], here we also need to check all 
Distinct node by
+// assuming it as Aggregate.
+case d @ Distinct(_: LogicalPlan) if d.isStreaming
+  && outputMode != InternalOutputModes.Append => true
+case _ => false
+  }
+
+  private def isStreamingStatefulOperation(p: LogicalPlan): Boolean = p match {
+case s: Aggregate if s.isStreaming => true
+// Since the Distinct node will be replaced to Aggregate in the optimizer 
rule
+// [[ReplaceDistinctWithAggregate]], here we also need to check all 
Distinct node by
+// assuming it as Aggregate.
+case d @ Distinct(_: LogicalPlan) if d.isStreaming => true
+case _ @ Join(left, right, _, _, _) if left.isStreaming && 
right.isStreaming => true
+case f: FlatMapGroupsWithState if f.isStreaming => true
+case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
+// Deduplicate also works without event time column even in streaming,
+// in such cases, although Dedup is still a stateful operation in a 
streaming
+// query, it could be ignored in all checks below, so let it return false.
+case d: Deduplicate if d.isStreaming && d.keys.exists(hasEventTimeCol) => 
true
+case _ => false
+  }
   /**
* Checks for possible correctness issue in chained stateful operators. The 
behavior is
* controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
* Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
* print a warning message.
*/
-  def checkStreamingQueryGlobalWatermarkLimit(
-  plan: LogicalPlan,
-  outputMode: OutputMode): Unit = {
-def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-  case s: Aggregate
-if s.isStreaming && outputMode == InternalOutputModes.Append => true
-  case Join(left, right, joinType, _, _)
-if left.isStreaming && right.isStreaming && joinType != Inner => true
-  case f: FlatMapGroupsWithState
-if f.isStreaming && f.outputMode == OutputMode.Append() => true
-  case _ => false
-}
-
-def isStatefulOperation(p: LogicalPlan): Boolean = p match {
-  case 

[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-08 Thread GitBox


alex-balikov commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1017413562


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   hmm, the above scenario dropDuplicates -> aggregation was supported before. 
So I was wrong - dropDuplicates and also inner join with timestamp equality 
condition can be followed by a stateful operator in any mode - these operators 
- dropDuplicates and inner equality join  do not delay the output records. I 
apologize for the randomization, I think the above scenario is important to 
continue supporting.



##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -215,20 +220,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Complete)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("a" -> 3L, "b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default
+val exp = intercept[AnalysisException] {

Review Comment:
   same here - the above scenario should work



##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##
@@ -188,17 +194,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
 expectedMsgs = Seq("Complete"))
 
   // FlatMapGroupsWithState(Update) in streaming with aggregation
-  for (outputMode <- Seq(Append, Update, Complete)) {
+  for (outputMode <- Seq(Update, Complete)) {
 assertNotSupportedInStreamingPlan(
   "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming 
relation " +
 s"with aggregation in $outputMode mode",
   TestFlatMapGroupsWithState(
 null, att, att, Seq(att), Seq(att), att, null, Update, 
isMapGroupsWithState = false, null,
 Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
   outputMode = outputMode,
-  expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with 
aggregation"))
+  expectedMsgs = Seq("Multiple stateful operators", "Update", "Complete"))
   }
 
+  assertNotSupportedInStreamingPlan(
+"flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming 
relation " +
+  s"with aggregation in Append mode",
+TestFlatMapGroupsWithState(
+  null, att, att, Seq(att), Seq(att), att, null, Update, 
isMapGroupsWithState = false, null,

Review Comment:
   I am not sure what is the semantics of setting output mode to Update on 
flatMapGroupsWithState but it does not match the Append output mode below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-04 Thread GitBox


alex-balikov commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014428293


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -157,10 +172,11 @@ object UnsupportedOperationChecker extends Logging {
 // Disallow multiple streaming aggregations
 val aggregates = collectStreamingAggregates(plan)
 
-if (aggregates.size > 1) {
+if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {
   throwError(
 "Multiple streaming aggregations are not supported with " +

Review Comment:
   and we need to change the message accordingly - 'aggregations' -> 'stateful 
operators'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-04 Thread GitBox


alex-balikov commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014425174


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -157,10 +193,11 @@ object UnsupportedOperationChecker extends Logging {
 // Disallow multiple streaming aggregations
 val aggregates = collectStreamingAggregates(plan)
 
-if (aggregates.size > 1) {
+if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {

Review Comment:
   Absolutely. I agree that we should allow multiple stateful ops only in 
append mode. The other modes are not implemented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org