This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 653d82e150 refactor(core): remove unused passToAllDownstream flag from 
State (#4415)
653d82e150 is described below

commit 653d82e15034e854f494b72fecf173a9c1f46796
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Apr 24 12:14:21 2026 -0700

    refactor(core): remove unused passToAllDownstream flag from State (#4415)
    
    ### What changes were proposed in this PR?
    This PR removes the `passToAllDownstream` flag from the `State` class
    and the corresponding branch in the default `processState`
    implementation, in both the Scala and Python sides of the engine.
    
    Background: when I originally introduced `passToAllDownstream`, I didn't
    think the design through carefully. In practice:
    
    - The flag is not set to `true` anywhere in the codebase.
    - The only code that reads it is the default `processState` itself,
    which means the flag has no observable effect today.
    - I don't plan to build any feature on top of this flag. The idea it was
    gesturing at (limiting how far a state propagates) will be reintroduced
    later as a proper "scope" concept with a different mechanism, not as a
    boolean carried on every `State` instance.
    
    After this PR, the default behavior of `processState` is: **pass the
    incoming state to all downstream operators**, unless an operator
    explicitly consumes it by overriding `processState` and returning `None`
    (Scala) / not returning a `State` (Python). This matches what the
    previous default was *trying* to express, minus the dead flag.
    
    This PR also fixes the mirror typo in the return type hint.
    
    ### Any related issues, documentation, discussions?
    Closes #4414
    
    ### How was this PR tested?
    Tested with existing unit/integration tests — no test referenced
    `passToAllDownstream` / `pass_to_all_downstream`, and the default
    behavior after this change (`processState` returning the state as-is) is
    what the previous implementation already produced whenever the flag was
    effectively unused, i.e. in every current code path. No new tests were
    added because this is a pure removal of dead configuration with no new
    behavior to cover.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    No.
    
    ---------
    
    Co-authored-by: Chen Li <[email protected]>
---
 amber/src/main/python/core/models/operator.py                  | 10 ++++------
 amber/src/main/python/core/models/state.py                     |  5 +----
 .../apache/texera/amber/core/executor/OperatorExecutor.scala   |  8 +-------
 .../main/scala/org/apache/texera/amber/core/state/State.scala  |  5 +----
 4 files changed, 7 insertions(+), 21 deletions(-)

diff --git a/amber/src/main/python/core/models/operator.py 
b/amber/src/main/python/core/models/operator.py
index 7905083995..952e2a12c8 100644
--- a/amber/src/main/python/core/models/operator.py
+++ b/amber/src/main/python/core/models/operator.py
@@ -108,16 +108,14 @@ class Operator(ABC):
     def process_state(self, state: State, port: int) -> Optional[State]:
         """
         Process an input State from the given link.
-        The default implementation is to pass the State to all downstream 
operators
-        if the State has pass_to_all_downstream set to True.
+        The default implementation is to pass the State to all downstream 
operators.
         :param state: State, a State from an input port to be processed.
         :param port: int, input port index of the current exhausted port.
         :return: State, producing one State object
         """
-        if state.passToAllDownstream:
-            return state
+        return state
 
-    def produce_state_on_start(self, port: int) -> State:
+    def produce_state_on_start(self, port: int) -> Optional[State]:
         """
         Produce a State when the given link started.
 
@@ -126,7 +124,7 @@ class Operator(ABC):
         """
         pass
 
-    def produce_state_on_finish(self, port: int) -> State:
+    def produce_state_on_finish(self, port: int) -> Optional[State]:
         """
         Produce a State after the input port is exhausted.
 
diff --git a/amber/src/main/python/core/models/state.py 
b/amber/src/main/python/core/models/state.py
index 2c8a268dfb..feb35f2e27 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -26,11 +26,8 @@ from .schema.attribute_type import FROM_PYOBJECT_MAPPING
 
 @dataclass
 class State:
-    def __init__(
-        self, table: Optional[Table] = None, pass_to_all_downstream: bool = 
False
-    ):
+    def __init__(self, table: Optional[Table] = None):
         self.schema = Schema()
-        self.passToAllDownstream = pass_to_all_downstream
         if table is not None:
             self.__dict__.update(table.to_pandas().iloc[0].to_dict())
             self.schema = Schema(table.schema)
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
index f99739acc0..9837213abb 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
@@ -29,13 +29,7 @@ trait OperatorExecutor {
 
   def produceStateOnStart(port: Int): Option[State] = None
 
-  def processState(state: State, port: Int): Option[State] = {
-    if (state.isPassToAllDownstream) {
-      Some(state)
-    } else {
-      None
-    }
-  }
+  def processState(state: State, port: Int): Option[State] = Some(state)
 
   def processTupleMultiPort(
       tuple: Tuple,
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
index 3226c9d2fe..2b3465473b 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
@@ -23,9 +23,8 @@ import org.apache.texera.amber.core.tuple.{Attribute, 
AttributeType, Schema, Tup
 
 import scala.collection.mutable
 
-final case class State(tuple: Option[Tuple] = None, passToAllDownstream: 
Boolean = false) {
+final case class State(tuple: Option[Tuple] = None) {
   val data: mutable.Map[String, (AttributeType, Any)] = mutable.LinkedHashMap()
-  add("passToAllDownstream", passToAllDownstream, AttributeType.BOOLEAN)
   if (tuple.isDefined) {
     tuple.get.getSchema.getAttributes.foreach { attribute =>
       add(attribute.getName, tuple.get.getField(attribute.getName), 
attribute.getType)
@@ -37,8 +36,6 @@ final case class State(tuple: Option[Tuple] = None, 
passToAllDownstream: Boolean
 
   def get(key: String): Any = data(key)._2
 
-  def isPassToAllDownstream: Boolean = 
get("passToAllDownstream").asInstanceOf[Boolean]
-
   def apply(key: String): Any = get(key)
 
   def toTuple: Tuple =

Reply via email to