zhuqi-lucas commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916774947

   @pepijnve It works for me, the change code is here:
   
   ```rust
   tokio = { workspace = true, features = ["macros", "signal"]}
   ```
   
   
   ```rust
   use arrow::array::{Int64Array, RecordBatch};
   use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
   use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, 
TaskContext};
   use datafusion::functions_aggregate::sum;
   use datafusion::physical_expr::aggregate::AggregateExprBuilder;
   use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
   use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, 
PhysicalGroupBy};
   use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
   use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, 
PlanProperties};
   use datafusion::{common, physical_plan};
   use futures::{Stream, StreamExt};
   use std::any::Any;
   use std::error::Error;
   use std::fmt::Formatter;
   use std::pin::Pin;
   use std::sync::Arc;
   use std::task::{Context, Poll};
   use tokio::signal::ctrl_c;
   use datafusion::prelude::SessionContext;
   
   struct InfiniteStream {
       batch: RecordBatch,
       poll_count: usize,
   }
   
   impl RecordBatchStream for InfiniteStream {
       fn schema(&self) -> SchemaRef {
           self.batch.schema()
       }
   }
   
   impl Stream for InfiniteStream {
       type Item = common::Result<RecordBatch>;
   
       fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
           self.poll_count += 1;
           if self.poll_count % 10_000 == 0 {
               println!("InfiniteStream::poll_next {} times", self.poll_count);
           }
           Poll::Ready(Some(Ok(self.batch.clone())))
       }
   }
   
   #[derive(Debug)]
   struct InfiniteExec {
       batch: RecordBatch,
       properties: PlanProperties,
   }
   
   impl InfiniteExec {
       fn new(batch: &RecordBatch) -> Self {
           InfiniteExec {
               batch: batch.clone(),
               properties: PlanProperties::new(
                   EquivalenceProperties::new(batch.schema().clone()),
                   Partitioning::UnknownPartitioning(1),
                   EmissionType::Incremental,
                   Boundedness::Unbounded {
                       requires_infinite_memory: false,
                   },
               ),
           }
       }
   }
   
   impl DisplayAs for InfiniteExec {
       fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
           write!(f, "infinite")
       }
   }
   
   impl ExecutionPlan for InfiniteExec {
       fn name(&self) -> &str {
           "infinite"
       }
   
       fn as_any(&self) -> &dyn Any {
           self
       }
   
       fn schema(&self) -> SchemaRef {
           self.batch.schema()
       }
   
       fn properties(&self) -> &PlanProperties {
           &self.properties
       }
   
       fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
           vec![]
       }
   
       fn with_new_children(
           self: Arc<Self>,
           _children: Vec<Arc<dyn ExecutionPlan>>,
       ) -> common::Result<Arc<dyn ExecutionPlan>> {
           Ok(self.clone())
       }
   
       fn execute(
           &self,
           _partition: usize,
           _context: Arc<TaskContext>,
       ) -> common::Result<SendableRecordBatchStream> {
           Ok(Box::pin(InfiniteStream {
               batch: self.batch.clone(),
               poll_count: 0,
           }))
       }
   }
   
   #[tokio::test]
   async fn main() -> Result<(), Box<dyn Error>> {
       // 1) build session & schema & sample batch
       let session_ctx = SessionContext::new();
       let schema = Arc::new(Schema::new(Fields::try_from(vec![
           Field::new("value", DataType::Int64, false),
       ])?));
       let mut builder = Int64Array::builder(8192);
       for v in 0..8192 {
           builder.append_value(v);
       }
       let batch = RecordBatch::try_new(schema.clone(), 
vec![Arc::new(builder.finish())])?;
   
       // 2) set up the infinite source + aggregation
       let inf = Arc::new(InfiniteExec::new(&batch));
       let aggr = Arc::new(AggregateExec::try_new(
           AggregateMode::Single,
           PhysicalGroupBy::new(vec![], vec![], vec![]),
           vec![Arc::new(
               AggregateExprBuilder::new(sum::sum_udaf(), vec![Arc::new(
                   
datafusion::physical_expr::expressions::Column::new_with_schema(
                       "value", &schema,
                   )?
               )])
                   .schema(inf.schema())
                   .alias("sum")
                   .build()?,
           )],
           vec![None],
           inf.clone(),
           inf.schema(),
       )?);
   
       // 3) get the stream
       let mut stream = physical_plan::execute_stream(aggr, 
session_ctx.task_ctx())?;
   
       println!("Running query; press Ctrl-C to cancel");
       // 4) drive the stream inline in select!
       let result = tokio::select! {
       batch_opt = async {
           loop {
               if let Some(item) = stream.next().await {
                   break Some(item);
               } else {
                   break None;
               }
           }
       } => batch_opt,
           _ = ctrl_c() => {
               println!("Cancellation received!");
               None
           }
       };
   
       // 5) handle the outcome
       match result {
           None => println!("No result (cancelled or empty)"),
           Some(Ok(batch)) => println!("Got batch with {} rows", 
batch.num_rows()),
           Some(Err(e)) => eprintln!("Error: {}", e),
       }
   
       println!("Exiting, stream will be dropped now");
       Ok(())
   }
   
   ```
   
   
   Testing result, ctril c works well:
   ```rust
   Running query; press Ctrl-C to cancel
   InfiniteStream::poll_next 10000 times
   InfiniteStream::poll_next 20000 times
   InfiniteStream::poll_next 30000 times
   InfiniteStream::poll_next 40000 times
   InfiniteStream::poll_next 50000 times
   Cancellation received!
   No result (cancelled or empty)
   Exiting, stream will be dropped now
   
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to