[arrow-rs] branch master updated: Implement the Iterator trait for the json Reader. (#451)
This is an automated email from the ASF dual-hosted git repository. nevime pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new e5cda31 Implement the Iterator trait for the json Reader. (#451) e5cda31 is described below commit e5cda312b697c3d610637b28c58b6f1b104b41cc Author: Laurent Mazare AuthorDate: Sun Jun 13 08:22:38 2021 +0800 Implement the Iterator trait for the json Reader. (#451) * Implement the Iterator trait for the json Reader. * Use transpose. --- arrow/src/json/reader.rs | 39 +++ 1 file changed, 39 insertions(+) diff --git a/arrow/src/json/reader.rs b/arrow/src/json/reader.rs index d0b9c19..9235142 100644 --- a/arrow/src/json/reader.rs +++ b/arrow/src/json/reader.rs @@ -1569,6 +1569,14 @@ impl ReaderBuilder { } } +impl Iterator for Reader { +type Item = Result; + +fn next( self) -> Option { +self.next().transpose() +} +} + #[cfg(test)] mod tests { use crate::{ @@ -2946,4 +2954,35 @@ mod tests { assert_eq!(batch.num_columns(), 1); assert_eq!(batch.num_rows(), 3); } + +#[test] +fn test_json_iterator() { +let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(5); +let reader: Reader = builder +.build::(File::open("test/data/basic.json").unwrap()) +.unwrap(); +let schema = reader.schema(); +let (col_a_index, _) = schema.column_with_name("a").unwrap(); + +let mut sum_num_rows = 0; +let mut num_batches = 0; +let mut sum_a = 0; +for batch in reader { +let batch = batch.unwrap(); +assert_eq!(4, batch.num_columns()); +sum_num_rows += batch.num_rows(); +num_batches += 1; +let batch_schema = batch.schema(); +assert_eq!(schema, batch_schema); +let a_array = batch +.column(col_a_index) +.as_any() +.downcast_ref::() +.unwrap(); +sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::(); +} +assert_eq!(12, sum_num_rows); +assert_eq!(3, num_batches); +assert_eq!(111, sum_a); +} }
[arrow-rs] branch master updated: Add Decimal to CsvWriter and improve debug display (#406)
This is an automated email from the ASF dual-hosted git repository. nevime pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new fb45112 Add Decimal to CsvWriter and improve debug display (#406) fb45112 is described below commit fb451125c4ed49a425de10afb6f42af0d9723a19 Author: Ádám Lippai AuthorDate: Sun Jun 13 02:20:08 2021 +0200 Add Decimal to CsvWriter and improve debug display (#406) * Add Decimal to CsvWriter and improve debug display * Measure CSV writer instead of file and data creation * Re-use decimal formatting --- arrow/benches/csv_writer.rs | 19 ++- arrow/src/array/array_binary.rs | 36 arrow/src/csv/writer.rs | 23 --- arrow/src/util/display.rs | 27 --- 4 files changed, 62 insertions(+), 43 deletions(-) diff --git a/arrow/benches/csv_writer.rs b/arrow/benches/csv_writer.rs index 50b94d6..62c5da9 100644 --- a/arrow/benches/csv_writer.rs +++ b/arrow/benches/csv_writer.rs @@ -28,14 +28,14 @@ use arrow::record_batch::RecordBatch; use std::fs::File; use std::sync::Arc; -fn record_batches_to_csv() { +fn criterion_benchmark(c: Criterion) { #[cfg(feature = "csv")] { let schema = Schema::new(vec![ Field::new("c1", DataType::Utf8, false), Field::new("c2", DataType::Float64, true), Field::new("c3", DataType::UInt32, false), -Field::new("c3", DataType::Boolean, true), +Field::new("c4", DataType::Boolean, true), ]); let c1 = StringArray::from(vec![ @@ -59,16 +59,17 @@ fn record_batches_to_csv() { let file = File::create("target/bench_write_csv.csv").unwrap(); let mut writer = csv::Writer::new(file); let batches = vec![, , , , , , , , , , ]; -#[allow(clippy::unit_arg)] -criterion::black_box(for batch in batches { -writer.write(batch).unwrap() + +c.bench_function("record_batches_to_csv", |b| { +b.iter(|| { +#[allow(clippy::unit_arg)] +criterion::black_box(for batch in { +writer.write(batch).unwrap() +}); +}); }); } } -fn criterion_benchmark(c: Criterion) { -c.bench_function("record_batches_to_csv", |b| b.iter(record_batches_to_csv)); -} - criterion_group!(benches, criterion_benchmark); criterion_main!(benches); diff --git a/arrow/src/array/array_binary.rs b/arrow/src/array/array_binary.rs index 0cb4db4..0b374db 100644 --- a/arrow/src/array/array_binary.rs +++ b/arrow/src/array/array_binary.rs @@ -666,6 +666,17 @@ impl DecimalArray { self.length * i as i32 } +#[inline] +pub fn value_as_string(, row: usize) -> String { +let decimal_string = self.value(row).to_string(); +if self.scale == 0 { +decimal_string +} else { +let splits = decimal_string.split_at(decimal_string.len() - self.scale); +format!("{}.{}", splits.0, splits.1) +} +} + pub fn from_fixed_size_list_array( v: FixedSizeListArray, precision: usize, @@ -729,7 +740,9 @@ impl fmt::Debug for DecimalArray { fn fmt(, f: fmt::Formatter) -> fmt::Result { write!(f, "DecimalArray<{}, {}>\n[\n", self.precision, self.scale)?; print_long_array(self, f, |array, index, f| { -fmt::Debug::fmt((index), f) +let formatted_decimal = array.value_as_string(index); + +write!(f, "{}", formatted_decimal) })?; write!(f, "]") } @@ -758,7 +771,7 @@ impl Array for DecimalArray { #[cfg(test)] mod tests { use crate::{ -array::{LargeListArray, ListArray}, +array::{DecimalBuilder, LargeListArray, ListArray}, datatypes::Field, }; @@ -1163,17 +1176,16 @@ mod tests { #[test] fn test_decimal_array_fmt_debug() { -let values: [u8; 32] = [ -192, 219, 180, 17, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 36, 75, 238, 253, -255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, -]; -let array_data = ArrayData::builder(DataType::Decimal(23, 6)) -.len(2) -.add_buffer(Buffer::from([..])) -.build(); -let arr = DecimalArray::from(array_data); +let values: Vec = vec![888700, -888700]; +let mut decimal_builder = DecimalBuilder::new(3, 23, 6); + +values.iter().for_each(|| { +decimal_builder.append_value(value).unwrap(); +}); +decimal_builder.append_null().unwrap(); +let arr = decimal_builder.finish(); assert_eq!( -"DecimalArray<23, 6>\n[\n 888700,\n -888700,\n]", +"DecimalArray<23, 6>\n[\n
[arrow-rs] branch master updated: remove unnecessary wraps in sortk (#445)
This is an automated email from the ASF dual-hosted git repository. nevime pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new efe86cd remove unnecessary wraps in sortk (#445) efe86cd is described below commit efe86cdf329ec4bfad3b72bd23ee6558340fa297 Author: Jiayu Liu AuthorDate: Sun Jun 13 08:00:35 2021 +0800 remove unnecessary wraps in sortk (#445) --- arrow/src/compute/kernels/sort.rs | 96 +-- 1 file changed, 51 insertions(+), 45 deletions(-) diff --git a/arrow/src/compute/kernels/sort.rs b/arrow/src/compute/kernels/sort.rs index dff5695..b0eecb9 100644 --- a/arrow/src/compute/kernels/sort.rs +++ b/arrow/src/compute/kernels/sort.rs @@ -163,7 +163,7 @@ pub fn sort_to_indices( let (v, n) = partition_validity(values); -match values.data_type() { +Ok(match values.data_type() { DataType::Boolean => sort_boolean(values, v, n, , limit), DataType::Int8 => { sort_primitive::(values, v, n, cmp, , limit) @@ -278,10 +278,12 @@ pub fn sort_to_indices( DataType::Float64 => { sort_list::(values, v, n, , limit) } -t => Err(ArrowError::ComputeError(format!( -"Sort not supported for list type {:?}", -t -))), +t => { +return Err(ArrowError::ComputeError(format!( +"Sort not supported for list type {:?}", +t +))) +} }, DataType::LargeList(field) => match field.data_type() { DataType::Int8 => sort_list::(values, v, n, , limit), @@ -304,10 +306,12 @@ pub fn sort_to_indices( DataType::Float64 => { sort_list::(values, v, n, , limit) } -t => Err(ArrowError::ComputeError(format!( -"Sort not supported for list type {:?}", -t -))), +t => { +return Err(ArrowError::ComputeError(format!( +"Sort not supported for list type {:?}", +t +))) +} }, DataType::FixedSizeList(field, _) => match field.data_type() { DataType::Int8 => sort_list::(values, v, n, , limit), @@ -330,10 +334,12 @@ pub fn sort_to_indices( DataType::Float64 => { sort_list::(values, v, n, , limit) } -t => Err(ArrowError::ComputeError(format!( -"Sort not supported for list type {:?}", -t -))), +t => { +return Err(ArrowError::ComputeError(format!( +"Sort not supported for list type {:?}", +t +))) +} }, DataType::Dictionary(key_type, value_type) if *value_type.as_ref() == DataType::Utf8 => @@ -363,17 +369,21 @@ pub fn sort_to_indices( DataType::UInt64 => { sort_string_dictionary::(values, v, n, , limit) } -t => Err(ArrowError::ComputeError(format!( -"Sort not supported for dictionary key type {:?}", -t -))), +t => { +return Err(ArrowError::ComputeError(format!( +"Sort not supported for dictionary key type {:?}", +t +))) +} } } -t => Err(ArrowError::ComputeError(format!( -"Sort not supported for data type {:?}", -t -))), -} +t => { +return Err(ArrowError::ComputeError(format!( +"Sort not supported for data type {:?}", +t +))) +} +}) } /// Options that define how sort kernels should behave @@ -396,14 +406,13 @@ impl Default for SortOptions { } /// Sort primitive values -#[allow(clippy::unnecessary_wraps)] fn sort_boolean( values: , value_indices: Vec, null_indices: Vec, options: , limit: Option, -) -> Result { +) -> UInt32Array { let values = values .as_any() .downcast_ref::() @@ -469,11 +478,10 @@ fn sort_boolean( vec![], ); -Ok(UInt32Array::from(result_data)) +UInt32Array::from(result_data) } /// Sort primitive values -#[allow(clippy::unnecessary_wraps)] fn sort_primitive( values: , value_indices: Vec, @@ -481,7 +489,7 @@ fn sort_primitive( cmp: F, options: , limit: Option, -) -> Result +) -> UInt32Array where T: ArrowPrimitiveType, T::Native: std::cmp::PartialOrd, @@ -549,7 +557,7 @@ where vec![], ); -Ok(UInt32Array::from(result_data)) +
[arrow] branch master updated (0e9285b -> 27d89a9)
This is an automated email from the ASF dual-hosted git repository. kou pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from 0e9285b ARROW-13053: [Python] Fix build issue with Homebrewed arrow library add 27d89a9 ARROW-12962: [GLib][Ruby] Add Arrow::Scalar No new revisions were added by this update. Summary of changes: c_glib/arrow-glib/arrow-glib.h |1 + c_glib/arrow-glib/arrow-glib.hpp |1 + c_glib/arrow-glib/basic-array.cpp | 19 +- c_glib/arrow-glib/basic-array.h|3 + c_glib/arrow-glib/datum.cpp| 151 ++ c_glib/arrow-glib/datum.h | 26 +- c_glib/arrow-glib/datum.hpp|3 + c_glib/arrow-glib/meson.build |3 + c_glib/arrow-glib/scalar.cpp | 2382 c_glib/arrow-glib/scalar.h | 678 ++ c_glib/arrow-glib/{datum.hpp => scalar.hpp}| 31 +- .../arrow-dataset-glib/arrow-dataset-glib-docs.xml |4 + c_glib/doc/arrow-glib/arrow-glib-docs.xml |4 + c_glib/test/run-test.rb|4 + c_glib/test/test-array-datum.rb| 12 + .../buffer.rb => c_glib/test/test-binary-scalar.rb | 34 +- ...{test-array-datum.rb => test-boolean-scalar.rb} | 44 +- .../buffer.rb => c_glib/test/test-date32-scalar.rb | 33 +- .../buffer.rb => c_glib/test/test-date64-scalar.rb | 33 +- ...st-array-datum.rb => test-decimal128-scalar.rb} | 40 +- ...st-array-datum.rb => test-decimal256-scalar.rb} | 40 +- ...t-array-datum.rb => test-dense-union-scalar.rb} | 44 +- .../buffer.rb => c_glib/test/test-double-scalar.rb | 35 +- ...y-datum.rb => test-fixed-size-binary-scalar.rb} | 41 +- .../buffer.rb => c_glib/test/test-float-scalar.rb | 35 +- c_glib/test/test-function.rb | 19 + .../buffer.rb => c_glib/test/test-int16-scalar.rb | 32 +- .../buffer.rb => c_glib/test/test-int32-scalar.rb | 32 +- .../buffer.rb => c_glib/test/test-int64-scalar.rb | 32 +- .../buffer.rb => c_glib/test/test-int8-scalar.rb | 32 +- .../test/test-large-binary-scalar.rb | 34 +- .../test/test-large-string-scalar.rb | 34 +- .../{test-array-datum.rb => test-list-scalar.rb} | 38 +- c_glib/test/test-map-scalar.rb | 65 + .../buffer.rb => c_glib/test/test-null-scalar.rb | 28 +- .../{test-array-datum.rb => test-scalar-datum.rb} | 33 +- ...-array-datum.rb => test-sparse-union-scalar.rb} | 44 +- .../{test-array-datum.rb => test-string-scalar.rb} | 47 +- .../{test-array-datum.rb => test-struct-scalar.rb} | 47 +- .../buffer.rb => c_glib/test/test-time32-scalar.rb | 34 +- .../buffer.rb => c_glib/test/test-time64-scalar.rb | 34 +- ...est-array-datum.rb => test-timestamp-scalar.rb} | 40 +- .../buffer.rb => c_glib/test/test-uint16-scalar.rb | 32 +- .../buffer.rb => c_glib/test/test-uint32-scalar.rb | 32 +- .../buffer.rb => c_glib/test/test-uint64-scalar.rb | 32 +- .../buffer.rb => c_glib/test/test-uint8-scalar.rb | 32 +- ruby/red-arrow/lib/arrow/buffer.rb | 16 +- ...er.rb => constructor-arguments-gc-guardable.rb} | 11 +- ruby/red-arrow/lib/arrow/datum.rb | 98 + ruby/red-arrow/lib/arrow/loader.rb | 30 + ruby/red-arrow/lib/arrow/{buffer.rb => scalar.rb} | 18 +- .../buffer.rb => test/test-boolean-scalar.rb} | 14 +- .../red-arrow/test/test-float-scalar.rb| 56 +- ruby/red-arrow/test/test-function.rb | 176 ++ 54 files changed, 4400 insertions(+), 473 deletions(-) create mode 100644 c_glib/arrow-glib/scalar.cpp create mode 100644 c_glib/arrow-glib/scalar.h copy c_glib/arrow-glib/{datum.hpp => scalar.hpp} (55%) copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-binary-scalar.rb (58%) copy c_glib/test/{test-array-datum.rb => test-boolean-scalar.rb} (56%) copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-date32-scalar.rb (60%) copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-date64-scalar.rb (59%) copy c_glib/test/{test-array-datum.rb => test-decimal128-scalar.rb} (56%) copy c_glib/test/{test-array-datum.rb => test-decimal256-scalar.rb} (56%) copy c_glib/test/{test-array-datum.rb => test-dense-union-scalar.rb} (56%) copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-double-scalar.rb (58%) copy c_glib/test/{test-array-datum.rb => test-fixed-size-binary-scalar.rb} (56%) copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-float-scalar.rb (58%) copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-int16-scalar.rb (60%) copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-int32-scalar.rb (60%) copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-int64-scalar.rb
[arrow] branch master updated (7339bd5 -> 0e9285b)
This is an automated email from the ASF dual-hosted git repository. kou pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from 7339bd5 [GitHub] Add shorter GitHub repository description to .asf.yaml add 0e9285b ARROW-13053: [Python] Fix build issue with Homebrewed arrow library No new revisions were added by this update. Summary of changes: python/CMakeLists.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-)
[arrow] branch master updated (9162954 -> 7339bd5)
This is an automated email from the ASF dual-hosted git repository. wesm pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git. from 9162954 ARROW-13065: [Packaging][RPM] Add missing required LZ4 version information add 7339bd5 [GitHub] Add shorter GitHub repository description to .asf.yaml No new revisions were added by this update. Summary of changes: .asf.yaml | 4 1 file changed, 4 insertions(+)
[arrow-rs] branch master updated: remove clippy unnecessary wraps (#449)
This is an automated email from the ASF dual-hosted git repository. dheres pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git The following commit(s) were added to refs/heads/master by this push: new dc5507a remove clippy unnecessary wraps (#449) dc5507a is described below commit dc5507a996e197ba4856aaf00783e62be7bb4222 Author: Jiayu Liu AuthorDate: Sat Jun 12 20:59:35 2021 +0800 remove clippy unnecessary wraps (#449) --- arrow/src/compute/kernels/cast.rs | 8 1 file changed, 8 deletions(-) diff --git a/arrow/src/compute/kernels/cast.rs b/arrow/src/compute/kernels/cast.rs index 150f1f6..d755e1a 100644 --- a/arrow/src/compute/kernels/cast.rs +++ b/arrow/src/compute/kernels/cast.rs @@ -956,7 +956,6 @@ const EPOCH_DAYS_FROM_CE: i32 = 719_163; /// Arrays should have the same primitive data type, otherwise this should fail. /// We do not perform this check on primitive data types as we only use this /// function internally, where it is guaranteed to be infallible. -#[allow(clippy::unnecessary_wraps)] fn cast_array_data(array: , to_type: DataType) -> Result where TO: ArrowNumericType, @@ -974,7 +973,6 @@ where } /// Convert Array into a PrimitiveArray of type, and apply numeric cast -#[allow(clippy::unnecessary_wraps)] fn cast_numeric_arrays(from: ) -> Result where FROM: ArrowNumericType, @@ -1006,7 +1004,6 @@ where } /// Cast numeric types to Utf8 -#[allow(clippy::unnecessary_wraps)] fn cast_numeric_to_string(array: ) -> Result where FROM: ArrowNumericType, @@ -1035,7 +1032,6 @@ where } /// Cast numeric types to Utf8 -#[allow(clippy::unnecessary_wraps)] fn cast_string_to_numeric( from: , cast_options: , @@ -1101,7 +1097,6 @@ where } /// Casts generic string arrays to Date32Array -#[allow(clippy::unnecessary_wraps)] fn cast_string_to_date32( array: Array, cast_options: , @@ -1164,7 +1159,6 @@ fn cast_string_to_date32( } /// Casts generic string arrays to Date64Array -#[allow(clippy::unnecessary_wraps)] fn cast_string_to_date64( array: Array, cast_options: , @@ -1226,7 +1220,6 @@ fn cast_string_to_date64( } /// Casts generic string arrays to TimeStampNanosecondArray -#[allow(clippy::unnecessary_wraps)] fn cast_string_to_timestamp_ns( array: Array, cast_options: , @@ -1308,7 +1301,6 @@ where /// Cast Boolean types to numeric /// /// `false` returns 0 while `true` returns 1 -#[allow(clippy::unnecessary_wraps)] fn cast_bool_to_numeric( from: , cast_options: ,
[arrow-rs] branch master updated (71e9d78 -> f624153)
This is an automated email from the ASF dual-hosted git repository. alamb pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git. from 71e9d78 Implement faster arrow array reader (#384) add f624153 Remove DictionaryArray::keys_array method and replace usages by the keys method (#419) No new revisions were added by this update. Summary of changes: arrow/src/array/array_dictionary.rs | 21 +++-- arrow/src/array/builder.rs | 2 +- arrow/src/array/ord.rs | 4 ++-- arrow/src/compute/kernels/cast.rs | 6 -- arrow/src/compute/kernels/sort.rs | 4 ++-- arrow/src/compute/kernels/take.rs | 2 +- arrow/src/util/display.rs | 2 +- 7 files changed, 14 insertions(+), 27 deletions(-)
[arrow-datafusion] branch master updated: ShuffleReaderExec now supports multiple locations per partition (#541)
This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git The following commit(s) were added to refs/heads/master by this push: new 8f4078d ShuffleReaderExec now supports multiple locations per partition (#541) 8f4078d is described below commit 8f4078d83f7ea0348fa43906d26156bf8a95de4c Author: Andy Grove AuthorDate: Sat Jun 12 06:45:06 2021 -0600 ShuffleReaderExec now supports multiple locations per partition (#541) * ShuffleReaderExec now supports multiple locations per partition * Remove TODO * avoid clone --- ballista/rust/client/src/context.rs| 39 ++--- ballista/rust/core/proto/ballista.proto| 7 +- .../core/src/execution_plans/shuffle_reader.rs | 94 +- .../core/src/serde/physical_plan/from_proto.rs | 12 ++- .../rust/core/src/serde/physical_plan/to_proto.rs | 18 +++-- ballista/rust/core/src/utils.rs| 40 - ballista/rust/scheduler/src/planner.rs | 2 +- ballista/rust/scheduler/src/state/mod.rs | 6 +- 8 files changed, 130 insertions(+), 88 deletions(-) diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 4e5cc1a..695045d 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -29,21 +29,18 @@ use ballista_core::serde::protobuf::{ execute_query_params::Query, job_status, ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, }; +use ballista_core::utils::WrappedStream; use ballista_core::{ client::BallistaClient, datasource::DfTableAdapter, utils::create_datafusion_context, }; use datafusion::arrow::datatypes::Schema; -use datafusion::arrow::datatypes::SchemaRef; -use datafusion::arrow::error::Result as ArrowResult; -use datafusion::arrow::record_batch::RecordBatch; use datafusion::catalog::TableReference; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::csv::CsvReadOptions; use datafusion::{dataframe::DataFrame, physical_plan::RecordBatchStream}; use futures::future; -use futures::Stream; use futures::StreamExt; use log::{error, info}; @@ -74,32 +71,6 @@ impl BallistaContextState { } } -struct WrappedStream { -stream: Pin> + Send + Sync>>, -schema: SchemaRef, -} - -impl RecordBatchStream for WrappedStream { -fn schema() -> SchemaRef { -self.schema.clone() -} -} - -impl Stream for WrappedStream { -type Item = ArrowResult; - -fn poll_next( -mut self: Pin< Self>, -cx: std::task::Context<'_>, -) -> std::task::Poll> { -self.stream.poll_next_unpin(cx) -} - -fn size_hint() -> (usize, Option) { -self.stream.size_hint() -} -} - #[allow(dead_code)] pub struct BallistaContext { @@ -287,10 +258,10 @@ impl BallistaContext { .into_iter() .collect::>>()?; -let result = WrappedStream { -stream: Box::pin(futures::stream::iter(result).flatten()), -schema: Arc::new(schema), -}; +let result = WrappedStream::new( +Box::pin(futures::stream::iter(result).flatten()), +Arc::new(schema), +); break Ok(Box::pin(result)); } }; diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 85af902..5aafd00 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -489,10 +489,15 @@ message HashAggregateExecNode { } message ShuffleReaderExecNode { - repeated PartitionLocation partition_location = 1; + repeated ShuffleReaderPartition partition = 1; Schema schema = 2; } +message ShuffleReaderPartition { + // each partition of a shuffle read can read data from multiple locations + repeated PartitionLocation location = 1; +} + message GlobalLimitExecNode { PhysicalPlanNode input = 1; uint32 limit = 2; diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs index db29cf1..3a7f795 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::fmt::Formatter; use std::sync::Arc; use std::{any::Any, pin::Pin}; @@ -22,35 +23,35 @@ use crate::client::BallistaClient; use crate::memory_stream::MemoryStream; use crate::serde::scheduler::PartitionLocation; +use crate::utils::WrappedStream; use async_trait::async_trait; use