Copilot commented on code in PR #4490:
URL: https://github.com/apache/texera/pull/4490#discussion_r3191948113


##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala:
##########
@@ -126,6 +126,7 @@ class DataProcessor(
       val outputState = executor.processState(state, port)
       if (outputState.isDefined) {
         outputManager.emitState(outputState.get)
+        outputManager.saveStateToStorageIfNeeded(outputState.get)
       }
     } catch safely {

Review Comment:
   State materialization is only triggered from `processInputState` (i.e., when 
a `StateFrame` is received). However, Scala also emits state via 
`produceStateOnStart` / `produceStateOnFinish` in `StartChannelHandler` and 
`EndChannelHandler` (they call `outputManager.emitState(...)` directly). Those 
start/finish states will not be persisted to the new state table, so downstream 
regions reading from materialization will miss them. Consider persisting any 
state that is emitted (e.g., also call `saveStateToStorageIfNeeded` in the 
start/end channel handlers, or move the persistence hook into/alongside 
`emitState` so all emission paths are covered).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to