Xiao-zhen-Liu commented on code in PR #4490:
URL: https://github.com/apache/texera/pull/4490#discussion_r3192837833
##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala:
##########
@@ -57,6 +58,9 @@ object State {
def fromTuple(row: Tuple): State = fromJson(row.getField[String](Content))
+ def uriFromResultUri(resultUri: URI): URI =
Review Comment:
This helper is hacky. State and Result are sibling resources — both belong
to the same `(workflowId, executionId, globalPortId)` and neither is derived
from the other. Deriving one URI from another by string replacement also breaks
if `/result` appears anywhere else in the URI (e.g. an operator named `result`).
A cleaner approach to add a `VFSURIFactory.createStateURI(workflowId,
executionId, globalPortId)` that mirrors `createResultURI`, build state URIs
through it, and delete this helper.
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala:
##########
@@ -84,6 +89,19 @@ class InputPortMaterializationReaderThread(
// Notify the input port of start of input channel
emitECM(METHOD_START_CHANNEL, NO_ALIGNMENT)
try {
+ val stateDocument =
+ DocumentFactory
+ .openDocument(State.uriFromResultUri(uri))
+ ._1
+ .asInstanceOf[VirtualDocument[Tuple]]
+ val stateReadIterator = stateDocument.get()
+ while (stateReadIterator.hasNext) {
+ val state = State.fromTuple(stateReadIterator.next())
+ inputMessageQueue.put(
Review Comment:
I think you are sending the state to each worker, which is our design
decision. It would be good to note this in the code documentation, as the
data-reading loop next filters by `partitioner.getBucketIndex`, while this loop
intentionally enqueues every state to every worker. Without a note, the
asymmetry reads like an oversight.
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala:
##########
@@ -106,6 +124,7 @@ class InputPortMaterializationReaderThread(
}
// Flush any remaining tuples in the buffer.
if (buffer.nonEmpty) flush()
+
Review Comment:
Remove this unintentional new line.
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala:
##########
@@ -84,6 +89,19 @@ class InputPortMaterializationReaderThread(
// Notify the input port of start of input channel
emitECM(METHOD_START_CHANNEL, NO_ALIGNMENT)
try {
+ val stateDocument =
+ DocumentFactory
+ .openDocument(State.uriFromResultUri(uri))
+ ._1
+ .asInstanceOf[VirtualDocument[Tuple]]
+ val stateReadIterator = stateDocument.get()
+ while (stateReadIterator.hasNext) {
Review Comment:
Does it not matter the order we read the states vs. the tuples? If so, this
is a design decision that should be documented here.
##########
amber/src/main/python/core/models/state.py:
##########
@@ -41,6 +41,10 @@ def from_json(cls, payload: str) -> "State":
def from_tuple(cls, row: Tuple) -> "State":
return cls.from_json(row[cls.CONTENT])
+ @staticmethod
+ def uri_from_result_uri(result_uri: str) -> str:
Review Comment:
Ditto.
##########
amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py:
##########
@@ -17,8 +17,8 @@
import typing
from loguru import logger
-from pyarrow import Table
from typing import Union
+from pyarrow import Table
Review Comment:
Unrelated import reorder — please revert to keep the diff focused on the
state-materialization change.
##########
amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py:
##########
@@ -138,8 +147,15 @@ def run(self) -> None:
self.uri
)
self.emit_ecm("StartChannel",
EmbeddedControlMessageType.NO_ALIGNMENT)
- storage_iterator = self.materialization.get()
+ state_document, _ = DocumentFactory.open_document(
Review Comment:
Ditto about the design documentation.
##########
amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py:
##########
@@ -125,6 +125,15 @@ def tuple_to_batch_with_filter(self, tuple_: Tuple) ->
typing.Iterator[DataFrame
if receiver == self.worker_actor_id:
yield self.tuples_to_data_frame(tuples)
+ def emit_state_with_filter(self, state: State) ->
typing.Iterator[DataPayload]:
Review Comment:
This Python implementation seems to be different from Scala imp. This routes
through `partitioner.flush_state` only to immediately filter the broadcast back
down to this worker — given the design that every worker reads every state, the
partitioner detour is a no-op. Consider dropping this method and emitting
`StateFrame(State.from_tuple(row))` directly in `run()`, mirroring the Scala
reader. The current name (`_with_filter`) also implies routing logic that
doesn't actually exist.
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala:
##########
@@ -232,6 +239,10 @@ class OutputManager(
})
}
+ def saveStateToStorageIfNeeded(state: State): Unit = {
Review Comment:
This writes the same state into every output port's state table. For
multi-output-port operators that's a fan-out by N. Is this intended? If yes,
please add a comment explaining the behavior so it's not read as a bug. Same
goes for Python.
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala:
##########
@@ -245,7 +256,7 @@ class OutputManager(
writerThread.join()
case None =>
}
-
+ this.stateWriters.remove(outputPortId).foreach(_.close())
Review Comment:
Result tuples are written via a dedicated `OutputPortResultWriterThread` so
the DP thread isn't blocked on Iceberg I/O. State writes here run synchronously
on the caller's thread (DataProcessor), so every `putOne` — and eventually
buffer flushes / commits — stalls the DP thread. For loops that emit state
frequently this becomes a real cost. Please mirror the result path with a
`OutputPortStateWriterThread` (or generalize the existing one) so DP just
enqueues.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]