aglinxinyuan commented on code in PR #5900:
URL: https://github.com/apache/texera/pull/5900#discussion_r3457100202


##########
amber/src/main/python/core/runnables/network_sender.py:
##########
@@ -100,7 +106,12 @@ def _send_data(self, to: ChannelIdentity, data_payload: 
DataPayload) -> None:
         elif isinstance(data_payload, StateFrame):
             data_header = PythonDataHeader(tag=to, payload_type="State")
             table = pa.Table.from_pydict(
-                {State.CONTENT: [data_payload.frame.to_json()]},
+                {
+                    State.CONTENT: [data_payload.frame.to_json()],

Review Comment:
   Good catch, and accurate — `PythonProxyServer` deserializes incoming Python 
state via `StateFrame(State.fromTuple(...))`, and Scala's `StateFrame(frame: 
State)` carries no loop fields, so loop bookkeeping doesn't survive a 
Python→Scala→Python hop.
   
   That's intentional for this PR, which is **dormant**: with no loop 
operators, `loop_counter` is always 0, so Scala dropping/defaulting it is 
observationally identical. And there's no short-table risk — this PR makes 
**both** Python and Scala `toTuple()` emit the full 4 columns, so every State 
Arrow table on the wire is 4-column and the receiver never KeyErrors.
   
   Preserving loop bookkeeping **end-to-end** is part of the loop-operator PR, 
where the durable channel is the iceberg materialization columns (read directly 
by the Python materialization reader), not the live Scala bridge. Extending 
Scala `StateFrame` (or confirming loops only use the materialization path) will 
land there.



##########
amber/src/main/python/core/runnables/network_receiver.py:
##########
@@ -96,7 +96,12 @@ def data_handler(command: bytes, table: Table) -> int:
                 "Data",
                 lambda _: DataFrame(table),
                 "State",
-                lambda _: 
StateFrame(State.from_json(table[State.CONTENT][0].as_py())),
+                lambda _: StateFrame(
+                    State.from_json(table[State.CONTENT][0].as_py()),
+                    loop_counter=int(table[State.LOOP_COUNTER][0].as_py()),
+                    loop_start_id=table[State.LOOP_START_ID][0].as_py(),
+                    
loop_start_state_uri=table[State.LOOP_START_STATE_URI][0].as_py(),

Review Comment:
   Good catch, and accurate — `PythonProxyServer` deserializes incoming Python 
state via `StateFrame(State.fromTuple(...))`, and Scala's `StateFrame(frame: 
State)` carries no loop fields, so loop bookkeeping doesn't survive a 
Python→Scala→Python hop.
   
   That's intentional for this PR, which is **dormant**: with no loop 
operators, `loop_counter` is always 0, so Scala dropping/defaulting it is 
observationally identical. And there's no short-table risk — this PR makes 
**both** Python and Scala `toTuple()` emit the full 4 columns, so every State 
Arrow table on the wire is 4-column and the receiver never KeyErrors.
   
   Preserving loop bookkeeping **end-to-end** is part of the loop-operator PR, 
where the durable channel is the iceberg materialization columns (read directly 
by the Python materialization reader), not the live Scala bridge. Extending 
Scala `StateFrame` (or confirming loops only use the materialization path) will 
land there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to