This is an automated email from the ASF dual-hosted git repository. kszucs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new cc5d687 ARROW-4467: [Rust] [DataFusion] Create a REPL & Dockerfile for DataFusion cc5d687 is described below commit cc5d6876206e457e562820e054864c2ec6464064 Author: Zhiyuan Zheng <zhiyuan.zh...@yandex.com> AuthorDate: Mon Apr 15 13:23:11 2019 +0200 ARROW-4467: [Rust] [DataFusion] Create a REPL & Dockerfile for DataFusion This pr contains a REPL implementation of DataFusion and create a Dockerfile for it. Which achieves the following workflow w/o coding: [https://gist.github.com/zhzy0077/4fd32795691ae7725a323d9a61e55c9d](https://gist.github.com/zhzy0077/4fd32795691ae7725a323d9a61e55c9d) Known Issue: 1. Don't support `unsigned` data types since `sqlparser-rs` haven't supported yet. 2. I don't know how to push docker images to `apache/arrow-datafusion`. Help wanted here @andygrove Author: Zhiyuan Zheng <zhiyuan.zh...@yandex.com> Closes #4147 from zhzy0077/master and squashes the following commits: deb83a75f <Zhiyuan Zheng> add dockerfile & update readme.md 1ca46d7d0 <Zhiyuan Zheng> Remove debug infos & add tests for create external table. babecc6cd <Zhiyuan Zheng> add repl for DataFusion --- rust/.gitignore | 2 + rust/datafusion/Cargo.toml | 8 +- rust/datafusion/Dockerfile | 24 +++ rust/datafusion/README.md | 31 ++++ rust/datafusion/src/bin/repl.rs | 203 +++++++++++++++++++++ rust/datafusion/src/execution/context.rs | 103 ++++++++++- rust/datafusion/src/execution/mod.rs | 1 + rust/datafusion/src/execution/scalar_relation.rs | 65 +++++++ rust/datafusion/src/logicalplan.rs | 18 ++ .../src/optimizer/projection_push_down.rs | 13 ++ rust/datafusion/src/sql/parser.rs | 24 ++- rust/datafusion/tests/sql.rs | 53 ++++++ 12 files changed, 524 insertions(+), 21 deletions(-) diff --git a/rust/.gitignore b/rust/.gitignore index fa8d85a..6580b8e 100644 --- a/rust/.gitignore +++ b/rust/.gitignore @@ -1,2 +1,4 @@ Cargo.lock target + +.history \ No newline at end of file diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index 8dcb657..202a98b 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -34,15 +34,21 @@ edition = "2018" name = "datafusion" path = "src/lib.rs" +[[bin]] +name = "datafusion-cli" +path = "src/bin/repl.rs" + [dependencies] fnv = "1.0.3" arrow = { path = "../arrow" } parquet = { path = "../parquet" } -datafusion-rustyline = "2.0.0-alpha-20180628" serde = { version = "1.0.80", features = ["rc"] } serde_derive = "1.0.80" serde_json = "1.0.33" sqlparser = "0.2.0" +clap = "2.33.0" +rustyline = "3.0.0" +prettytable-rs = "0.8.0" [dev-dependencies] criterion = "0.2.0" diff --git a/rust/datafusion/Dockerfile b/rust/datafusion/Dockerfile new file mode 100644 index 0000000..8b16313 --- /dev/null +++ b/rust/datafusion/Dockerfile @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +FROM rustlang/rust:nightly + +COPY rust /arrow/rust/ +WORKDIR /arrow/rust/datafusion +RUN cargo install --bin datafusion-cli --path . + +CMD ["datafusion-cli", "--data-path", "/data"] diff --git a/rust/datafusion/README.md b/rust/datafusion/README.md index 925a0d2..654f5ba 100644 --- a/rust/datafusion/README.md +++ b/rust/datafusion/README.md @@ -23,6 +23,8 @@ DataFusion is an in-memory query engine that uses Apache Arrow as the memory mod ## Usage + +#### Use as a lib Add this to your Cargo.toml: ```toml @@ -30,6 +32,35 @@ Add this to your Cargo.toml: datafusion = "0.14.0-SNAPSHOT" ``` +#### Use as a bin +##### Build your own bin(requires rust toolchains) +```sh +git clone https://github/apache/arrow +cd arrow/rust/datafusion +cargo run --bin datafusion-cli +``` +##### Use Dockerfile +```sh +git clone https://github/apache/arrow +cd arrow +docker build -f rust/datafusion/Dockerfile . --tag datafusion-cli +docker run -it -v $(your_data_location):/data datafusion-cli +``` + +``` +USAGE: + datafusion-cli [OPTIONS] + +FLAGS: + -h, --help Prints help information + -V, --version Prints version information + +OPTIONS: + -c, --batch-size <batch-size> The batch size of each query, default value is 1048576 + -p, --data-path <data-path> Path to your data, default to current directory +``` + + # Status ## General diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs new file mode 100644 index 0000000..ac73b26 --- /dev/null +++ b/rust/datafusion/src/bin/repl.rs @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate clap; + +use arrow::array::*; +use arrow::datatypes::{DataType, TimeUnit}; +use clap::{App, Arg}; +use datafusion::error::{ExecutionError, Result}; +use datafusion::execution::context::ExecutionContext; +use datafusion::execution::relation::Relation; +use prettytable::{Cell, Row, Table}; +use rustyline::Editor; +use std::cell::RefMut; +use std::env; +use std::path::Path; + +fn main() { + let matches = App::new("DataFusion") + .version(crate_version!()) + .about( + "DataFusion is an in-memory query engine that uses Apache Arrow \ + as the memory model. It supports executing SQL queries against CSV and \ + Parquet files as well as querying directly against in-memory data.", + ) + .arg( + Arg::with_name("data-path") + .help("Path to your data, default to current directory") + .short("p") + .long("data-path") + .takes_value(true), + ) + .arg( + Arg::with_name("batch-size") + .help("The batch size of each query, default value is 1048576") + .short("c") + .long("batch-size") + .takes_value(true), + ) + .get_matches(); + + if let Some(path) = matches.value_of("data-path") { + let p = Path::new(path); + env::set_current_dir(&p).unwrap(); + }; + + let batch_size = matches + .value_of("batch-size") + .map(|size| size.parse::<usize>().unwrap()) + .unwrap_or(1_048_576); + + let mut ctx = ExecutionContext::new(); + + let mut rl = Editor::<()>::new(); + rl.load_history(".history").ok(); + + let mut query = "".to_owned(); + loop { + let readline = rl.readline("> "); + match readline { + Ok(ref line) if line.trim_end().ends_with(';') => { + query.push_str(line.trim_end()); + rl.add_history_entry(query.clone()); + match exec_and_print(&mut ctx, query, batch_size) { + Ok(_) => {} + Err(err) => println!("{:?}", err), + } + query = "".to_owned(); + } + Ok(ref line) => { + query.push_str(line); + query.push_str(" "); + } + Err(_) => { + break; + } + } + } + + rl.save_history(".history").ok(); +} + +fn exec_and_print( + ctx: &mut ExecutionContext, + sql: String, + batch_size: usize, +) -> Result<()> { + let relation = ctx.sql(&sql, batch_size)?; + print_result(relation.borrow_mut())?; + + Ok(()) +} + +fn print_result(mut results: RefMut<Relation>) -> Result<()> { + let mut row_count = 0; + let mut table = Table::new(); + let schema = results.schema(); + + let mut header = Vec::new(); + for field in schema.fields() { + header.push(Cell::new(&field.name())); + } + table.add_row(Row::new(header)); + + while let Some(batch) = results.next().unwrap() { + row_count += batch.num_rows(); + + for row in 0..batch.num_rows() { + let mut cells = Vec::new(); + for col in 0..batch.num_columns() { + let column = batch.column(col); + cells.push(Cell::new(&str_value(column.clone(), row)?)); + } + table.add_row(Row::new(cells)); + } + } + table.printstd(); + + if row_count > 1 { + println!("{} rows in set.", row_count); + } else { + println!("{} row in set.", row_count); + } + + Ok(()) +} + +macro_rules! make_string { + ($array_type:ty, $column: ident, $row: ident) => {{ + Ok($column + .as_any() + .downcast_ref::<$array_type>() + .unwrap() + .value($row) + .to_string()) + }}; +} + +fn str_value(column: ArrayRef, row: usize) -> Result<String> { + match column.data_type() { + DataType::Utf8 => Ok(column + .as_any() + .downcast_ref::<BinaryArray>() + .unwrap() + .get_string(row)), + DataType::Boolean => make_string!(BooleanArray, column, row), + DataType::Int16 => make_string!(Int16Array, column, row), + DataType::Int32 => make_string!(Int32Array, column, row), + DataType::Int64 => make_string!(Int64Array, column, row), + DataType::UInt8 => make_string!(UInt8Array, column, row), + DataType::UInt16 => make_string!(UInt16Array, column, row), + DataType::UInt32 => make_string!(UInt32Array, column, row), + DataType::UInt64 => make_string!(UInt64Array, column, row), + DataType::Float16 => make_string!(Float32Array, column, row), + DataType::Float32 => make_string!(Float32Array, column, row), + DataType::Float64 => make_string!(Float64Array, column, row), + DataType::Timestamp(unit) if *unit == TimeUnit::Second => { + make_string!(TimestampSecondArray, column, row) + } + DataType::Timestamp(unit) if *unit == TimeUnit::Millisecond => { + make_string!(TimestampMillisecondArray, column, row) + } + DataType::Timestamp(unit) if *unit == TimeUnit::Microsecond => { + make_string!(TimestampMicrosecondArray, column, row) + } + DataType::Timestamp(unit) if *unit == TimeUnit::Nanosecond => { + make_string!(TimestampNanosecondArray, column, row) + } + DataType::Date32(_) => make_string!(Date32Array, column, row), + DataType::Date64(_) => make_string!(Date64Array, column, row), + DataType::Time32(unit) if *unit == TimeUnit::Second => { + make_string!(Time32SecondArray, column, row) + } + DataType::Time32(unit) if *unit == TimeUnit::Millisecond => { + make_string!(Time32MillisecondArray, column, row) + } + DataType::Time32(unit) if *unit == TimeUnit::Microsecond => { + make_string!(Time64MicrosecondArray, column, row) + } + DataType::Time64(unit) if *unit == TimeUnit::Nanosecond => { + make_string!(Time64NanosecondArray, column, row) + } + _ => Err(ExecutionError::ExecutionError(format!( + "Unsupported {:?} type for repl.", + column.data_type() + ))), + } +} diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 1618e6a..1bebdb5 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -25,6 +25,8 @@ use std::sync::Arc; use arrow::datatypes::*; +use crate::arrow::array::ArrayRef; +use crate::arrow::builder::BooleanBuilder; use crate::datasource::csv::CsvFile; use crate::datasource::TableProvider; use crate::error::{ExecutionError, Result}; @@ -34,15 +36,18 @@ use crate::execution::filter::FilterRelation; use crate::execution::limit::LimitRelation; use crate::execution::projection::ProjectRelation; use crate::execution::relation::{DataSourceRelation, Relation}; +use crate::execution::scalar_relation::ScalarRelation; use crate::execution::table_impl::TableImpl; use crate::logicalplan::*; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::projection_push_down::ProjectionPushDown; use crate::optimizer::type_coercion::TypeCoercionRule; use crate::optimizer::utils; +use crate::sql::parser::FileType; use crate::sql::parser::{DFASTNode, DFParser}; use crate::sql::planner::{SchemaProvider, SqlToRel}; use crate::table::Table; +use sqlparser::sqlast::{SQLColumnDef, SQLType}; /// Execution context for registering data sources and executing queries pub struct ExecutionContext { @@ -50,7 +55,7 @@ pub struct ExecutionContext { } impl ExecutionContext { - /// Create a new excution context for in-memory queries + /// Create a new execution context for in-memory queries pub fn new() -> Self { Self { datasources: Rc::new(RefCell::new(HashMap::new())), @@ -83,9 +88,61 @@ impl ExecutionContext { Ok(self.optimize(&plan)?) } - other => Err(ExecutionError::General(format!( - "Cannot create logical plan from {:?}", - other + DFASTNode::CreateExternalTable { + name, + columns, + file_type, + header_row, + location, + } => { + let schema = Arc::new(self.build_schema(columns)?); + + Ok(Arc::new(LogicalPlan::CreateExternalTable { + schema, + name, + location, + file_type, + header_row, + })) + } + } + } + + fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> { + let mut fields = Vec::new(); + + for column in columns { + let data_type = self.make_data_type(column.data_type)?; + fields.push(Field::new(&column.name, data_type, column.allow_null)); + } + + Ok(Schema::new(fields)) + } + + fn make_data_type(&self, sql_type: SQLType) -> Result<DataType> { + match sql_type { + SQLType::BigInt => Ok(DataType::Int64), + SQLType::Int => Ok(DataType::Int32), + SQLType::SmallInt => Ok(DataType::Int16), + SQLType::Char(_) | SQLType::Varchar(_) | SQLType::Text => Ok(DataType::Utf8), + SQLType::Decimal(_, _) => Ok(DataType::Float64), + SQLType::Float(_) => Ok(DataType::Float32), + SQLType::Real | SQLType::Double => Ok(DataType::Float64), + SQLType::Boolean => Ok(DataType::Boolean), + SQLType::Date => Ok(DataType::Date64(DateUnit::Day)), + SQLType::Time => Ok(DataType::Time64(TimeUnit::Millisecond)), + SQLType::Timestamp => Ok(DataType::Date64(DateUnit::Millisecond)), + SQLType::Uuid + | SQLType::Clob(_) + | SQLType::Binary(_) + | SQLType::Varbinary(_) + | SQLType::Blob(_) + | SQLType::Regclass + | SQLType::Bytea + | SQLType::Custom(_) + | SQLType::Array(_) => Err(ExecutionError::General(format!( + "Unsupported data type: {:?}.", + sql_type ))), } } @@ -110,7 +167,7 @@ impl ExecutionContext { /// Get a table by name pub fn table(&mut self, table_name: &str) -> Result<Arc<Table>> { - match self.datasources.borrow().get(table_name) { + match (*self.datasources).borrow().get(table_name) { Some(provider) => { Ok(Arc::new(TableImpl::new(Arc::new(LogicalPlan::TableScan { schema_name: "".to_string(), @@ -151,7 +208,7 @@ impl ExecutionContext { ref table_name, ref projection, .. - } => match self.datasources.borrow().get(table_name) { + } => match (*self.datasources).borrow().get(table_name) { Some(provider) => { let ds = provider.scan(projection, batch_size)?; if ds.len() == 1 { @@ -277,6 +334,38 @@ impl ExecutionContext { } } + LogicalPlan::CreateExternalTable { + ref schema, + ref name, + ref location, + ref file_type, + ref header_row, + } => { + match file_type { + FileType::CSV => { + self.register_csv(name, location, schema, *header_row) + } + _ => { + return Err(ExecutionError::ExecutionError(format!( + "Unsupported file type {:?}.", + file_type + ))); + } + } + let mut builder = BooleanBuilder::new(1); + builder.append_value(true)?; + + let columns = vec![Arc::new(builder.finish()) as ArrayRef]; + Ok(Rc::new(RefCell::new(ScalarRelation::new( + Arc::new(Schema::new(vec![Field::new( + "result", + DataType::Boolean, + false, + )])), + columns, + )))) + } + _ => Err(ExecutionError::NotImplemented( "Unsupported logical plan for execution".to_string(), )), @@ -289,7 +378,7 @@ struct ExecutionContextSchemaProvider { } impl SchemaProvider for ExecutionContextSchemaProvider { fn get_table_meta(&self, name: &str) -> Option<Arc<Schema>> { - match self.datasources.borrow().get(name) { + match (*self.datasources).borrow().get(name) { Some(ds) => Some(ds.schema().clone()), None => None, } diff --git a/rust/datafusion/src/execution/mod.rs b/rust/datafusion/src/execution/mod.rs index cfd748a..d4f57a7 100644 --- a/rust/datafusion/src/execution/mod.rs +++ b/rust/datafusion/src/execution/mod.rs @@ -24,4 +24,5 @@ pub mod filter; pub mod limit; pub mod projection; pub mod relation; +pub mod scalar_relation; pub mod table_impl; diff --git a/rust/datafusion/src/execution/scalar_relation.rs b/rust/datafusion/src/execution/scalar_relation.rs new file mode 100644 index 0000000..96e3118 --- /dev/null +++ b/rust/datafusion/src/execution/scalar_relation.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Scalar relation, emit one fixed scalar value. + +use crate::error::Result; +use crate::execution::relation::Relation; +use arrow::array::ArrayRef; +use arrow::datatypes::Schema; +use arrow::record_batch::RecordBatch; +use std::sync::Arc; + +/// A relation emit single scalar array; +pub(super) struct ScalarRelation { + /// The schema for the limit relation, which is always the same as the schema of the input relation + schema: Arc<Schema>, + + value: Vec<ArrayRef>, + + /// The number of rows that have been returned so far + emitted: bool, +} + +impl ScalarRelation { + pub fn new(schema: Arc<Schema>, value: Vec<ArrayRef>) -> Self { + Self { + schema, + value, + emitted: false, + } + } +} + +impl Relation for ScalarRelation { + fn next(&mut self) -> Result<Option<RecordBatch>> { + if self.emitted { + return Ok(None); + } + + self.emitted = true; + + Ok(Some(RecordBatch::try_new( + self.schema().clone(), + self.value.clone(), + )?)) + } + + fn schema(&self) -> &Arc<Schema> { + &self.schema + } +} diff --git a/rust/datafusion/src/logicalplan.rs b/rust/datafusion/src/logicalplan.rs index c2ac37b..8e69056 100644 --- a/rust/datafusion/src/logicalplan.rs +++ b/rust/datafusion/src/logicalplan.rs @@ -24,6 +24,7 @@ use arrow::datatypes::{DataType, Field, Schema}; use crate::error::{ExecutionError, Result}; use crate::optimizer::utils; +use crate::sql::parser::FileType; /// Enumeration of supported function types (Scalar and Aggregate) #[derive(Serialize, Deserialize, Debug, Clone)] @@ -441,6 +442,19 @@ pub enum LogicalPlan { /// The schema description schema: Arc<Schema>, }, + /// Represents a create external table expression. + CreateExternalTable { + /// The table schema + schema: Arc<Schema>, + /// The table name + name: String, + /// The physical location + location: String, + /// The file type of physical file + file_type: FileType, + /// Whether the CSV file contains a header + header_row: bool, + }, } impl LogicalPlan { @@ -454,6 +468,7 @@ impl LogicalPlan { LogicalPlan::Aggregate { schema, .. } => &schema, LogicalPlan::Sort { schema, .. } => &schema, LogicalPlan::Limit { schema, .. } => &schema, + LogicalPlan::CreateExternalTable { schema, .. } => &schema, } } } @@ -530,6 +545,9 @@ impl LogicalPlan { write!(f, "Limit: {:?}", expr)?; input.fmt_with_indent(f, indent + 1) } + LogicalPlan::CreateExternalTable { ref name, .. } => { + write!(f, "CreateExternalTable: {:?}", name) + } } } } diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs index a75982f..52ac440 100644 --- a/rust/datafusion/src/optimizer/projection_push_down.rs +++ b/rust/datafusion/src/optimizer/projection_push_down.rs @@ -191,6 +191,19 @@ impl ProjectionPushDown { input: input.clone(), schema: schema.clone(), })), + LogicalPlan::CreateExternalTable { + schema, + name, + location, + file_type, + header_row, + } => Ok(Arc::new(LogicalPlan::CreateExternalTable { + schema: schema.clone(), + name: name.to_string(), + location: location.to_string(), + file_type: file_type.clone(), + header_row: *header_row, + })), } } diff --git a/rust/datafusion/src/sql/parser.rs b/rust/datafusion/src/sql/parser.rs index dc46ce0..74ae4cf 100644 --- a/rust/datafusion/src/sql/parser.rs +++ b/rust/datafusion/src/sql/parser.rs @@ -32,7 +32,7 @@ macro_rules! parser_err { } /// Types of files to parse as DataFrames -#[derive(Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub enum FileType { /// Newline-delimited JSON NdJson, @@ -134,18 +134,16 @@ impl DFParser { true }; - match self.parser.peek_token() { - Some(Token::Comma) => { - self.parser.next_token(); - columns.push(SQLColumnDef { - name: column_name, - data_type: data_type, - allow_null, - default: None, - is_primary: false, - is_unique: false, - }); - } + columns.push(SQLColumnDef { + name: column_name, + data_type: data_type, + allow_null, + default: None, + is_primary: false, + is_unique: false, + }); + match self.parser.next_token() { + Some(Token::Comma) => continue, Some(Token::RParen) => break, _ => { return parser_err!( diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 81c52c9..566fecf 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -241,6 +241,26 @@ fn csv_query_limit_zero() { assert_eq!(expected, actual); } +#[test] +fn csv_query_create_external_table() { + let mut ctx = ExecutionContext::new(); + register_aggregate_csv_by_sql(&mut ctx); + let sql = "SELECT c1, c2, c3, c4, c5, c6, c7, c8, c9, 10, c11, c12, c13 FROM aggregate_test_100 LIMIT 1"; + let actual = execute(&mut ctx, sql); + let expected = "\"c\"\t2\t1\t18109\t2033001162\t-6513304855495910254\t25\t43062\t1491205016\t10\t0.110830784\t0.9294097332465232\t\"6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW\"\n".to_string(); + assert_eq!(expected, actual); +} + +#[test] +fn csv_query_external_table_count() { + let mut ctx = ExecutionContext::new(); + register_aggregate_csv_by_sql(&mut ctx); + let sql = "SELECT COUNT(c12) FROM aggregate_test_100"; + let actual = execute(&mut ctx, sql); + let expected = "100\n".to_string(); + assert_eq!(expected, actual); +} + //TODO Uncomment the following test when ORDER BY is implemented to be able to test ORDER // BY + LIMIT /* @@ -273,6 +293,39 @@ fn aggr_test_schema() -> Arc<Schema> { ])) } +fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + + // TODO: The following c9 should be migrated to UInt32 and c10 should be UInt64 once unsigned is supported. + ctx.sql( + &format!( + " + CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 INT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT NOT NULL, + c5 INT NOT NULL, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 BIGINT NOT NULL, + c10 VARCHAR NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL + ) + STORED AS CSV + WITH HEADER ROW + LOCATION '{}/csv/aggregate_test_100.csv' + ", + testdata + ), + 1024, + ) + .unwrap(); +} + fn register_aggregate_csv(ctx: &mut ExecutionContext) { let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); let schema = aggr_test_schema();