zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2922745280
> After my previous comment I started thinking about how our Java code
handles this kind of thing. Typically when we do a `Future#cancel(mayInterrupt:
true)` we have `Thread#isInterrupted` checks in the async code and throw
`InterruptedException` in response. Then we let the exception bubble up the
stack and we're done.
>
> Would that maybe be an option? Have a cancellation token that rather than
triggering `return Pending` triggers a `return
Ready(Err(DataFusionError::ExecutionJoin))` or something similar. Error
handling logic is already in place throughout the codebase and errors propagate
nicely up the call chain. That might be a way to avoid having to sprinkle these
checks all over the place in the codebase.
@pepijnve I agree with this is the possible solution, and also combined to
make the cancellation low level, something like:
### Option A: Wrap in `DataSourceExec.open()`
```rust
impl DataSource for FileScanConfig {
fn open(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
// 1. build your raw stream
let raw_stream = Box::pin(FileStream::new(…)?);
// 2. wrap it in a generic cancellation stream
let cancellable =
CancellationStream::new(context.clone(), raw_stream);
// 3. return the wrapped stream
Ok(Box::pin(cancellable))
}
}
```
```rust
...
if ctx.is_cancelled() {
return Poll::Ready(Some(Err(DataFusionError::Execution("Query
cancelled".to_string().into()))));
}
...
```
But for the update cancellation logic, it seems make the logic complex,
current our ctx:
```rust
#[derive(Clone)]
pub struct SessionContext {
/// UUID for the session
session_id: String,
/// Session start time
session_start_time: DateTime<Utc>,
/// Shared session state for the session
state: Arc<RwLock<SessionState>>,
}
```
I can try this, i think the benefit is, when the consumer is trying to
consume different types streaming exec(all based datasource streaming). So if
one receive the cancel signal, it will return err Query cancelled. So it will
not polling other streaming execs?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]