scovich commented on code in PR #7884: URL: https://github.com/apache/arrow-rs/pull/7884#discussion_r2195451441
########## parquet-variant-compute/src/from_json.rs: ########## @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for transforming a batch of JSON strings into a batch of Variants represented as +//! STRUCT<metadata: BINARY, value: BINARY> + +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field}; +use arrow_schema::ArrowError; +use parquet_variant::{json_to_variant, VariantBuilder}; + +fn variant_arrow_repr() -> DataType { + // The subfields are expected to be non-nullable according to the parquet variant spec. + let metadata_field = Field::new("metadata", DataType::Binary, false); + let value_field = Field::new("value", DataType::Binary, false); + let fields = vec![metadata_field, value_field]; + DataType::Struct(fields.into()) +} + +/// Parse a batch of JSON strings into a batch of Variants represented as +/// STRUCT<metadata: BINARY, value: BINARY> where nulls are preserved. The JSON strings in the input +/// must be valid. +pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<StructArray, ArrowError> { + let input_string_array = match input.as_any().downcast_ref::<StringArray>() { + Some(string_array) => Ok(string_array), + None => Err(ArrowError::CastError( + "Expected reference to StringArray as input".into(), + )), + }?; + + // Zero-copy builders + let mut metadata_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128); Review Comment: Where does the 128 scale factor come from? It seems a stretch to assume that a batch of `N` JSON strings will produce `128*N` bytes worth of unique field names? Especially as N grows and that scale factor becomes more and more dangerous? If we really feel it's important to pre-allocate capacity, I'd recommend capping it at e.g. 1MB. But honestly, I'd just allocate a normal vec and let it grow normally, unless/until we have some proof that the guaranteed O(n) cost to append n bytes to a vec isn't good enough, and that pre-allocation actually helps in a wide variety of situations. ########## parquet-variant-compute/src/from_json.rs: ########## @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for transforming a batch of JSON strings into a batch of Variants represented as +//! STRUCT<metadata: BINARY, value: BINARY> + +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field}; +use arrow_schema::ArrowError; +use parquet_variant::{json_to_variant, VariantBuilder}; + +fn variant_arrow_repr() -> DataType { + // The subfields are expected to be non-nullable according to the parquet variant spec. + let metadata_field = Field::new("metadata", DataType::Binary, false); + let value_field = Field::new("value", DataType::Binary, false); + let fields = vec![metadata_field, value_field]; + DataType::Struct(fields.into()) +} + +/// Parse a batch of JSON strings into a batch of Variants represented as +/// STRUCT<metadata: BINARY, value: BINARY> where nulls are preserved. The JSON strings in the input +/// must be valid. +pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<StructArray, ArrowError> { + let input_string_array = match input.as_any().downcast_ref::<StringArray>() { + Some(string_array) => Ok(string_array), + None => Err(ArrowError::CastError( + "Expected reference to StringArray as input".into(), + )), + }?; + + // Zero-copy builders + let mut metadata_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128); + let mut metadata_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1); + let mut metadata_validity = BooleanBufferBuilder::new(input.len()); + let mut metadata_current_offset: i32 = 0; + metadata_offsets.push(metadata_current_offset); + + let mut value_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128); + let mut value_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1); + let mut value_validity = BooleanBufferBuilder::new(input.len()); + let mut value_current_offset: i32 = 0; + value_offsets.push(value_current_offset); + + let mut validity = BooleanBufferBuilder::new(input.len()); + for i in 0..input.len() { + if input.is_null(i) { + // The subfields are expected to be non-nullable according to the parquet variant spec. Review Comment: We already know they're non-nullable... maybe the comment can explain that we're pushing valid-but-empty subfields, to maintain proper positioning? Also: Could we create nullable sub-field arrays even tho the schema says they're non-nullable, and rely on nested null masks? Does that save space in case the variant column has a lot of null entries? ########## parquet-variant-compute/src/from_json.rs: ########## @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for transforming a batch of JSON strings into a batch of Variants represented as +//! STRUCT<metadata: BINARY, value: BINARY> + +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field}; +use arrow_schema::ArrowError; +use parquet_variant::{json_to_variant, VariantBuilder}; + +fn variant_arrow_repr() -> DataType { + // The subfields are expected to be non-nullable according to the parquet variant spec. + let metadata_field = Field::new("metadata", DataType::Binary, false); + let value_field = Field::new("value", DataType::Binary, false); + let fields = vec![metadata_field, value_field]; + DataType::Struct(fields.into()) +} + +/// Parse a batch of JSON strings into a batch of Variants represented as +/// STRUCT<metadata: BINARY, value: BINARY> where nulls are preserved. The JSON strings in the input +/// must be valid. +pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<StructArray, ArrowError> { + let input_string_array = match input.as_any().downcast_ref::<StringArray>() { + Some(string_array) => Ok(string_array), + None => Err(ArrowError::CastError( + "Expected reference to StringArray as input".into(), + )), + }?; + + // Zero-copy builders + let mut metadata_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128); + let mut metadata_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1); + let mut metadata_validity = BooleanBufferBuilder::new(input.len()); + let mut metadata_current_offset: i32 = 0; + metadata_offsets.push(metadata_current_offset); + + let mut value_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128); Review Comment: As above... I'm not sure how helpful it really is to pre-allocate capacity based only on input length. Some variants will be very small, and an unbounded over-allocation could hog a lot of memory. Others will be much (much) larger than 128B each, and the vec will anyway end up making multiple capacity increases along the way. ########## parquet-variant-compute/src/from_json.rs: ########## @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for transforming a batch of JSON strings into a batch of Variants represented as +//! STRUCT<metadata: BINARY, value: BINARY> + +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field}; +use arrow_schema::ArrowError; +use parquet_variant::{json_to_variant, VariantBuilder}; + +fn variant_arrow_repr() -> DataType { + // The subfields are expected to be non-nullable according to the parquet variant spec. Review Comment: The shredding spec makes `value` optional, with the possibility -- but not the requirement -- to have a `typed_value` instead when `value` is missing. Should we recognize that situation and throw a suitable "shredded variants not supported" error, instead of blowing up with an obscure schema mismatch error? ########## parquet-variant-compute/src/from_json.rs: ########## @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for transforming a batch of JSON strings into a batch of Variants represented as +//! STRUCT<metadata: BINARY, value: BINARY> + +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field}; +use arrow_schema::ArrowError; +use parquet_variant::{json_to_variant, VariantBuilder}; + +fn variant_arrow_repr() -> DataType { + // The subfields are expected to be non-nullable according to the parquet variant spec. + let metadata_field = Field::new("metadata", DataType::Binary, false); + let value_field = Field::new("value", DataType::Binary, false); + let fields = vec![metadata_field, value_field]; + DataType::Struct(fields.into()) +} + +/// Parse a batch of JSON strings into a batch of Variants represented as +/// STRUCT<metadata: BINARY, value: BINARY> where nulls are preserved. The JSON strings in the input +/// must be valid. +pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<StructArray, ArrowError> { + let input_string_array = match input.as_any().downcast_ref::<StringArray>() { + Some(string_array) => Ok(string_array), + None => Err(ArrowError::CastError( + "Expected reference to StringArray as input".into(), + )), + }?; + + // Zero-copy builders + let mut metadata_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128); + let mut metadata_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1); + let mut metadata_validity = BooleanBufferBuilder::new(input.len()); + let mut metadata_current_offset: i32 = 0; + metadata_offsets.push(metadata_current_offset); + + let mut value_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128); + let mut value_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1); + let mut value_validity = BooleanBufferBuilder::new(input.len()); + let mut value_current_offset: i32 = 0; + value_offsets.push(value_current_offset); + + let mut validity = BooleanBufferBuilder::new(input.len()); + for i in 0..input.len() { + if input.is_null(i) { + // The subfields are expected to be non-nullable according to the parquet variant spec. Review Comment: But I guess we still have to push _something_ to the offset arrays, to maintain proper positioning... so valid-but-empty is probably the best we can do? ########## parquet-variant-compute/src/from_json.rs: ########## @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for transforming a batch of JSON strings into a batch of Variants represented as +//! STRUCT<metadata: BINARY, value: BINARY> + +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field}; +use arrow_schema::ArrowError; +use parquet_variant::{json_to_variant, VariantBuilder}; + +fn variant_arrow_repr() -> DataType { + // The subfields are expected to be non-nullable according to the parquet variant spec. Review Comment: ... but now that I think about it, that would be an issue when reading from parquet, which has nothing to do with this PR. ########## parquet-variant-compute/src/from_json.rs: ########## @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for transforming a batch of JSON strings into a batch of Variants represented as +//! STRUCT<metadata: BINARY, value: BINARY> + +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field}; +use arrow_schema::ArrowError; +use parquet_variant::{json_to_variant, VariantBuilder}; + +fn variant_arrow_repr() -> DataType { + // The subfields are expected to be non-nullable according to the parquet variant spec. + let metadata_field = Field::new("metadata", DataType::Binary, false); + let value_field = Field::new("value", DataType::Binary, false); + let fields = vec![metadata_field, value_field]; + DataType::Struct(fields.into()) +} + +/// Parse a batch of JSON strings into a batch of Variants represented as +/// STRUCT<metadata: BINARY, value: BINARY> where nulls are preserved. The JSON strings in the input +/// must be valid. +pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<StructArray, ArrowError> { + let input_string_array = match input.as_any().downcast_ref::<StringArray>() { + Some(string_array) => Ok(string_array), + None => Err(ArrowError::CastError( + "Expected reference to StringArray as input".into(), + )), + }?; + + // Zero-copy builders + let mut metadata_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128); + let mut metadata_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1); + let mut metadata_validity = BooleanBufferBuilder::new(input.len()); + let mut metadata_current_offset: i32 = 0; + metadata_offsets.push(metadata_current_offset); + + let mut value_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128); + let mut value_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1); + let mut value_validity = BooleanBufferBuilder::new(input.len()); + let mut value_current_offset: i32 = 0; + value_offsets.push(value_current_offset); + + let mut validity = BooleanBufferBuilder::new(input.len()); + for i in 0..input.len() { + if input.is_null(i) { + // The subfields are expected to be non-nullable according to the parquet variant spec. + metadata_validity.append(true); + value_validity.append(true); + metadata_offsets.push(metadata_current_offset); + value_offsets.push(value_current_offset); + validity.append(false); + } else { + let mut vb = VariantBuilder::new(); + json_to_variant(input_string_array.value(i), &mut vb)?; + let (metadata, value) = vb.finish(); + validity.append(true); + + metadata_current_offset += metadata.len() as i32; + metadata_buffer.extend(metadata); + metadata_offsets.push(metadata_current_offset); + metadata_validity.append(true); + + value_current_offset += value.len() as i32; + value_buffer.extend(value); + value_offsets.push(value_current_offset); + value_validity.append(true); + println!("{value_current_offset} {metadata_current_offset}"); + } + } + let metadata_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(metadata_offsets)); + let metadata_data_buffer = Buffer::from_vec(metadata_buffer); + let metadata_null_buffer = NullBuffer::new(metadata_validity.finish()); + + let value_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(value_offsets)); + let value_data_buffer = Buffer::from_vec(value_buffer); + let value_null_buffer = NullBuffer::new(value_validity.finish()); + + let metadata_array = BinaryArray::new( + metadata_offsets_buffer, + metadata_data_buffer, + Some(metadata_null_buffer), + ); + let value_array = BinaryArray::new( + value_offsets_buffer, + value_data_buffer, + Some(value_null_buffer), + ); + + let struct_fields: Vec<ArrayRef> = vec![Arc::new(metadata_array), Arc::new(value_array)]; + let variant_fields = match variant_arrow_repr() { + DataType::Struct(fields) => fields, + _ => unreachable!("variant_arrow_repr is hard-coded and must match the expected schema"), + }; Review Comment: Should `variant_arrow_repr` just return `Fields` instead of `DataType`? Then we don't have to unpack it. It's also a 4-line method with a single call site (3 lines if we don't convert it to a `DataType`, so we might also consider removing the method entirely? Could also make the relationship with the `StructArray` more explicit by something like: ```suggestion let metadata_field = Field::new("metadata", metadata_array.data_type(), false); let value_field = Field::new("value", value_array.data_type(), false); let variant_fields = vec![metadata_field, value_field].into(); ``` ########## parquet-variant-compute/src/from_json.rs: ########## @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for transforming a batch of JSON strings into a batch of Variants represented as +//! STRUCT<metadata: BINARY, value: BINARY> + +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field}; +use arrow_schema::ArrowError; +use parquet_variant::{json_to_variant, VariantBuilder}; + +fn variant_arrow_repr() -> DataType { + // The subfields are expected to be non-nullable according to the parquet variant spec. + let metadata_field = Field::new("metadata", DataType::Binary, false); + let value_field = Field::new("value", DataType::Binary, false); + let fields = vec![metadata_field, value_field]; + DataType::Struct(fields.into()) +} + +/// Parse a batch of JSON strings into a batch of Variants represented as +/// STRUCT<metadata: BINARY, value: BINARY> where nulls are preserved. The JSON strings in the input +/// must be valid. +pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<StructArray, ArrowError> { + let input_string_array = match input.as_any().downcast_ref::<StringArray>() { + Some(string_array) => Ok(string_array), + None => Err(ArrowError::CastError( + "Expected reference to StringArray as input".into(), + )), + }?; + + // Zero-copy builders + let mut metadata_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128); + let mut metadata_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1); + let mut metadata_validity = BooleanBufferBuilder::new(input.len()); + let mut metadata_current_offset: i32 = 0; + metadata_offsets.push(metadata_current_offset); + + let mut value_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128); + let mut value_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1); + let mut value_validity = BooleanBufferBuilder::new(input.len()); + let mut value_current_offset: i32 = 0; + value_offsets.push(value_current_offset); + + let mut validity = BooleanBufferBuilder::new(input.len()); + for i in 0..input.len() { + if input.is_null(i) { + // The subfields are expected to be non-nullable according to the parquet variant spec. + metadata_validity.append(true); + value_validity.append(true); + metadata_offsets.push(metadata_current_offset); + value_offsets.push(value_current_offset); + validity.append(false); + } else { + let mut vb = VariantBuilder::new(); + json_to_variant(input_string_array.value(i), &mut vb)?; + let (metadata, value) = vb.finish(); + validity.append(true); + + metadata_current_offset += metadata.len() as i32; + metadata_buffer.extend(metadata); + metadata_offsets.push(metadata_current_offset); + metadata_validity.append(true); + + value_current_offset += value.len() as i32; + value_buffer.extend(value); + value_offsets.push(value_current_offset); + value_validity.append(true); + println!("{value_current_offset} {metadata_current_offset}"); + } + } + let metadata_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(metadata_offsets)); + let metadata_data_buffer = Buffer::from_vec(metadata_buffer); + let metadata_null_buffer = NullBuffer::new(metadata_validity.finish()); + + let value_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(value_offsets)); + let value_data_buffer = Buffer::from_vec(value_buffer); + let value_null_buffer = NullBuffer::new(value_validity.finish()); + + let metadata_array = BinaryArray::new( + metadata_offsets_buffer, + metadata_data_buffer, + Some(metadata_null_buffer), + ); + let value_array = BinaryArray::new( + value_offsets_buffer, + value_data_buffer, + Some(value_null_buffer), + ); + + let struct_fields: Vec<ArrayRef> = vec![Arc::new(metadata_array), Arc::new(value_array)]; + let variant_fields = match variant_arrow_repr() { + DataType::Struct(fields) => fields, + _ => unreachable!("variant_arrow_repr is hard-coded and must match the expected schema"), + }; + let null_buffer = NullBuffer::new(validity.finish()); + Ok(StructArray::new( + variant_fields, + struct_fields, + Some(null_buffer), + )) +} + +#[cfg(test)] +mod test { + use crate::batch_json_string_to_variant; + use arrow::array::{Array, ArrayRef, BinaryArray, StringArray}; + use arrow_schema::ArrowError; + use parquet_variant::{Variant, VariantBuilder}; + use std::sync::Arc; + + #[test] + fn test_batch_json_string_to_variant() -> Result<(), ArrowError> { + let input = StringArray::from(vec![ + Some("1"), + None, + Some("{\"a\": 32}"), + Some("null"), + None, + ]); + let array_ref: ArrayRef = Arc::new(input); + let output = batch_json_string_to_variant(&array_ref).unwrap(); + + let struct_array = &output; + let metadata_array = struct_array + .column(0) + .as_any() + .downcast_ref::<BinaryArray>() + .unwrap(); + let value_array = struct_array + .column(1) + .as_any() + .downcast_ref::<BinaryArray>() + .unwrap(); + + assert_eq!(struct_array.is_null(0), false); + assert_eq!(struct_array.is_null(1), true); + assert_eq!(struct_array.is_null(2), false); + assert_eq!(struct_array.is_null(3), false); + assert_eq!(struct_array.is_null(4), true); + + assert_eq!(metadata_array.value(0), &[1, 0, 0]); + assert_eq!(value_array.value(0), &[12, 1]); + + { + let mut vb = VariantBuilder::new(); + let mut ob = vb.new_object(); + ob.insert("a", Variant::Int8(32)); + ob.finish()?; + let (object_metadata, object_value) = vb.finish(); + assert_eq!(metadata_array.value(2), &object_metadata); + assert_eq!(value_array.value(2), &object_value); + } + + assert_eq!(metadata_array.value(3), &[1, 0, 0]); + assert_eq!(value_array.value(3), &[0]); + + // Ensure that the subfields are not actually nullable Review Comment: I'm not sure this is a useful thing to enforce? I'm pretty sure the arrow spec forbids to make any assumptions about the values of child arrays at positions an ancestor has marked invalid? ########## parquet-variant-compute/src/from_json.rs: ########## @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for transforming a batch of JSON strings into a batch of Variants represented as +//! STRUCT<metadata: BINARY, value: BINARY> + +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field}; +use arrow_schema::ArrowError; +use parquet_variant::{json_to_variant, VariantBuilder}; + +fn variant_arrow_repr() -> DataType { + // The subfields are expected to be non-nullable according to the parquet variant spec. + let metadata_field = Field::new("metadata", DataType::Binary, false); + let value_field = Field::new("value", DataType::Binary, false); + let fields = vec![metadata_field, value_field]; + DataType::Struct(fields.into()) +} + +/// Parse a batch of JSON strings into a batch of Variants represented as +/// STRUCT<metadata: BINARY, value: BINARY> where nulls are preserved. The JSON strings in the input +/// must be valid. +pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<StructArray, ArrowError> { + let input_string_array = match input.as_any().downcast_ref::<StringArray>() { + Some(string_array) => Ok(string_array), + None => Err(ArrowError::CastError( + "Expected reference to StringArray as input".into(), + )), + }?; Review Comment: Why not use `ok_or_else`? ```suggestion let input_string_array = input .as_any() .downcast_ref::<StringArray>() .ok_or_else(|| ArrowError::CastError("Expected a StringArray as input".into()))?; ``` ########## parquet-variant-compute/src/from_json.rs: ########## @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for transforming a batch of JSON strings into a batch of Variants represented as +//! STRUCT<metadata: BINARY, value: BINARY> + +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field}; +use arrow_schema::ArrowError; +use parquet_variant::{json_to_variant, VariantBuilder}; + +fn variant_arrow_repr() -> DataType { + // The subfields are expected to be non-nullable according to the parquet variant spec. + let metadata_field = Field::new("metadata", DataType::Binary, false); + let value_field = Field::new("value", DataType::Binary, false); + let fields = vec![metadata_field, value_field]; + DataType::Struct(fields.into()) +} + +/// Parse a batch of JSON strings into a batch of Variants represented as +/// STRUCT<metadata: BINARY, value: BINARY> where nulls are preserved. The JSON strings in the input +/// must be valid. +pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<StructArray, ArrowError> { + let input_string_array = match input.as_any().downcast_ref::<StringArray>() { + Some(string_array) => Ok(string_array), + None => Err(ArrowError::CastError( + "Expected reference to StringArray as input".into(), + )), + }?; + + // Zero-copy builders + let mut metadata_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128); + let mut metadata_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1); + let mut metadata_validity = BooleanBufferBuilder::new(input.len()); + let mut metadata_current_offset: i32 = 0; + metadata_offsets.push(metadata_current_offset); + + let mut value_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128); + let mut value_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1); + let mut value_validity = BooleanBufferBuilder::new(input.len()); + let mut value_current_offset: i32 = 0; + value_offsets.push(value_current_offset); + + let mut validity = BooleanBufferBuilder::new(input.len()); + for i in 0..input.len() { + if input.is_null(i) { + // The subfields are expected to be non-nullable according to the parquet variant spec. + metadata_validity.append(true); + value_validity.append(true); + metadata_offsets.push(metadata_current_offset); + value_offsets.push(value_current_offset); + validity.append(false); + } else { + let mut vb = VariantBuilder::new(); + json_to_variant(input_string_array.value(i), &mut vb)?; + let (metadata, value) = vb.finish(); + validity.append(true); + + metadata_current_offset += metadata.len() as i32; + metadata_buffer.extend(metadata); + metadata_offsets.push(metadata_current_offset); + metadata_validity.append(true); + + value_current_offset += value.len() as i32; + value_buffer.extend(value); + value_offsets.push(value_current_offset); + value_validity.append(true); + println!("{value_current_offset} {metadata_current_offset}"); + } + } + let metadata_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(metadata_offsets)); + let metadata_data_buffer = Buffer::from_vec(metadata_buffer); + let metadata_null_buffer = NullBuffer::new(metadata_validity.finish()); + + let value_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(value_offsets)); + let value_data_buffer = Buffer::from_vec(value_buffer); + let value_null_buffer = NullBuffer::new(value_validity.finish()); + + let metadata_array = BinaryArray::new( + metadata_offsets_buffer, + metadata_data_buffer, + Some(metadata_null_buffer), + ); + let value_array = BinaryArray::new( + value_offsets_buffer, + value_data_buffer, + Some(value_null_buffer), + ); + + let struct_fields: Vec<ArrayRef> = vec![Arc::new(metadata_array), Arc::new(value_array)]; + let variant_fields = match variant_arrow_repr() { + DataType::Struct(fields) => fields, + _ => unreachable!("variant_arrow_repr is hard-coded and must match the expected schema"), + }; + let null_buffer = NullBuffer::new(validity.finish()); + Ok(StructArray::new( + variant_fields, + struct_fields, + Some(null_buffer), + )) +} + +#[cfg(test)] +mod test { + use crate::batch_json_string_to_variant; + use arrow::array::{Array, ArrayRef, BinaryArray, StringArray}; + use arrow_schema::ArrowError; + use parquet_variant::{Variant, VariantBuilder}; + use std::sync::Arc; + + #[test] + fn test_batch_json_string_to_variant() -> Result<(), ArrowError> { + let input = StringArray::from(vec![ + Some("1"), + None, + Some("{\"a\": 32}"), + Some("null"), + None, + ]); Review Comment: > default values Not quite sure what you mean by "default values" or how an engine's NULL handling relates to string (json) -> variant parsing? > `"{{{{true: false}, "-1": "+1"}, 0: 1}}"` I'm pretty sure JSON objects requires string field names? ########## parquet-variant-compute/src/from_json.rs: ########## @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayRef, BinaryBuilder, BooleanBufferBuilder, StringArray, StructArray, +}; +use arrow::buffer::NullBuffer; +use arrow::datatypes::{DataType, Field}; +use arrow_schema::ArrowError; +use parquet_variant::{json_to_variant, VariantBuilder}; + +fn variant_arrow_repr() -> DataType { + let metadata_field = Field::new("metadata", DataType::Binary, true); + let value_field = Field::new("value", DataType::Binary, true); + let fields = vec![metadata_field, value_field]; + DataType::Struct(fields.into()) +} + +pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<StructArray, ArrowError> { Review Comment: I imagine we'll eventually want a top-level `cast_to_variant` that converts strong types to variant? And passing a string array to such a function should produce a variant column full of string (or short-string) variant values? This method here is json-parsing strings and casting the result to variant, not casting strings directly to variant? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org