This is an automated email from the ASF dual-hosted git repository.

dtenedor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b75a329cb14d [SPARK-55317][SQL] Add SequentialUnion logical plan node 
and planning rule
b75a329cb14d is described below

commit b75a329cb14d1442ec05e5c1bcbc0000619c5691
Author: ericm-db <[email protected]>
AuthorDate: Fri Feb 6 15:07:55 2026 -0800

    [SPARK-55317][SQL] Add SequentialUnion logical plan node and planning rule
    
    ### What changes were proposed in this pull request?
    
    This PR introduces the SequentialUnion logical plan node, which enables 
sequential processing of multiple streaming sources. Unlike the existing Union 
operator that processes all children concurrently, SequentialUnion processes 
each child source to completion before moving to the next.
    
    The plan is for this logical operator to act as a placeholder for future 
consumption by the physical planning process to use specialized execution 
operators; there won't be a physical SequentialUnion mapping 1:1 to this 
logical operator.
    
    Key changes:
    - Added SequentialUnion case class extending UnionBase in 
basicLogicalOperators.scala
    - Supports byName and allowMissingCol parameters for schema compatibility
    - Includes flatten() method to handle nested SequentialUnion nodes 
(enabling chaining)
    - Validates minimum 2 children and parameter constraints
    - Added comprehensive test suite in SequentialUnionSuite.scala
    
    This is the foundational logical plan component. Execution support and 
user-facing APIs will be added in follow-up PRs.
    
    ### Why are the changes needed?
    Sequential Union enables backfill-to-live streaming scenarios where 
historical data must be processed completely before transitioning to live data, 
while preserving stateful operator state across the transition.
    
    Example use case: Process historical Parquet data completely, then 
seamlessly switch to live Kafka streaming - all within a single query that 
maintains aggregations, watermarks, and deduplication state.
    This pattern is not currently possible with the existing Union operator, 
which processes all sources concurrently.
    
    ### Does this PR introduce _any_ user-facing change?
    No. This PR only adds the internal logical plan node. No user-facing APIs 
or behavior changes are introduced yet.
    
    ### How was this patch tested?
    
    Added new test suite SequentialUnionSuite with tests covering:
    - Basic node creation and validation
    - Minimum children requirement (at least 2)
    - Output schema verification
    - byName and allowMissingCol parameter constraints
    - Flattening logic for nested SequentialUnions (single level, deeply 
nested, multiple nested scenarios)
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #54098 from ericm-db/sequential-union-logical-node.
    
    Authored-by: ericm-db <[email protected]>
    Signed-off-by: Daniel Tenedorio <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  27 ++
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   2 +
 .../analysis/SequentialUnionAnalysis.scala         |  80 ++++++
 .../analysis/UnsupportedOperationChecker.scala     |  11 +-
 .../plans/logical/SequentialStreamingUnion.scala   | 135 ++++++++++
 .../spark/sql/catalyst/trees/TreePatterns.scala    |   1 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |  19 ++
 .../SequentialStreamingUnionAnalysisSuite.scala    | 284 +++++++++++++++++++++
 8 files changed, 554 insertions(+), 5 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 258fcfeda15e..9d228de9b590 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -4710,6 +4710,15 @@
     ],
     "sqlState" : "07501"
   },
+  "NESTED_SEQUENTIAL_STREAMING_UNION" : {
+    "message" : [
+      "Nested SequentialStreamingUnion is not supported. <hint>",
+      "SequentialStreamingUnion can only be used at the top level of a 
streaming query.",
+      "Each source in followedBy() should be a base streaming source or simple 
transformations,",
+      "not another followedBy() result wrapped in other operators."
+    ],
+    "sqlState" : "42601"
+  },
   "NEW_CHECK_CONSTRAINT_VIOLATION" : {
     "message" : [
       "The new check constraint (<expression>) cannot be added because it 
would be violated by existing data in table <tableName>. Please ensure all 
existing rows satisfy the constraint before adding it."
@@ -4872,6 +4881,15 @@
     },
     "sqlState" : "42000"
   },
