Copilot commented on code in PR #5900:
URL: https://github.com/apache/texera/pull/5900#discussion_r3456476332
##########
common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala:
##########
@@ -220,7 +220,7 @@ class IcebergDocumentSpec extends
VirtualDocumentSpec[Tuple] with BeforeAndAfter
val writer = stateDocument.writer(UUID.randomUUID().toString)
writer.open()
- writer.putOne(state.toTuple)
+ writer.putOne(state.toTuple())
Review Comment:
This test still uses `"loop_counter"` inside the user `State` JSON (and
asserts it via `State.fromTuple`), but the new format promotes loop bookkeeping
to dedicated columns. Keeping `loop_counter` in the content makes the test
inconsistent with the intended contract and doesn’t validate the new columns.
Consider moving the counter into the `toTuple(loopCounter=...,
loopStartId=..., loopStartStateUri=...)` arguments and asserting those columns
on the stored `Tuple` instead of asserting a JSON key.
##########
common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala:
##########
@@ -252,7 +252,7 @@ class IcebergDocumentSpec extends
VirtualDocumentSpec[Tuple] with BeforeAndAfter
val writer = stateDocument.writer(UUID.randomUUID().toString)
writer.open()
- states.foreach(state => writer.putOne(state.toTuple))
+ states.foreach(state => writer.putOne(state.toTuple()))
writer.close()
Review Comment:
This multi-row state materialization test still uses `"loop_counter"` as a
key inside the user `State` JSON and sorts/asserts on it after
`State.fromTuple`. With loop bookkeeping now being first-class columns, this
test is likely to become misleading (it validates a JSON field rather than the
dedicated columns).
Consider switching the test to use non-reserved user keys (e.g., `"i"`) and
set/assert the loop bookkeeping via `state.toTuple(loopCounter = ..., ...)` and
the stored row’s `getField("loop_counter")` instead.
##########
amber/src/main/python/core/runnables/network_sender.py:
##########
@@ -100,7 +106,12 @@ def _send_data(self, to: ChannelIdentity, data_payload:
DataPayload) -> None:
elif isinstance(data_payload, StateFrame):
data_header = PythonDataHeader(tag=to, payload_type="State")
table = pa.Table.from_pydict(
- {State.CONTENT: [data_payload.frame.to_json()]},
+ {
+ State.CONTENT: [data_payload.frame.to_json()],
Review Comment:
These new loop-bookkeeping columns are serialized into the Arrow table for
`StateFrame`, but the Scala side currently deserializes incoming Python state
as `State.fromTuple(ArrowUtils.getTexeraTuple(...))` and wraps it into
`StateFrame(frame: State)` (dropping every non-`content` column). That means
`loop_counter` / `loop_start_*` will not survive a real Python→Scala→Python
hop, even though they’re being sent here.
If the intent is for loop bookkeeping to be preserved end-to-end over the
network, Scala needs to carry these columns as part of the state payload (e.g.,
extend Scala `StateFrame` with loop fields or forward the raw `Tuple`) and
update `PythonProxyServer`/routing accordingly.
##########
amber/src/main/python/core/runnables/network_receiver.py:
##########
@@ -96,7 +96,12 @@ def data_handler(command: bytes, table: Table) -> int:
"Data",
lambda _: DataFrame(table),
"State",
- lambda _:
StateFrame(State.from_json(table[State.CONTENT][0].as_py())),
+ lambda _: StateFrame(
+ State.from_json(table[State.CONTENT][0].as_py()),
+ loop_counter=int(table[State.LOOP_COUNTER][0].as_py()),
+ loop_start_id=table[State.LOOP_START_ID][0].as_py(),
+
loop_start_state_uri=table[State.LOOP_START_STATE_URI][0].as_py(),
Review Comment:
This receiver now expects loop-bookkeeping columns to be present in every
incoming State Arrow table. However, the current Scala Arrow-flight bridge
deserializes State payloads via `State.fromTuple(...)` and emits a Scala
`StateFrame(frame: State)` without preserving extra columns, so those columns
may never reach Python in practice.
If loop bookkeeping must round-trip across the Scala↔Python boundary, the
Scala bridge/model needs to preserve and re-emit these columns (otherwise this
code will only work for sources that bypass Scala and send the full 4-column
table directly).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]