[arrow-rs] branch master updated: Implement the Iterator trait for the json Reader. (#451)

2021-06-12 Thread nevime
This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
 new e5cda31  Implement the Iterator trait for the json Reader. (#451)
e5cda31 is described below

commit e5cda312b697c3d610637b28c58b6f1b104b41cc
Author: Laurent Mazare 
AuthorDate: Sun Jun 13 08:22:38 2021 +0800

Implement the Iterator trait for the json Reader. (#451)

* Implement the Iterator trait for the json Reader.

* Use transpose.
---
 arrow/src/json/reader.rs | 39 +++
 1 file changed, 39 insertions(+)

diff --git a/arrow/src/json/reader.rs b/arrow/src/json/reader.rs
index d0b9c19..9235142 100644
--- a/arrow/src/json/reader.rs
+++ b/arrow/src/json/reader.rs
@@ -1569,6 +1569,14 @@ impl ReaderBuilder {
 }
 }
 
+impl Iterator for Reader {
+type Item = Result;
+
+fn next( self) -> Option {
+self.next().transpose()
+}
+}
+
 #[cfg(test)]
 mod tests {
 use crate::{
@@ -2946,4 +2954,35 @@ mod tests {
 assert_eq!(batch.num_columns(), 1);
 assert_eq!(batch.num_rows(), 3);
 }
+
+#[test]
+fn test_json_iterator() {
+let builder = 
ReaderBuilder::new().infer_schema(None).with_batch_size(5);
+let reader: Reader = builder
+.build::(File::open("test/data/basic.json").unwrap())
+.unwrap();
+let schema = reader.schema();
+let (col_a_index, _) = schema.column_with_name("a").unwrap();
+
+let mut sum_num_rows = 0;
+let mut num_batches = 0;
+let mut sum_a = 0;
+for batch in reader {
+let batch = batch.unwrap();
+assert_eq!(4, batch.num_columns());
+sum_num_rows += batch.num_rows();
+num_batches += 1;
+let batch_schema = batch.schema();
+assert_eq!(schema, batch_schema);
+let a_array = batch
+.column(col_a_index)
+.as_any()
+.downcast_ref::()
+.unwrap();
+sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::();
+}
+assert_eq!(12, sum_num_rows);
+assert_eq!(3, num_batches);
+assert_eq!(111, sum_a);
+}
 }


[arrow-rs] branch master updated: Add Decimal to CsvWriter and improve debug display (#406)

2021-06-12 Thread nevime
This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
 new fb45112  Add Decimal to CsvWriter and improve debug display (#406)
fb45112 is described below

commit fb451125c4ed49a425de10afb6f42af0d9723a19
Author: Ádám Lippai 
AuthorDate: Sun Jun 13 02:20:08 2021 +0200

Add Decimal to CsvWriter and improve debug display (#406)

* Add Decimal to CsvWriter and improve debug display

* Measure CSV writer instead of file and data creation

* Re-use decimal formatting
---
 arrow/benches/csv_writer.rs | 19 ++-
 arrow/src/array/array_binary.rs | 36 
 arrow/src/csv/writer.rs | 23 ---
 arrow/src/util/display.rs   | 27 ---
 4 files changed, 62 insertions(+), 43 deletions(-)

diff --git a/arrow/benches/csv_writer.rs b/arrow/benches/csv_writer.rs
index 50b94d6..62c5da9 100644
--- a/arrow/benches/csv_writer.rs
+++ b/arrow/benches/csv_writer.rs
@@ -28,14 +28,14 @@ use arrow::record_batch::RecordBatch;
 use std::fs::File;
 use std::sync::Arc;
 
-fn record_batches_to_csv() {
+fn criterion_benchmark(c:  Criterion) {
 #[cfg(feature = "csv")]
 {
 let schema = Schema::new(vec![
 Field::new("c1", DataType::Utf8, false),
 Field::new("c2", DataType::Float64, true),
 Field::new("c3", DataType::UInt32, false),
-Field::new("c3", DataType::Boolean, true),
+Field::new("c4", DataType::Boolean, true),
 ]);
 
 let c1 = StringArray::from(vec![
@@ -59,16 +59,17 @@ fn record_batches_to_csv() {
 let file = File::create("target/bench_write_csv.csv").unwrap();
 let mut writer = csv::Writer::new(file);
 let batches = vec![, , , , , , , , , , ];
-#[allow(clippy::unit_arg)]
-criterion::black_box(for batch in batches {
-writer.write(batch).unwrap()
+
+c.bench_function("record_batches_to_csv", |b| {
+b.iter(|| {
+#[allow(clippy::unit_arg)]
+criterion::black_box(for batch in  {
+writer.write(batch).unwrap()
+});
+});
 });
 }
 }
 
-fn criterion_benchmark(c:  Criterion) {
-c.bench_function("record_batches_to_csv", |b| 
b.iter(record_batches_to_csv));
-}
-
 criterion_group!(benches, criterion_benchmark);
 criterion_main!(benches);
diff --git a/arrow/src/array/array_binary.rs b/arrow/src/array/array_binary.rs
index 0cb4db4..0b374db 100644
--- a/arrow/src/array/array_binary.rs
+++ b/arrow/src/array/array_binary.rs
@@ -666,6 +666,17 @@ impl DecimalArray {
 self.length * i as i32
 }
 
+#[inline]
+pub fn value_as_string(, row: usize) -> String {
+let decimal_string = self.value(row).to_string();
+if self.scale == 0 {
+decimal_string
+} else {
+let splits = decimal_string.split_at(decimal_string.len() - 
self.scale);
+format!("{}.{}", splits.0, splits.1)
+}
+}
+
 pub fn from_fixed_size_list_array(
 v: FixedSizeListArray,
 precision: usize,
@@ -729,7 +740,9 @@ impl fmt::Debug for DecimalArray {
 fn fmt(, f:  fmt::Formatter) -> fmt::Result {
 write!(f, "DecimalArray<{}, {}>\n[\n", self.precision, self.scale)?;
 print_long_array(self, f, |array, index, f| {
-fmt::Debug::fmt((index), f)
+let formatted_decimal = array.value_as_string(index);
+
+write!(f, "{}", formatted_decimal)
 })?;
 write!(f, "]")
 }
@@ -758,7 +771,7 @@ impl Array for DecimalArray {
 #[cfg(test)]
 mod tests {
 use crate::{
-array::{LargeListArray, ListArray},
+array::{DecimalBuilder, LargeListArray, ListArray},
 datatypes::Field,
 };
 
@@ -1163,17 +1176,16 @@ mod tests {
 
 #[test]
 fn test_decimal_array_fmt_debug() {
-let values: [u8; 32] = [
-192, 219, 180, 17, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 36, 75, 
238, 253,
-255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
-];
-let array_data = ArrayData::builder(DataType::Decimal(23, 6))
-.len(2)
-.add_buffer(Buffer::from([..]))
-.build();
-let arr = DecimalArray::from(array_data);
+let values: Vec = vec![888700, -888700];
+let mut decimal_builder = DecimalBuilder::new(3, 23, 6);
+
+values.iter().for_each(|| {
+decimal_builder.append_value(value).unwrap();
+});
+decimal_builder.append_null().unwrap();
+let arr = decimal_builder.finish();
 assert_eq!(
-"DecimalArray<23, 6>\n[\n  888700,\n  -888700,\n]",
+"DecimalArray<23, 6>\n[\n  

[arrow-rs] branch master updated: remove unnecessary wraps in sortk (#445)

2021-06-12 Thread nevime
This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
 new efe86cd  remove unnecessary wraps in sortk (#445)
efe86cd is described below

commit efe86cdf329ec4bfad3b72bd23ee6558340fa297
Author: Jiayu Liu 
AuthorDate: Sun Jun 13 08:00:35 2021 +0800

remove unnecessary wraps in sortk (#445)
---
 arrow/src/compute/kernels/sort.rs | 96 +--
 1 file changed, 51 insertions(+), 45 deletions(-)

diff --git a/arrow/src/compute/kernels/sort.rs 
b/arrow/src/compute/kernels/sort.rs
index dff5695..b0eecb9 100644
--- a/arrow/src/compute/kernels/sort.rs
+++ b/arrow/src/compute/kernels/sort.rs
@@ -163,7 +163,7 @@ pub fn sort_to_indices(
 
 let (v, n) = partition_validity(values);
 
-match values.data_type() {
+Ok(match values.data_type() {
 DataType::Boolean => sort_boolean(values, v, n, , limit),
 DataType::Int8 => {
 sort_primitive::(values, v, n, cmp, , limit)
@@ -278,10 +278,12 @@ pub fn sort_to_indices(
 DataType::Float64 => {
 sort_list::(values, v, n, , limit)
 }
-t => Err(ArrowError::ComputeError(format!(
-"Sort not supported for list type {:?}",
-t
-))),
+t => {
+return Err(ArrowError::ComputeError(format!(
+"Sort not supported for list type {:?}",
+t
+)))
+}
 },
 DataType::LargeList(field) => match field.data_type() {
 DataType::Int8 => sort_list::(values, v, n, 
, limit),
@@ -304,10 +306,12 @@ pub fn sort_to_indices(
 DataType::Float64 => {
 sort_list::(values, v, n, , limit)
 }
-t => Err(ArrowError::ComputeError(format!(
-"Sort not supported for list type {:?}",
-t
-))),
+t => {
+return Err(ArrowError::ComputeError(format!(
+"Sort not supported for list type {:?}",
+t
+)))
+}
 },
 DataType::FixedSizeList(field, _) => match field.data_type() {
 DataType::Int8 => sort_list::(values, v, n, 
, limit),
@@ -330,10 +334,12 @@ pub fn sort_to_indices(
 DataType::Float64 => {
 sort_list::(values, v, n, , limit)
 }
-t => Err(ArrowError::ComputeError(format!(
-"Sort not supported for list type {:?}",
-t
-))),
+t => {
+return Err(ArrowError::ComputeError(format!(
+"Sort not supported for list type {:?}",
+t
+)))
+}
 },
 DataType::Dictionary(key_type, value_type)
 if *value_type.as_ref() == DataType::Utf8 =>
@@ -363,17 +369,21 @@ pub fn sort_to_indices(
 DataType::UInt64 => {
 sort_string_dictionary::(values, v, n, 
, limit)
 }
-t => Err(ArrowError::ComputeError(format!(
-"Sort not supported for dictionary key type {:?}",
-t
-))),
+t => {
+return Err(ArrowError::ComputeError(format!(
+"Sort not supported for dictionary key type {:?}",
+t
+)))
+}
 }
 }
-t => Err(ArrowError::ComputeError(format!(
-"Sort not supported for data type {:?}",
-t
-))),
-}
+t => {
+return Err(ArrowError::ComputeError(format!(
+"Sort not supported for data type {:?}",
+t
+)))
+}
+})
 }
 
 /// Options that define how sort kernels should behave
@@ -396,14 +406,13 @@ impl Default for SortOptions {
 }
 
 /// Sort primitive values
-#[allow(clippy::unnecessary_wraps)]
 fn sort_boolean(
 values: ,
 value_indices: Vec,
 null_indices: Vec,
 options: ,
 limit: Option,
-) -> Result {
+) -> UInt32Array {
 let values = values
 .as_any()
 .downcast_ref::()
@@ -469,11 +478,10 @@ fn sort_boolean(
 vec![],
 );
 
-Ok(UInt32Array::from(result_data))
+UInt32Array::from(result_data)
 }
 
 /// Sort primitive values
-#[allow(clippy::unnecessary_wraps)]
 fn sort_primitive(
 values: ,
 value_indices: Vec,
@@ -481,7 +489,7 @@ fn sort_primitive(
 cmp: F,
 options: ,
 limit: Option,
-) -> Result
+) -> UInt32Array
 where
 T: ArrowPrimitiveType,
 T::Native: std::cmp::PartialOrd,
@@ -549,7 +557,7 @@ where
 vec![],
 );
 
-Ok(UInt32Array::from(result_data))
+

[arrow] branch master updated (0e9285b -> 27d89a9)

2021-06-12 Thread kou
This is an automated email from the ASF dual-hosted git repository.

kou pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git.


from 0e9285b  ARROW-13053: [Python] Fix build issue with Homebrewed arrow 
library
 add 27d89a9  ARROW-12962: [GLib][Ruby] Add Arrow::Scalar

No new revisions were added by this update.

Summary of changes:
 c_glib/arrow-glib/arrow-glib.h |1 +
 c_glib/arrow-glib/arrow-glib.hpp   |1 +
 c_glib/arrow-glib/basic-array.cpp  |   19 +-
 c_glib/arrow-glib/basic-array.h|3 +
 c_glib/arrow-glib/datum.cpp|  151 ++
 c_glib/arrow-glib/datum.h  |   26 +-
 c_glib/arrow-glib/datum.hpp|3 +
 c_glib/arrow-glib/meson.build  |3 +
 c_glib/arrow-glib/scalar.cpp   | 2382 
 c_glib/arrow-glib/scalar.h |  678 ++
 c_glib/arrow-glib/{datum.hpp => scalar.hpp}|   31 +-
 .../arrow-dataset-glib/arrow-dataset-glib-docs.xml |4 +
 c_glib/doc/arrow-glib/arrow-glib-docs.xml  |4 +
 c_glib/test/run-test.rb|4 +
 c_glib/test/test-array-datum.rb|   12 +
 .../buffer.rb => c_glib/test/test-binary-scalar.rb |   34 +-
 ...{test-array-datum.rb => test-boolean-scalar.rb} |   44 +-
 .../buffer.rb => c_glib/test/test-date32-scalar.rb |   33 +-
 .../buffer.rb => c_glib/test/test-date64-scalar.rb |   33 +-
 ...st-array-datum.rb => test-decimal128-scalar.rb} |   40 +-
 ...st-array-datum.rb => test-decimal256-scalar.rb} |   40 +-
 ...t-array-datum.rb => test-dense-union-scalar.rb} |   44 +-
 .../buffer.rb => c_glib/test/test-double-scalar.rb |   35 +-
 ...y-datum.rb => test-fixed-size-binary-scalar.rb} |   41 +-
 .../buffer.rb => c_glib/test/test-float-scalar.rb  |   35 +-
 c_glib/test/test-function.rb   |   19 +
 .../buffer.rb => c_glib/test/test-int16-scalar.rb  |   32 +-
 .../buffer.rb => c_glib/test/test-int32-scalar.rb  |   32 +-
 .../buffer.rb => c_glib/test/test-int64-scalar.rb  |   32 +-
 .../buffer.rb => c_glib/test/test-int8-scalar.rb   |   32 +-
 .../test/test-large-binary-scalar.rb   |   34 +-
 .../test/test-large-string-scalar.rb   |   34 +-
 .../{test-array-datum.rb => test-list-scalar.rb}   |   38 +-
 c_glib/test/test-map-scalar.rb |   65 +
 .../buffer.rb => c_glib/test/test-null-scalar.rb   |   28 +-
 .../{test-array-datum.rb => test-scalar-datum.rb}  |   33 +-
 ...-array-datum.rb => test-sparse-union-scalar.rb} |   44 +-
 .../{test-array-datum.rb => test-string-scalar.rb} |   47 +-
 .../{test-array-datum.rb => test-struct-scalar.rb} |   47 +-
 .../buffer.rb => c_glib/test/test-time32-scalar.rb |   34 +-
 .../buffer.rb => c_glib/test/test-time64-scalar.rb |   34 +-
 ...est-array-datum.rb => test-timestamp-scalar.rb} |   40 +-
 .../buffer.rb => c_glib/test/test-uint16-scalar.rb |   32 +-
 .../buffer.rb => c_glib/test/test-uint32-scalar.rb |   32 +-
 .../buffer.rb => c_glib/test/test-uint64-scalar.rb |   32 +-
 .../buffer.rb => c_glib/test/test-uint8-scalar.rb  |   32 +-
 ruby/red-arrow/lib/arrow/buffer.rb |   16 +-
 ...er.rb => constructor-arguments-gc-guardable.rb} |   11 +-
 ruby/red-arrow/lib/arrow/datum.rb  |   98 +
 ruby/red-arrow/lib/arrow/loader.rb |   30 +
 ruby/red-arrow/lib/arrow/{buffer.rb => scalar.rb}  |   18 +-
 .../buffer.rb => test/test-boolean-scalar.rb}  |   14 +-
 .../red-arrow/test/test-float-scalar.rb|   56 +-
 ruby/red-arrow/test/test-function.rb   |  176 ++
 54 files changed, 4400 insertions(+), 473 deletions(-)
 create mode 100644 c_glib/arrow-glib/scalar.cpp
 create mode 100644 c_glib/arrow-glib/scalar.h
 copy c_glib/arrow-glib/{datum.hpp => scalar.hpp} (55%)
 copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-binary-scalar.rb 
(58%)
 copy c_glib/test/{test-array-datum.rb => test-boolean-scalar.rb} (56%)
 copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-date32-scalar.rb 
(60%)
 copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-date64-scalar.rb 
(59%)
 copy c_glib/test/{test-array-datum.rb => test-decimal128-scalar.rb} (56%)
 copy c_glib/test/{test-array-datum.rb => test-decimal256-scalar.rb} (56%)
 copy c_glib/test/{test-array-datum.rb => test-dense-union-scalar.rb} (56%)
 copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-double-scalar.rb 
(58%)
 copy c_glib/test/{test-array-datum.rb => test-fixed-size-binary-scalar.rb} 
(56%)
 copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-float-scalar.rb 
(58%)
 copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-int16-scalar.rb 
(60%)
 copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-int32-scalar.rb 
(60%)
 copy ruby/red-arrow/lib/arrow/buffer.rb => c_glib/test/test-int64-scalar.rb 

[arrow] branch master updated (7339bd5 -> 0e9285b)

2021-06-12 Thread kou
This is an automated email from the ASF dual-hosted git repository.

kou pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git.


from 7339bd5  [GitHub] Add shorter GitHub repository description to 
.asf.yaml
 add 0e9285b  ARROW-13053: [Python] Fix build issue with Homebrewed arrow 
library

No new revisions were added by this update.

Summary of changes:
 python/CMakeLists.txt | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)


[arrow] branch master updated (9162954 -> 7339bd5)

2021-06-12 Thread wesm
This is an automated email from the ASF dual-hosted git repository.

wesm pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git.


from 9162954  ARROW-13065: [Packaging][RPM] Add missing required LZ4 
version information
 add 7339bd5  [GitHub] Add shorter GitHub repository description to 
.asf.yaml

No new revisions were added by this update.

Summary of changes:
 .asf.yaml | 4 
 1 file changed, 4 insertions(+)


[arrow-rs] branch master updated: remove clippy unnecessary wraps (#449)

2021-06-12 Thread dheres
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
 new dc5507a  remove clippy unnecessary wraps (#449)
dc5507a is described below

commit dc5507a996e197ba4856aaf00783e62be7bb4222
Author: Jiayu Liu 
AuthorDate: Sat Jun 12 20:59:35 2021 +0800

remove clippy unnecessary wraps (#449)
---
 arrow/src/compute/kernels/cast.rs | 8 
 1 file changed, 8 deletions(-)

diff --git a/arrow/src/compute/kernels/cast.rs 
b/arrow/src/compute/kernels/cast.rs
index 150f1f6..d755e1a 100644
--- a/arrow/src/compute/kernels/cast.rs
+++ b/arrow/src/compute/kernels/cast.rs
@@ -956,7 +956,6 @@ const EPOCH_DAYS_FROM_CE: i32 = 719_163;
 /// Arrays should have the same primitive data type, otherwise this should 
fail.
 /// We do not perform this check on primitive data types as we only use this
 /// function internally, where it is guaranteed to be infallible.
-#[allow(clippy::unnecessary_wraps)]
 fn cast_array_data(array: , to_type: DataType) -> Result
 where
 TO: ArrowNumericType,
@@ -974,7 +973,6 @@ where
 }
 
 /// Convert Array into a PrimitiveArray of type, and apply numeric cast
-#[allow(clippy::unnecessary_wraps)]
 fn cast_numeric_arrays(from: ) -> Result
 where
 FROM: ArrowNumericType,
@@ -1006,7 +1004,6 @@ where
 }
 
 /// Cast numeric types to Utf8
-#[allow(clippy::unnecessary_wraps)]
 fn cast_numeric_to_string(array: ) -> 
Result
 where
 FROM: ArrowNumericType,
@@ -1035,7 +1032,6 @@ where
 }
 
 /// Cast numeric types to Utf8
-#[allow(clippy::unnecessary_wraps)]
 fn cast_string_to_numeric(
 from: ,
 cast_options: ,
@@ -1101,7 +1097,6 @@ where
 }
 
 /// Casts generic string arrays to Date32Array
-#[allow(clippy::unnecessary_wraps)]
 fn cast_string_to_date32(
 array:  Array,
 cast_options: ,
@@ -1164,7 +1159,6 @@ fn cast_string_to_date32(
 }
 
 /// Casts generic string arrays to Date64Array
-#[allow(clippy::unnecessary_wraps)]
 fn cast_string_to_date64(
 array:  Array,
 cast_options: ,
@@ -1226,7 +1220,6 @@ fn cast_string_to_date64(
 }
 
 /// Casts generic string arrays to TimeStampNanosecondArray
-#[allow(clippy::unnecessary_wraps)]
 fn cast_string_to_timestamp_ns(
 array:  Array,
 cast_options: ,
@@ -1308,7 +1301,6 @@ where
 /// Cast Boolean types to numeric
 ///
 /// `false` returns 0 while `true` returns 1
-#[allow(clippy::unnecessary_wraps)]
 fn cast_bool_to_numeric(
 from: ,
 cast_options: ,


[arrow-rs] branch master updated (71e9d78 -> f624153)

2021-06-12 Thread alamb
This is an automated email from the ASF dual-hosted git repository.

alamb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git.


from 71e9d78  Implement faster arrow array reader (#384)
 add f624153  Remove DictionaryArray::keys_array method and replace usages 
by the keys method (#419)

No new revisions were added by this update.

Summary of changes:
 arrow/src/array/array_dictionary.rs | 21 +++--
 arrow/src/array/builder.rs  |  2 +-
 arrow/src/array/ord.rs  |  4 ++--
 arrow/src/compute/kernels/cast.rs   |  6 --
 arrow/src/compute/kernels/sort.rs   |  4 ++--
 arrow/src/compute/kernels/take.rs   |  2 +-
 arrow/src/util/display.rs   |  2 +-
 7 files changed, 14 insertions(+), 27 deletions(-)


[arrow-datafusion] branch master updated: ShuffleReaderExec now supports multiple locations per partition (#541)

2021-06-12 Thread alamb
This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
 new 8f4078d  ShuffleReaderExec now supports multiple locations per 
partition (#541)
8f4078d is described below

commit 8f4078d83f7ea0348fa43906d26156bf8a95de4c
Author: Andy Grove 
AuthorDate: Sat Jun 12 06:45:06 2021 -0600

ShuffleReaderExec now supports multiple locations per partition (#541)

* ShuffleReaderExec now supports multiple locations per partition

* Remove TODO

* avoid clone
---
 ballista/rust/client/src/context.rs| 39 ++---
 ballista/rust/core/proto/ballista.proto|  7 +-
 .../core/src/execution_plans/shuffle_reader.rs | 94 +-
 .../core/src/serde/physical_plan/from_proto.rs | 12 ++-
 .../rust/core/src/serde/physical_plan/to_proto.rs  | 18 +++--
 ballista/rust/core/src/utils.rs| 40 -
 ballista/rust/scheduler/src/planner.rs |  2 +-
 ballista/rust/scheduler/src/state/mod.rs   |  6 +-
 8 files changed, 130 insertions(+), 88 deletions(-)

diff --git a/ballista/rust/client/src/context.rs 
b/ballista/rust/client/src/context.rs
index 4e5cc1a..695045d 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -29,21 +29,18 @@ use ballista_core::serde::protobuf::{
 execute_query_params::Query, job_status, ExecuteQueryParams, 
GetJobStatusParams,
 GetJobStatusResult,
 };
+use ballista_core::utils::WrappedStream;
 use ballista_core::{
 client::BallistaClient, datasource::DfTableAdapter, 
utils::create_datafusion_context,
 };
 
 use datafusion::arrow::datatypes::Schema;
-use datafusion::arrow::datatypes::SchemaRef;
-use datafusion::arrow::error::Result as ArrowResult;
-use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::catalog::TableReference;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::logical_plan::LogicalPlan;
 use datafusion::physical_plan::csv::CsvReadOptions;
 use datafusion::{dataframe::DataFrame, physical_plan::RecordBatchStream};
 use futures::future;
-use futures::Stream;
 use futures::StreamExt;
 use log::{error, info};
 
@@ -74,32 +71,6 @@ impl BallistaContextState {
 }
 }
 
-struct WrappedStream {
-stream: Pin> + Send + 
Sync>>,
-schema: SchemaRef,
-}
-
-impl RecordBatchStream for WrappedStream {
-fn schema() -> SchemaRef {
-self.schema.clone()
-}
-}
-
-impl Stream for WrappedStream {
-type Item = ArrowResult;
-
-fn poll_next(
-mut self: Pin< Self>,
-cx:  std::task::Context<'_>,
-) -> std::task::Poll> {
-self.stream.poll_next_unpin(cx)
-}
-
-fn size_hint() -> (usize, Option) {
-self.stream.size_hint()
-}
-}
-
 #[allow(dead_code)]
 
 pub struct BallistaContext {
@@ -287,10 +258,10 @@ impl BallistaContext {
 .into_iter()
 .collect::>>()?;
 
-let result = WrappedStream {
-stream: 
Box::pin(futures::stream::iter(result).flatten()),
-schema: Arc::new(schema),
-};
+let result = WrappedStream::new(
+Box::pin(futures::stream::iter(result).flatten()),
+Arc::new(schema),
+);
 break Ok(Box::pin(result));
 }
 };
diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 85af902..5aafd00 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -489,10 +489,15 @@ message HashAggregateExecNode {
 }
 
 message ShuffleReaderExecNode {
-  repeated PartitionLocation partition_location = 1;
+  repeated ShuffleReaderPartition partition = 1;
   Schema schema = 2;
 }
 
+message ShuffleReaderPartition {
+  // each partition of a shuffle read can read data from multiple locations
+  repeated PartitionLocation location = 1;
+}
+
 message GlobalLimitExecNode {
   PhysicalPlanNode input = 1;
   uint32 limit = 2;
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs 
b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index db29cf1..3a7f795 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::fmt::Formatter;
 use std::sync::Arc;
 use std::{any::Any, pin::Pin};
 
@@ -22,35 +23,35 @@ use crate::client::BallistaClient;
 use crate::memory_stream::MemoryStream;
 use crate::serde::scheduler::PartitionLocation;
 
+use crate::utils::WrappedStream;
 use async_trait::async_trait;
 use