This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-remove-passToAllDownstream
in repository https://gitbox.apache.org/repos/asf/texera.git

commit fc8cdf83b081178ab1ea53c345410a099e1e6f38
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Apr 19 16:58:19 2026 -0700

    init
---
 amber/src/main/python/core/models/operator.py                     | 6 ++----
 amber/src/main/python/core/models/state.py                        | 5 +----
 .../org/apache/texera/amber/core/executor/OperatorExecutor.scala  | 8 +-------
 .../src/main/scala/org/apache/texera/amber/core/state/State.scala | 5 +----
 4 files changed, 5 insertions(+), 19 deletions(-)

diff --git a/amber/src/main/python/core/models/operator.py 
b/amber/src/main/python/core/models/operator.py
index 7905083995..6c2b657d74 100644
--- a/amber/src/main/python/core/models/operator.py
+++ b/amber/src/main/python/core/models/operator.py
@@ -108,14 +108,12 @@ class Operator(ABC):
     def process_state(self, state: State, port: int) -> Optional[State]:
         """
         Process an input State from the given link.
-        The default implementation is to pass the State to all downstream 
operators
-        if the State has pass_to_all_downstream set to True.
+        The default implementation is to pass the State to all downstream 
operators.
         :param state: State, a State from an input port to be processed.
         :param port: int, input port index of the current exhausted port.
         :return: State, producing one State object
         """
-        if state.passToAllDownstream:
-            return state
+        return state
 
     def produce_state_on_start(self, port: int) -> State:
         """
diff --git a/amber/src/main/python/core/models/state.py 
b/amber/src/main/python/core/models/state.py
index 2c8a268dfb..feb35f2e27 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -26,11 +26,8 @@ from .schema.attribute_type import FROM_PYOBJECT_MAPPING
 
 @dataclass
 class State:
-    def __init__(
-        self, table: Optional[Table] = None, pass_to_all_downstream: bool = 
False
-    ):
+    def __init__(self, table: Optional[Table] = None):
         self.schema = Schema()
-        self.passToAllDownstream = pass_to_all_downstream
         if table is not None:
             self.__dict__.update(table.to_pandas().iloc[0].to_dict())
             self.schema = Schema(table.schema)
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
index f99739acc0..9837213abb 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
@@ -29,13 +29,7 @@ trait OperatorExecutor {
 
   def produceStateOnStart(port: Int): Option[State] = None
 
-  def processState(state: State, port: Int): Option[State] = {
-    if (state.isPassToAllDownstream) {
-      Some(state)
-    } else {
-      None
-    }
-  }
+  def processState(state: State, port: Int): Option[State] = Some(state)
 
   def processTupleMultiPort(
       tuple: Tuple,
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
index 3226c9d2fe..2b3465473b 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
@@ -23,9 +23,8 @@ import org.apache.texera.amber.core.tuple.{Attribute, 
AttributeType, Schema, Tup
 
 import scala.collection.mutable
 
-final case class State(tuple: Option[Tuple] = None, passToAllDownstream: 
Boolean = false) {
+final case class State(tuple: Option[Tuple] = None) {
   val data: mutable.Map[String, (AttributeType, Any)] = mutable.LinkedHashMap()
-  add("passToAllDownstream", passToAllDownstream, AttributeType.BOOLEAN)
   if (tuple.isDefined) {
     tuple.get.getSchema.getAttributes.foreach { attribute =>
       add(attribute.getName, tuple.get.getField(attribute.getName), 
attribute.getType)
@@ -37,8 +36,6 @@ final case class State(tuple: Option[Tuple] = None, 
passToAllDownstream: Boolean
 
   def get(key: String): Any = data(key)._2
 
-  def isPassToAllDownstream: Boolean = 
get("passToAllDownstream").asInstanceOf[Boolean]
-
   def apply(key: String): Any = get(key)
 
   def toTuple: Tuple =

Reply via email to