mithuncy opened a new issue, #22065:
URL: https://github.com/apache/datafusion/issues/22065
## Reproducible test case
Verified on `datafusion = "53.1.0"` + `datafusion-proto = "53.1.0"`. The bug
reproduces deterministically: a `LogicalPlan` with `null_aware = true` returns
0 rows when executed in-process, but returns 2 rows after a `to_proto` ->
`from_proto` round-trip.
### Setup
`Cargo.toml`:
```toml
[package]
name = "df_repro"
version = "0.1.0"
edition = "2021"
[dependencies]
datafusion = "53.1.0"
datafusion-proto = "53.1.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
```
`src/main.rs`:
```rust
use datafusion::execution::context::SessionContext;
use datafusion_proto::bytes::{
logical_plan_from_bytes_with_extension_codec,
logical_plan_to_bytes_with_extension_codec,
};
use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new();
// Inline VALUES so the leaves are LogicalPlan::Values, which the default
// codec knows how to serialize. The data shape matches the SQL intent:
// t1 = {1, 2, 3}
// excludes = {1, NULL} <-- NULL on the right side of NOT IN
let sql = "
SELECT id
FROM (VALUES (1), (2), (3)) AS t1(id)
WHERE id NOT IN (
SELECT bad_id
FROM (VALUES (CAST(1 AS INT)), (CAST(NULL AS INT))) AS
excludes(bad_id)
)
";
// The unoptimized plan still has Expr::InSubquery, which the default
codec
// rejects. We must run the optimizer first; that lowers NOT IN into
// LogicalPlan::Join { join_type: LeftAnti, null_aware: true, .. },
which is
// the exact shape any real consumer (e.g. parallel IPC) serializes.
let df = ctx.sql(sql).await?;
let plan = ctx.state().optimize(df.logical_plan())?;
// (a) Direct execution -- correct, returns 0 rows.
let direct =
ctx.execute_logical_plan(plan.clone()).await?.collect().await?;
let direct_rows: usize = direct.iter().map(|b| b.num_rows()).sum();
println!("direct rows: {direct_rows}");
// (b) Round-trip through datafusion-proto -- wrong, returns 2 rows.
let codec = DefaultLogicalExtensionCodec {};
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
let task_ctx = ctx.task_ctx();
let plan_rt =
logical_plan_from_bytes_with_extension_codec(&bytes, &task_ctx,
&codec)?;
let rt = ctx.execute_logical_plan(plan_rt).await?.collect().await?;
let rt_rows: usize = rt.iter().map(|b| b.num_rows()).sum();
println!("round-trip rows: {rt_rows}");
assert_eq!(direct_rows, 0, "direct execution should return 0 rows");
assert_eq!(rt_rows, 0, "round-trip should also return 0 rows -- BUG");
Ok(())
}
```
### Observed output
```
direct rows: 0
round-trip rows: 2
```
| Path | Result |
|---|---|
| Direct execution of optimized `LogicalPlan::Join` (`null_aware=true`) | 0
rows (correct) |
| `to_proto` -> `from_proto` -> execute | 2 rows (wrong -- behaves as plain
`LeftAnti`) |
## Root cause
`null_aware` is present on the **physical** `HashJoinExecNode` proto (added
in #19635) but missing from the **logical** `JoinNode` proto. The logical
encoder also drops it during destructuring, and the logical decoder cannot
restore it because the proto carries no field.
1. `datafusion/proto/proto/datafusion.proto` -- `message JoinNode` has 8
fields, no `null_aware`:
```protobuf
message JoinNode {
LogicalPlanNode left = 1;
LogicalPlanNode right = 2;
datafusion_common.JoinType join_type = 3;
datafusion_common.JoinConstraint join_constraint = 4;
repeated LogicalExprNode left_join_key = 5;
repeated LogicalExprNode right_join_key = 6;
datafusion_common.NullEquality null_equality = 7;
LogicalExprNode filter = 8;
// bool null_aware = 9; // MISSING
}
```
2. `datafusion/proto/src/logical_plan/mod.rs:1469` -- `to_proto`
destructures with `..`, so `null_aware` falls into "the rest" and is silently
discarded:
```rust
LogicalPlan::Join(Join {
left, right, on, filter, join_type, join_constraint,
null_equality,
.. // <- null_aware is here, silently dropped
}) => { /* builds protobuf::JoinNode without null_aware */ }
```
3. `datafusion/proto/src/logical_plan/mod.rs:830` -- `from_proto` rebuilds
the join via `LogicalPlanBuilder::join_with_expr_keys` / `join_using`, neither
of which carries a `null_aware` parameter. So even if the proto had the field,
the current decoder would still drop it on the way back in.
Net effect: any consumer of
`datafusion_proto::bytes::logical_plan_to_bytes_with_extension_codec`
(parallel-execution IPC, plan caching, distributed scheduling, etc.) silently
loses null-aware NOT IN semantics across the round-trip.
## Fix
1. **Proto:** add `bool null_aware = 9;` to `message JoinNode` in
`datafusion/proto/proto/datafusion.proto`. Default-`false` keeps wire
compatibility with older writers.
2. **`to_proto` (`logical_plan/mod.rs:1469`):** pull `null_aware` out of the
destructure and write it to the proto:
```rust
LogicalPlan::Join(Join {
left, right, on, filter, join_type, join_constraint,
null_equality, null_aware, ..
}) => {
// ...
protobuf::JoinNode { /* existing fields */, null_aware: *null_aware }
}
```
3. **`from_proto` (`logical_plan/mod.rs:830`):** stop using the builder
paths that ignore `null_aware`. Construct the `Join` directly via
`Join::try_new(left, right, on, filter, join_type, join_constraint,
null_equality, null_aware)` (signature already accepts an `Expr`-keyed `on` and
`null_aware: bool`, fits the decoder's data without a new builder API).
Alternatively, add an expr-keyed `LogicalPlanBuilder` variant that carries
`null_equality` + `null_aware`.
4. **Tests** in `datafusion/proto/tests/cases/roundtrip_logical_plan.rs`:
- **Round-trip equality:** build a `LogicalPlan::Join` with
`null_aware=true`, serialize, deserialize, assert the resulting plan's
`null_aware` is still `true`.
- **Execution regression:** run the `t1` / `excludes` reproducer above
through `to_proto` -> `from_proto` -> `execute`, and assert the result is 0
rows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]