alamb commented on a change in pull request #1112: URL: https://github.com/apache/arrow-datafusion/pull/1112#discussion_r728302645
########## File path: datafusion/src/physical_plan/repartition.rs ########## @@ -365,56 +382,98 @@ impl RepartitionExec { Ok(()) } +} + +#[derive(Debug)] Review comment: The `AbortOnDrop` structure nicely encapsulates RAAI style, aborting a bunch of `JoinHandle`s on drop. What would you think about using that same structure in all of the operators that need this treatment (e.g. SortExec, etc) rather than using `PinnedDrop` and raw `JoinHandles` for the other You might even be able to encapsulate the JoinHandle creation into `AbortOnDrop` so it was easier to ensure that all tasks spawned in physical plans were doing the right thing (aka we could avoid using `tokio::task::spawn` directly). Having a controlled wrapper around task spawning might be useful for other activities too (like being able to control which threadpool was used to run the task) ########## File path: datafusion/src/physical_plan/repartition.rs ########## @@ -853,4 +919,26 @@ mod tests { let schema = batch1.schema(); BarrierExec::new(vec![vec![batch1, batch2], vec![batch3, batch4]], schema) } + + #[tokio::test] + async fn test_drop_cancel() -> Result<()> { + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); + + let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2)); + let refs = blocking_exec.refs(); + let sort_exec = Arc::new(RepartitionExec::try_new( + blocking_exec, + Partitioning::UnknownPartitioning(1), + )?); + + let fut = collect(sort_exec); + let mut fut = fut.boxed(); + + assert_is_pending(&mut fut); Review comment: 👍 ########## File path: datafusion/src/physical_plan/repartition.rs ########## @@ -55,11 +56,14 @@ pub struct RepartitionExec { /// Partitioning scheme to use partitioning: Partitioning, /// Channels for sending batches from input partitions to output partitions. - /// Key is the partition number - channels: Arc< - Mutex< + /// Key is the partition number. + /// + /// Stored alongside is an abort marker that will kill the background job once it's no longer needed. + channels_and_abort_helper: Arc< Review comment: What would you think about using a named struct (that can have additional documentation on it) rather than ```rust (HashMap<usize, (UnboundedSender<MaybeBatch>, UnboundedReceiver<MaybeBatch>)>, Arc<AbortOnDrop>,) ``` to ```rust struct ChannelsAndAbortHelper { channels: HashMap<usize, (UnboundedSender<MaybeBatch>, UnboundedReceiver<MaybeBatch>)>, abort: Arc<AbortOnDrop> } ``` And then you could rename `channels_and_abort_helper` to `state` or something shorter and have the inner field name add semantic value We can do that as a follow on refactor, but I wanted to get your opinion ########## File path: datafusion/src/physical_plan/windows/window_agg_exec.rs ########## @@ -240,16 +248,19 @@ impl WindowAggStream { let (tx, rx) = futures::channel::oneshot::channel(); let schema_clone = schema.clone(); let elapsed_compute = baseline_metrics.elapsed_compute().clone(); - tokio::spawn(async move { + let join_handle = tokio::spawn(async move { let schema = schema_clone.clone(); let result = WindowAggStream::process(input, window_expr, schema, elapsed_compute) .await; - tx.send(result) + + // failing here is OK, the receiver is gone and does not care about the result Review comment: 👍 ########## File path: datafusion/src/physical_plan/repartition.rs ########## @@ -365,56 +382,98 @@ impl RepartitionExec { Ok(()) } +} + +#[derive(Debug)] +struct AbortOnDrop(Vec<JoinHandle<()>>); + +impl Drop for AbortOnDrop { + fn drop(&mut self) { + for join_handle in &self.0 { + join_handle.abort(); + } + } +} +pin_project! { /// Waits for `input_task` which is consuming one of the inputs to /// complete. Upon each successful completion, sends a `None` to /// each of the output tx channels to signal one of the inputs is /// complete. Upon error, propagates the errors to all output tx /// channels. - async fn wait_for_task( + struct WaitForTask { + #[pin] input_task: JoinHandle<Result<()>>, txs: HashMap<usize, UnboundedSender<Option<ArrowResult<RecordBatch>>>>, - ) { + } + + impl PinnedDrop for WaitForTask { Review comment: As above, I wonder if this could have a single `abort_on_drop` field instead of `input_task` and this `PinnedDrop` implementation ########## File path: datafusion/src/physical_plan/windows/mod.rs ########## @@ -258,4 +260,35 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_drop_cancel() -> Result<()> { + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); + + let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); + let refs = blocking_exec.refs(); + let sort_exec = Arc::new(WindowAggExec::try_new( Review comment: ```suggestion let window_agg_exec = Arc::new(WindowAggExec::try_new( ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org