nathanb9 opened a new pull request, #22791:
URL: https://github.com/apache/datafusion/pull/22791

   ## Which issue does this PR close?
   
   Related to #22641 (same user-visible symptom, **different code path** — see 
below). Not a duplicate.
   
   ## Rationale for this change
   
   `NestedLoopJoinExec` can return **incorrect, non-deterministic** results for
   `LEFT`/`RIGHT`/`FULL` (and other left-final-emission) joins when the probe 
side
   has **more than one partition** — i.e. essentially any non-equi join under 
the
   default multi-partition config. Some unmatched-left rows are emitted 
**twice**:
   once correctly and once as a spurious `NULL`-padded "unmatched" row.
   
   ### Root cause
   
   The probe streams share a `probe_threads_counter`. Each stream is supposed to
   decrement it exactly once (when its right input is exhausted); the stream 
that
   drives the counter to `0` is the one that emits the unmatched-left rows, 
*after*
   all partitions have finished probing.
   
   But in `handle_emit_left_unmatched`, after `process_left_unmatched` returns
   `Ok(false)`, `maybe_flush_ready_batch()` can `return` early with a ready 
batch
   **before the state advances to `Done`**. The stream is still in
   `EmitLeftUnmatched`, so the next poll re-enters `process_left_unmatched` with
   `left_emit_idx == 0` and the gate
   `self.left_emit_idx == 0 && !left_data.report_probe_completed()` 
**decrements the
   shared counter a second time**. The counter then hits `0` *before* every
   partition has finished probing, so a partition emits unmatched-left rows 
early —
   including left rows that match in a not-yet-drained partition. (In traces the
   counter visibly underflows `0 → usize::MAX`.) Because it depends on partition
   scheduling, the result is non-deterministic.
   
   This is **distinct from #22641 / #22671**, which address the *memory-limited 
spill
   fallback* path. This bug reproduces with unbounded memory and with the disk
   manager disabled (spill path unreachable), i.e. it is in the normal 
single-pass
   path introduced by the NLJ rewrite (#16996).
   
   ## What changes are included in this PR?
   
   Decrement the shared `probe_threads_counter` **exactly once per probe 
stream** by
   guarding it with a new `probe_completed_reported: bool` on
   `NestedLoopJoinStream` (reset per chunk in the memory-limited path). 
One-file,
   +39/−5.
   
   ## Are these changes tested?
   
   - All existing `nested_loop_join` unit tests pass (42/42).
   - Verified end-to-end with a standalone harness that runs the failing query 
in a
     loop on a multi-threaded runtime (the bug is a scheduling race, so a 
single run
     often passes even when broken; a tight loop exposes it reliably):
   
     | run | before fix | after fix |
     |---|---|---|
     | 60× LEFT join, `target_partitions=4`, `batch_size=4` | `{6:42, 7:7, 8:4, 
9:7}` | **`{6:60}`** |
     | sweep of `target_partitions × batch_size` | wrong at every `tp ≥ 2` | 
**always correct** |
   
   > A deterministic unit test was attempted but the race does not reliably 
trigger
   > under the test harness's runtime, so it is intentionally omitted rather 
than
   > shipping a test that passes on the buggy code. Reviewers wanting to 
reproduce
   > can use the snippet below in a loop.
   
   ### Reproduction (datafusion-cli)
   
   The result must always be `6`. Because it is a scheduling race it is 
intermittent
   per invocation — loop it to observe divergence on an unpatched build:
   
   ```sql
   set datafusion.execution.target_partitions = 4;
   set datafusion.execution.batch_size = 4;
   
   create table t1(d date, b boolean, i int, v bigint) as values
    (date '1982-10-10', false, null, 4),
    (date '2064-11-25', true,  null, -72),
    (date '2064-10-06', true,  -95,  -41),
    (date '2013-01-20', false, 98,   null),
    (date '2011-04-30', false, 42,   -91);
   
   create table t2(s1 varchar, s2 varchar, w bigint) as values
    ('C.:~$wZ.bY@U|lN$fip1>N mZ', 'gUP0', 49),
    ('Dw8l4N(*Z<s#,Z', 'tS', 58),
    ('<Y-xI/8zG:a47tBp#vo%ah', 'I=Ieh', 83),
    ('Z]^6fijx3$', 'm0vc', 3),
    ('Id*B', '[3<_', -76);
   
   -- Deterministic correct answer (verify with target_partitions = 1): 6 rows.
   select count(*) as left_join_rows
   from t1 left join t2 on (t1.v < t2.w and t2.s2 <= 'a');
   ```
   
   - **Before:** intermittently returns `7`, `8`, or `9` (extra NULL-padded 
rows).
   - **After:** always returns `6`.
   
   ## Are there any user-facing changes?
   
   Bug fix only; no API changes. Outer / semi / anti / mark nested-loop joins 
with
   multiple probe partitions now return correct, deterministic results.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to