This is an automated email from the ASF dual-hosted git repository.

bobbai00 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 8383e19bdf fix(execution-api): fix multi-input operator's execution 
termination condition (#4615)
8383e19bdf is described below

commit 8383e19bdf2d819abf649d65e3ca5d1322bcc6a7
Author: Jiadong Bai <[email protected]>
AuthorDate: Fri May 1 15:55:22 2026 -0700

    fix(execution-api): fix multi-input operator's execution termination 
condition (#4615)
    
    ### What changes were proposed in this PR?
    
    This PR fixes a race in `SyncExecutionResource.allTargetsCompleted` that
    causes the sync execution API (`POST /api/execution/{wid}/{cuid}/run`)
    to terminate before a HashJoin's probe phase produces output, returning
    an empty result.
    
    **Root cause.** `HashJoinOpDesc.getPhysicalPlan` produces two
    PhysicalOps (`build`, `probe`) sharing one logical id, separated by a
    blocking edge. The scheduler places them in two regions and runs them
    sequentially. `WorkflowExecution.getAllRegionExecutionsStats` aggregates
    per-logical-op state by `groupBy(_._1.logicalOpId.id)` over only the
    *registered* `RegionExecution`s. Between "build region completed" and
    "probe region instantiated," only the build PhysicalOp is registered, so
    `aggregateStates(Iterable(COMPLETED))` returns `COMPLETED`. The sync
    resource then takes the `TargetResultsReady` branch, calls
    `killExecution`, and reads the probe's still-empty Iceberg output. The
    same shape applies to any logical operator whose physical plan contains
    multiple PhysicalOps separated by a blocking edge (e.g., `Aggregate`).
    It does not surface in the regular WebSocket-driven frontend execution
    because the frontend waits for full workflow termination.
    
    **Fix.** Strengthen `allTargetsCompleted` to require, in addition to
    `operatorState == COMPLETED`, that every declared external input port of
    the target is already present in
    `OperatorMetrics.operatorStatistics.inputMetrics`. Port-1 metrics only
    appear after the probe actually consumes data, which closes the race
    window. Internal ports (e.g., HashJoin's build→probe internal edge) are
    filtered out on both sides of the comparison so the predicate matches
    what `aggregateMetrics` already exposes. Source operators (zero declared
    inputs) and single-input operators are unaffected; for empty-input edge
    cases, `terminalStateObservable` continues to provide the fallback
    signal.
    
    ```scala
    val targetExpectedExternalInputs: Map[String, Int] = 
effectiveLogicalPlan.operators
      .filter(op => 
request.targetOperatorIds.contains(op.operatorIdentifier.id))
      .map(op =>
        op.operatorIdentifier.id -> 
op.operatorInfo.inputPorts.count(!_.id.internal)
      )
      .toMap
    
    def allTargetsCompleted(stats: ExecutionStatsStore): Boolean = {
      request.targetOperatorIds.nonEmpty && request.targetOperatorIds.forall { 
opId =>
        stats.operatorInfo.get(opId).exists { metrics =>
          val externalInputPortsReporting =
            metrics.operatorStatistics.inputMetrics.count(!_.portId.internal)
          val expectedExternalInputs = 
targetExpectedExternalInputs.getOrElse(opId, 0)
          metrics.operatorState == COMPLETED &&
          externalInputPortsReporting >= expectedExternalInputs
        }
      }
    }
    ```
    
    ### Any related issues, documentation, discussions?
    
    Closes #4576
    
    ### How was this PR tested?
    
    Manually reproduced and verified end-to-end against
    `ComputingUnitMaster` on port 8085 with a 3-operator DAG (CSVFileScan
    movies + CSVFileScan ratings → HashJoin on `movieId`) executed via `POST
    /api/execution/{wid}/{cuid}/run` with `targetOperatorIds =
    [HashJoinId]`. Inputs: `movies.csv` (1000 rows) and `ratings.csv` (10
    311 rows).
    
    Steps to reproduce / verify:
    
    ```
    # 1. Start the master
    sbt "project WorkflowExecutionService" compile
    java ... org.apache.texera.web.ComputingUnitMaster   # listens on :8085
    
    # 2. Get a JWT
    curl -s -X POST http://localhost:8080/api/auth/login \
      -H "Content-Type: application/json" \
      -d '{"username":"<user>","password":"<pw>"}'
    
    # 3. POST the request (CSV → CSV → HashJoin, target = HashJoin)
    curl -s -X POST http://localhost:8085/api/execution/<wid>/<cuid>/run \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer <token>" \
      --data @sync-exec-request.json
    ```
    
    Existing tests pass (`sbt "project WorkflowExecutionService" compile`
    succeeds). No new unit test was added because the failure is a timing
    race in the controller's region-registration sequence relative to the
    sync resource's observable; reproducing it deterministically in a unit
    test would require either mocking `ExecutionStatsStore` to emit a
    build-only snapshot followed by a build+probe snapshot, or driving the
    full controller actor system, both of which are out of scope for this
    targeted fix. Manual reproduction is reliable on every run because the
    race window is several hundred milliseconds wide and `Observable.amb`
    consistently selects the (incorrect) target-completion signal first
    prior to this fix.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
    
    ---------
    
    Co-authored-by: Xinyuan Lin <[email protected]>
---
 .../texera/web/resource/SyncExecutionResource.scala     | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala
 
b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala
index 489c9016d5..d3047db580 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala
@@ -191,12 +191,27 @@ class SyncExecutionResource extends LazyLogging {
       val currentConsoleState = 
executionService.executionStateStore.consoleStore.getState
       val currentStatsState = 
executionService.executionStateStore.statsStore.getState
 
+      // Multi-region operators (e.g., HashJoin: build region then probe 
region) report their
+      // aggregated logical state as COMPLETED for a brief window after the 
first region
+      // terminates and before the second region's workers are added to 
regionExecutions.
+      // Guard against firing during that window by also requiring every 
declared external
+      // input port to be present in the operator's input metrics — port-1 
stats only appear
+      // once probe actually starts consuming, which closes the race.
+      val targetExpectedExternalInputs: Map[String, Int] = 
effectiveLogicalPlan.operators
+        .filter(op => 
request.targetOperatorIds.contains(op.operatorIdentifier.id))
+        .map(op => op.operatorIdentifier.id -> 
op.operatorInfo.inputPorts.count(!_.id.internal))
+        .toMap
+
       // Require COMPLETED, not just "has output", so upstream operators 
finish flushing
       // their data downstream before we tear the execution down.
       def allTargetsCompleted(stats: ExecutionStatsStore): Boolean = {
         request.targetOperatorIds.nonEmpty && request.targetOperatorIds.forall 
{ opId =>
           stats.operatorInfo.get(opId).exists { metrics =>
-            metrics.operatorState == COMPLETED
+            val externalInputPortsReporting =
+              metrics.operatorStatistics.inputMetrics.count(!_.portId.internal)
+            val expectedExternalInputs = 
targetExpectedExternalInputs.getOrElse(opId, 0)
+            metrics.operatorState == COMPLETED &&
+            externalInputPortsReporting >= expectedExternalInputs
           }
         }
       }

Reply via email to