This is an automated email from the ASF dual-hosted git repository. aglinxinyuan pushed a commit to branch xinyuan-remove-passToAllDownstream in repository https://gitbox.apache.org/repos/asf/texera.git
commit fc8cdf83b081178ab1ea53c345410a099e1e6f38 Author: Xinyuan Lin <[email protected]> AuthorDate: Sun Apr 19 16:58:19 2026 -0700 init --- amber/src/main/python/core/models/operator.py | 6 ++---- amber/src/main/python/core/models/state.py | 5 +---- .../org/apache/texera/amber/core/executor/OperatorExecutor.scala | 8 +------- .../src/main/scala/org/apache/texera/amber/core/state/State.scala | 5 +---- 4 files changed, 5 insertions(+), 19 deletions(-) diff --git a/amber/src/main/python/core/models/operator.py b/amber/src/main/python/core/models/operator.py index 7905083995..6c2b657d74 100644 --- a/amber/src/main/python/core/models/operator.py +++ b/amber/src/main/python/core/models/operator.py @@ -108,14 +108,12 @@ class Operator(ABC): def process_state(self, state: State, port: int) -> Optional[State]: """ Process an input State from the given link. - The default implementation is to pass the State to all downstream operators - if the State has pass_to_all_downstream set to True. + The default implementation is to pass the State to all downstream operators. :param state: State, a State from an input port to be processed. :param port: int, input port index of the current exhausted port. :return: State, producing one State object """ - if state.passToAllDownstream: - return state + return state def produce_state_on_start(self, port: int) -> State: """ diff --git a/amber/src/main/python/core/models/state.py b/amber/src/main/python/core/models/state.py index 2c8a268dfb..feb35f2e27 100644 --- a/amber/src/main/python/core/models/state.py +++ b/amber/src/main/python/core/models/state.py @@ -26,11 +26,8 @@ from .schema.attribute_type import FROM_PYOBJECT_MAPPING @dataclass class State: - def __init__( - self, table: Optional[Table] = None, pass_to_all_downstream: bool = False - ): + def __init__(self, table: Optional[Table] = None): self.schema = Schema() - self.passToAllDownstream = pass_to_all_downstream if table is not None: self.__dict__.update(table.to_pandas().iloc[0].to_dict()) self.schema = Schema(table.schema) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala index f99739acc0..9837213abb 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala @@ -29,13 +29,7 @@ trait OperatorExecutor { def produceStateOnStart(port: Int): Option[State] = None - def processState(state: State, port: Int): Option[State] = { - if (state.isPassToAllDownstream) { - Some(state) - } else { - None - } - } + def processState(state: State, port: Int): Option[State] = Some(state) def processTupleMultiPort( tuple: Tuple, diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala index 3226c9d2fe..2b3465473b 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala @@ -23,9 +23,8 @@ import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tup import scala.collection.mutable -final case class State(tuple: Option[Tuple] = None, passToAllDownstream: Boolean = false) { +final case class State(tuple: Option[Tuple] = None) { val data: mutable.Map[String, (AttributeType, Any)] = mutable.LinkedHashMap() - add("passToAllDownstream", passToAllDownstream, AttributeType.BOOLEAN) if (tuple.isDefined) { tuple.get.getSchema.getAttributes.foreach { attribute => add(attribute.getName, tuple.get.getField(attribute.getName), attribute.getType) @@ -37,8 +36,6 @@ final case class State(tuple: Option[Tuple] = None, passToAllDownstream: Boolean def get(key: String): Any = data(key)._2 - def isPassToAllDownstream: Boolean = get("passToAllDownstream").asInstanceOf[Boolean] - def apply(key: String): Any = get(key) def toTuple: Tuple =
