alamb commented on code in PR #7893:
URL: https://github.com/apache/arrow-datafusion/pull/7893#discussion_r1478462579
##########
datafusion/core/tests/user_defined/user_defined_aggregates.rs:
##########
@@ -589,29 +589,25 @@ impl FirstSelector {
}
/// Convert to a set of ScalarValues
- fn to_state(&self) -> Vec<ScalarValue> {
- vec![
- ScalarValue::Float64(Some(self.value)),
- ScalarValue::TimestampNanosecond(Some(self.time), None),
- ]
- }
+ fn to_state(&self) -> Result<ScalarValue> {
+ let f64arr = Arc::new(Float64Array::from(vec![self.value])) as
ArrayRef;
+ let timearr =
+ Arc::new(TimestampNanosecondArray::from(vec![self.time])) as
ArrayRef;
- /// return this selector as a single scalar (struct) value
- fn to_scalar(&self) -> ScalarValue {
- ScalarValue::Struct(Some(self.to_state()), Self::fields())
+ let struct_arr =
+ StructArray::try_new(Self::fields(), vec![f64arr, timearr], None)?;
+ Ok(ScalarValue::Struct(Arc::new(struct_arr)))
Review Comment:
Given this is now returning a struct as the intermediate datatype, I think
we also need to update `state_datatypes` to return a `DataType::Struct` to be
consistent
##########
datafusion/common/src/scalar.rs:
##########
@@ -2297,14 +2326,20 @@ impl ScalarValue {
Self::Dictionary(key_type.clone(), Box::new(value))
}
DataType::Struct(fields) => {
- let array = as_struct_array(array)?;
+ let struct_arr = as_struct_array(array)?;
Review Comment:
I think this could be implemented using
https://docs.rs/arrow/latest/arrow/array/trait.Array.html#tymethod.slice
So like
```rust
let one_element_struct_array = array.slice(index, 1)
.as_struct_array()?
```
Which would be both less code as well as more efficient (no data copies
needed)
##########
datafusion/proto/src/logical_plan/to_proto.rs:
##########
@@ -1709,7 +1681,9 @@ fn create_proto_scalar<I, T: FnOnce(&I) ->
protobuf::scalar_value::Value>(
Ok(protobuf::ScalarValue { value: Some(value) })
}
-fn encode_scalar_list_value(
+// ScalarValue::List / FixedSizeList / LargeList / Struct are serialized using
+// Arrow IPC messages as a single column RecordBatch
Review Comment:
❤️
##########
datafusion/common/src/scalar.rs:
##########
@@ -2744,6 +2766,25 @@ impl From<Option<&str>> for ScalarValue {
}
}
+impl From<Vec<(&str, ScalarValue)>> for ScalarValue {
Review Comment:
Could we please add a comment explaining what this does?
##########
datafusion/common/src/scalar.rs:
##########
@@ -1377,6 +1402,70 @@ impl ScalarValue {
}};
}
+ fn build_struct_array(
+ scalars: impl IntoIterator<Item = ScalarValue>,
+ ) -> Result<ArrayRef> {
+ let arrays = scalars
+ .into_iter()
+ .map(|s| s.to_array())
+ .collect::<Result<Vec<_>>>()?;
+
+ let first_struct = arrays[0].as_struct_opt();
+ if first_struct.is_none() {
+ return _internal_err!(
+ "Inconsistent types in ScalarValue::iter_to_array. \
+ Expected ScalarValue::Struct, got {:?}",
+ arrays[0].clone()
+ );
+ }
+
+ let mut valid = BooleanBufferBuilder::new(arrays.len());
Review Comment:
What do you think about using
https://docs.rs/arrow/latest/arrow/compute/kernels/concat/index.html here? I
think you should be able to simply concat the arrays together without having to
have special handling (and if concat doesn't support `StructArray` we can
potentially file a ticket upstream in arrow-rs)
##########
datafusion/proto/tests/cases/roundtrip_logical_plan.rs:
##########
@@ -693,32 +694,6 @@ impl LogicalExtensionCodec for TopKExtensionCodec {
}
}
-#[test]
-fn scalar_values_error_serialization() {
- let should_fail_on_seralize: Vec<ScalarValue> = vec![
- // Should fail due to empty values
- ScalarValue::Struct(
- Some(vec![]),
Review Comment:
I agree that there is no need to test serializing an empty array as it isn't
a valid input anyways
##########
datafusion/common/src/scalar.rs:
##########
@@ -3152,6 +3226,28 @@ impl fmt::Debug for ScalarValue {
ScalarValue::FixedSizeList(_) => write!(f,
"FixedSizeList({self})"),
ScalarValue::List(_) => write!(f, "List({self})"),
ScalarValue::LargeList(_) => write!(f, "LargeList({self})"),
+ ScalarValue::Struct(struct_arr) => {
Review Comment:
Likewise here -- perhaps we can use
https://docs.rs/arrow/latest/arrow/util/display/fn.array_value_to_string.html
again
##########
datafusion/common/src/scalar.rs:
##########
@@ -3078,18 +3132,38 @@ impl fmt::Display for ScalarValue {
ScalarValue::DurationMillisecond(e) => format_option!(f, e)?,
ScalarValue::DurationMicrosecond(e) => format_option!(f, e)?,
ScalarValue::DurationNanosecond(e) => format_option!(f, e)?,
- ScalarValue::Struct(e, fields) => match e {
- Some(l) => write!(
+ ScalarValue::Struct(struct_arr) => {
+ // ScalarValue Struct should always have a single element
+ assert_eq!(struct_arr.len(), 1);
+
+ let columns = struct_arr.columns();
+ let fields = struct_arr.fields();
+ let nulls = struct_arr.nulls();
+
+ write!(
Review Comment:
I think you can use the arrow display code here instead:
https://docs.rs/arrow/latest/arrow/util/display/fn.array_value_to_string.html
Using ArrayFormatter directly would save a copy:
https://docs.rs/arrow-cast/50.0.0/src/arrow_cast/display.rs.html#930
##########
datafusion/common/src/scalar.rs:
##########
@@ -2695,14 +2730,22 @@ impl From<String> for ScalarValue {
}
}
-impl From<Vec<(&str, ScalarValue)>> for ScalarValue {
- fn from(value: Vec<(&str, ScalarValue)>) -> Self {
- let (fields, scalars): (SchemaBuilder, Vec<_>) = value
- .into_iter()
- .map(|(name, scalar)| (Field::new(name, scalar.data_type(),
false), scalar))
- .unzip();
+// Wrapper for ScalarValue::Struct that checks the length of the arrays,
without nulls
Review Comment:
Are these still TODO?
##########
datafusion/physical-expr/src/aggregate/array_agg_ordered.rs:
##########
@@ -323,20 +335,32 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
impl OrderSensitiveArrayAggAccumulator {
fn evaluate_orderings(&self) -> Result<ScalarValue> {
Review Comment:
Maybe we can file a follow on ticket to track this idea
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]