This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new c494f88 ARROW-4540: [Rust] Basic JSON reader
c494f88 is described below
commit c494f88fc432d9de7bcb5aac86033f9f57d15d83
Author: Neville Dipale <[email protected]>
AuthorDate: Mon Feb 18 11:16:51 2019 -0700
ARROW-4540: [Rust] Basic JSON reader
Replaces #3624
___
An initial JSON reader for Rust, which reads JSON line-delimited files into
record batches.
Supports:
* schema inference and user-supplied schema
* projection by column names
* reading lists
* reading lists and scalars, converting scalars to single-value lists
Excluded in this PR are:
* Struct arrays
* Nesting in lists. The list or struct is replaced with null for now
* Extremely mixed types, e.g [int, bool, float, list(_)]
Author: Neville Dipale <[email protected]>
Closes #3685 from nevi-me/rust/json-reader2 and squashes the following
commits:
594d056 <Neville Dipale> ARROW-4540: Basic JSON reader
---
rust/arrow/src/csv/reader.rs | 2 +-
rust/arrow/src/datatypes.rs | 10 +-
rust/arrow/src/error.rs | 1 +
rust/arrow/src/{lib.rs => json/mod.rs} | 28 +-
rust/arrow/src/json/reader.rs | 1063 ++++++++++++++++++++++++++++++++
rust/arrow/src/lib.rs | 3 +
rust/arrow/test/data/arrays.json | 3 +
rust/arrow/test/data/basic.json | 12 +
rust/arrow/test/data/basic_nulls.json | 12 +
rust/arrow/test/data/mixed_arrays.json | 4 +
10 files changed, 1108 insertions(+), 30 deletions(-)
diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs
index 7d3b309..1be7fff 100644
--- a/rust/arrow/src/csv/reader.rs
+++ b/rust/arrow/src/csv/reader.rs
@@ -401,7 +401,7 @@ impl Default for ReaderBuilder {
impl ReaderBuilder {
/// Create a new builder for configuring CSV parsing options.
///
- /// To convert a builder into a reader, call `Reader::from_builder`
+ /// To convert a builder into a reader, call `ReaderBuilder::build`
///
/// # Example
///
diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs
index 3baa5b9..e5a0d0d 100644
--- a/rust/arrow/src/datatypes.rs
+++ b/rust/arrow/src/datatypes.rs
@@ -44,7 +44,7 @@ use crate::error::{ArrowError, Result};
/// Nested types can themselves be nested within other arrays.
/// For more information on these types please see
/// [here](https://arrow.apache.org/docs/memory_layout.html).
-#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
PartialOrd, Ord)]
pub enum DataType {
Boolean,
Int8,
@@ -68,13 +68,13 @@ pub enum DataType {
Struct(Vec<Field>),
}
-#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
PartialOrd, Ord)]
pub enum DateUnit {
Day,
Millisecond,
}
-#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
PartialOrd, Ord)]
pub enum TimeUnit {
Second,
Millisecond,
@@ -82,7 +82,7 @@ pub enum TimeUnit {
Nanosecond,
}
-#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
PartialOrd, Ord)]
pub enum IntervalUnit {
YearMonth,
DayTime,
@@ -91,7 +91,7 @@ pub enum IntervalUnit {
/// Contains the meta-data for a single relative type.
///
/// The `Schema` object is an ordered collection of `Field` objects.
-#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
PartialOrd, Ord)]
pub struct Field {
name: String,
data_type: DataType,
diff --git a/rust/arrow/src/error.rs b/rust/arrow/src/error.rs
index 58204a3..a981478 100644
--- a/rust/arrow/src/error.rs
+++ b/rust/arrow/src/error.rs
@@ -28,6 +28,7 @@ pub enum ArrowError {
ComputeError(String),
DivideByZero,
CsvError(String),
+ JsonError(String),
IoError(String),
}
diff --git a/rust/arrow/src/lib.rs b/rust/arrow/src/json/mod.rs
similarity index 54%
copy from rust/arrow/src/lib.rs
copy to rust/arrow/src/json/mod.rs
index ca06fc1..6a4dfc1 100644
--- a/rust/arrow/src/lib.rs
+++ b/rust/arrow/src/json/mod.rs
@@ -15,29 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-//! A native Rust implementation of [Apache Arrow](https://arrow.apache.org),
a cross-language
-//! development platform for in-memory data.
-//!
-//! Currently the project is developed and tested against nightly Rust. To
learn more
-//! about the status of Arrow in Rust, see `README.md`.
+//! Transfer data between the Arrow memory format and JSON line-delimited
records.
-#![feature(type_ascription)]
-#![feature(rustc_private)]
-#![feature(specialization)]
-#![feature(try_from)]
-#![allow(dead_code)]
-#![allow(non_camel_case_types)]
+pub mod reader;
-pub mod array;
-pub mod array_data;
-pub mod bitmap;
-pub mod buffer;
-pub mod builder;
-pub mod compute;
-pub mod csv;
-pub mod datatypes;
-pub mod error;
-pub mod memory;
-pub mod record_batch;
-pub mod tensor;
-pub mod util;
+pub use self::reader::Reader;
+pub use self::reader::ReaderBuilder;
diff --git a/rust/arrow/src/json/reader.rs b/rust/arrow/src/json/reader.rs
new file mode 100644
index 0000000..ab2f42f
--- /dev/null
+++ b/rust/arrow/src/json/reader.rs
@@ -0,0 +1,1063 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! JSON Reader
+//!
+//! This JSON reader allows JSON line-delimited files to be read into the
Arrow memory
+//! model. Records are loaded in batches and are then converted from row-based
data to
+//! columnar data.
+//!
+//! Example:
+//!
+//! ```
+//! use arrow::datatypes::{DataType, Field, Schema};
+//! use arrow::json;
+//! use std::fs::File;
+//! use std::io::BufReader;
+//! use std::sync::Arc;
+//!
+//! let schema = Schema::new(vec![
+//! Field::new("a", DataType::Float64, false),
+//! Field::new("b", DataType::Float64, false),
+//! Field::new("c", DataType::Float64, false),
+//! ]);
+//!
+//! let file = File::open("test/data/basic.json").unwrap();
+//!
+//! let mut json = json::Reader::new(BufReader::new(file), Arc::new(schema),
1024, None);
+//! let batch = json.next().unwrap().unwrap();
+//! ```
+
+use std::collections::{HashMap, HashSet};
+use std::fs::File;
+use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
+use std::sync::Arc;
+
+use serde_json::Value;
+
+use crate::array::*;
+use crate::builder::*;
+use crate::datatypes::*;
+use crate::error::{ArrowError, Result};
+use crate::record_batch::RecordBatch;
+
+/// Coerce data type during inference
+///
+/// * `Int64` and `Float64` should be `Float64`
+/// * Lists and scalars are coerced to a list of a compatible scalar
+/// * All other types are coerced to `Utf8`
+fn coerce_data_type(dt: Vec<&DataType>) -> Result<DataType> {
+ match dt.len() {
+ 1 => Ok(dt[0].clone()),
+ 2 => {
+ // there can be a case where a list and scalar both exist
+ if dt.contains(&&DataType::List(box DataType::Float64))
+ || dt.contains(&&DataType::List(box DataType::Int64))
+ || dt.contains(&&DataType::List(box DataType::Boolean))
+ || dt.contains(&&DataType::List(box DataType::Utf8))
+ {
+ // we have a list and scalars, so we should get the values and
coerce them
+ let mut dt = dt;
+ // sorting guarantees that the list will be the second value
+ dt.sort();
+ match (dt[0], dt[1]) {
+ (t1, DataType::List(box DataType::Float64)) => {
+ if t1 == &DataType::Float64 {
+ Ok(DataType::List(box DataType::Float64))
+ } else {
+ Ok(DataType::List(box coerce_data_type(vec![
+ t1,
+ &DataType::Float64,
+ ])?))
+ }
+ }
+ (t1, DataType::List(box DataType::Int64)) => {
+ if t1 == &DataType::Int64 {
+ Ok(DataType::List(box DataType::Int64))
+ } else {
+ Ok(DataType::List(box coerce_data_type(vec![
+ t1,
+ &DataType::Int64,
+ ])?))
+ }
+ }
+ (t1, DataType::List(box DataType::Boolean)) => {
+ if t1 == &DataType::Boolean {
+ Ok(DataType::List(box DataType::Boolean))
+ } else {
+ Ok(DataType::List(box coerce_data_type(vec![
+ t1,
+ &DataType::Boolean,
+ ])?))
+ }
+ }
+ (t1, DataType::List(box DataType::Utf8)) => {
+ if t1 == &DataType::Utf8 {
+ Ok(DataType::List(box DataType::Utf8))
+ } else {
+ dbg!(&t1);
+ Ok(DataType::List(box coerce_data_type(vec![
+ t1,
+ &DataType::Utf8,
+ ])?))
+ }
+ }
+ (t1 @ _, t2 @ _) => Err(ArrowError::JsonError(format!(
+ "Cannot coerce data types for {:?} and {:?}",
+ t1, t2
+ ))),
+ }
+ } else if dt.contains(&&DataType::Float64) &&
dt.contains(&&DataType::Int64) {
+ Ok(DataType::Float64)
+ } else {
+ Ok(DataType::Utf8)
+ }
+ }
+ _ => {
+ // TODO(nevi_me) It's possible to have [float, int, list(float)],
which should
+ // return list(float). Will hash this out later
+ Ok(DataType::List(box DataType::Utf8))
+ }
+ }
+}
+
+/// Generate schema from JSON field names and inferred data types
+fn generate_schema(spec: HashMap<String, HashSet<DataType>>) ->
Result<Arc<Schema>> {
+ let fields: Result<Vec<Field>> = spec
+ .iter()
+ .map(|(k, hs)| {
+ let v: Vec<&DataType> = hs.iter().collect();
+ match coerce_data_type(v) {
+ Ok(t) => Ok(Field::new(k, t, true)),
+ Err(e) => Err(e),
+ }
+ })
+ .collect();
+ match fields {
+ Ok(fields) => {
+ let schema = Schema::new(fields);
+ Ok(Arc::new(schema))
+ }
+ Err(e) => Err(e),
+ }
+}
+
+/// Infer the fields of a JSON file by reading the first n records of the
file, with
+/// `max_read_records` controlling the maximum number of records to read.
+///
+/// If `max_read_records` is not set, the whole file is read to infer its
field types.
+fn infer_json_schema(file: File, max_read_records: Option<usize>) ->
Result<Arc<Schema>> {
+ let mut values: HashMap<String, HashSet<DataType>> = HashMap::new();
+ let mut reader = BufReader::new(file.try_clone()?);
+
+ let mut line = String::new();
+ for _ in 0..max_read_records.unwrap_or(std::usize::MAX) {
+ &reader.read_line(&mut line)?;
+ if line.is_empty() {
+ break;
+ }
+ let record: Value = serde_json::from_str(&line.trim()).expect("Not
valid JSON");
+
+ line = String::new();
+
+ match record {
+ Value::Object(map) => {
+ let res = map
+ .iter()
+ .map(|(k, v)| {
+ match v {
+ Value::Array(a) => {
+ // collect the data types in array
+ let types: Result<Vec<Option<&DataType>>> = a
+ .iter()
+ .map(|a| match a {
+ Value::Null => Ok(None),
+ Value::Number(n) => {
+ if n.is_i64() {
+ Ok(Some(&DataType::Int64))
+ } else {
+ Ok(Some(&DataType::Float64))
+ }
+ }
+ Value::Bool(_) =>
Ok(Some(&DataType::Boolean)),
+ Value::String(_) =>
Ok(Some(&DataType::Utf8)),
+ Value::Array(_) | Value::Object(_) => {
+ Err(ArrowError::JsonError(
+ "Nested lists and structs not
supported"
+ .to_string(),
+ ))
+ }
+ })
+ .collect();
+ match types {
+ Ok(types) => {
+ // unwrap the Option and discard None
values (from
+ // JSON nulls)
+ let mut types: Vec<&DataType> =
+ types.into_iter().filter_map(|t|
t).collect();
+ types.dedup();
+ // if a record contains only nulls, it
is not
+ // added to values
+ if !types.is_empty() {
+ let dt = coerce_data_type(types)?;
+
+ if values.contains_key(k) {
+ let x =
values.get_mut(k).unwrap();
+ x.insert(DataType::List(box
dt));
+ } else {
+ // create hashset and add
value type
+ let mut hs = HashSet::new();
+ hs.insert(DataType::List(box
dt));
+ values.insert(k.to_string(),
hs);
+ }
+ }
+ Ok(())
+ }
+ Err(e) => Err(e),
+ }
+ }
+ Value::Bool(_) => {
+ if values.contains_key(k) {
+ let x = values.get_mut(k).unwrap();
+ x.insert(DataType::Boolean);
+ } else {
+ // create hashset and add value type
+ let mut hs = HashSet::new();
+ hs.insert(DataType::Boolean);
+ values.insert(k.to_string(), hs);
+ }
+ Ok(())
+ }
+ Value::Null => {
+ // do nothing, we treat json as nullable by
default when
+ // inferring
+ Ok(())
+ }
+ Value::Number(n) => {
+ if n.is_f64() {
+ if values.contains_key(k) {
+ let x = values.get_mut(k).unwrap();
+ x.insert(DataType::Float64);
+ } else {
+ // create hashset and add value type
+ let mut hs = HashSet::new();
+ hs.insert(DataType::Float64);
+ values.insert(k.to_string(), hs);
+ }
+ } else {
+ // default to i64
+ if values.contains_key(k) {
+ let x = values.get_mut(k).unwrap();
+ x.insert(DataType::Int64);
+ } else {
+ // create hashset and add value type
+ let mut hs = HashSet::new();
+ hs.insert(DataType::Int64);
+ values.insert(k.to_string(), hs);
+ }
+ }
+ Ok(())
+ }
+ Value::String(_) => {
+ if values.contains_key(k) {
+ let x = values.get_mut(k).unwrap();
+ x.insert(DataType::Utf8);
+ } else {
+ // create hashset and add value type
+ let mut hs = HashSet::new();
+ hs.insert(DataType::Utf8);
+ values.insert(k.to_string(), hs);
+ }
+ Ok(())
+ }
+ Value::Object(_) => Err(ArrowError::JsonError(
+ "Reading nested JSON structes currently not
supported"
+ .to_string(),
+ )),
+ }
+ })
+ .collect();
+ match res {
+ Ok(()) => {}
+ Err(e) => return Err(e),
+ }
+ }
+ t @ _ => {
+ return Err(ArrowError::JsonError(format!(
+ "Expected JSON record to be an object, found {:?}",
+ t
+ )));
+ }
+ };
+ }
+
+ let schema = generate_schema(values)?;
+
+ // return the reader seek back to the start
+ &reader.into_inner().seek(SeekFrom::Start(0))?;
+
+ Ok(schema)
+}
+
+/// JSON file reader
+pub struct Reader<R: Read> {
+ /// Explicit schema for the JSON file
+ schema: Arc<Schema>,
+ /// Optional projection for which columns to load (case-sensitive names)
+ projection: Option<Vec<String>>,
+ /// File reader
+ reader: BufReader<R>,
+ /// Batch size (number of records to load each time)
+ batch_size: usize,
+}
+
+impl<R: Read> Reader<R> {
+ /// Create a new JSON Reader from any value that implements the `Read`
trait.
+ ///
+ /// If reading a `File`, you can customise the Reader, such as to enable
schema
+ /// inference, use `ReaderBuilder`.
+ pub fn new(
+ reader: BufReader<R>,
+ schema: Arc<Schema>,
+ batch_size: usize,
+ projection: Option<Vec<String>>,
+ ) -> Self {
+ Self {
+ schema,
+ projection,
+ reader,
+ batch_size,
+ }
+ }
+
+ /// Read the next batch of records
+ pub fn next(&mut self) -> Result<Option<RecordBatch>> {
+ let mut rows: Vec<Value> = Vec::with_capacity(self.batch_size);
+ let mut line = String::new();
+ for _ in 0..self.batch_size {
+ self.reader.read_line(&mut line)?;
+ if !line.is_empty() {
+ rows.push(serde_json::from_str(&line).expect("Not valid
JSON"));
+ line = String::new();
+ } else {
+ break;
+ }
+ }
+
+ let rows = &rows[..];
+ let projection = self.projection.clone().unwrap_or(vec![]);
+ let arrays: Result<Vec<ArrayRef>> = self
+ .schema
+ .clone()
+ .fields()
+ .iter()
+ .filter(|field| {
+ if projection.is_empty() {
+ return true;
+ }
+ projection.contains(field.name())
+ })
+ .map(|field| {
+ match field.data_type().clone() {
+ DataType::Boolean => self.build_boolean_array(rows,
field.name()),
+ DataType::Float64 => {
+ self.build_primitive_array::<Float64Type>(rows,
field.name())
+ }
+ DataType::Float32 => {
+ self.build_primitive_array::<Float32Type>(rows,
field.name())
+ }
+ DataType::Int64 =>
self.build_primitive_array::<Int64Type>(rows, field.name()),
+ DataType::Int32 =>
self.build_primitive_array::<Int32Type>(rows, field.name()),
+ DataType::Int16 =>
self.build_primitive_array::<Int16Type>(rows, field.name()),
+ DataType::Int8 =>
self.build_primitive_array::<Int8Type>(rows, field.name()),
+ DataType::UInt64 => {
+ self.build_primitive_array::<UInt64Type>(rows,
field.name())
+ }
+ DataType::UInt32 => {
+ self.build_primitive_array::<UInt32Type>(rows,
field.name())
+ }
+ DataType::UInt16 => {
+ self.build_primitive_array::<UInt16Type>(rows,
field.name())
+ }
+ DataType::UInt8 =>
self.build_primitive_array::<UInt8Type>(rows, field.name()),
+ DataType::Utf8 => {
+ let mut builder = BinaryBuilder::new(rows.len());
+ for row_index in 0..rows.len() {
+ match rows[row_index].get(field.name()) {
+ Some(value) => {
+ match value.as_str() {
+ Some(v) => builder.append_string(v)?,
+ // TODO: value might exist as
something else, coerce so we don't lose it
+ None => builder.append(false)?,
+ }
+ }
+ None => builder.append(false)?,
+ }
+ }
+ Ok(Arc::new(builder.finish()) as ArrayRef)
+ }
+ DataType::List(ref t) => match t {
+ box DataType::Int8 =>
self.build_list_array::<Int8Type>(rows, field.name()),
+ box DataType::Int16 =>
self.build_list_array::<Int16Type>(rows, field.name()),
+ box DataType::Int32 =>
self.build_list_array::<Int32Type>(rows, field.name()),
+ box DataType::Int64 =>
self.build_list_array::<Int64Type>(rows, field.name()),
+ box DataType::UInt8 =>
self.build_list_array::<UInt8Type>(rows, field.name()),
+ box DataType::UInt16 =>
self.build_list_array::<UInt16Type>(rows, field.name()),
+ box DataType::UInt32 =>
self.build_list_array::<UInt32Type>(rows, field.name()),
+ box DataType::UInt64 =>
self.build_list_array::<UInt64Type>(rows, field.name()),
+ box DataType::Float32 =>
self.build_list_array::<Float32Type>(rows, field.name()),
+ box DataType::Float64 =>
self.build_list_array::<Float64Type>(rows, field.name()),
+ box DataType::Boolean =>
self.build_boolean_list_array(rows, field.name()),
+ box DataType::Utf8 => {
+ let values_builder = BinaryBuilder::new(rows.len()
* 5);
+ let mut builder = ListBuilder::new(values_builder);
+ for row_index in 0..rows.len() {
+ match rows[row_index].get(field.name()) {
+ Some(value) => {
+ // value can be an array or a scalar
+ let vals: Vec<Option<String>> = if let
Value::String(v) = value {
+ vec![Some(v.to_string())]
+ } else if let Value::Array(n) = value {
+ n.iter().map(|v: &Value| {
+ if v.is_string() {
+
Some(v.as_str().unwrap().to_string())
+ } else if v.is_array() ||
v.is_object() {
+ // implicitly drop nested
values
+ // TODO support
deep-nesting
+ None
+ } else {
+ Some(v.to_string())
+ }
+ }).collect()
+ } else if let Value::Null = value {
+ vec![None]
+ } else {
+ if !value.is_object() {
+ vec![Some(value.to_string())]
+ } else {
+ return
Err(ArrowError::JsonError("1Only scalars are currently supported in JSON
arrays".to_string()))
+ }
+ };
+ for i in 0..vals.len() {
+ match &vals[i] {
+ Some(v) =>
builder.values().append_string(&v)?,
+ None =>
builder.values().append_null()?,
+ };
+ }
+ }
+ None => {}
+ }
+ builder.append(true)?
+ }
+ Ok(Arc::new(builder.finish()) as ArrayRef)
+ }
+ _ => return Err(ArrowError::JsonError("Data type is
currently not supported in a list".to_string())),
+ },
+ _ => return Err(ArrowError::JsonError("struct types are
not yet supported".to_string())),
+ }
+ })
+ .collect();
+
+ match arrays {
+ Ok(arr) => Ok(Some(RecordBatch::new(self.schema.clone(), arr))),
+ Err(e) => Err(e),
+ }
+ }
+
+ fn build_boolean_array(&self, rows: &[Value], col_name: &str) ->
Result<ArrayRef> {
+ let mut builder = BooleanBuilder::new(rows.len());
+ for row_index in 0..rows.len() {
+ match rows[row_index].get(col_name) {
+ Some(value) => match value.as_bool() {
+ Some(v) => builder.append_value(v)?,
+ None => builder.append_null()?,
+ },
+ None => {
+ builder.append_null()?;
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ }
+
+ fn build_boolean_list_array(
+ &self,
+ rows: &[Value],
+ col_name: &str,
+ ) -> Result<ArrayRef> {
+ let values_builder = BooleanBuilder::new(rows.len() * 5);
+ let mut builder = ListBuilder::new(values_builder);
+ for row_index in 0..rows.len() {
+ match rows[row_index].get(col_name) {
+ Some(value) => {
+ // value can be an array or a scalar
+ let vals: Vec<Option<bool>> = if let Value::Bool(v) =
value {
+ vec![Some(*v)]
+ } else if let Value::Array(n) = value {
+ n.iter().map(|v: &Value| v.as_bool()).collect()
+ } else if let Value::Null = value {
+ vec![None]
+ } else {
+ return Err(ArrowError::JsonError(
+ "2Only scalars are currently supported in JSON
arrays"
+ .to_string(),
+ ));
+ };
+ for i in 0..vals.len() {
+ match vals[i] {
+ Some(v) => builder.values().append_value(v)?,
+ None => builder.values().append_null()?,
+ };
+ }
+ }
+ None => {}
+ }
+ builder.append(true)?
+ }
+ Ok(Arc::new(builder.finish()))
+ }
+
+ fn build_primitive_array<T: ArrowPrimitiveType>(
+ &self,
+ rows: &[Value],
+ col_name: &str,
+ ) -> Result<ArrayRef>
+ where
+ T: ArrowNumericType,
+ T::Native: num::NumCast,
+ {
+ let mut builder = PrimitiveBuilder::<T>::new(rows.len());
+ for row_index in 0..rows.len() {
+ match rows[row_index].get(col_name) {
+ Some(value) => {
+ // check that value is of expected datatype
+ match value.as_f64() {
+ Some(v) => match num::cast::cast(v) {
+ Some(v) => builder.append_value(v)?,
+ None => builder.append_null()?,
+ },
+ None => builder.append_null()?,
+ }
+ }
+ None => {
+ builder.append_null()?;
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ }
+
+ fn build_list_array<T: ArrowPrimitiveType>(
+ &self,
+ rows: &[Value],
+ col_name: &str,
+ ) -> Result<ArrayRef>
+ where
+ T::Native: num::NumCast,
+ {
+ let values_builder: PrimitiveBuilder<T> =
PrimitiveBuilder::new(rows.len());
+ let mut builder = ListBuilder::new(values_builder);
+ for row_index in 0..rows.len() {
+ match rows[row_index].get(col_name) {
+ Some(value) => {
+ // value can be an array or a scalar
+ let vals: Vec<Option<f64>> = if let Value::Number(value) =
value {
+ vec![value.as_f64()]
+ } else if let Value::Array(n) = value {
+ n.iter().map(|v: &Value| v.as_f64()).collect()
+ } else if let Value::Null = value {
+ vec![None]
+ } else {
+ return Err(ArrowError::JsonError(
+ "3Only scalars are currently supported in JSON
arrays"
+ .to_string(),
+ ));
+ };
+ for i in 0..vals.len() {
+ match vals[i] {
+ Some(v) => match num::cast::cast(v) {
+ Some(v) => builder.values().append_value(v)?,
+ None => builder.values().append_null()?,
+ },
+ None => builder.values().append_null()?,
+ };
+ }
+ }
+ None => {}
+ }
+ builder.append(true)?
+ }
+ Ok(Arc::new(builder.finish()))
+ }
+}
+
+/// JSON file reader builder
+pub struct ReaderBuilder {
+ /// Optional schema for the JSON file
+ ///
+ /// If the schema is not supplied, the reader will try to infer the schema
+ /// based on the JSON structure.
+ schema: Option<Arc<Schema>>,
+ /// Optional maximum number of records to read during schema inference
+ ///
+ /// If a number is not provided, all the records are read.
+ max_records: Option<usize>,
+ /// Batch size (number of records to load each time)
+ ///
+ /// The default batch size when using the `ReaderBuilder` is 1024 records
+ batch_size: usize,
+ /// Optional projection for which columns to load (zero-based column
indices)
+ projection: Option<Vec<String>>,
+}
+
+impl Default for ReaderBuilder {
+ fn default() -> Self {
+ Self {
+ schema: None,
+ max_records: None,
+ batch_size: 1024,
+ projection: None,
+ }
+ }
+}
+
+impl ReaderBuilder {
+ /// Create a new builder for configuring JSON parsing options.
+ ///
+ /// To convert a builder into a reader, call `Reader::from_builder`
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// extern crate arrow;
+ ///
+ /// use arrow::json;
+ /// use std::fs::File;
+ ///
+ /// fn example() -> json::Reader<File> {
+ /// let file = File::open("test/data/basic.json").unwrap();
+ ///
+ /// // create a builder, inferring the schema with the first 100
records
+ /// let builder = json::ReaderBuilder::new().infer_schema(Some(100));
+ ///
+ /// let reader = builder.build::<File>(file).unwrap();
+ ///
+ /// reader
+ /// }
+ /// ```
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Set the JSON file's schema
+ pub fn with_schema(mut self, schema: Arc<Schema>) -> Self {
+ self.schema = Some(schema);
+ self
+ }
+
+ /// Set the JSON reader to infer the schema of the file
+ pub fn infer_schema(mut self, max_records: Option<usize>) -> Self {
+ // remove any schema that is set
+ self.schema = None;
+ self.max_records = max_records;
+ self
+ }
+
+ /// Set the batch size (number of records to load at one time)
+ pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+ self.batch_size = batch_size;
+ self
+ }
+
+ /// Set the reader's column projection
+ pub fn with_projection(mut self, projection: Vec<String>) -> Self {
+ self.projection = Some(projection);
+ self
+ }
+
+ /// Create a new `Reader` from the `ReaderBuilder`
+ pub fn build<R: Read>(self, file: File) -> Result<Reader<File>> {
+ // check if schema should be inferred
+ let schema = match self.schema {
+ Some(schema) => schema,
+ None => {
+ let inferred = infer_json_schema(file.try_clone()?,
self.max_records)?;
+
+ inferred
+ }
+ };
+ let buf_reader = BufReader::new(file);
+ Ok(Reader::new(
+ buf_reader,
+ schema,
+ self.batch_size,
+ self.projection,
+ ))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_json_basic() {
+ let builder =
ReaderBuilder::new().infer_schema(None).with_batch_size(64);
+ let mut reader: Reader<File> = builder
+ .build::<File>(File::open("test/data/basic.json").unwrap())
+ .unwrap();
+ let batch = reader.next().unwrap().unwrap();
+
+ assert_eq!(4, batch.num_columns());
+ assert_eq!(12, batch.num_rows());
+
+ let schema = batch.schema();
+
+ let a = schema.column_with_name("a").unwrap();
+ assert_eq!(&DataType::Int64, a.1.data_type());
+ let b = schema.column_with_name("b").unwrap();
+ assert_eq!(&DataType::Float64, b.1.data_type());
+ let c = schema.column_with_name("c").unwrap();
+ assert_eq!(&DataType::Boolean, c.1.data_type());
+ let d = schema.column_with_name("d").unwrap();
+ assert_eq!(&DataType::Utf8, d.1.data_type());
+
+ let aa = batch
+ .column(a.0)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap();
+ assert_eq!(1, aa.value(0));
+ assert_eq!(-10, aa.value(1));
+ let bb = batch
+ .column(b.0)
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap();
+ assert_eq!(2.0, bb.value(0));
+ assert_eq!(-3.5, bb.value(1));
+ let cc = batch
+ .column(c.0)
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .unwrap();
+ assert_eq!(false, cc.value(0));
+ assert_eq!(true, cc.value(10));
+ let dd = batch
+ .column(d.0)
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .unwrap();
+ assert_eq!("4", String::from_utf8(dd.value(0).to_vec()).unwrap());
+ assert_eq!("text", String::from_utf8(dd.value(8).to_vec()).unwrap());
+ }
+
+ #[test]
+ fn test_json_basic_with_nulls() {
+ let builder =
ReaderBuilder::new().infer_schema(None).with_batch_size(64);
+ let mut reader: Reader<File> = builder
+ .build::<File>(File::open("test/data/basic_nulls.json").unwrap())
+ .unwrap();
+ let batch = reader.next().unwrap().unwrap();
+
+ assert_eq!(4, batch.num_columns());
+ assert_eq!(12, batch.num_rows());
+
+ let schema = batch.schema();
+
+ let a = schema.column_with_name("a").unwrap();
+ assert_eq!(&DataType::Int64, a.1.data_type());
+ let b = schema.column_with_name("b").unwrap();
+ assert_eq!(&DataType::Float64, b.1.data_type());
+ let c = schema.column_with_name("c").unwrap();
+ assert_eq!(&DataType::Boolean, c.1.data_type());
+ let d = schema.column_with_name("d").unwrap();
+ assert_eq!(&DataType::Utf8, d.1.data_type());
+
+ let aa = batch
+ .column(a.0)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap();
+ assert_eq!(true, aa.is_valid(0));
+ assert_eq!(false, aa.is_valid(1));
+ assert_eq!(false, aa.is_valid(11));
+ let bb = batch
+ .column(b.0)
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap();
+ assert_eq!(true, bb.is_valid(0));
+ assert_eq!(false, bb.is_valid(2));
+ assert_eq!(false, bb.is_valid(11));
+ let cc = batch
+ .column(c.0)
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .unwrap();
+ assert_eq!(true, cc.is_valid(0));
+ assert_eq!(false, cc.is_valid(4));
+ assert_eq!(false, cc.is_valid(11));
+ let dd = batch
+ .column(d.0)
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .unwrap();
+ assert_eq!(false, dd.is_valid(0));
+ assert_eq!(true, dd.is_valid(1));
+ assert_eq!(false, dd.is_valid(4));
+ assert_eq!(false, dd.is_valid(11));
+ }
+
+ #[test]
+ fn test_json_basic_schema() {
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Float32, false),
+ Field::new("c", DataType::Boolean, false),
+ Field::new("d", DataType::Utf8, false),
+ ]);
+
+ let mut reader: Reader<File> = Reader::new(
+ BufReader::new(File::open("test/data/basic.json").unwrap()),
+ Arc::new(schema),
+ 1024,
+ None,
+ );
+ let batch = reader.next().unwrap().unwrap();
+
+ assert_eq!(4, batch.num_columns());
+ assert_eq!(12, batch.num_rows());
+
+ let schema = batch.schema();
+
+ let a = schema.column_with_name("a").unwrap();
+ assert_eq!(&DataType::Int32, a.1.data_type());
+ let b = schema.column_with_name("b").unwrap();
+ assert_eq!(&DataType::Float32, b.1.data_type());
+ let c = schema.column_with_name("c").unwrap();
+ assert_eq!(&DataType::Boolean, c.1.data_type());
+ let d = schema.column_with_name("d").unwrap();
+ assert_eq!(&DataType::Utf8, d.1.data_type());
+
+ let aa = batch
+ .column(a.0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(1, aa.value(0));
+ // test that a 64bit value is returned as null due to overflowing
+ assert_eq!(false, aa.is_valid(11));
+ let bb = batch
+ .column(b.0)
+ .as_any()
+ .downcast_ref::<Float32Array>()
+ .unwrap();
+ assert_eq!(2.0, bb.value(0));
+ assert_eq!(-3.5, bb.value(1));
+ }
+
+ #[test]
+ fn test_json_basic_schema_projection() {
+ // We test implicit and explicit projection:
+ // Implicit: omitting fields from a schema
+ // Explicit: supplying a vec of fields to take
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Float32, false),
+ Field::new("c", DataType::Boolean, false),
+ ]);
+
+ let mut reader: Reader<File> = Reader::new(
+ BufReader::new(File::open("test/data/basic.json").unwrap()),
+ Arc::new(schema),
+ 1024,
+ Some(vec!["a".to_string(), "c".to_string()]),
+ );
+ let batch = reader.next().unwrap().unwrap();
+
+ assert_eq!(2, batch.num_columns());
+ assert_eq!(12, batch.num_rows());
+
+ let schema = batch.schema();
+
+ let a = schema.column_with_name("a").unwrap();
+ assert_eq!(&DataType::Int32, a.1.data_type());
+ let c = schema.column_with_name("c").unwrap();
+ assert_eq!(&DataType::Boolean, c.1.data_type());
+ }
+
+ #[test]
+ fn test_json_arrays() {
+ let builder =
ReaderBuilder::new().infer_schema(None).with_batch_size(64);
+ let mut reader: Reader<File> = builder
+ .build::<File>(File::open("test/data/arrays.json").unwrap())
+ .unwrap();
+ let batch = reader.next().unwrap().unwrap();
+
+ assert_eq!(4, batch.num_columns());
+ assert_eq!(3, batch.num_rows());
+
+ let schema = batch.schema();
+
+ let a = schema.column_with_name("a").unwrap();
+ assert_eq!(&DataType::Int64, a.1.data_type());
+ let b = schema.column_with_name("b").unwrap();
+ assert_eq!(
+ &DataType::List(Box::new(DataType::Float64)),
+ b.1.data_type()
+ );
+ let c = schema.column_with_name("c").unwrap();
+ assert_eq!(
+ &DataType::List(Box::new(DataType::Boolean)),
+ c.1.data_type()
+ );
+ let d = schema.column_with_name("d").unwrap();
+ assert_eq!(&DataType::Utf8, d.1.data_type());
+
+ let aa = batch
+ .column(a.0)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap();
+ assert_eq!(1, aa.value(0));
+ assert_eq!(-10, aa.value(1));
+ let bb = batch
+ .column(b.0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ let bb = bb.values();
+ let bb = bb.as_any().downcast_ref::<Float64Array>().unwrap();
+ assert_eq!(9, bb.len());
+ assert_eq!(2.0, bb.value(0));
+ assert_eq!(-6.1, bb.value(5));
+ assert_eq!(false, bb.is_valid(7));
+
+ let cc = batch
+ .column(c.0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ let cc = cc.values();
+ let cc = cc.as_any().downcast_ref::<BooleanArray>().unwrap();
+ assert_eq!(6, cc.len());
+ assert_eq!(false, cc.value(0));
+ assert_eq!(false, cc.value(4));
+ assert_eq!(false, cc.is_valid(5));
+ }
+
+ #[test]
+ #[should_panic(expected = "Not valid JSON")]
+ fn test_invalid_file() {
+ let builder =
ReaderBuilder::new().infer_schema(None).with_batch_size(64);
+ let mut reader: Reader<File> = builder
+
.build::<File>(File::open("test/data/uk_cities_with_headers.csv").unwrap())
+ .unwrap();
+ let _batch = reader.next().unwrap().unwrap();
+ }
+
+ #[test]
+ fn test_coersion_scalar_and_list() {
+ use crate::datatypes::DataType::*;
+
+ assert_eq!(
+ List(box Float64),
+ coerce_data_type(vec![&Float64, &List(box Float64)]).unwrap()
+ );
+ assert_eq!(
+ List(box Float64),
+ coerce_data_type(vec![&Float64, &List(box Int64)]).unwrap()
+ );
+ assert_eq!(
+ List(box Int64),
+ coerce_data_type(vec![&Int64, &List(box Int64)]).unwrap()
+ );
+ // boolean an number are incompatible, return utf8
+ assert_eq!(
+ List(box Utf8),
+ coerce_data_type(vec![&Boolean, &List(box Float64)]).unwrap()
+ );
+ }
+
+ #[test]
+ fn test_mixed_json_arrays() {
+ let builder =
ReaderBuilder::new().infer_schema(None).with_batch_size(64);
+ let mut reader: Reader<File> = builder
+ .build::<File>(File::open("test/data/mixed_arrays.json").unwrap())
+ .unwrap();
+ let batch = reader.next().unwrap().unwrap();
+
+ assert_eq!(4, batch.num_columns());
+ assert_eq!(4, batch.num_rows());
+
+ let schema = batch.schema();
+
+ let a = schema.column_with_name("a").unwrap();
+ assert_eq!(&DataType::Int64, a.1.data_type());
+ let b = schema.column_with_name("b").unwrap();
+ assert_eq!(
+ &DataType::List(Box::new(DataType::Float64)),
+ b.1.data_type()
+ );
+ let c = schema.column_with_name("c").unwrap();
+ assert_eq!(
+ &DataType::List(Box::new(DataType::Boolean)),
+ c.1.data_type()
+ );
+ let d = schema.column_with_name("d").unwrap();
+ assert_eq!(&DataType::List(box DataType::Utf8), d.1.data_type());
+
+ let bb = batch
+ .column(b.0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ let bb = bb.values();
+ let bb = bb.as_any().downcast_ref::<Float64Array>().unwrap();
+ assert_eq!(10, bb.len());
+ assert_eq!(4.0, bb.value(9));
+
+ let cc = batch
+ .column(c.0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ let cc = cc.values();
+ let cc = cc.as_any().downcast_ref::<BooleanArray>().unwrap();
+ assert_eq!(6, cc.len());
+ assert_eq!(false, cc.value(0));
+ assert_eq!(false, cc.value(3));
+ assert_eq!(false, cc.is_valid(2));
+ assert_eq!(false, cc.is_valid(4));
+
+ let dd = batch
+ .column(d.0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ let dd = dd.values();
+ let dd = dd.as_any().downcast_ref::<BinaryArray>().unwrap();
+ assert_eq!(7, dd.len());
+ assert_eq!(false, dd.is_valid(1));
+ assert_eq!("text", &String::from_utf8(dd.value(2).to_vec()).unwrap());
+ assert_eq!("1", &String::from_utf8(dd.value(3).to_vec()).unwrap());
+ assert_eq!("false", &String::from_utf8(dd.value(4).to_vec()).unwrap());
+ assert_eq!("array", &String::from_utf8(dd.value(5).to_vec()).unwrap());
+ assert_eq!("2.4", &String::from_utf8(dd.value(6).to_vec()).unwrap());
+ }
+}
diff --git a/rust/arrow/src/lib.rs b/rust/arrow/src/lib.rs
index ca06fc1..01e01d8 100644
--- a/rust/arrow/src/lib.rs
+++ b/rust/arrow/src/lib.rs
@@ -25,6 +25,8 @@
#![feature(rustc_private)]
#![feature(specialization)]
#![feature(try_from)]
+// required for matching box in lists
+#![feature(box_syntax, box_patterns)]
#![allow(dead_code)]
#![allow(non_camel_case_types)]
@@ -37,6 +39,7 @@ pub mod compute;
pub mod csv;
pub mod datatypes;
pub mod error;
+pub mod json;
pub mod memory;
pub mod record_batch;
pub mod tensor;
diff --git a/rust/arrow/test/data/arrays.json b/rust/arrow/test/data/arrays.json
new file mode 100644
index 0000000..5dbdd19
--- /dev/null
+++ b/rust/arrow/test/data/arrays.json
@@ -0,0 +1,3 @@
+{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true], "d":"4"}
+{"a":-10, "b":[2.0, 1.3, -6.1], "c":[true, true], "d":"4"}
+{"a":2, "b":[2.0, null, -6.1], "c":[false, null], "d":"text"}
diff --git a/rust/arrow/test/data/basic.json b/rust/arrow/test/data/basic.json
new file mode 100644
index 0000000..dafd2dd
--- /dev/null
+++ b/rust/arrow/test/data/basic.json
@@ -0,0 +1,12 @@
+{"a":1, "b":2.0, "c":false, "d":"4"}
+{"a":-10, "b":-3.5, "c":true, "d":"4"}
+{"a":2, "b":0.6, "c":false, "d":"text"}
+{"a":1, "b":2.0, "c":false, "d":"4"}
+{"a":7, "b":-3.5, "c":true, "d":"4"}
+{"a":1, "b":0.6, "c":false, "d":"text"}
+{"a":1, "b":2.0, "c":false, "d":"4"}
+{"a":5, "b":-3.5, "c":true, "d":"4"}
+{"a":1, "b":0.6, "c":false, "d":"text"}
+{"a":1, "b":2.0, "c":false, "d":"4"}
+{"a":1, "b":-3.5, "c":true, "d":"4"}
+{"a":100000000000000, "b":0.6, "c":false, "d":"text"}
\ No newline at end of file
diff --git a/rust/arrow/test/data/basic_nulls.json
b/rust/arrow/test/data/basic_nulls.json
new file mode 100644
index 0000000..1451df7
--- /dev/null
+++ b/rust/arrow/test/data/basic_nulls.json
@@ -0,0 +1,12 @@
+{"a":1, "b":2.0, "c":false}
+{"a":null, "b":-3.5, "c":true, "d":"4"}
+{"c":false, "d":"text"}
+{"a":1, "b":2.0, "c":false, "d":"4"}
+{"a":7, "b":-3.5, "c":null, "d":null}
+{"a":1, "b":0.6, "c":false}
+{"a":1, "b":2.0, "d":"4"}
+{"a":5, "c":true}
+{"a":1, "b":0.6, "c":false, "d":"text"}
+{"a":1, "b":2.0, "c":false, "d":"4"}
+{"a":1, "b":-3.5, "c":true, "d":"4"}
+{}
\ No newline at end of file
diff --git a/rust/arrow/test/data/mixed_arrays.json
b/rust/arrow/test/data/mixed_arrays.json
new file mode 100644
index 0000000..1898728
--- /dev/null
+++ b/rust/arrow/test/data/mixed_arrays.json
@@ -0,0 +1,4 @@
+{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true], "d":4.1}
+{"a":-10, "b":[2.0, 1.3, -6.1], "c":null, "d":null}
+{"a":2, "b":[2.0, null, -6.1], "c":[false, null], "d":"text"}
+{"a":3, "b":4, "c": true, "d":[1, false, "array", 2.4]}