This is an automated email from the ASF dual-hosted git repository.
Xiao-zhen-Liu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 653d82e150 refactor(core): remove unused passToAllDownstream flag from
State (#4415)
653d82e150 is described below
commit 653d82e15034e854f494b72fecf173a9c1f46796
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Apr 24 12:14:21 2026 -0700
refactor(core): remove unused passToAllDownstream flag from State (#4415)
### What changes were proposed in this PR?
This PR removes the `passToAllDownstream` flag from the `State` class
and the corresponding branch in the default `processState`
implementation, in both the Scala and Python sides of the engine.
Background: when I originally introduced `passToAllDownstream`, I didn't
think the design through carefully. In practice:
- The flag is not set to `true` anywhere in the codebase.
- The only code that reads it is the default `processState` itself,
which means the flag has no observable effect today.
- I don't plan to build any feature on top of this flag. The idea it was
gesturing at (limiting how far a state propagates) will be reintroduced
later as a proper "scope" concept with a different mechanism, not as a
boolean carried on every `State` instance.
After this PR, the default behavior of `processState` is: **pass the
incoming state to all downstream operators**, unless an operator
explicitly consumes it by overriding `processState` and returning `None`
(Scala) / not returning a `State` (Python). This matches what the
previous default was *trying* to express, minus the dead flag.
This PR also fixes the mirror typo in the return type hint.
### Any related issues, documentation, discussions?
Closes #4414
### How was this PR tested?
Tested with existing unit/integration tests — no test referenced
`passToAllDownstream` / `pass_to_all_downstream`, and the default
behavior after this change (`processState` returning the state as-is) is
what the previous implementation already produced whenever the flag was
effectively unused, i.e. in every current code path. No new tests were
added because this is a pure removal of dead configuration with no new
behavior to cover.
### Was this PR authored or co-authored using generative AI tooling?
No.
---------
Co-authored-by: Chen Li <[email protected]>
---
amber/src/main/python/core/models/operator.py | 10 ++++------
amber/src/main/python/core/models/state.py | 5 +----
.../apache/texera/amber/core/executor/OperatorExecutor.scala | 8 +-------
.../main/scala/org/apache/texera/amber/core/state/State.scala | 5 +----
4 files changed, 7 insertions(+), 21 deletions(-)
diff --git a/amber/src/main/python/core/models/operator.py
b/amber/src/main/python/core/models/operator.py
index 7905083995..952e2a12c8 100644
--- a/amber/src/main/python/core/models/operator.py
+++ b/amber/src/main/python/core/models/operator.py
@@ -108,16 +108,14 @@ class Operator(ABC):
def process_state(self, state: State, port: int) -> Optional[State]:
"""
Process an input State from the given link.
- The default implementation is to pass the State to all downstream
operators
- if the State has pass_to_all_downstream set to True.
+ The default implementation is to pass the State to all downstream
operators.
:param state: State, a State from an input port to be processed.
:param port: int, input port index of the current exhausted port.
:return: State, producing one State object
"""
- if state.passToAllDownstream:
- return state
+ return state
- def produce_state_on_start(self, port: int) -> State:
+ def produce_state_on_start(self, port: int) -> Optional[State]:
"""
Produce a State when the given link started.
@@ -126,7 +124,7 @@ class Operator(ABC):
"""
pass
- def produce_state_on_finish(self, port: int) -> State:
+ def produce_state_on_finish(self, port: int) -> Optional[State]:
"""
Produce a State after the input port is exhausted.
diff --git a/amber/src/main/python/core/models/state.py
b/amber/src/main/python/core/models/state.py
index 2c8a268dfb..feb35f2e27 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -26,11 +26,8 @@ from .schema.attribute_type import FROM_PYOBJECT_MAPPING
@dataclass
class State:
- def __init__(
- self, table: Optional[Table] = None, pass_to_all_downstream: bool =
False
- ):
+ def __init__(self, table: Optional[Table] = None):
self.schema = Schema()
- self.passToAllDownstream = pass_to_all_downstream
if table is not None:
self.__dict__.update(table.to_pandas().iloc[0].to_dict())
self.schema = Schema(table.schema)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
index f99739acc0..9837213abb 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
@@ -29,13 +29,7 @@ trait OperatorExecutor {
def produceStateOnStart(port: Int): Option[State] = None
- def processState(state: State, port: Int): Option[State] = {
- if (state.isPassToAllDownstream) {
- Some(state)
- } else {
- None
- }
- }
+ def processState(state: State, port: Int): Option[State] = Some(state)
def processTupleMultiPort(
tuple: Tuple,
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
index 3226c9d2fe..2b3465473b 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
@@ -23,9 +23,8 @@ import org.apache.texera.amber.core.tuple.{Attribute,
AttributeType, Schema, Tup
import scala.collection.mutable
-final case class State(tuple: Option[Tuple] = None, passToAllDownstream:
Boolean = false) {
+final case class State(tuple: Option[Tuple] = None) {
val data: mutable.Map[String, (AttributeType, Any)] = mutable.LinkedHashMap()
- add("passToAllDownstream", passToAllDownstream, AttributeType.BOOLEAN)
if (tuple.isDefined) {
tuple.get.getSchema.getAttributes.foreach { attribute =>
add(attribute.getName, tuple.get.getField(attribute.getName),
attribute.getType)
@@ -37,8 +36,6 @@ final case class State(tuple: Option[Tuple] = None,
passToAllDownstream: Boolean
def get(key: String): Any = data(key)._2
- def isPassToAllDownstream: Boolean =
get("passToAllDownstream").asInstanceOf[Boolean]
-
def apply(key: String): Any = get(key)
def toTuple: Tuple =