zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2948434379
Updated the reproducer case now in latest PR, and i commented it because our
rule or built-in Yield still not fix this.
```rust
#[tokio::test]
async fn test_filter_reject_all_batches_cancel() -> Result<(), Box<dyn
Error>> {
// 1) Create a Session, Schema, and an 8K-row RecordBatch
let session_ctx = SessionContext::new();
let schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
"value",
DataType::Int64,
false,
)]));
// Build a batch with values 0..8191
let mut builder = Int64Array::builder(8_192);
for v in 0..8_192 {
builder.append_value(v);
}
let batch = Arc::new(RecordBatch::try_new(
schema.clone(),
vec![Arc::new(builder.finish())],
)?);
// 2a) Wrap this batch in an InfiniteExec
let infinite = Arc::new(InfiniteExec::new(&batch));
// 2b) Construct a FilterExec that is always false: “value > 10000” (no
rows pass)
let false_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
Arc::new(Column::new_with_schema("value", &schema)?),
Gt,
Arc::new(Literal::new(ScalarValue::Int64(Some(10_000)))),
));
let filtered: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(false_predicate, infinite.clone())?);
// 2c) Use CoalesceBatchesExec to guarantee each Filter pull always
yields an 8192-row batch
let coalesced: Arc<dyn ExecutionPlan> =
Arc::new(CoalesceBatchesExec::new(filtered.clone(), 8_192));
// 2d) Hash-repartition into 1 partition (so that a later global
aggregation would run on a single partition)
let exprs: Vec<Arc<dyn PhysicalExpr>> =
vec![Arc::new(Column::new_with_schema("value", &schema)?)];
let part = Partitioning::Hash(exprs.clone(), 1);
let hashed: Arc<dyn ExecutionPlan> =
Arc::new(RepartitionExec::try_new(coalesced.clone(), part.clone())?);
// 4) WrapLeaves to insert YieldExec—so that the InfiniteExec yields
control between batches
let config = ConfigOptions::new();
let optimized = WrapLeaves::new().optimize(hashed, &config)?;
// 5) Execute with a 1-second timeout. Because Filter discards all 8192
rows each time
// without ever producing output, no batch will arrive within 1
second. And since
// emission type is not Final, we never see an end‐of‐stream marker.
let mut stream = physical_plan::execute_stream(optimized,
session_ctx.task_ctx())?;
const TIMEOUT: u64 = 1;
let result = select! {
batch_opt = stream.next() => batch_opt,
_ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) =>
{
None
}
};
assert!(
result.is_none(),
"Expected no output for infinite + filter(all-false) + aggregate,
but got a batch"
);
Ok(())
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]