ACking-you commented on issue #7604:
URL: 
https://github.com/apache/arrow-datafusion/issues/7604#issuecomment-1729309581

   > @ACking-you - this looks like a real issue. Why did you close the issue?
   
   When I run the following code with version 30.0.0 it gives me an error, but 
31.0.0 behaves fine:
   ```rust
   use arrow::array::{Array, ArrayRef, TimestampMillisecondArray};
   use arrow_schema::TimeUnit;
   use async_trait::async_trait;
   use datafusion::arrow::array::{UInt64Builder, UInt8Builder};
   use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
   use datafusion::arrow::record_batch::RecordBatch;
   use datafusion::datasource::{TableProvider, TableType};
   use datafusion::error::Result;
   use datafusion::execution::context::{SessionState, TaskContext};
   use datafusion::physical_plan::expressions::PhysicalSortExpr;
   use datafusion::physical_plan::memory::MemoryStream;
   use datafusion::physical_plan::{
       project_schema, DisplayAs, DisplayFormatType, ExecutionPlan,
       SendableRecordBatchStream, Statistics,
   };
   use datafusion::prelude::*;
   use datafusion_expr::Expr;
   use std::any::Any;
   use std::collections::{BTreeMap, HashMap};
   use std::fmt::{Debug, Formatter};
   use std::sync::{Arc, Mutex};
   
   #[tokio::main]
   async fn main() -> Result<()> {
       // create our custom datasource and adding some users
       let db = CustomDataSource::default();
       db.populate_users();
       let ctx = SessionContext::new();
       ctx.register_table("test_table", Arc::new(db))?;
       let df = ctx
           .sql("select min(date),b from test_table group by b")
           .await?;
       df.show().await?;
       Ok(())
   }
   
   /// A User, with an id and a bank account
   #[derive(Clone, Debug)]
   struct User {
       id: u8,
       bank_account: u64,
       date: i64,
   }
   
   /// A custom datasource, used to represent a datastore with a single index
   #[derive(Clone)]
   pub struct CustomDataSource {
       inner: Arc<Mutex<CustomDataSourceInner>>,
   }
   
   struct CustomDataSourceInner {
       data: HashMap<u8, User>,
       bank_account_index: BTreeMap<u64, u8>,
   }
   
   impl Debug for CustomDataSource {
       fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
           f.write_str("custom_db")
       }
   }
   
   impl CustomDataSource {
       pub(crate) async fn create_physical_plan(
           &self,
           projections: Option<&Vec<usize>>,
           schema: SchemaRef,
       ) -> Result<Arc<dyn ExecutionPlan>> {
           Ok(Arc::new(CustomExec::new(projections, schema, self.clone())))
       }
   
       pub(crate) fn populate_users(&self) {
           self.add_user(User {
               id: 1,
               bank_account: 9_000,
               date: 0,
           });
           self.add_user(User {
               id: 2,
               bank_account: 100,
               date: 1,
           });
           self.add_user(User {
               id: 3,
               bank_account: 100,
               date: 2,
           });
       }
   
       fn add_user(&self, user: User) {
           let mut inner = self.inner.lock().unwrap();
           inner.bank_account_index.insert(user.bank_account, user.id);
           inner.data.insert(user.id, user);
       }
   }
   
   impl Default for CustomDataSource {
       fn default() -> Self {
           CustomDataSource {
               inner: Arc::new(Mutex::new(CustomDataSourceInner {
                   data: Default::default(),
                   bank_account_index: Default::default(),
               })),
           }
       }
   }
   
   #[async_trait]
   impl TableProvider for CustomDataSource {
       fn as_any(&self) -> &dyn Any {
           self
       }
   
       fn schema(&self) -> SchemaRef {
           SchemaRef::new(Schema::new(vec![
               Field::new("a", DataType::UInt8, false),
               Field::new("b", DataType::UInt64, true),
               Field::new(
                   "date",
                   DataType::Timestamp(TimeUnit::Millisecond, 
Some("+08:00".into())),
                   false,
               ),
           ]))
       }
   
       fn table_type(&self) -> TableType {
           TableType::Base
       }
   
       async fn scan(
           &self,
           _state: &SessionState,
           projection: Option<&Vec<usize>>,
           // filters and limit can be used here to inject some push-down 
operations if needed
           _filters: &[Expr],
           _limit: Option<usize>,
       ) -> Result<Arc<dyn ExecutionPlan>> {
           return self.create_physical_plan(projection, self.schema()).await;
       }
   }
   
   #[derive(Debug, Clone)]
   struct CustomExec {
       db: CustomDataSource,
       projected_schema: SchemaRef,
   }
   
   impl CustomExec {
       fn new(
           projections: Option<&Vec<usize>>,
           schema: SchemaRef,
           db: CustomDataSource,
       ) -> Self {
           let projected_schema = project_schema(&schema, projections).unwrap();
           Self {
               db,
               projected_schema,
           }
       }
   }
   
   impl DisplayAs for CustomExec {
       fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
           todo!()
       }
   }
   
   impl ExecutionPlan for CustomExec {
       fn as_any(&self) -> &dyn Any {
           self
       }
   
       fn schema(&self) -> SchemaRef {
           self.projected_schema.clone()
       }
   
       fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning 
{
           datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
       }
   
       fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
           None
       }
   
       fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
           vec![]
       }
   
       fn with_new_children(
           self: Arc<Self>,
           _: Vec<Arc<dyn ExecutionPlan>>,
       ) -> Result<Arc<dyn ExecutionPlan>> {
           Ok(self)
       }
   
       fn execute(
           &self,
           _partition: usize,
           _context: Arc<TaskContext>,
       ) -> Result<SendableRecordBatchStream> {
           let users: Vec<User> = {
               let db = self.db.inner.lock().unwrap();
               db.data.values().cloned().collect()
           };
           let mut id_array = UInt8Builder::new();
           let mut account_array = UInt64Builder::new();
           let mut columns: Vec<ArrayRef> = vec![];
           for field in &self.projected_schema.fields {
               match field.data_type() {
                   DataType::UInt8 => {
                       for u in users.iter() {
                           id_array.append_value(u.id);
                       }
                       columns.push(Arc::new(id_array.finish()));
                   }
                   DataType::Timestamp(TimeUnit::Millisecond, _) => unsafe {
                       let mut values = Vec::new();
                       for u in users.iter() {
                           values.push(u.date);
                       }
                       columns.push(Arc::new(
                           TimestampMillisecondArray::from_iter_values(values)
                               .with_timezone("+08:00"),
                       ));
                   },
                   DataType::UInt64 => {
                       for u in users.iter() {
                           account_array.append_value(u.bank_account);
                       }
                       columns.push(Arc::new(account_array.finish()));
                   }
                   _ => {
                       unreachable!("")
                   }
               }
           }
   
           Ok(Box::pin(MemoryStream::try_new(
               vec![RecordBatch::try_new(
                   self.projected_schema.clone(),
                   columns,
               )?],
               self.schema(),
               None,
           )?))
       }
   
       fn statistics(&self) -> Statistics {
           Statistics::default()
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to