Copilot commented on code in PR #5900:
URL: https://github.com/apache/texera/pull/5900#discussion_r3456476332


##########
common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala:
##########
@@ -220,7 +220,7 @@ class IcebergDocumentSpec extends 
VirtualDocumentSpec[Tuple] with BeforeAndAfter
 
     val writer = stateDocument.writer(UUID.randomUUID().toString)
     writer.open()
-    writer.putOne(state.toTuple)
+    writer.putOne(state.toTuple())

Review Comment:
   This test still uses `"loop_counter"` inside the user `State` JSON (and 
asserts it via `State.fromTuple`), but the new format promotes loop bookkeeping 
to dedicated columns. Keeping `loop_counter` in the content makes the test 
inconsistent with the intended contract and doesn’t validate the new columns.
   
   Consider moving the counter into the `toTuple(loopCounter=..., 
loopStartId=..., loopStartStateUri=...)` arguments and asserting those columns 
on the stored `Tuple` instead of asserting a JSON key.



##########
common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala:
##########
@@ -252,7 +252,7 @@ class IcebergDocumentSpec extends 
VirtualDocumentSpec[Tuple] with BeforeAndAfter
 
     val writer = stateDocument.writer(UUID.randomUUID().toString)
     writer.open()
-    states.foreach(state => writer.putOne(state.toTuple))
+    states.foreach(state => writer.putOne(state.toTuple()))
     writer.close()

Review Comment:
   This multi-row state materialization test still uses `"loop_counter"` as a 
key inside the user `State` JSON and sorts/asserts on it after 
`State.fromTuple`. With loop bookkeeping now being first-class columns, this 
test is likely to become misleading (it validates a JSON field rather than the 
dedicated columns).
   
   Consider switching the test to use non-reserved user keys (e.g., `"i"`) and 
set/assert the loop bookkeeping via `state.toTuple(loopCounter = ..., ...)` and 
the stored row’s `getField("loop_counter")` instead.



##########
amber/src/main/python/core/runnables/network_sender.py:
##########
@@ -100,7 +106,12 @@ def _send_data(self, to: ChannelIdentity, data_payload: 
DataPayload) -> None:
         elif isinstance(data_payload, StateFrame):
             data_header = PythonDataHeader(tag=to, payload_type="State")
             table = pa.Table.from_pydict(
-                {State.CONTENT: [data_payload.frame.to_json()]},
+                {
+                    State.CONTENT: [data_payload.frame.to_json()],

Review Comment:
   These new loop-bookkeeping columns are serialized into the Arrow table for 
`StateFrame`, but the Scala side currently deserializes incoming Python state 
as `State.fromTuple(ArrowUtils.getTexeraTuple(...))` and wraps it into 
`StateFrame(frame: State)` (dropping every non-`content` column). That means 
`loop_counter` / `loop_start_*` will not survive a real Python→Scala→Python 
hop, even though they’re being sent here.
   
   If the intent is for loop bookkeeping to be preserved end-to-end over the 
network, Scala needs to carry these columns as part of the state payload (e.g., 
extend Scala `StateFrame` with loop fields or forward the raw `Tuple`) and 
update `PythonProxyServer`/routing accordingly.



##########
amber/src/main/python/core/runnables/network_receiver.py:
##########
@@ -96,7 +96,12 @@ def data_handler(command: bytes, table: Table) -> int:
                 "Data",
                 lambda _: DataFrame(table),
                 "State",
-                lambda _: 
StateFrame(State.from_json(table[State.CONTENT][0].as_py())),
+                lambda _: StateFrame(
+                    State.from_json(table[State.CONTENT][0].as_py()),
+                    loop_counter=int(table[State.LOOP_COUNTER][0].as_py()),
+                    loop_start_id=table[State.LOOP_START_ID][0].as_py(),
+                    
loop_start_state_uri=table[State.LOOP_START_STATE_URI][0].as_py(),

Review Comment:
   This receiver now expects loop-bookkeeping columns to be present in every 
incoming State Arrow table. However, the current Scala Arrow-flight bridge 
deserializes State payloads via `State.fromTuple(...)` and emits a Scala 
`StateFrame(frame: State)` without preserving extra columns, so those columns 
may never reach Python in practice.
   
   If loop bookkeeping must round-trip across the Scala↔Python boundary, the 
Scala bridge/model needs to preserve and re-emit these columns (otherwise this 
code will only work for sources that bypass Scala and send the full 4-column 
table directly).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to