zhuqi-lucas commented on issue #16193: URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916774947
@pepijnve It works for me, the change code is here: ```rust tokio = { workspace = true, features = ["macros", "signal"]} ``` ```rust use arrow::array::{Int64Array, RecordBatch}; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::functions_aggregate::sum; use datafusion::physical_expr::aggregate::AggregateExprBuilder; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use datafusion::{common, physical_plan}; use futures::{Stream, StreamExt}; use std::any::Any; use std::error::Error; use std::fmt::Formatter; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::signal::ctrl_c; use datafusion::prelude::SessionContext; struct InfiniteStream { batch: RecordBatch, poll_count: usize, } impl RecordBatchStream for InfiniteStream { fn schema(&self) -> SchemaRef { self.batch.schema() } } impl Stream for InfiniteStream { type Item = common::Result<RecordBatch>; fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { self.poll_count += 1; if self.poll_count % 10_000 == 0 { println!("InfiniteStream::poll_next {} times", self.poll_count); } Poll::Ready(Some(Ok(self.batch.clone()))) } } #[derive(Debug)] struct InfiniteExec { batch: RecordBatch, properties: PlanProperties, } impl InfiniteExec { fn new(batch: &RecordBatch) -> Self { InfiniteExec { batch: batch.clone(), properties: PlanProperties::new( EquivalenceProperties::new(batch.schema().clone()), Partitioning::UnknownPartitioning(1), EmissionType::Incremental, Boundedness::Unbounded { requires_infinite_memory: false, }, ), } } } impl DisplayAs for InfiniteExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { write!(f, "infinite") } } impl ExecutionPlan for InfiniteExec { fn name(&self) -> &str { "infinite" } fn as_any(&self) -> &dyn Any { self } fn schema(&self) -> SchemaRef { self.batch.schema() } fn properties(&self) -> &PlanProperties { &self.properties } fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { vec![] } fn with_new_children( self: Arc<Self>, _children: Vec<Arc<dyn ExecutionPlan>>, ) -> common::Result<Arc<dyn ExecutionPlan>> { Ok(self.clone()) } fn execute( &self, _partition: usize, _context: Arc<TaskContext>, ) -> common::Result<SendableRecordBatchStream> { Ok(Box::pin(InfiniteStream { batch: self.batch.clone(), poll_count: 0, })) } } #[tokio::test] async fn main() -> Result<(), Box<dyn Error>> { // 1) build session & schema & sample batch let session_ctx = SessionContext::new(); let schema = Arc::new(Schema::new(Fields::try_from(vec![ Field::new("value", DataType::Int64, false), ])?)); let mut builder = Int64Array::builder(8192); for v in 0..8192 { builder.append_value(v); } let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())])?; // 2) set up the infinite source + aggregation let inf = Arc::new(InfiniteExec::new(&batch)); let aggr = Arc::new(AggregateExec::try_new( AggregateMode::Single, PhysicalGroupBy::new(vec![], vec![], vec![]), vec![Arc::new( AggregateExprBuilder::new(sum::sum_udaf(), vec![Arc::new( datafusion::physical_expr::expressions::Column::new_with_schema( "value", &schema, )? )]) .schema(inf.schema()) .alias("sum") .build()?, )], vec![None], inf.clone(), inf.schema(), )?); // 3) get the stream let mut stream = physical_plan::execute_stream(aggr, session_ctx.task_ctx())?; println!("Running query; press Ctrl-C to cancel"); // 4) drive the stream inline in select! let result = tokio::select! { batch_opt = async { loop { if let Some(item) = stream.next().await { break Some(item); } else { break None; } } } => batch_opt, _ = ctrl_c() => { println!("Cancellation received!"); None } }; // 5) handle the outcome match result { None => println!("No result (cancelled or empty)"), Some(Ok(batch)) => println!("Got batch with {} rows", batch.num_rows()), Some(Err(e)) => eprintln!("Error: {}", e), } println!("Exiting, stream will be dropped now"); Ok(()) } ``` Testing result, ctril c works well: ```rust Running query; press Ctrl-C to cancel InfiniteStream::poll_next 10000 times InfiniteStream::poll_next 20000 times InfiniteStream::poll_next 30000 times InfiniteStream::poll_next 40000 times InfiniteStream::poll_next 50000 times Cancellation received! No result (cancelled or empty) Exiting, stream will be dropped now ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org