This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 96664ce1dc Remove physical expr of ListIndex and ListRange, convert to
`array_element` and `array_slice` functions (#9492)
96664ce1dc is described below
commit 96664ce1dc2e5890b33e05282d81df34a3ded870
Author: Jay Zhan <[email protected]>
AuthorDate: Sun Mar 10 19:30:10 2024 +0800
Remove physical expr of ListIndex and ListRange, convert to `array_element`
and `array_slice` functions (#9492)
* remove physical range and index
Signed-off-by: jayzhan211 <[email protected]>
* remove proto
Signed-off-by: jayzhan211 <[email protected]>
---------
Signed-off-by: jayzhan211 <[email protected]>
---
datafusion/core/src/physical_planner.rs | 20 +-
datafusion/optimizer/src/analyzer/rewrite_expr.rs | 43 ++-
.../src/expressions/get_indexed_field.rs | 314 +--------------------
datafusion/physical-expr/src/planner.rs | 24 +-
datafusion/proto/proto/datafusion.proto | 14 +-
datafusion/proto/src/generated/pbjson.rs | 244 ----------------
datafusion/proto/src/generated/prost.rs | 24 +-
datafusion/proto/src/physical_plan/from_proto.rs | 27 --
datafusion/proto/src/physical_plan/to_proto.rs | 14 -
.../proto/tests/cases/roundtrip_physical_plan.rs | 60 ----
10 files changed, 71 insertions(+), 713 deletions(-)
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index c9192f5790..6d49287deb 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -211,19 +211,19 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) ->
Result<String> {
let expr = create_physical_name(expr, false)?;
let name = match field {
GetFieldAccess::NamedStructField { name } =>
format!("{expr}[{name}]"),
- GetFieldAccess::ListIndex { key } => {
- let key = create_physical_name(key, false)?;
- format!("{expr}[{key}]")
+ GetFieldAccess::ListIndex { key: _ } => {
+ unreachable!(
+ "ListIndex should have been rewritten in
OperatorToFunction"
+ )
}
GetFieldAccess::ListRange {
- start,
- stop,
- stride,
+ start: _,
+ stop: _,
+ stride: _,
} => {
- let start = create_physical_name(start, false)?;
- let stop = create_physical_name(stop, false)?;
- let stride = create_physical_name(stride, false)?;
- format!("{expr}[{start}:{stop}:{stride}]")
+ unreachable!(
+ "ListIndex should have been rewritten in
OperatorToFunction"
+ )
}
};
diff --git a/datafusion/optimizer/src/analyzer/rewrite_expr.rs
b/datafusion/optimizer/src/analyzer/rewrite_expr.rs
index 3ea5596b67..6f856fa9bd 100644
--- a/datafusion/optimizer/src/analyzer/rewrite_expr.rs
+++ b/datafusion/optimizer/src/analyzer/rewrite_expr.rs
@@ -28,10 +28,12 @@ use datafusion_common::{DFSchema, DFSchemaRef, Result};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::expr_rewriter::rewrite_preserving_name;
use datafusion_expr::utils::merge_schema;
-use datafusion_expr::{
- BinaryExpr, BuiltinScalarFunction, Expr, LogicalPlan, Operator,
- ScalarFunctionDefinition,
-};
+use datafusion_expr::BuiltinScalarFunction;
+use datafusion_expr::GetFieldAccess;
+use datafusion_expr::GetIndexedField;
+use datafusion_expr::Operator;
+use datafusion_expr::ScalarFunctionDefinition;
+use datafusion_expr::{BinaryExpr, Expr, LogicalPlan};
#[derive(Default)]
pub struct OperatorToFunction {}
@@ -126,6 +128,39 @@ impl TreeNodeRewriter for OperatorToFunctionRewriter {
return Ok(Transformed::yes(expr));
}
}
+
+ if let Expr::GetIndexedField(GetIndexedField {
+ ref expr,
+ ref field,
+ }) = expr
+ {
+ match field {
+ GetFieldAccess::ListIndex { ref key } => {
+ let expr = *expr.clone();
+ let key = *key.clone();
+ let args = vec![expr, key];
+ return Ok(Transformed::yes(Expr::ScalarFunction(
+
ScalarFunction::new(BuiltinScalarFunction::ArrayElement, args),
+ )));
+ }
+ GetFieldAccess::ListRange {
+ start,
+ stop,
+ stride,
+ } => {
+ let expr = *expr.clone();
+ let start = *start.clone();
+ let stop = *stop.clone();
+ let stride = *stride.clone();
+ let args = vec![expr, start, stop, stride];
+ return Ok(Transformed::yes(Expr::ScalarFunction(
+ ScalarFunction::new(BuiltinScalarFunction::ArraySlice,
args),
+ )));
+ }
+ _ => {}
+ }
+ }
+
Ok(Transformed::no(expr))
}
}
diff --git a/datafusion/physical-expr/src/expressions/get_indexed_field.rs
b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
index c93090c494..99b2279ba5 100644
--- a/datafusion/physical-expr/src/expressions/get_indexed_field.rs
+++ b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
@@ -20,8 +20,6 @@
use crate::PhysicalExpr;
use datafusion_common::exec_err;
-use crate::array_expressions::{array_element, array_slice};
-use crate::expressions::Literal;
use crate::physical_expr::down_cast_any_ref;
use arrow::{
array::{Array, Scalar, StringArray},
@@ -42,61 +40,25 @@ use std::{any::Any, sync::Arc};
pub enum GetFieldAccessExpr {
/// Named field, For example `struct["name"]`
NamedStructField { name: ScalarValue },
- /// Single list index, for example: `list[i]`
- ListIndex { key: Arc<dyn PhysicalExpr> },
- /// List stride, for example `list[i:j:k]`
- ListRange {
- start: Arc<dyn PhysicalExpr>,
- stop: Arc<dyn PhysicalExpr>,
- stride: Arc<dyn PhysicalExpr>,
- },
}
impl std::fmt::Display for GetFieldAccessExpr {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
GetFieldAccessExpr::NamedStructField { name } => write!(f, "[{}]",
name),
- GetFieldAccessExpr::ListIndex { key } => write!(f, "[{}]", key),
- GetFieldAccessExpr::ListRange {
- start,
- stop,
- stride,
- } => {
- write!(f, "[{}:{}:{}]", start, stop, stride)
- }
}
}
}
impl PartialEq<dyn Any> for GetFieldAccessExpr {
fn eq(&self, other: &dyn Any) -> bool {
- use GetFieldAccessExpr::{ListIndex, ListRange, NamedStructField};
down_cast_any_ref(other)
.downcast_ref::<Self>()
.map(|x| match (self, x) {
- (NamedStructField { name: lhs }, NamedStructField { name: rhs
}) => {
- lhs.eq(rhs)
- }
- (ListIndex { key: lhs }, ListIndex { key: rhs }) =>
lhs.eq(rhs),
(
- ListRange {
- start: start_lhs,
- stop: stop_lhs,
- stride: stride_lhs,
- },
- ListRange {
- start: start_rhs,
- stop: stop_rhs,
- stride: stride_rhs,
- },
- ) => {
- start_lhs.eq(start_rhs)
- && stop_lhs.eq(stop_rhs)
- && stride_lhs.eq(stride_rhs)
- }
- (NamedStructField { .. }, ListIndex { .. } | ListRange { .. })
=> false,
- (ListIndex { .. }, NamedStructField { .. } | ListRange { .. })
=> false,
- (ListRange { .. }, NamedStructField { .. } | ListIndex { .. })
=> false,
+ GetFieldAccessExpr::NamedStructField { name: lhs },
+ GetFieldAccessExpr::NamedStructField { name: rhs },
+ ) => lhs.eq(rhs),
})
.unwrap_or(false)
}
@@ -127,45 +89,6 @@ impl GetIndexedFieldExpr {
)
}
- /// Create a new [`GetIndexedFieldExpr`] for accessing the specified index
- pub fn new_index(arg: Arc<dyn PhysicalExpr>, key: Arc<dyn PhysicalExpr>)
-> Self {
- Self::new(arg, GetFieldAccessExpr::ListIndex { key })
- }
-
- /// Create a new [`GetIndexedFieldExpr`] for accessing the range
- pub fn new_range(
- arg: Arc<dyn PhysicalExpr>,
- start: Arc<dyn PhysicalExpr>,
- stop: Arc<dyn PhysicalExpr>,
- ) -> Self {
- Self::new(
- arg,
- GetFieldAccessExpr::ListRange {
- start,
- stop,
- stride: Arc::new(Literal::new(ScalarValue::Int64(Some(1))))
- as Arc<dyn PhysicalExpr>,
- },
- )
- }
-
- /// Create a new [`GetIndexedFieldExpr`] for accessing the stride
- pub fn new_stride(
- arg: Arc<dyn PhysicalExpr>,
- start: Arc<dyn PhysicalExpr>,
- stop: Arc<dyn PhysicalExpr>,
- stride: Arc<dyn PhysicalExpr>,
- ) -> Self {
- Self::new(
- arg,
- GetFieldAccessExpr::ListRange {
- start,
- stop,
- stride,
- },
- )
- }
-
/// Get the description of what field should be accessed
pub fn field(&self) -> &GetFieldAccessExpr {
&self.field
@@ -176,23 +99,11 @@ impl GetIndexedFieldExpr {
&self.arg
}
- fn schema_access(&self, input_schema: &Schema) ->
Result<GetFieldAccessSchema> {
+ fn schema_access(&self, _input_schema: &Schema) ->
Result<GetFieldAccessSchema> {
Ok(match &self.field {
GetFieldAccessExpr::NamedStructField { name } => {
GetFieldAccessSchema::NamedStructField { name: name.clone() }
}
- GetFieldAccessExpr::ListIndex { key } =>
GetFieldAccessSchema::ListIndex {
- key_dt: key.data_type(input_schema)?,
- },
- GetFieldAccessExpr::ListRange {
- start,
- stop,
- stride,
- } => GetFieldAccessSchema::ListRange {
- start_dt: start.data_type(input_schema)?,
- stop_dt: stop.data_type(input_schema)?,
- stride_dt: stride.data_type(input_schema)?,
- },
})
}
}
@@ -249,40 +160,6 @@ impl PhysicalExpr for GetIndexedFieldExpr {
"get indexed field is only possible on lists
with int64 indexes or struct \
with utf8 indexes. Tried {dt:?} with
{name:?} index"),
},
- GetFieldAccessExpr::ListIndex{key} => {
- let key = key.evaluate(batch)?.into_array(batch.num_rows())?;
- match (array.data_type(), key.data_type()) {
- (DataType::List(_), DataType::Int64) |
(DataType::LargeList(_), DataType::Int64) =>
Ok(ColumnarValue::Array(array_element(&[
- array, key
- ])?)),
- (DataType::List(_), key) | (DataType::LargeList(_), key) =>
exec_err!(
- "get indexed field is only possible on
List/LargeList with int64 indexes. \
- Tried with {key:?} index"),
- (dt, key) => exec_err!(
- "get indexed field is only possible on
List/LargeList with int64 indexes or struct \
- with utf8 indexes. Tried
{dt:?} with {key:?} index"),
- }
- },
- GetFieldAccessExpr::ListRange { start, stop, stride } => {
- let start =
start.evaluate(batch)?.into_array(batch.num_rows())?;
- let stop = stop.evaluate(batch)?.into_array(batch.num_rows())?;
- let stride =
stride.evaluate(batch)?.into_array(batch.num_rows())?;
- match (array.data_type(), start.data_type(), stop.data_type(),
stride.data_type()) {
- (DataType::List(_), DataType::Int64, DataType::Int64,
DataType::Int64) |
- (DataType::LargeList(_), DataType::Int64, DataType::Int64,
DataType::Int64)=> {
- Ok(ColumnarValue::Array((array_slice(&[
- array, start, stop, stride
- ]))?))
- },
- (DataType::List(_), start, stop, stride) |
- (DataType::LargeList(_), start, stop, stride)=> exec_err!(
- "get indexed field is only possible on List/LargeList
with int64 indexes. \
- Tried with {start:?}, {stop:?} and {stride:?}
indices"),
- (dt, start, stop, stride) => exec_err!(
- "get indexed field is only possible on List/LargeList
with int64 indexes or struct \
- with utf8 indexes. Tried {dt:?} with
{start:?}, {stop:?} and {stride:?} indices"),
- }
- }
}
}
@@ -319,39 +196,13 @@ impl PartialEq<dyn Any> for GetIndexedFieldExpr {
mod tests {
use super::*;
use crate::expressions::col;
- use arrow::array::new_empty_array;
- use arrow::array::{ArrayRef, GenericListArray};
- use arrow::array::{
- BooleanArray, Int64Array, ListBuilder, StringBuilder, StructArray,
- };
+ use arrow::array::ArrayRef;
+ use arrow::array::{BooleanArray, Int64Array, StructArray};
+ use arrow::datatypes::Field;
use arrow::datatypes::Fields;
- use arrow::{array::StringArray, datatypes::Field};
- use datafusion_common::cast::{as_boolean_array, as_list_array,
as_string_array};
+ use datafusion_common::cast::as_boolean_array;
use datafusion_common::Result;
- fn build_list_arguments(
- list_of_lists: Vec<Vec<Option<&str>>>,
- list_of_start_indices: Vec<Option<i64>>,
- list_of_stop_indices: Vec<Option<i64>>,
- ) -> (GenericListArray<i32>, Int64Array, Int64Array) {
- let builder = StringBuilder::with_capacity(list_of_lists.len(), 1024);
- let mut list_builder = ListBuilder::new(builder);
- for values in list_of_lists {
- let builder = list_builder.values();
- for value in values {
- match value {
- None => builder.append_null(),
- Some(v) => builder.append_value(v),
- }
- }
- list_builder.append(true);
- }
-
- let start_array = Int64Array::from(list_of_start_indices);
- let stop_array = Int64Array::from(list_of_stop_indices);
- (list_builder.finish(), start_array, stop_array)
- }
-
#[test]
fn get_indexed_field_named_struct_field() -> Result<()> {
let schema = struct_schema();
@@ -391,153 +242,4 @@ mod tests {
true,
)])
}
-
- fn list_schema(cols: &[&str]) -> Schema {
- if cols.len() == 2 {
- Schema::new(vec![
- Field::new_list(cols[0], Field::new("item", DataType::Utf8,
true), true),
- Field::new(cols[1], DataType::Int64, true),
- ])
- } else {
- Schema::new(vec![
- Field::new_list(cols[0], Field::new("item", DataType::Utf8,
true), true),
- Field::new(cols[1], DataType::Int64, true),
- Field::new(cols[2], DataType::Int64, true),
- ])
- }
- }
-
- #[test]
- fn get_indexed_field_list_index() -> Result<()> {
- let list_of_lists = vec![
- vec![Some("a"), Some("b"), None],
- vec![None, Some("c"), Some("d")],
- vec![Some("e"), None, Some("f")],
- ];
- let list_of_start_indices = vec![Some(1), Some(2), None];
- let list_of_stop_indices = vec![None];
- let expected_list = vec![Some("a"), Some("c"), None];
-
- let schema = list_schema(&["list", "key"]);
- let (list_col, key_col, _) = build_list_arguments(
- list_of_lists,
- list_of_start_indices,
- list_of_stop_indices,
- );
- let expr = col("list", &schema).unwrap();
- let key = col("key", &schema).unwrap();
- let batch = RecordBatch::try_new(
- Arc::new(schema),
- vec![Arc::new(list_col), Arc::new(key_col)],
- )?;
- let expr = Arc::new(GetIndexedFieldExpr::new_index(expr, key));
- let result = expr
- .evaluate(&batch)?
- .into_array(1)
- .expect("Failed to convert to array");
- let result = as_string_array(&result).expect("failed to downcast to
ListArray");
- let expected = StringArray::from(expected_list);
- assert_eq!(expected, result.clone());
- Ok(())
- }
-
- #[test]
- fn get_indexed_field_list_range() -> Result<()> {
- let list_of_lists = vec![
- vec![Some("a"), Some("b"), None],
- vec![None, Some("c"), Some("d")],
- vec![Some("e"), None, Some("f")],
- ];
- let list_of_start_indices = vec![Some(1), Some(2), None];
- let list_of_stop_indices = vec![Some(2), None, Some(3)];
- let expected_list = vec![
- vec![Some("a"), Some("b")],
- vec![Some("c"), Some("d")],
- vec![Some("e"), None, Some("f")],
- ];
-
- let schema = list_schema(&["list", "start", "stop"]);
- let (list_col, start_col, stop_col) = build_list_arguments(
- list_of_lists,
- list_of_start_indices,
- list_of_stop_indices,
- );
- let expr = col("list", &schema).unwrap();
- let start = col("start", &schema).unwrap();
- let stop = col("stop", &schema).unwrap();
- let batch = RecordBatch::try_new(
- Arc::new(schema),
- vec![Arc::new(list_col), Arc::new(start_col), Arc::new(stop_col)],
- )?;
- let expr = Arc::new(GetIndexedFieldExpr::new_range(expr, start, stop));
- let result = expr
- .evaluate(&batch)?
- .into_array(1)
- .expect("Failed to convert to array");
- let result = as_list_array(&result).expect("failed to downcast to
ListArray");
- let (expected, _, _) =
- build_list_arguments(expected_list, vec![None], vec![None]);
- assert_eq!(expected, result.clone());
- Ok(())
- }
-
- #[test]
- fn get_indexed_field_empty_list() -> Result<()> {
- let schema = list_schema(&["list", "key"]);
- let builder = StringBuilder::new();
- let mut list_builder = ListBuilder::new(builder);
- let key_array = new_empty_array(&DataType::Int64);
- let expr = col("list", &schema).unwrap();
- let key = col("key", &schema).unwrap();
- let batch = RecordBatch::try_new(
- Arc::new(schema),
- vec![Arc::new(list_builder.finish()), key_array],
- )?;
- let expr = Arc::new(GetIndexedFieldExpr::new_index(expr, key));
- let result = expr
- .evaluate(&batch)?
- .into_array(batch.num_rows())
- .expect("Failed to convert to array");
- assert!(result.is_empty());
- Ok(())
- }
-
- #[test]
- fn get_indexed_field_invalid_list_index() -> Result<()> {
- let schema = list_schema(&["list", "error"]);
- let expr = col("list", &schema).unwrap();
- let key = col("error", &schema).unwrap();
- let builder = StringBuilder::with_capacity(3, 1024);
- let mut list_builder = ListBuilder::new(builder);
- list_builder.values().append_value("hello");
- list_builder.append(true);
-
- let key_array = Int64Array::from(vec![Some(3)]);
- let batch = RecordBatch::try_new(
- Arc::new(schema),
- vec![Arc::new(list_builder.finish()), Arc::new(key_array)],
- )?;
- let expr = Arc::new(GetIndexedFieldExpr::new_index(expr, key));
- let result = expr
- .evaluate(&batch)?
- .into_array(1)
- .expect("Failed to convert to array");
- assert!(result.is_null(0));
- Ok(())
- }
-
- #[test]
- fn get_indexed_field_eq() -> Result<()> {
- let schema = list_schema(&["list", "error"]);
- let expr = col("list", &schema).unwrap();
- let key = col("error", &schema).unwrap();
- let indexed_field =
- Arc::new(GetIndexedFieldExpr::new_index(expr.clone(), key.clone()))
- as Arc<dyn PhysicalExpr>;
- let indexed_field_other =
- Arc::new(GetIndexedFieldExpr::new_index(key, expr)) as Arc<dyn
PhysicalExpr>;
- assert!(indexed_field.eq(&indexed_field));
- assert!(!indexed_field.eq(&indexed_field_other));
- Ok(())
- }
}
diff --git a/datafusion/physical-expr/src/planner.rs
b/datafusion/physical-expr/src/planner.rs
index 858dbd30c1..8001f989a2 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -233,22 +233,16 @@ pub fn create_physical_expr(
GetFieldAccess::NamedStructField { name } => {
GetFieldAccessExpr::NamedStructField { name: name.clone() }
}
- GetFieldAccess::ListIndex { key } =>
GetFieldAccessExpr::ListIndex {
- key: create_physical_expr(key, input_dfschema,
execution_props)?,
- },
+ GetFieldAccess::ListIndex { key: _ } => {
+ unreachable!("ListIndex should be rewritten in
OperatorToFunction")
+ }
GetFieldAccess::ListRange {
- start,
- stop,
- stride,
- } => GetFieldAccessExpr::ListRange {
- start: create_physical_expr(start, input_dfschema,
execution_props)?,
- stop: create_physical_expr(stop, input_dfschema,
execution_props)?,
- stride: create_physical_expr(
- stride,
- input_dfschema,
- execution_props,
- )?,
- },
+ start: _,
+ stop: _,
+ stride: _,
+ } => {
+ unreachable!("ListRange should be rewritten in
OperatorToFunction")
+ }
};
Ok(Arc::new(GetIndexedFieldExpr::new(
create_physical_expr(expr, input_dfschema, execution_props)?,
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index a5287e2a57..c068b253ce 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1775,21 +1775,11 @@ message NamedStructFieldExpr {
ScalarValue name = 1;
}
-message ListIndexExpr {
- PhysicalExprNode key = 1;
-}
-
-message ListRangeExpr {
- PhysicalExprNode start = 1;
- PhysicalExprNode stop = 2;
- PhysicalExprNode stride = 3;
-}
-
message PhysicalGetIndexedFieldExprNode {
PhysicalExprNode arg = 1;
oneof field {
NamedStructFieldExpr named_struct_field_expr = 2;
- ListIndexExpr list_index_expr = 3;
- ListRangeExpr list_range_expr = 4;
+ // 3 was list_index_expr
+ // 4 was list_range_expr
}
}
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 787cd570b2..7a366c08ad 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -12439,97 +12439,6 @@ impl<'de> serde::Deserialize<'de> for ListIndex {
deserializer.deserialize_struct("datafusion.ListIndex", FIELDS,
GeneratedVisitor)
}
}
-impl serde::Serialize for ListIndexExpr {
- #[allow(deprecated)]
- fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
- where
- S: serde::Serializer,
- {
- use serde::ser::SerializeStruct;
- let mut len = 0;
- if self.key.is_some() {
- len += 1;
- }
- let mut struct_ser =
serializer.serialize_struct("datafusion.ListIndexExpr", len)?;
- if let Some(v) = self.key.as_ref() {
- struct_ser.serialize_field("key", v)?;
- }
- struct_ser.end()
- }
-}
-impl<'de> serde::Deserialize<'de> for ListIndexExpr {
- #[allow(deprecated)]
- fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
- where
- D: serde::Deserializer<'de>,
- {
- const FIELDS: &[&str] = &[
- "key",
- ];
-
- #[allow(clippy::enum_variant_names)]
- enum GeneratedField {
- Key,
- }
- impl<'de> serde::Deserialize<'de> for GeneratedField {
- fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
- where
- D: serde::Deserializer<'de>,
- {
- struct GeneratedVisitor;
-
- impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
- type Value = GeneratedField;
-
- fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(formatter, "expected one of: {:?}", &FIELDS)
- }
-
- #[allow(unused_variables)]
- fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
- where
- E: serde::de::Error,
- {
- match value {
- "key" => Ok(GeneratedField::Key),
- _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
- }
- }
- }
- deserializer.deserialize_identifier(GeneratedVisitor)
- }
- }
- struct GeneratedVisitor;
- impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
- type Value = ListIndexExpr;
-
- fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
- formatter.write_str("struct datafusion.ListIndexExpr")
- }
-
- fn visit_map<V>(self, mut map_: V) ->
std::result::Result<ListIndexExpr, V::Error>
- where
- V: serde::de::MapAccess<'de>,
- {
- let mut key__ = None;
- while let Some(k) = map_.next_key()? {
- match k {
- GeneratedField::Key => {
- if key__.is_some() {
- return
Err(serde::de::Error::duplicate_field("key"));
- }
- key__ = map_.next_value()?;
- }
- }
- }
- Ok(ListIndexExpr {
- key: key__,
- })
- }
- }
- deserializer.deserialize_struct("datafusion.ListIndexExpr", FIELDS,
GeneratedVisitor)
- }
-}
impl serde::Serialize for ListRange {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
@@ -12655,131 +12564,6 @@ impl<'de> serde::Deserialize<'de> for ListRange {
deserializer.deserialize_struct("datafusion.ListRange", FIELDS,
GeneratedVisitor)
}
}
-impl serde::Serialize for ListRangeExpr {
- #[allow(deprecated)]
- fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
- where
- S: serde::Serializer,
- {
- use serde::ser::SerializeStruct;
- let mut len = 0;
- if self.start.is_some() {
- len += 1;
- }
- if self.stop.is_some() {
- len += 1;
- }
- if self.stride.is_some() {
- len += 1;
- }
- let mut struct_ser =
serializer.serialize_struct("datafusion.ListRangeExpr", len)?;
- if let Some(v) = self.start.as_ref() {
- struct_ser.serialize_field("start", v)?;
- }
- if let Some(v) = self.stop.as_ref() {
- struct_ser.serialize_field("stop", v)?;
- }
- if let Some(v) = self.stride.as_ref() {
- struct_ser.serialize_field("stride", v)?;
- }
- struct_ser.end()
- }
-}
-impl<'de> serde::Deserialize<'de> for ListRangeExpr {
- #[allow(deprecated)]
- fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
- where
- D: serde::Deserializer<'de>,
- {
- const FIELDS: &[&str] = &[
- "start",
- "stop",
- "stride",
- ];
-
- #[allow(clippy::enum_variant_names)]
- enum GeneratedField {
- Start,
- Stop,
- Stride,
- }
- impl<'de> serde::Deserialize<'de> for GeneratedField {
- fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
- where
- D: serde::Deserializer<'de>,
- {
- struct GeneratedVisitor;
-
- impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
- type Value = GeneratedField;
-
- fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(formatter, "expected one of: {:?}", &FIELDS)
- }
-
- #[allow(unused_variables)]
- fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
- where
- E: serde::de::Error,
- {
- match value {
- "start" => Ok(GeneratedField::Start),
- "stop" => Ok(GeneratedField::Stop),
- "stride" => Ok(GeneratedField::Stride),
- _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
- }
- }
- }
- deserializer.deserialize_identifier(GeneratedVisitor)
- }
- }
- struct GeneratedVisitor;
- impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
- type Value = ListRangeExpr;
-
- fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
- formatter.write_str("struct datafusion.ListRangeExpr")
- }
-
- fn visit_map<V>(self, mut map_: V) ->
std::result::Result<ListRangeExpr, V::Error>
- where
- V: serde::de::MapAccess<'de>,
- {
- let mut start__ = None;
- let mut stop__ = None;
- let mut stride__ = None;
- while let Some(k) = map_.next_key()? {
- match k {
- GeneratedField::Start => {
- if start__.is_some() {
- return
Err(serde::de::Error::duplicate_field("start"));
- }
- start__ = map_.next_value()?;
- }
- GeneratedField::Stop => {
- if stop__.is_some() {
- return
Err(serde::de::Error::duplicate_field("stop"));
- }
- stop__ = map_.next_value()?;
- }
- GeneratedField::Stride => {
- if stride__.is_some() {
- return
Err(serde::de::Error::duplicate_field("stride"));
- }
- stride__ = map_.next_value()?;
- }
- }
- }
- Ok(ListRangeExpr {
- start: start__,
- stop: stop__,
- stride: stride__,
- })
- }
- }
- deserializer.deserialize_struct("datafusion.ListRangeExpr", FIELDS,
GeneratedVisitor)
- }
-}
impl serde::Serialize for ListingTableScanNode {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
@@ -18156,12 +17940,6 @@ impl serde::Serialize for
PhysicalGetIndexedFieldExprNode {
physical_get_indexed_field_expr_node::Field::NamedStructFieldExpr(v) => {
struct_ser.serialize_field("namedStructFieldExpr", v)?;
}
- physical_get_indexed_field_expr_node::Field::ListIndexExpr(v)
=> {
- struct_ser.serialize_field("listIndexExpr", v)?;
- }
- physical_get_indexed_field_expr_node::Field::ListRangeExpr(v)
=> {
- struct_ser.serialize_field("listRangeExpr", v)?;
- }
}
}
struct_ser.end()
@@ -18177,18 +17955,12 @@ impl<'de> serde::Deserialize<'de> for
PhysicalGetIndexedFieldExprNode {
"arg",
"named_struct_field_expr",
"namedStructFieldExpr",
- "list_index_expr",
- "listIndexExpr",
- "list_range_expr",
- "listRangeExpr",
];
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
Arg,
NamedStructFieldExpr,
- ListIndexExpr,
- ListRangeExpr,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -18212,8 +17984,6 @@ impl<'de> serde::Deserialize<'de> for
PhysicalGetIndexedFieldExprNode {
match value {
"arg" => Ok(GeneratedField::Arg),
"namedStructFieldExpr" | "named_struct_field_expr"
=> Ok(GeneratedField::NamedStructFieldExpr),
- "listIndexExpr" | "list_index_expr" =>
Ok(GeneratedField::ListIndexExpr),
- "listRangeExpr" | "list_range_expr" =>
Ok(GeneratedField::ListRangeExpr),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -18248,20 +18018,6 @@ impl<'de> serde::Deserialize<'de> for
PhysicalGetIndexedFieldExprNode {
return
Err(serde::de::Error::duplicate_field("namedStructFieldExpr"));
}
field__ =
map_.next_value::<::std::option::Option<_>>()?.map(physical_get_indexed_field_expr_node::Field::NamedStructFieldExpr)
-;
- }
- GeneratedField::ListIndexExpr => {
- if field__.is_some() {
- return
Err(serde::de::Error::duplicate_field("listIndexExpr"));
- }
- field__ =
map_.next_value::<::std::option::Option<_>>()?.map(physical_get_indexed_field_expr_node::Field::ListIndexExpr)
-;
- }
- GeneratedField::ListRangeExpr => {
- if field__.is_some() {
- return
Err(serde::de::Error::duplicate_field("listRangeExpr"));
- }
- field__ =
map_.next_value::<::std::option::Option<_>>()?.map(physical_get_indexed_field_expr_node::Field::ListRangeExpr)
;
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 971775e24b..79decc1252 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2529,26 +2529,10 @@ pub struct NamedStructFieldExpr {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct ListIndexExpr {
- #[prost(message, optional, boxed, tag = "1")]
- pub key:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
-}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct ListRangeExpr {
- #[prost(message, optional, boxed, tag = "1")]
- pub start:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
- #[prost(message, optional, boxed, tag = "2")]
- pub stop:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
- #[prost(message, optional, boxed, tag = "3")]
- pub stride:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
-}
-#[allow(clippy::derive_partial_eq_without_eq)]
-#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PhysicalGetIndexedFieldExprNode {
#[prost(message, optional, boxed, tag = "1")]
pub arg:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
- #[prost(oneof = "physical_get_indexed_field_expr_node::Field", tags = "2,
3, 4")]
+ #[prost(oneof = "physical_get_indexed_field_expr_node::Field", tags = "2")]
pub field:
::core::option::Option<physical_get_indexed_field_expr_node::Field>,
}
/// Nested message and enum types in `PhysicalGetIndexedFieldExprNode`.
@@ -2556,12 +2540,10 @@ pub mod physical_get_indexed_field_expr_node {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Field {
+ /// 3 was list_index_expr
+ /// 4 was list_range_expr
#[prost(message, tag = "2")]
NamedStructFieldExpr(super::NamedStructFieldExpr),
- #[prost(message, tag = "3")]
- ListIndexExpr(::prost::alloc::boxed::Box<super::ListIndexExpr>),
- #[prost(message, tag = "4")]
- ListRangeExpr(::prost::alloc::boxed::Box<super::ListRangeExpr>),
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord,
::prost::Enumeration)]
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index af0aa485c3..d3b41f114f 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -389,33 +389,6 @@ pub fn parse_physical_expr(
Some(protobuf::physical_get_indexed_field_expr_node::Field::NamedStructFieldExpr(named_struct_field_expr))
=> GetFieldAccessExpr::NamedStructField{
name: convert_required!(named_struct_field_expr.name)?,
},
-
Some(protobuf::physical_get_indexed_field_expr_node::Field::ListIndexExpr(list_index_expr))
=> GetFieldAccessExpr::ListIndex{
- key: parse_required_physical_expr(
- list_index_expr.key.as_deref(),
- registry,
- "key",
- input_schema,
- )?},
-
Some(protobuf::physical_get_indexed_field_expr_node::Field::ListRangeExpr(list_range_expr))
=> GetFieldAccessExpr::ListRange{
- start: parse_required_physical_expr(
- list_range_expr.start.as_deref(),
- registry,
- "start",
- input_schema,
- )?,
- stop: parse_required_physical_expr(
- list_range_expr.stop.as_deref(),
- registry,
- "stop",
- input_schema
- )?,
- stride: parse_required_physical_expr(
- list_range_expr.stride.as_deref(),
- registry,
- "stride",
- input_schema
- )?,
- },
None =>
return Err(proto_error(
"Field must not be None",
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index c464571893..da4e87b7a8 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -559,20 +559,6 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for
protobuf::PhysicalExprNode {
name: Some(ScalarValue::try_from(name)?)
})
),
- GetFieldAccessExpr::ListIndex{key} => Some(
-
protobuf::physical_get_indexed_field_expr_node::Field::ListIndexExpr(Box::new(protobuf::ListIndexExpr
{
- key: Some(Box::new(key.to_owned().try_into()?))
- }))
- ),
- GetFieldAccessExpr::ListRange { start, stop, stride } => {
- Some(
-
protobuf::physical_get_indexed_field_expr_node::Field::ListRangeExpr(Box::new(protobuf::ListRangeExpr
{
- start:
Some(Box::new(start.to_owned().try_into()?)),
- stop: Some(Box::new(stop.to_owned().try_into()?)),
- stride:
Some(Box::new(stride.to_owned().try_into()?)),
- }))
- )
- }
};
Ok(protobuf::PhysicalExprNode {
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index ff71ee750c..a3c0b3eccd 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -33,7 +33,6 @@ use datafusion::logical_expr::{
create_udf, BuiltinScalarFunction, JoinType, Operator, Volatility,
};
use datafusion::parquet::file::properties::WriterProperties;
-use datafusion::physical_expr::expressions::Literal;
use datafusion::physical_expr::expressions::NthValueAgg;
use datafusion::physical_expr::window::SlidingAggregateWindowExpr;
use datafusion::physical_expr::{PhysicalSortRequirement, ScalarFunctionExpr};
@@ -736,65 +735,6 @@ fn roundtrip_get_indexed_field_named_struct_field() ->
Result<()> {
roundtrip_test(plan)
}
-#[test]
-fn roundtrip_get_indexed_field_list_index() -> Result<()> {
- let fields = vec![
- Field::new("id", DataType::Int64, true),
- Field::new_list("arg", Field::new("item", DataType::Float64, true),
true),
- Field::new("key", DataType::Int64, true),
- ];
-
- let schema = Schema::new(fields);
- let input = Arc::new(PlaceholderRowExec::new(Arc::new(schema.clone())));
-
- let col_arg = col("arg", &schema)?;
- let col_key = col("key", &schema)?;
- let get_indexed_field_expr = Arc::new(GetIndexedFieldExpr::new(
- col_arg,
- GetFieldAccessExpr::ListIndex { key: col_key },
- ));
-
- let plan = Arc::new(ProjectionExec::try_new(
- vec![(get_indexed_field_expr, "result".to_string())],
- input,
- )?);
-
- roundtrip_test(plan)
-}
-
-#[test]
-fn roundtrip_get_indexed_field_list_range() -> Result<()> {
- let fields = vec![
- Field::new("id", DataType::Int64, true),
- Field::new_list("arg", Field::new("item", DataType::Float64, true),
true),
- Field::new("start", DataType::Int64, true),
- Field::new("stop", DataType::Int64, true),
- ];
-
- let schema = Schema::new(fields);
- let input = Arc::new(EmptyExec::new(Arc::new(schema.clone())));
-
- let col_arg = col("arg", &schema)?;
- let col_start = col("start", &schema)?;
- let col_stop = col("stop", &schema)?;
- let get_indexed_field_expr = Arc::new(GetIndexedFieldExpr::new(
- col_arg,
- GetFieldAccessExpr::ListRange {
- start: col_start,
- stop: col_stop,
- stride: Arc::new(Literal::new(ScalarValue::Int64(Some(1))))
- as Arc<dyn PhysicalExpr>,
- },
- ));
-
- let plan = Arc::new(ProjectionExec::try_new(
- vec![(get_indexed_field_expr, "result".to_string())],
- input,
- )?);
-
- roundtrip_test(plan)
-}
-
#[test]
fn roundtrip_analyze() -> Result<()> {
let field_a = Field::new("plan_type", DataType::Utf8, false);