Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17361#discussion_r107307253 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -147,49 +147,68 @@ object UnsupportedOperationChecker { throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " + "streaming DataFrames/Datasets") - // mapGroupsWithState: Allowed only when no aggregation + Update output mode - case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState => - if (collectStreamingAggregates(plan).isEmpty) { - if (outputMode != InternalOutputModes.Update) { - throwError("mapGroupsWithState is not supported with " + - s"$outputMode output mode on a streaming DataFrame/Dataset") - } else { - // Allowed when no aggregation + Update output mode - } - } else { - throwError("mapGroupsWithState is not supported with aggregation " + - "on a streaming DataFrame/Dataset") - } - - // flatMapGroupsWithState without aggregation - case m: FlatMapGroupsWithState - if m.isStreaming && collectStreamingAggregates(plan).isEmpty => - m.outputMode match { - case InternalOutputModes.Update => - if (outputMode != InternalOutputModes.Update) { - throwError("flatMapGroupsWithState in update mode is not supported with " + + // mapGroupsWithState and flatMapGroupsWithState + case m: FlatMapGroupsWithState if m.isStreaming => + + // Check compatibility with output modes and aggregations in query + val aggsAfterFlatMapGroups = collectStreamingAggregates(plan) + + if (m.isMapGroupsWithState) { // check mapGroupsWithState + // allowed only in update query output mode and without aggregation + if (aggsAfterFlatMapGroups.nonEmpty) { + throwError( + "mapGroupsWithState is not supported with aggregation " + + "on a streaming DataFrame/Dataset") + } else if (outputMode != InternalOutputModes.Update) { + throwError( + "mapGroupsWithState is not supported with " + s"$outputMode output mode on a streaming DataFrame/Dataset") + } + } else { // check latMapGroupsWithState + if (aggsAfterFlatMapGroups.isEmpty) { + // flatMapGroupsWithState without aggregation: operation's output mode must + // match query output mode + m.outputMode match { + case InternalOutputModes.Update if outputMode != InternalOutputModes.Update => + throwError( + "flatMapGroupsWithState in update mode is not supported with " + + s"$outputMode output mode on a streaming DataFrame/Dataset") + + case InternalOutputModes.Append if outputMode != InternalOutputModes.Append => + throwError( + "flatMapGroupsWithState in append mode is not supported with " + + s"$outputMode output mode on a streaming DataFrame/Dataset") + + case _ => } - case InternalOutputModes.Append => - if (outputMode != InternalOutputModes.Append) { - throwError("flatMapGroupsWithState in append mode is not supported with " + - s"$outputMode output mode on a streaming DataFrame/Dataset") + } else { + // flatMapGroupsWithState with aggregation: update operation mode not allowed, and + // *groupsWithState after aggregation not allowed + if (m.outputMode == InternalOutputModes.Update) { + throwError( + "flatMapGroupsWithState in update mode is not supported with " + + "aggregation on a streaming DataFrame/Dataset") + } else if (collectStreamingAggregates(m).nonEmpty) { + throwError( + "flatMapGroupsWithState in append mode is not supported after " + + s"aggregation on a streaming DataFrame/Dataset") } + } } - // flatMapGroupsWithState(Update) with aggregation - case m: FlatMapGroupsWithState - if m.isStreaming && m.outputMode == InternalOutputModes.Update - && collectStreamingAggregates(plan).nonEmpty => - throwError("flatMapGroupsWithState in update mode is not supported with " + - "aggregation on a streaming DataFrame/Dataset") - - // flatMapGroupsWithState(Append) with aggregation - case m: FlatMapGroupsWithState - if m.isStreaming && m.outputMode == InternalOutputModes.Append - && collectStreamingAggregates(m).nonEmpty => - throwError("flatMapGroupsWithState in append mode is not supported after " + - s"aggregation on a streaming DataFrame/Dataset") + // Check compatibility with timeout configs + if (m.timeout == EventTimeTimeout) { + // With event time timeout, watermark must be defined. + val watermarkAttributes = m.child.output.collect { + case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a + } + if (watermarkAttributes.isEmpty) { + throwError( + "Event time timeout is not supported in a [map|flatMap]GroupsWithState " + --- End diff -- are we? I didnt know there was a policy. I am fine hyphenating.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org