+  "NOT_STREAMING_DATASET" : {
+    "message" : [
+      "All sources in <operator> must be streaming relations.",
+      "Ensure all DataFrames are created using spark.readStream() rather than 
spark.read().",
+      "Example: df1 = spark.readStream.format(\"delta\").load(...); df2 = 
spark.readStream.format(\"kafka\").load(...)",
+      "Then use: df1.followedBy(df2)"
+    ],
+    "sqlState" : "42K0A"
+  },
   "NOT_SUPPORTED_CHANGE_COLUMN" : {
     "message" : [
       "ALTER TABLE ALTER/CHANGE COLUMN is not supported for changing <table>'s 
column <originName> with type <originType> to <newName> with type <newType>."
@@ -5533,6 +5551,15 @@
     ],
     "sqlState" : "0A000"
   },
+  "STATEFUL_CHILDREN_NOT_SUPPORTED_IN_SEQUENTIAL_STREAMING_UNION" : {
+    "message" : [
+      "Stateful operations are not supported as direct children of 
SequentialStreamingUnion. Please apply stateful operations (aggregations, 
joins, deduplication, etc.) after the SequentialStreamingUnion.",
+      "Incorrect: 
historical.groupBy(\"key\").count().followedBy(live.groupBy(\"key\").count())",
+      "Correct: historical.followedBy(live).groupBy(\"key\").count()",
+      "This ensures state is preserved across source transitions from 
historical to live data."
+    ],
+    "sqlState" : "42601"
+  },
   "STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_HANDLE_STATE" : {
     "message" : [
       "Failed to perform stateful processor operation=<operationType> with 
invalid handle state=<handleState>."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c15f0a5a30ca..979bb6f9cff2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -496,6 +496,8 @@ class Analyzer(
       ResolveBinaryArithmetic ::
       new ResolveIdentifierClause(earlyBatches) ::
       ResolveUnion ::
+      FlattenSequentialStreamingUnion ::
+      ValidateSequentialStreamingUnion ::
       ResolveRowLevelCommandAssignments ::
       RewriteDeleteFromTable ::
       RewriteUpdateTable ::
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SequentialUnionAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SequentialUnionAnalysis.scala
new file mode 100644
index 000000000000..3b0e3ede9cbc
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SequentialUnionAnalysis.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
SequentialStreamingUnion}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern._
+import org.apache.spark.sql.errors.QueryCompilationErrors
+
+/**
+ * Flattens nested SequentialStreamingUnion nodes into a single level.
+ * This allows chaining: df1.followedBy(df2).followedBy(df3)
+ */
+object FlattenSequentialStreamingUnion extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+    _.containsPattern(SEQUENTIAL_STREAMING_UNION)) {
+    case SequentialStreamingUnion(children, byName, allowMissingCol) =>
+      val flattened = SequentialStreamingUnion.flatten(children)
+      SequentialStreamingUnion(flattened, byName, allowMissingCol)
+  }
+}
+
+/**
+ * Validates SequentialStreamingUnion constraints:
+ * - All children must be streaming relations
+ * - No nested SequentialStreamingUnions (should be flattened first)
+ * - No stateful operations in any child subtrees
+ *
+ * Note: Minimum 2 children is enforced by the resolved property, not explicit 
validation.
+ */
+object ValidateSequentialStreamingUnion extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.foreach {
+      case su: SequentialStreamingUnion =>
+        validateAllStreaming(su)
+        validateNoNesting(su)
+        validateNoStatefulDescendants(su)
+      case _ =>
+    }
+    plan
+  }
+
+  private def validateAllStreaming(su: SequentialStreamingUnion): Unit = {
+    val nonStreamingChildren = su.children.filterNot(_.isStreaming)
+    if (nonStreamingChildren.nonEmpty) {
+      throw 
QueryCompilationErrors.notStreamingDatasetError("SequentialStreamingUnion")
+    }
+  }
+
+  private def validateNoNesting(su: SequentialStreamingUnion): Unit = {
+    su.children.foreach { child =>
+      if (child.containsPattern(SEQUENTIAL_STREAMING_UNION)) {
+        throw QueryCompilationErrors.nestedSequentialStreamingUnionError()
+      }
+    }
+  }
+
+  private def validateNoStatefulDescendants(su: SequentialStreamingUnion): 
Unit = {
+    su.children.foreach { child =>
+      if (child.exists(UnsupportedOperationChecker.isStatefulOperation)) {
+        throw 
QueryCompilationErrors.statefulChildrenNotSupportedInSequentialStreamingUnionError()
+      }
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index d658d83f066f..60b952b285e1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -56,7 +56,7 @@ object UnsupportedOperationChecker extends Logging {
    * @param exp the expression to be checked
    * @return true if it is a event time column.
    */
-  private def hasEventTimeCol(exp: Expression): Boolean = exp.exists {
+  def hasEventTimeCol(exp: Expression): Boolean = exp.exists {
     case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
     case _ => false
   }
@@ -88,13 +88,14 @@ object UnsupportedOperationChecker extends Logging {
   }
 
   /**
-   * This method is only used with ifCannotBeFollowedByStatefulOperation.
-   * Here we list up stateful operators but there is an exception for 
Deduplicate:
-   * it is only counted here when it has an event time column.
+   * Checks if a logical plan is a streaming stateful operation.
+   * Stateful operations include aggregations, stream-stream joins, 
deduplication,
+   * and stateful transformations (FlatMapGroupsWithState, TransformWithState, 
etc.).
+   * Note: Deduplicate is only counted as stateful when it has an event time 
column.
    * @param p the logical plan to be checked
    * @return true if there is a streaming stateful operation
    */
-  private def isStatefulOperation(p: LogicalPlan): Boolean = p match {
+  def isStatefulOperation(p: LogicalPlan): Boolean = p match {
     case s: Aggregate if s.isStreaming => true
     // Since the Distinct node will be replaced to Aggregate in the optimizer 
rule
     // [[ReplaceDistinctWithAggregate]], here we also need to check all 
Distinct node by
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SequentialStreamingUnion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SequentialStreamingUnion.scala
new file mode 100644
index 000000000000..e2f1a1aed5d3
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SequentialStreamingUnion.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.trees.TreePattern._
+
+/**
+ * Logical plan for unioning multiple streaming plans sequentially, processing 
each child to
+ * completion before moving to the next. This is used for backfill-to-live 
streaming scenarios
+ * where historical data should be processed completely before switching to 
live data.
+ *
+ * This is a logical placeholder node that acts as a marker in the query plan. 
During physical
+ * planning, the streaming execution engine will match against this node and 
generate specific
+ * execution steps for sequential processing, rather than creating a 
corresponding physical
+ * SequentialStreamingUnion operator.
+ *
+ * Unlike [[Union]] which processes all children concurrently in streaming 
queries,
+ * SequentialStreamingUnion processes each child source sequentially:
+ * 1. First child processes until complete (bounded sources reach their end)
+ * 2. Second child begins processing
+ * 3. And so on...
+ *
+ * Requirements:
+ * - Minimum 2 children required
+ * - All children must be streaming sources
+ * - All non-final children (i.e., all children except the last) must support 
bounded execution
+ *   (SupportsTriggerAvailableNow). The last child typically remains unbounded 
for live streaming.
+ * - All children must have explicit names when used in streaming queries
+ * - Children cannot contain stateful operations (aggregations, joins, etc.)
+ * - Schema compatibility is enforced via UnionBase
+ *
+ * State preservation: Stateful operators applied AFTER 
SequentialStreamingUnion (aggregations,
+ * watermarks, deduplication, joins) preserve their state across source 
transitions,
+ * enabling seamless backfill-to-live scenarios.
+ *
+ * Example:
+ * {{{
+ *   val historical = 
spark.readStream.format("delta").name("historical").load("/data")
+ *   val live = spark.readStream.format("kafka").name("live").load()
+ *   // Correct: stateful operations after SequentialStreamingUnion
+ *   historical.followedBy(live).groupBy("key").count()
+ *
+ *   // Incorrect: stateful operations before SequentialStreamingUnion
+ *   // 
historical.groupBy("key").count().followedBy(live.groupBy("key").count()) // 
Not allowed
+ * }}}
+ *
+ * @param children        The logical plans to union sequentially (must be 
streaming sources)
+ * @param byName          Whether to resolve columns by name (vs. by position)
+ * @param allowMissingCol When true (requires byName = true), allows children 
to have different
+ *                        columns. Missing columns in any child are filled 
with nulls. When false,
+ *                        all children must have the same set of columns.
+ */
+case class SequentialStreamingUnion(
+    children: Seq[LogicalPlan],
+    byName: Boolean,
+    allowMissingCol: Boolean) extends UnionBase {
+  assert(!allowMissingCol || byName,
+    "`allowMissingCol` can be true only if `byName` is true.")
+
+  final override val nodePatterns: Seq[TreePattern] = 
Seq(SEQUENTIAL_STREAMING_UNION)
+
+  /**
+   * This node is considered resolved when:
+   * 1. children.length >= 2: Has at least 2 children (cannot create 
sequential union
+   *    with < 2 sources)
+   * 2. !(byName || allowMissingCol): Column resolution is by position 
(default). When
+   *    byName or allowMissingCol is true, the ResolveUnion rule must first 
transform
+   *    this into a resolved form with schema alignment projections.
+   * 3. childrenResolved: All child nodes have been resolved by the analyzer
+   * 4. allChildrenCompatible: All children have compatible schemas (enforced 
by
+   *    UnionBase)
+   *
+   * This matches Union's resolution logic but without duplicate column 
checking, since
+   * SequentialStreamingUnion is validated separately for streaming-specific
+   * constraints.
+   */
+  override lazy val resolved: Boolean = {
+    children.length >= 2 &&
+    !(byName || allowMissingCol) &&
+    childrenResolved &&
+    allChildrenCompatible
+  }
+
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[LogicalPlan]): SequentialStreamingUnion = {
+    copy(children = newChildren)
+  }
+}
+
+object SequentialStreamingUnion {
+  def apply(left: LogicalPlan, right: LogicalPlan): SequentialStreamingUnion = 
{
+    SequentialStreamingUnion(left :: right :: Nil, byName = false, 
allowMissingCol = false)
+  }
+
+  /**
+   * Recursively flattens direct SequentialStreamingUnion children.
+   * This enables chaining: df1.followedBy(df2).followedBy(df3).followedBy(df4)
+   *
+   * Example:
+   *   SequentialStreamingUnion(SequentialStreamingUnion(df1, df2), df3)
+   * Flattens to:
+   *   SequentialStreamingUnion(df1, df2, df3)
+   *
+   * Note: This only handles direct children. SequentialStreamingUnions nested
+   * through other operators (e.g., Project(SequentialStreamingUnion(...))) are
+   * not flattened and will be caught by validation as invalid.
+   *
+   * @param plans The plans to flatten
+   * @return Flattened sequence of plans
+   */
+  def flatten(plans: Seq[LogicalPlan]): Seq[LogicalPlan] = {
+    plans.flatMap {
+      case SequentialStreamingUnion(children, _, _) =>
+        // Recursively flatten direct SequentialStreamingUnion children
+        flatten(children)
+      case other =>
+        Seq(other)
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index 69e726fe2f85..882c05594c05 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -167,6 +167,7 @@ object TreePattern extends Enumeration  {
   val REPARTITION_OPERATION: Value = Value
   val REBALANCE_PARTITIONS: Value = Value
   val RESOLVED_METRIC_VIEW: Value = Value
+  val SEQUENTIAL_STREAMING_UNION: Value = Value
   val SERIALIZE_FROM_OBJECT: Value = Value
   val SORT: Value = Value
   val SQL_TABLE_FUNCTION: Value = Value
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 1a7eed39d69e..b4b01d680367 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -4539,4 +4539,23 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       errorClass = "UNSUPPORTED_TIME_TYPE",
       messageParameters = Map.empty)
   }
+
+  def nestedSequentialStreamingUnionError(): Throwable = {
+    new AnalysisException(
+      errorClass = "NESTED_SEQUENTIAL_STREAMING_UNION",
+      messageParameters = Map(
+        "hint" -> "Use chained followedBy calls instead: 
df1.followedBy(df2).followedBy(df3)"))
+  }
+
+  def notStreamingDatasetError(operator: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NOT_STREAMING_DATASET",
+      messageParameters = Map("operator" -> operator))
+  }
+
+  def statefulChildrenNotSupportedInSequentialStreamingUnionError(): Throwable 
= {
+    new AnalysisException(
+      errorClass = 
"STATEFUL_CHILDREN_NOT_SUPPORTED_IN_SEQUENTIAL_STREAMING_UNION",
+      messageParameters = Map.empty)
+  }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SequentialStreamingUnionAnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SequentialStreamingUnionAnalysisSuite.scala
new file mode 100644
index 000000000000..2cfe2d271043
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SequentialStreamingUnionAnalysisSuite.scala
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LocalRelation, 
Project, SequentialStreamingUnion}
+import org.apache.spark.sql.errors.DataTypeErrorsBase
+
+class SequentialStreamingUnionAnalysisSuite extends AnalysisTest with 
DataTypeErrorsBase {
+
+  private val testRelation1 = LocalRelation($"a".int, $"b".int)
+  private val testRelation2 = LocalRelation($"a".int, $"b".int)
+  private val testRelation3 = LocalRelation($"a".int, $"b".int)
+
+  test("FlattenSequentialStreamingUnion - single level") {
+    val union = SequentialStreamingUnion(testRelation1, testRelation2)
+    val result = FlattenSequentialStreamingUnion(union)
+
+    assert(result.isInstanceOf[SequentialStreamingUnion])
+    val su = result.asInstanceOf[SequentialStreamingUnion]
+    assert(su.children.length == 2)
+  }
+
+  test("FlattenSequentialStreamingUnion - nested unions get flattened") {
+    val innerUnion = SequentialStreamingUnion(testRelation1, testRelation2)
+    val outerUnion = SequentialStreamingUnion(innerUnion, testRelation3)
+
+    val result = FlattenSequentialStreamingUnion(outerUnion)
+
+    assert(result.isInstanceOf[SequentialStreamingUnion])
+    val su = result.asInstanceOf[SequentialStreamingUnion]
+    assert(su.children.length == 3, "Should flatten to 3 children")
+    assert(!su.children.exists(_.isInstanceOf[SequentialStreamingUnion]),
+      "No nested SequentialStreamingUnions")
+  }
+
+  test("FlattenSequentialStreamingUnion - deeply nested unions") {
+    val union1 = SequentialStreamingUnion(testRelation1, testRelation2)
+    val union2 = SequentialStreamingUnion(union1, testRelation3)
+
+    val result = FlattenSequentialStreamingUnion(union2)
+
+    val su = result.asInstanceOf[SequentialStreamingUnion]
+    assert(su.children.length == 3)
+  }
+
+  test("FlattenSequentialStreamingUnion - multiple nested unions at same 
level") {
+    val union1 = SequentialStreamingUnion(testRelation1, testRelation2)
+    val union2 = SequentialStreamingUnion(testRelation2, testRelation3)
+    val outerUnion = SequentialStreamingUnion(
+      Seq(union1, union2), byName = false, allowMissingCol = false)
+
+    val result = FlattenSequentialStreamingUnion(outerUnion)
+
+    val su = result.asInstanceOf[SequentialStreamingUnion]
+    assert(su.children.length == 4, "Should have 4 flattened children")
+  }
+
+  test("FlattenSequentialStreamingUnion - handles multi-level direct nesting") 
{
+    // Test flattening when we have multiple levels of direct 
SequentialStreamingUnion nesting
+    // e.g., df1.followedBy(df2).followedBy(df3).followedBy(df4)
+    val union1 = SequentialStreamingUnion(testRelation1, testRelation2)
+    val union2 = SequentialStreamingUnion(union1, testRelation3)
+    val testRelation4 = LocalRelation($"a".int, $"b".int)
+    val union3 = SequentialStreamingUnion(union2, testRelation4)
+
+    val result = FlattenSequentialStreamingUnion(union3)
+
+    val su = result.asInstanceOf[SequentialStreamingUnion]
+    // Should recursively flatten all levels: rel1, rel2, rel3, rel4
+    assert(su.children.length == 4, "Should recursively flatten all nested 
levels")
+    assert(!su.children.exists(_.isInstanceOf[SequentialStreamingUnion]),
+      "No SequentialStreamingUnion children should remain")
+  }
+
+  test("FlattenSequentialStreamingUnion - preserves byName parameter") {
+    val innerUnion = SequentialStreamingUnion(testRelation1, testRelation2)
+    val outerUnion = SequentialStreamingUnion(
+      Seq(innerUnion, testRelation3), byName = true, allowMissingCol = false)
+
+    val result = FlattenSequentialStreamingUnion(outerUnion)
+
+    val su = result.asInstanceOf[SequentialStreamingUnion]
+    assert(su.byName == true, "byName parameter should be preserved")
+  }
+
+  test("FlattenSequentialStreamingUnion - preserves allowMissingCol 
parameter") {
+    val innerUnion = SequentialStreamingUnion(testRelation1, testRelation2)
+    val outerUnion = SequentialStreamingUnion(
+      Seq(innerUnion, testRelation3),
+      byName = true,
+      allowMissingCol = true)
+
+    val result = FlattenSequentialStreamingUnion(outerUnion)
+
+    val su = result.asInstanceOf[SequentialStreamingUnion]
+    assert(su.allowMissingCol == true, "allowMissingCol parameter should be 
preserved")
+  }
+
+  test("ValidateSequentialStreamingUnion - valid plan passes") {
+    // Create streaming relations
+    val streamingRelation1 = testRelation1.copy(isStreaming = true)
+    val streamingRelation2 = testRelation2.copy(isStreaming = true)
+
+    val union = SequentialStreamingUnion(streamingRelation1, 
streamingRelation2)
+
+    // Should not throw exception
+    ValidateSequentialStreamingUnion(union)
+  }
+
+  test("ValidateSequentialStreamingUnion - rejects non-streaming children") {
+    // Non-streaming relations
+    val union = SequentialStreamingUnion(testRelation1, testRelation2)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        ValidateSequentialStreamingUnion(union)
+      },
+      condition = "NOT_STREAMING_DATASET",
+      parameters = Map("operator" -> "SequentialStreamingUnion"))
+  }
+
+  test("ValidateSequentialStreamingUnion - rejects directly nested unions") {
+    val streamingRelation1 = testRelation1.copy(isStreaming = true)
+    val streamingRelation2 = testRelation2.copy(isStreaming = true)
+    val streamingRelation3 = testRelation3.copy(isStreaming = true)
+
+    // Manually create a nested union without running flatten
+    // (In practice, flatten would handle this, but validation catches it as a 
safeguard)
+    val innerUnion = SequentialStreamingUnion(streamingRelation1, 
streamingRelation2)
+    val outerUnion = SequentialStreamingUnion(innerUnion, streamingRelation3)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        ValidateSequentialStreamingUnion(outerUnion)
+      },
+      condition = "NESTED_SEQUENTIAL_STREAMING_UNION",
+      parameters = Map(
+        "hint" -> "Use chained followedBy calls instead: 
df1.followedBy(df2).followedBy(df3)"))
+  }
+
+  test("ValidateSequentialStreamingUnion - rejects nested unions through other 
operators") {
+    val streamingRelation1 = testRelation1.copy(isStreaming = true)
+    val streamingRelation2 = testRelation2.copy(isStreaming = true)
+    val streamingRelation3 = testRelation3.copy(isStreaming = true)
+
+    // Create a nested union through a Project operator
+    // e.g., from df1.select("a", "b").followedBy(df2)
+    val innerUnion = SequentialStreamingUnion(streamingRelation1, 
streamingRelation2)
+    val projectOverUnion = Project(Seq($"a", $"b"), innerUnion)
+    val outerUnion = SequentialStreamingUnion(projectOverUnion, 
streamingRelation3)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        ValidateSequentialStreamingUnion(outerUnion)
+      },
+      condition = "NESTED_SEQUENTIAL_STREAMING_UNION",
+      parameters = Map(
+        "hint" -> "Use chained followedBy calls instead: 
df1.followedBy(df2).followedBy(df3)"))
+  }
+
+  test("ValidateSequentialStreamingUnion - three or more children allowed") {
+    val streamingRelation1 = testRelation1.copy(isStreaming = true)
+    val streamingRelation2 = testRelation2.copy(isStreaming = true)
+    val streamingRelation3 = testRelation3.copy(isStreaming = true)
+
+    val union = SequentialStreamingUnion(
+      Seq(streamingRelation1, streamingRelation2, streamingRelation3),
+      byName = false,
+      allowMissingCol = false)
+
+    // Should not throw exception
+    ValidateSequentialStreamingUnion(union)
+  }
+
+  test("ValidateSequentialStreamingUnion - rejects stateful children") {
+    import org.apache.spark.sql.catalyst.expressions.aggregate.Count
+    val streamingRelation1 = testRelation1.copy(isStreaming = true)
+    val streamingRelation2 = testRelation2.copy(isStreaming = true)
+
+    // Create an aggregation (stateful operation) as a child
+    val agg = Aggregate(Seq($"a"), Seq($"a", 
Count($"b").toAggregateExpression().as("count")),
+      streamingRelation2)
+
+    val union = SequentialStreamingUnion(streamingRelation1, agg)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        ValidateSequentialStreamingUnion(union)
+      },
+      condition = 
"STATEFUL_CHILDREN_NOT_SUPPORTED_IN_SEQUENTIAL_STREAMING_UNION",
+      parameters = Map.empty)
+  }
+
+  test("ValidateSequentialStreamingUnion - rejects indirect stateful 
descendants") {
+    import org.apache.spark.sql.catalyst.expressions.aggregate.Count
+    val streamingRelation1 = testRelation1.copy(isStreaming = true)
+    val streamingRelation2 = testRelation2.copy(isStreaming = true)
+
+    // Create an aggregation wrapped in a Project (indirect stateful 
descendant)
+    val agg = Aggregate(Seq($"a"), Seq($"a", 
Count($"b").toAggregateExpression().as("count")),
+      streamingRelation2)
+    val projectOverAgg = Project(Seq($"a"), agg)
+
+    val union = SequentialStreamingUnion(streamingRelation1, projectOverAgg)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        ValidateSequentialStreamingUnion(union)
+      },
+      condition = 
"STATEFUL_CHILDREN_NOT_SUPPORTED_IN_SEQUENTIAL_STREAMING_UNION",
+      parameters = Map.empty)
+  }
+
+  test("ValidateSequentialStreamingUnion - allows non-stateful operations like 
select") {
+    val streamingRelation1 = testRelation1.copy(isStreaming = true)
+    val streamingRelation2 = testRelation2.copy(isStreaming = true)
+    val streamingRelation3 = testRelation3.copy(isStreaming = true)
+
+    // Project (select) is NOT a stateful operation, so this should be valid
+    val project1 = Project(Seq($"a"), streamingRelation1)
+    val project2 = Project(Seq($"a", $"b"), streamingRelation2)
+    val project3 = Project(Seq($"a"), streamingRelation3)
+
+    val union = SequentialStreamingUnion(
+      Seq(project1, project2, project3),
+      byName = false,
+      allowMissingCol = false)
+
+    // Should not throw exception - Project is not a stateful operation
+    ValidateSequentialStreamingUnion(union)
+  }
+
+  test("Chained followedBy with select operations - full analyzer flow") {
+    val streamingRelation1 = testRelation1.copy(isStreaming = true)
+    val streamingRelation2 = testRelation2.copy(isStreaming = true)
+    val streamingRelation3 = testRelation3.copy(isStreaming = true)
+
+    // Simulate: df1.select("a").followedBy(df2.select("a", 
"b")).followedBy(df3.select("a"))
+    val project1 = Project(Seq($"a"), streamingRelation1)
+    val project2 = Project(Seq($"a", $"b"), streamingRelation2)
+    val project3 = Project(Seq($"a"), streamingRelation3)
+
+    // This creates the nested structure from chaining:
+    // df1.select("a").followedBy(df2.select("a", "b"))
+    val innerUnion = SequentialStreamingUnion(project1, project2)
+    // .followedBy(df3.select("a"))
+    val outerUnion = SequentialStreamingUnion(innerUnion, project3)
+
+    // Step 1: Flatten the nested unions
+    val flattened = FlattenSequentialStreamingUnion(outerUnion)
+    val flattenedUnion = flattened.asInstanceOf[SequentialStreamingUnion]
+
+    // Verify flattening worked
+    assert(flattenedUnion.children.length == 3, "Should flatten to 3 children")
+    
assert(!flattenedUnion.children.exists(_.isInstanceOf[SequentialStreamingUnion]),
+      "No nested SequentialStreamingUnions after flattening")
+
+    // Step 2: Validate the flattened plan
+    // Should not throw exception - Project is not stateful, all are streaming
+    ValidateSequentialStreamingUnion(flattenedUnion)
+
+    // Verify the children are the expected Project nodes
+    assert(flattenedUnion.children(0).isInstanceOf[Project])
+    assert(flattenedUnion.children(1).isInstanceOf[Project])
+    assert(flattenedUnion.children(2).isInstanceOf[Project])
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to