Ma77Ball opened a new pull request, #4985:
URL: https://github.com/apache/texera/pull/4985

   ### What changes were proposed in this PR?
   
     Fixes the Difference operator hanging when one upstream operator (e.g., a 
single CSV) is wired to both of its input ports.                                
                                                                         
      
     **The bug**                                                                
                                                                                
                                                                       
     - Scheduler materializes the upstream to break the self-join cycle.
     - Each downstream worker spawns one `InputPortMaterializationReaderThread` 
per upstream URI.
     - Each thread's virtual "from" actor ID was built from `(uri, 
workerActorId)` only.
     - With a self-join, both threads share the same URI and worker → same 
`ChannelIdentity` → FIFO sequence numbers and end-of-channel markers 
cross-routed → one port never drains → Difference hangs.
                                                                                
                                                                                
                                                                       
     **The fix**
     - Mix the destination `PortIdentity` into the actor name: 
`MATERIALIZATION_READER_<uri>_port<n>[i]_<workerActorId>`.                      
                                                                                
        
     - Thread `toPortId` through the three callers of 
`getFromActorIdForInputPortStorage`:
       - `ResourceAllocator` → `globalPortId.portId.`
       - `AssignPortHandler` → `msg.portId.`
       - `InputManager` → `InputPortMaterializationReaderThread` (new ctor 
field)
                                                                                
                                                                                
                                                                       
     ### Any related issues, documentation, or discussions?
                                                                                
                                                                                
                                                                       
    Closes: #2588
   
     ### How was this PR tested?
   
     - `VirtualIdentityUtilsSpec` — checks the new ID format and asserts 
distinct IDs for the same `(uri, worker)` but different port IDs.               
                                                                                
  
     - `ExpansionGreedyScheduleGeneratorSpec` — builds `csv → difference` with 
both inputs from the same csv; asserts `levelSets.size > 1`, proving 
materialization happens and the schedule isn't a deadlocked single-level region.
     - Manual: ran a workflow with one CSV connected to both Difference ports — 
previously hung, now completes.                                                 
                                                                       
                                                               
     ### Was this PR authored or co-authored using generative AI tooling?       
                                                                                
                                                                       
                                                               
     Co-authored with: Claude Opus 4.7  in compliance with ASF


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to