This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b6c4fed14e improve the ergonomics of creating field and list array 
accesses (#7215)
b6c4fed14e is described below

commit b6c4fed14e3e3335a99d14bec164b953982e92f5
Author: Igor Izvekov <izvei...@gmail.com>
AuthorDate: Mon Aug 7 19:31:55 2023 +0300

    improve the ergonomics of creating field and list array accesses (#7215)
    
    * refactor: improve the ergonomics of creating field and list array accesses
    
    * fix: clippy
    
    * fix: roundtrip_get_indexed_field_named_struct_field
---
 datafusion/core/src/physical_planner.rs            |  37 +-
 datafusion/expr/src/expr.rs                        | 102 +--
 datafusion/expr/src/expr_schema.rs                 |  76 +-
 datafusion/expr/src/field_util.rs                  |  72 +-
 datafusion/expr/src/lib.rs                         |   4 +-
 datafusion/expr/src/tree_node/expr.rs              |  15 +-
 .../src/expressions/get_indexed_field.rs           | 351 ++++-----
 datafusion/physical-expr/src/expressions/mod.rs    |   2 +-
 datafusion/physical-expr/src/planner.rs            |  60 +-
 datafusion/proto/proto/datafusion.proto            |  37 +-
 datafusion/proto/src/generated/pbjson.rs           | 798 ++++++++++++++++++---
 datafusion/proto/src/generated/prost.rs            |  76 +-
 datafusion/proto/src/logical_plan/from_proto.rs    |  46 +-
 datafusion/proto/src/logical_plan/to_proto.rs      |  60 +-
 datafusion/proto/src/physical_plan/from_proto.rs   |  43 +-
 datafusion/proto/src/physical_plan/mod.rs          |  82 ++-
 datafusion/proto/src/physical_plan/to_proto.rs     |  33 +-
 datafusion/sql/src/expr/identifier.rs              |  10 +-
 datafusion/sql/src/expr/mod.rs                     |  58 +-
 datafusion/sql/src/utils.rs                        |  15 +-
 20 files changed, 1387 insertions(+), 590 deletions(-)

diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index e0dba6fe14..172304165a 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -65,7 +65,8 @@ use async_trait::async_trait;
 use datafusion_common::{plan_err, DFSchema, ScalarValue};
 use datafusion_expr::expr::{
     self, AggregateFunction, AggregateUDF, Alias, Between, BinaryExpr, Cast,
-    GetIndexedField, GroupingSet, InList, Like, ScalarUDF, TryCast, 
WindowFunction,
+    GetFieldAccess, GetIndexedField, GroupingSet, InList, Like, ScalarUDF, 
TryCast,
+    WindowFunction,
 };
 use datafusion_expr::expr_rewriter::{unalias, unnormalize_cols};
 use 
datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
@@ -181,28 +182,22 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> 
Result<String> {
             let expr = create_physical_name(expr, false)?;
             Ok(format!("{expr} IS NOT UNKNOWN"))
         }
-        Expr::GetIndexedField(GetIndexedField {
-            key,
-            extra_key,
-            expr,
-        }) => {
+        Expr::GetIndexedField(GetIndexedField { expr, field }) => {
             let expr = create_physical_name(expr, false)?;
+            let name = match field {
+                GetFieldAccess::NamedStructField { name } => 
format!("{expr}[{name}]"),
+                GetFieldAccess::ListIndex { key } => {
+                    let key = create_physical_name(key, false)?;
+                    format!("{expr}[{key}]")
+                }
+                GetFieldAccess::ListRange { start, stop } => {
+                    let start = create_physical_name(start, false)?;
+                    let stop = create_physical_name(stop, false)?;
+                    format!("{expr}[{start}:{stop}]")
+                }
+            };
 
-            if let (Some(list_key), Some(extra_key)) = (&key.list_key, 
extra_key) {
-                let key = create_physical_name(list_key, false)?;
-                let extra_key = create_physical_name(extra_key, false)?;
-                Ok(format!("{expr}[{key}:{extra_key}]"))
-            } else {
-                let key = if let Some(list_key) = &key.list_key {
-                    create_physical_name(list_key, false)?
-                } else if let Some(ScalarValue::Utf8(Some(struct_key))) = 
&key.struct_key
-                {
-                    struct_key.to_string()
-                } else {
-                    String::from("")
-                };
-                Ok(format!("{expr}[{key}]"))
-            }
+            Ok(name)
         }
         Expr::ScalarFunction(func) => {
             create_function_physical_name(&func.fun.to_string(), false, 
&func.args)
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 4adeb0653f..2fb65472c2 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -359,31 +359,15 @@ impl ScalarUDF {
     }
 }
 
-/// Key of `GetIndexedFieldKey`.
-/// This structure is needed to separate the responsibilities of the key for 
`DataType::List` and `DataType::Struct`.
-/// If we use index with `DataType::List`, then we use the `list_key` argument 
with `struct_key` equal to `None`.
-/// If we use index with `DataType::Struct`, then we use the `struct_key` 
argument with `list_key` equal to `None`.
-/// `list_key` can be any expression, unlike `struct_key` which can only be 
`ScalarValue::Utf8`.
 #[derive(Clone, PartialEq, Eq, Hash, Debug)]
-pub struct GetIndexedFieldKey {
-    /// The key expression for `DataType::List`
-    pub list_key: Option<Expr>,
-    /// The key expression for `DataType::Struct`
-    pub struct_key: Option<ScalarValue>,
-}
-
-impl GetIndexedFieldKey {
-    /// Create a new GetIndexedFieldKey expression
-    pub fn new(list_key: Option<Expr>, struct_key: Option<ScalarValue>) -> 
Self {
-        // value must be either `list_key` or `struct_key`
-        assert_ne!(list_key.is_some(), struct_key.is_some());
-        assert_ne!(list_key.is_none(), struct_key.is_none());
-
-        Self {
-            list_key,
-            struct_key,
-        }
-    }
+pub enum GetFieldAccess {
+    /// returns the field `struct[field]`. For example `struct["name"]`
+    NamedStructField { name: ScalarValue },
+    /// single list index
+    // list[i]
+    ListIndex { key: Box<Expr> },
+    /// list range `list[i:j]`
+    ListRange { start: Box<Expr>, stop: Box<Expr> },
 }
 
 /// Returns the field of a [`arrow::array::ListArray`] or 
[`arrow::array::StructArray`] by `key`.
@@ -393,23 +377,13 @@ pub struct GetIndexedField {
     /// The expression to take the field from
     pub expr: Box<Expr>,
     /// The name of the field to take
-    pub key: Box<GetIndexedFieldKey>,
-    /// The right border of the field to take
-    pub extra_key: Option<Box<Expr>>,
+    pub field: GetFieldAccess,
 }
 
 impl GetIndexedField {
     /// Create a new GetIndexedField expression
-    pub fn new(
-        expr: Box<Expr>,
-        key: Box<GetIndexedFieldKey>,
-        extra_key: Option<Box<Expr>>,
-    ) -> Self {
-        Self {
-            expr,
-            key,
-            extra_key,
-        }
+    pub fn new(expr: Box<Expr>, field: GetFieldAccess) -> Self {
+        Self { expr, field }
     }
 }
 
@@ -1178,22 +1152,15 @@ impl fmt::Display for Expr {
             }
             Expr::Wildcard => write!(f, "*"),
             Expr::QualifiedWildcard { qualifier } => write!(f, 
"{qualifier}.*"),
-            Expr::GetIndexedField(GetIndexedField {
-                key,
-                extra_key,
-                expr,
-            }) => {
-                let key = if let Some(list_key) = &key.list_key {
-                    format!("{list_key}")
-                } else {
-                    format!("{0}", key.struct_key.clone().unwrap())
-                };
-                if let Some(extra_key) = extra_key {
-                    write!(f, "({expr})[{key}:{extra_key}]")
-                } else {
-                    write!(f, "({expr})[{key}]")
+            Expr::GetIndexedField(GetIndexedField { field, expr }) => match 
field {
+                GetFieldAccess::NamedStructField { name } => {
+                    write!(f, "({expr})[{name}]")
                 }
-            }
+                GetFieldAccess::ListIndex { key } => write!(f, 
"({expr})[{key}]"),
+                GetFieldAccess::ListRange { start, stop } => {
+                    write!(f, "({expr})[{start}:{stop}]")
+                }
+            },
             Expr::GroupingSet(grouping_sets) => match grouping_sets {
                 GroupingSet::Rollup(exprs) => {
                     // ROLLUP (c0, c1, c2)
@@ -1382,24 +1349,21 @@ fn create_name(e: &Expr) -> Result<String> {
         Expr::ScalarSubquery(subquery) => {
             Ok(subquery.subquery.schema().field(0).name().clone())
         }
-        Expr::GetIndexedField(GetIndexedField {
-            key,
-            extra_key,
-            expr,
-        }) => {
+        Expr::GetIndexedField(GetIndexedField { expr, field }) => {
             let expr = create_name(expr)?;
-            let key = if let Some(list_key) = &key.list_key {
-                create_name(list_key)?
-            } else if let Some(ScalarValue::Utf8(Some(struct_key))) = 
&key.struct_key {
-                struct_key.to_string()
-            } else {
-                String::new()
-            };
-            if let Some(extra_key) = extra_key {
-                let extra_key = create_name(extra_key)?;
-                Ok(format!("{expr}[{key}:{extra_key}]"))
-            } else {
-                Ok(format!("{expr}[{key}]"))
+            match field {
+                GetFieldAccess::NamedStructField { name } => {
+                    Ok(format!("{expr}[{name}]"))
+                }
+                GetFieldAccess::ListIndex { key } => {
+                    let key = create_name(key)?;
+                    Ok(format!("{expr}[{key}]"))
+                }
+                GetFieldAccess::ListRange { start, stop } => {
+                    let start = create_name(start)?;
+                    let stop = create_name(stop)?;
+                    Ok(format!("{expr}[{start}:{stop}]"))
+                }
             }
         }
         Expr::ScalarFunction(func) => {
diff --git a/datafusion/expr/src/expr_schema.rs 
b/datafusion/expr/src/expr_schema.rs
index 6ca32c4377..89e3217f73 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -17,10 +17,11 @@
 
 use super::{Between, Expr, Like};
 use crate::expr::{
-    AggregateFunction, AggregateUDF, Alias, BinaryExpr, Cast, GetIndexedField, 
InList,
-    InSubquery, Placeholder, ScalarFunction, ScalarUDF, Sort, TryCast, 
WindowFunction,
+    AggregateFunction, AggregateUDF, Alias, BinaryExpr, Cast, GetFieldAccess,
+    GetIndexedField, InList, InSubquery, Placeholder, ScalarFunction, 
ScalarUDF, Sort,
+    TryCast, WindowFunction,
 };
-use crate::field_util::get_indexed_field;
+use crate::field_util::{get_indexed_field, GetFieldAccessCharacteristic};
 use crate::type_coercion::binary::get_result_type;
 use crate::{LogicalPlan, Projection, Subquery};
 use arrow::compute::can_cast_types;
@@ -155,24 +156,27 @@ impl ExprSchemable for Expr {
                 // grouping sets do not really have a type and do not appear 
in projections
                 Ok(DataType::Null)
             }
-            Expr::GetIndexedField(GetIndexedField {
-                key,
-                extra_key,
-                expr,
-            }) => {
+            Expr::GetIndexedField(GetIndexedField { expr, field }) => {
                 let expr_dt = expr.get_type(schema)?;
-                let key = if let Some(list_key) = &key.list_key {
-                    (Some(list_key.get_type(schema)?), None)
-                } else {
-                    (None, key.struct_key.clone())
-                };
-                let extra_key_dt = if let Some(extra_key) = extra_key {
-                    Some(extra_key.get_type(schema)?)
-                } else {
-                    None
+                let field_ch = match field {
+                    GetFieldAccess::NamedStructField { name } => {
+                        GetFieldAccessCharacteristic::NamedStructField {
+                            name: name.clone(),
+                        }
+                    }
+                    GetFieldAccess::ListIndex { key } => {
+                        GetFieldAccessCharacteristic::ListIndex {
+                            key_dt: key.get_type(schema)?,
+                        }
+                    }
+                    GetFieldAccess::ListRange { start, stop } => {
+                        GetFieldAccessCharacteristic::ListRange {
+                            start_dt: start.get_type(schema)?,
+                            stop_dt: stop.get_type(schema)?,
+                        }
+                    }
                 };
-                get_indexed_field(&expr_dt, &key, &extra_key_dt)
-                    .map(|x| x.data_type().clone())
+                get_indexed_field(&expr_dt, &field_ch).map(|x| 
x.data_type().clone())
             }
         }
     }
@@ -280,23 +284,27 @@ impl ExprSchemable for Expr {
                 "QualifiedWildcard expressions are not valid in a logical 
query plan"
                     .to_owned(),
             )),
-            Expr::GetIndexedField(GetIndexedField {
-                key,
-                extra_key,
-                expr,
-            }) => {
+            Expr::GetIndexedField(GetIndexedField { expr, field }) => {
                 let expr_dt = expr.get_type(input_schema)?;
-                let key = if let Some(list_key) = &key.list_key {
-                    (Some(list_key.get_type(input_schema)?), None)
-                } else {
-                    (None, key.struct_key.clone())
-                };
-                let extra_key_dt = if let Some(extra_key) = extra_key {
-                    Some(extra_key.get_type(input_schema)?)
-                } else {
-                    None
+                let field_ch = match field {
+                    GetFieldAccess::NamedStructField { name } => {
+                        GetFieldAccessCharacteristic::NamedStructField {
+                            name: name.clone(),
+                        }
+                    }
+                    GetFieldAccess::ListIndex { key } => {
+                        GetFieldAccessCharacteristic::ListIndex {
+                            key_dt: key.get_type(input_schema)?,
+                        }
+                    }
+                    GetFieldAccess::ListRange { start, stop } => {
+                        GetFieldAccessCharacteristic::ListRange {
+                            start_dt: start.get_type(input_schema)?,
+                            stop_dt: stop.get_type(input_schema)?,
+                        }
+                    }
                 };
-                get_indexed_field(&expr_dt, &key, &extra_key_dt).map(|x| 
x.is_nullable())
+                get_indexed_field(&expr_dt, &field_ch).map(|x| x.is_nullable())
             }
             Expr::GroupingSet(_) => {
                 // grouping sets do not really have the concept of nullable 
and do not appear
diff --git a/datafusion/expr/src/field_util.rs 
b/datafusion/expr/src/field_util.rs
index b490a57cb5..5b08e9c7d9 100644
--- a/datafusion/expr/src/field_util.rs
+++ b/datafusion/expr/src/field_util.rs
@@ -20,6 +20,19 @@
 use arrow::datatypes::{DataType, Field};
 use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
 
+pub enum GetFieldAccessCharacteristic {
+    /// returns the field `struct[field]`. For example `struct["name"]`
+    NamedStructField { name: ScalarValue },
+    /// single list index
+    // list[i]
+    ListIndex { key_dt: DataType },
+    /// list range `list[i:j]`
+    ListRange {
+        start_dt: DataType,
+        stop_dt: DataType,
+    },
+}
+
 /// Returns the field access indexed by `key` and/or `extra_key` from a 
[`DataType::List`] or [`DataType::Struct`]
 /// # Error
 /// Errors if
@@ -28,35 +41,44 @@ use datafusion_common::{plan_err, DataFusionError, Result, 
ScalarValue};
 /// * there is no field key is not of the required index type
 pub fn get_indexed_field(
     data_type: &DataType,
-    key: &(Option<DataType>, Option<ScalarValue>),
-    extra_key: &Option<DataType>,
+    field_characteristic: &GetFieldAccessCharacteristic,
 ) -> Result<Field> {
-    match (data_type, key) {
-        (DataType::List(lt), (Some(DataType::Int64), None)) => {
-            match extra_key {
-                Some(DataType::Int64) => Ok(Field::new("list", 
data_type.clone(), true)),
-                None => Ok(Field::new("list", lt.data_type().clone(), true)),
-                _ => Err(DataFusionError::Plan(
-                    "Only ints are valid as an indexed field in a 
list".to_string(),
-                )),
+    match field_characteristic {
+        GetFieldAccessCharacteristic::NamedStructField{ name } => {
+            match (data_type, name) {
+                (DataType::Struct(fields), ScalarValue::Utf8(Some(s))) => {
+                    if s.is_empty() {
+                        plan_err!(
+                            "Struct based indexed access requires a non empty 
string"
+                        )
+                    } else {
+                        let field = fields.iter().find(|f| f.name() == s);
+                        field.ok_or(DataFusionError::Plan(format!("Field {s} 
not found in struct"))).map(|f| f.as_ref().clone())
+                    }
+                }
+                (DataType::Struct(_), _) => plan_err!(
+                    "Only utf8 strings are valid as an indexed field in a 
struct"
+                ),
+                (other, _) => plan_err!("The expression to get an indexed 
field is only valid for `List` or `Struct` types, got {other}"),
+            }
+        }
+        GetFieldAccessCharacteristic::ListIndex{ key_dt } => {
+            match (data_type, key_dt) {
+                (DataType::List(lt), DataType::Int64) => Ok(Field::new("list", 
lt.data_type().clone(), true)),
+                (DataType::List(_), _) => plan_err!(
+                    "Only ints are valid as an indexed field in a list"
+                ),
+                (other, _) => plan_err!("The expression to get an indexed 
field is only valid for `List` or `Struct` types, got {other}"),
             }
         }
-        (DataType::Struct(fields), (None, Some(ScalarValue::Utf8(Some(s))))) 
=> {
-            if s.is_empty() {
-                plan_err!(
-                    "Struct based indexed access requires a non empty string"
-                )
-            } else {
-                let field = fields.iter().find(|f| f.name() == s);
-                field.ok_or(DataFusionError::Plan(format!("Field {s} not found 
in struct"))).map(|f| f.as_ref().clone())
+        GetFieldAccessCharacteristic::ListRange{ start_dt, stop_dt } => {
+            match (data_type, start_dt, stop_dt) {
+                (DataType::List(_), DataType::Int64, DataType::Int64) => 
Ok(Field::new("list", data_type.clone(), true)),
+                (DataType::List(_), _, _) => plan_err!(
+                    "Only ints are valid as an indexed field in a list"
+                ),
+                (other, _, _) => plan_err!("The expression to get an indexed 
field is only valid for `List` or `Struct` types, got {other}"),
             }
         }
-        (DataType::Struct(_), _) => plan_err!(
-            "Only utf8 strings are valid as an indexed field in a struct"
-        ),
-        (DataType::List(_), _) => plan_err!(
-            "Only ints are valid as an indexed field in a list"
-        ),
-        (other, _) => plan_err!("The expression to get an indexed field is 
only valid for `List` or `Struct` types, got {other}"),
     }
 }
diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs
index 8a4eae74fe..d35233bc39 100644
--- a/datafusion/expr/src/lib.rs
+++ b/datafusion/expr/src/lib.rs
@@ -60,8 +60,8 @@ pub use aggregate_function::AggregateFunction;
 pub use built_in_function::BuiltinScalarFunction;
 pub use columnar_value::ColumnarValue;
 pub use expr::{
-    Between, BinaryExpr, Case, Cast, Expr, GetIndexedField, GetIndexedFieldKey,
-    GroupingSet, Like, TryCast,
+    Between, BinaryExpr, Case, Cast, Expr, GetFieldAccess, GetIndexedField, 
GroupingSet,
+    Like, TryCast,
 };
 pub use expr_fn::*;
 pub use expr_schema::ExprSchemable;
diff --git a/datafusion/expr/src/tree_node/expr.rs 
b/datafusion/expr/src/tree_node/expr.rs
index d057260fa5..f74cc164a7 100644
--- a/datafusion/expr/src/tree_node/expr.rs
+++ b/datafusion/expr/src/tree_node/expr.rs
@@ -342,15 +342,12 @@ impl TreeNode for Expr {
             Expr::QualifiedWildcard { qualifier } => {
                 Expr::QualifiedWildcard { qualifier }
             }
-            Expr::GetIndexedField(GetIndexedField {
-                key,
-                extra_key,
-                expr,
-            }) => Expr::GetIndexedField(GetIndexedField::new(
-                transform_boxed(expr, &mut transform)?,
-                key,
-                extra_key,
-            )),
+            Expr::GetIndexedField(GetIndexedField { expr, field }) => {
+                Expr::GetIndexedField(GetIndexedField::new(
+                    transform_boxed(expr, &mut transform)?,
+                    field,
+                ))
+            }
             Expr::Placeholder(Placeholder { id, data_type }) => {
                 Expr::Placeholder(Placeholder { id, data_type })
             }
diff --git a/datafusion/physical-expr/src/expressions/get_indexed_field.rs 
b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
index 4386913d36..e414e594f9 100644
--- a/datafusion/physical-expr/src/expressions/get_indexed_field.rs
+++ b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
@@ -28,72 +28,46 @@ use arrow::{
 };
 use datafusion_common::{cast::as_struct_array, DataFusionError, Result, 
ScalarValue};
 use datafusion_expr::{
-    field_util::get_indexed_field as get_data_type_field, ColumnarValue,
+    field_util::{
+        get_indexed_field as get_data_type_field, GetFieldAccessCharacteristic,
+    },
+    ColumnarValue,
 };
 use std::fmt::Debug;
 use std::hash::{Hash, Hasher};
 use std::{any::Any, sync::Arc};
 
-/// Key of `GetIndexedFieldExpr`.
-/// This structure is needed to separate the responsibilities of the key for 
`DataType::List` and `DataType::Struct`.
-/// If we use index with `DataType::List`, then we use the `list_key` argument 
with `struct_key` equal to `None`.
-/// If we use index with `DataType::Struct`, then we use the `struct_key` 
argument with `list_key` equal to `None`.
-/// `list_key` can be any expression, unlike `struct_key` which can only be 
`ScalarValue::Utf8`.
 #[derive(Clone, Hash, Debug)]
-pub struct GetIndexedFieldExprKey {
-    /// The key expression for `DataType::List`
-    list_key: Option<Arc<dyn PhysicalExpr>>,
-    /// The key expression for `DataType::Struct`
-    struct_key: Option<ScalarValue>,
+pub enum GetFieldAccessExpr {
+    /// returns the field `struct[field]`. For example `struct["name"]`
+    NamedStructField { name: ScalarValue },
+    /// single list index
+    // list[i]
+    ListIndex { key: Arc<dyn PhysicalExpr> },
+    /// list range `list[i:j]`
+    ListRange {
+        start: Arc<dyn PhysicalExpr>,
+        stop: Arc<dyn PhysicalExpr>,
+    },
 }
 
-impl GetIndexedFieldExprKey {
-    /// Create new get field expression key
-    pub fn new(
-        list_key: Option<Arc<dyn PhysicalExpr>>,
-        struct_key: Option<ScalarValue>,
-    ) -> Self {
-        Self {
-            list_key,
-            struct_key,
-        }
-    }
-
-    /// Get the key expression for `DataType::List`
-    pub fn list_key(&self) -> &Option<Arc<dyn PhysicalExpr>> {
-        &self.list_key
-    }
-
-    /// Get the key expression for `DataType::Struct`
-    pub fn struct_key(&self) -> &Option<ScalarValue> {
-        &self.struct_key
-    }
-}
-
-impl std::fmt::Display for GetIndexedFieldExprKey {
+impl std::fmt::Display for GetFieldAccessExpr {
     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        if let Some(list_key) = &self.list_key {
-            write!(f, "{}", list_key)
-        } else {
-            write!(f, "{}", self.struct_key.clone().unwrap())
+        match self {
+            GetFieldAccessExpr::NamedStructField { name } => write!(f, "[{}]", 
name),
+            GetFieldAccessExpr::ListIndex { key } => write!(f, "[{}]", key),
+            GetFieldAccessExpr::ListRange { start, stop } => {
+                write!(f, "[{}:{}]", start, stop)
+            }
         }
     }
 }
 
-impl PartialEq<dyn Any> for GetIndexedFieldExprKey {
+impl PartialEq<dyn Any> for GetFieldAccessExpr {
     fn eq(&self, other: &dyn Any) -> bool {
         down_cast_any_ref(other)
             .downcast_ref::<Self>()
-            .map(|x| {
-                if let Some(list_key) = &self.list_key {
-                    list_key.eq(&x.list_key.clone().unwrap())
-                } else {
-                    self.struct_key
-                        .clone()
-                        .unwrap()
-                        .eq(&x.struct_key.clone().unwrap())
-                }
-            })
+            .map(|x| self.eq(x))
             .unwrap_or(false)
     }
 }
@@ -104,33 +78,18 @@ pub struct GetIndexedFieldExpr {
     /// The expression to find
     arg: Arc<dyn PhysicalExpr>,
     /// The key statement
-    key: GetIndexedFieldExprKey,
-    /// The extra key (it can be used only with `DataType::List`)
-    extra_key: Option<Arc<dyn PhysicalExpr>>,
+    field: GetFieldAccessExpr,
 }
 
 impl GetIndexedFieldExpr {
     /// Create new get field expression
-    pub fn new(
-        arg: Arc<dyn PhysicalExpr>,
-        key: GetIndexedFieldExprKey,
-        extra_key: Option<Arc<dyn PhysicalExpr>>,
-    ) -> Self {
-        Self {
-            arg,
-            key,
-            extra_key,
-        }
-    }
-
-    /// Get the input key
-    pub fn key(&self) -> &GetIndexedFieldExprKey {
-        &self.key
+    pub fn new(arg: Arc<dyn PhysicalExpr>, field: GetFieldAccessExpr) -> Self {
+        Self { arg, field }
     }
 
-    /// Get the input extra key
-    pub fn extra_key(&self) -> &Option<Arc<dyn PhysicalExpr>> {
-        &self.extra_key
+    /// Get the input field
+    pub fn field(&self) -> &GetFieldAccessExpr {
+        &self.field
     }
 
     /// Get the input expression
@@ -141,11 +100,7 @@ impl GetIndexedFieldExpr {
 
 impl std::fmt::Display for GetIndexedFieldExpr {
     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        if let Some(extra_key) = &self.extra_key {
-            write!(f, "({}).[{}:{}]", self.arg, self.key, extra_key)
-        } else {
-            write!(f, "({}).[{}]", self.arg, self.key)
-        }
+        write!(f, "({}).{}", self.arg, self.field)
     }
 }
 
@@ -156,87 +111,94 @@ impl PhysicalExpr for GetIndexedFieldExpr {
 
     fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
         let arg_dt = self.arg.data_type(input_schema)?;
-        let key = if let Some(list_key) = &self.key.list_key {
-            (Some(list_key.data_type(input_schema)?), None)
-        } else {
-            (None, self.key.struct_key.clone())
-        };
-        let extra_key_dt = if let Some(extra_key) = &self.extra_key {
-            Some(extra_key.data_type(input_schema)?)
-        } else {
-            None
+        let field_ch = match &self.field {
+            GetFieldAccessExpr::NamedStructField { name } => {
+                GetFieldAccessCharacteristic::NamedStructField { name: 
name.clone() }
+            }
+            GetFieldAccessExpr::ListIndex { key } => {
+                GetFieldAccessCharacteristic::ListIndex {
+                    key_dt: key.data_type(input_schema)?,
+                }
+            }
+            GetFieldAccessExpr::ListRange { start, stop } => {
+                GetFieldAccessCharacteristic::ListRange {
+                    start_dt: start.data_type(input_schema)?,
+                    stop_dt: stop.data_type(input_schema)?,
+                }
+            }
         };
-        get_data_type_field(&arg_dt, &key, &extra_key_dt).map(|f| 
f.data_type().clone())
+        get_data_type_field(&arg_dt, &field_ch).map(|f| f.data_type().clone())
     }
 
     fn nullable(&self, input_schema: &Schema) -> Result<bool> {
         let arg_dt = self.arg.data_type(input_schema)?;
-        let key = if let Some(list_key) = &self.key.list_key {
-            (Some(list_key.data_type(input_schema)?), None)
-        } else {
-            (None, self.key.struct_key.clone())
-        };
-        let extra_key_dt = if let Some(extra_key) = &self.extra_key {
-            Some(extra_key.data_type(input_schema)?)
-        } else {
-            None
+        let field_ch = match &self.field {
+            GetFieldAccessExpr::NamedStructField { name } => {
+                GetFieldAccessCharacteristic::NamedStructField { name: 
name.clone() }
+            }
+            GetFieldAccessExpr::ListIndex { key } => {
+                GetFieldAccessCharacteristic::ListIndex {
+                    key_dt: key.data_type(input_schema)?,
+                }
+            }
+            GetFieldAccessExpr::ListRange { start, stop } => {
+                GetFieldAccessCharacteristic::ListRange {
+                    start_dt: start.data_type(input_schema)?,
+                    stop_dt: stop.data_type(input_schema)?,
+                }
+            }
         };
-        get_data_type_field(&arg_dt, &key, &extra_key_dt).map(|f| 
f.is_nullable())
+        get_data_type_field(&arg_dt, &field_ch).map(|f| f.is_nullable())
     }
 
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
         let array = self.arg.evaluate(batch)?.into_array(batch.num_rows());
-        if let Some(extra_key) = &self.extra_key {
-            let list_key = self
-                .key
-                .list_key
-                .clone()
-                .unwrap()
-                .evaluate(batch)?
-                .into_array(batch.num_rows());
-            let extra_key = 
extra_key.evaluate(batch)?.into_array(batch.num_rows());
-            match (array.data_type(), list_key.data_type(), 
extra_key.data_type()) {
-                (DataType::List(_), DataType::Int64, DataType::Int64) => 
Ok(ColumnarValue::Array(array_slice(&[
-                    array, list_key, extra_key
+        match &self.field {
+            GetFieldAccessExpr::NamedStructField{name} => match 
(array.data_type(), name) {
+                (DataType::Struct(_), ScalarValue::Utf8(Some(k))) => {
+                    let as_struct_array = as_struct_array(&array)?;
+                    match as_struct_array.column_by_name(k) {
+                        None => Err(DataFusionError::Execution(
+                            format!("get indexed field {k} not found in 
struct"))),
+                        Some(col) => Ok(ColumnarValue::Array(col.clone()))
+                    }
+                }
+                (DataType::Struct(_), name) => Err(DataFusionError::Execution(
+                    format!("get indexed field is only possible on struct with 
utf8 indexes. \
+                             Tried with {name:?} index"))),
+                (dt, name) => Err(DataFusionError::Execution(
+                                format!("get indexed field is only possible on 
lists with int64 indexes or struct \
+                                         with utf8 indexes. Tried {dt:?} with 
{name:?} index"))),
+            },
+            GetFieldAccessExpr::ListIndex{key} => {
+            let key = key.evaluate(batch)?.into_array(batch.num_rows());
+            match (array.data_type(), key.data_type()) {
+                (DataType::List(_), DataType::Int64) => 
Ok(ColumnarValue::Array(array_element(&[
+                    array, key
                 ])?)),
-                (DataType::List(_), key, extra_key) => 
Err(DataFusionError::Execution(
-                    format!("get indexed field is only possible on lists with 
int64 indexes. \
-                             Tried with {key:?} and {extra_key:?} indices"))),
-                (dt, key, extra_key) => Err(DataFusionError::Execution(
-                    format!("get indexed field is only possible on lists with 
int64 indexes or struct \
-                             with utf8 indexes. Tried {dt:?} with {key:?} and 
{extra_key:?} indices"))),
-            }
-        } else if let Some(list_key) = &self.key.list_key {
-            let list_key = 
list_key.evaluate(batch)?.into_array(batch.num_rows());
-            match (array.data_type(), list_key.data_type()) {
-                    (DataType::List(_), DataType::Int64) => 
Ok(ColumnarValue::Array(array_element(&[
-                        array, list_key
+                (DataType::List(_), key) => Err(DataFusionError::Execution(
+                                format!("get indexed field is only possible on 
lists with int64 indexes. \
+                                    Tried with {key:?} index"))),
+                            (dt, key) => Err(DataFusionError::Execution(
+                                        format!("get indexed field is only 
possible on lists with int64 indexes or struct \
+                                                 with utf8 indexes. Tried 
{dt:?} with {key:?} index"))),
+                        }
+                },
+            GetFieldAccessExpr::ListRange{start, stop} => {
+                let start = 
start.evaluate(batch)?.into_array(batch.num_rows());
+                let stop = stop.evaluate(batch)?.into_array(batch.num_rows());
+                match (array.data_type(), start.data_type(), stop.data_type()) 
{
+                    (DataType::List(_), DataType::Int64, DataType::Int64) => 
Ok(ColumnarValue::Array(array_slice(&[
+                        array, start, stop
                     ])?)),
-                    (DataType::List(_), key) => Err(DataFusionError::Execution(
+                    (DataType::List(_), start, stop) => 
Err(DataFusionError::Execution(
                         format!("get indexed field is only possible on lists 
with int64 indexes. \
-                            Tried with {key:?} index"))),
-                    (dt, key) => Err(DataFusionError::Execution(
-                                format!("get indexed field is only possible on 
lists with int64 indexes or struct \
-                                         with utf8 indexes. Tried {dt:?} with 
{key:?} index"))),
-                }
-        } else {
-            let struct_key = self.key.struct_key.clone().unwrap();
-            match (array.data_type(), struct_key) {
-                    (DataType::Struct(_), ScalarValue::Utf8(Some(k))) => {
-                        let as_struct_array = as_struct_array(&array)?;
-                        match as_struct_array.column_by_name(&k) {
-                            None => Err(DataFusionError::Execution(
-                                format!("get indexed field {k} not found in 
struct"))),
-                            Some(col) => Ok(ColumnarValue::Array(col.clone()))
-                        }
-                    }
-                    (DataType::Struct(_), key) => 
Err(DataFusionError::Execution(
-                        format!("get indexed field is only possible on struct 
with utf8 indexes. \
-                                 Tried with {key:?} index"))),
-                    (dt, key) => Err(DataFusionError::Execution(
-                                    format!("get indexed field is only 
possible on lists with int64 indexes or struct \
-                                             with utf8 indexes. Tried {dt:?} 
with {key:?} index"))),
+                                 Tried with {start:?} and {stop:?} indices"))),
+                    (dt, start, stop) => Err(DataFusionError::Execution(
+                        format!("get indexed field is only possible on lists 
with int64 indexes or struct \
+                                 with utf8 indexes. Tried {dt:?} with 
{start:?} and {stop:?} indices"))),
                 }
+            },
         }
     }
 
@@ -250,8 +212,7 @@ impl PhysicalExpr for GetIndexedFieldExpr {
     ) -> Result<Arc<dyn PhysicalExpr>> {
         Ok(Arc::new(GetIndexedFieldExpr::new(
             children[0].clone(),
-            self.key.clone(),
-            self.extra_key.clone(),
+            self.field.clone(),
         )))
     }
 
@@ -265,15 +226,7 @@ impl PartialEq<dyn Any> for GetIndexedFieldExpr {
     fn eq(&self, other: &dyn Any) -> bool {
         down_cast_any_ref(other)
             .downcast_ref::<Self>()
-            .map(|x| {
-                if let Some(extra_key) = &self.extra_key {
-                    self.arg.eq(&x.arg)
-                        && self.key.eq(&x.key)
-                        && extra_key.eq(&x.extra_key)
-                } else {
-                    self.arg.eq(&x.arg) && self.key.eq(&x.key)
-                }
-            })
+            .map(|x| self.arg.eq(&x.arg) && self.field.eq(&x.field))
             .unwrap_or(false)
     }
 }
@@ -294,8 +247,8 @@ mod tests {
 
     fn build_list_arguments(
         list_of_lists: Vec<Vec<Option<&str>>>,
-        list_of_keys: Vec<Option<i64>>,
-        list_of_extra_keys: Vec<Option<i64>>,
+        list_of_start_indices: Vec<Option<i64>>,
+        list_of_stop_indices: Vec<Option<i64>>,
     ) -> (GenericListArray<i32>, Int64Array, Int64Array) {
         let builder = StringBuilder::with_capacity(list_of_lists.len(), 1024);
         let mut list_builder = ListBuilder::new(builder);
@@ -310,13 +263,13 @@ mod tests {
             list_builder.append(true);
         }
 
-        let key_array = Int64Array::from(list_of_keys);
-        let extra_key_array = Int64Array::from(list_of_extra_keys);
-        (list_builder.finish(), key_array, extra_key_array)
+        let start_array = Int64Array::from(list_of_start_indices);
+        let stop_array = Int64Array::from(list_of_stop_indices);
+        (list_builder.finish(), start_array, stop_array)
     }
 
     #[test]
-    fn get_indexed_field_struct() -> Result<()> {
+    fn get_indexed_field_named_struct_field() -> Result<()> {
         let schema = struct_schema();
         let boolean = BooleanArray::from(vec![false, false, true, true]);
         let int = Int64Array::from(vec![42, 28, 19, 31]);
@@ -335,11 +288,9 @@ mod tests {
         let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(struct_array)])?;
         let expr = Arc::new(GetIndexedFieldExpr::new(
             expr,
-            GetIndexedFieldExprKey::new(
-                None,
-                Some(ScalarValue::Utf8(Some(String::from("a")))),
-            ),
-            None,
+            GetFieldAccessExpr::NamedStructField {
+                name: ScalarValue::Utf8(Some(String::from("a"))),
+            },
         ));
         let result = expr.evaluate(&batch)?.into_array(1);
         let result =
@@ -375,29 +326,31 @@ mod tests {
     }
 
     #[test]
-    fn get_indexed_field_list_without_extra_key() -> Result<()> {
+    fn get_indexed_field_list_index() -> Result<()> {
         let list_of_lists = vec![
             vec![Some("a"), Some("b"), None],
             vec![None, Some("c"), Some("d")],
             vec![Some("e"), None, Some("f")],
         ];
-        let list_of_keys = vec![Some(1), Some(2), None];
-        let list_of_extra_keys = vec![None];
+        let list_of_start_indices = vec![Some(1), Some(2), None];
+        let list_of_stop_indices = vec![None];
         let expected_list = vec![Some("a"), Some("c"), None];
 
-        let schema = list_schema(&["l", "k"]);
-        let (list_col, key_col, _) =
-            build_list_arguments(list_of_lists, list_of_keys, 
list_of_extra_keys);
-        let expr = col("l", &schema).unwrap();
-        let key = col("k", &schema).unwrap();
+        let schema = list_schema(&["list", "key"]);
+        let (list_col, key_col, _) = build_list_arguments(
+            list_of_lists,
+            list_of_start_indices,
+            list_of_stop_indices,
+        );
+        let expr = col("list", &schema).unwrap();
+        let key = col("key", &schema).unwrap();
         let batch = RecordBatch::try_new(
             Arc::new(schema),
             vec![Arc::new(list_col), Arc::new(key_col)],
         )?;
         let expr = Arc::new(GetIndexedFieldExpr::new(
             expr,
-            GetIndexedFieldExprKey::new(Some(key), None),
-            None,
+            GetFieldAccessExpr::ListIndex { key },
         ));
         let result = expr.evaluate(&batch)?.into_array(1);
         let result = as_string_array(&result).expect("failed to downcast to 
ListArray");
@@ -407,38 +360,36 @@ mod tests {
     }
 
     #[test]
-    fn get_indexed_field_list_with_extra_key() -> Result<()> {
+    fn get_indexed_field_list_range() -> Result<()> {
         let list_of_lists = vec![
             vec![Some("a"), Some("b"), None],
             vec![None, Some("c"), Some("d")],
             vec![Some("e"), None, Some("f")],
         ];
-        let list_of_keys = vec![Some(1), Some(2), None];
-        let list_of_extra_keys = vec![Some(2), None, Some(3)];
+        let list_of_start_indices = vec![Some(1), Some(2), None];
+        let list_of_stop_indices = vec![Some(2), None, Some(3)];
         let expected_list = vec![
             vec![Some("a"), Some("b")],
             vec![Some("c"), Some("d")],
             vec![Some("e"), None, Some("f")],
         ];
 
-        let schema = list_schema(&["l", "k", "ek"]);
-        let (list_col, key_col, extra_key_col) =
-            build_list_arguments(list_of_lists, list_of_keys, 
list_of_extra_keys);
-        let expr = col("l", &schema).unwrap();
-        let key = col("k", &schema).unwrap();
-        let extra_key = col("ek", &schema).unwrap();
+        let schema = list_schema(&["list", "start", "stop"]);
+        let (list_col, start_col, stop_col) = build_list_arguments(
+            list_of_lists,
+            list_of_start_indices,
+            list_of_stop_indices,
+        );
+        let expr = col("list", &schema).unwrap();
+        let start = col("start", &schema).unwrap();
+        let stop = col("stop", &schema).unwrap();
         let batch = RecordBatch::try_new(
             Arc::new(schema),
-            vec![
-                Arc::new(list_col),
-                Arc::new(key_col),
-                Arc::new(extra_key_col),
-            ],
+            vec![Arc::new(list_col), Arc::new(start_col), Arc::new(stop_col)],
         )?;
         let expr = Arc::new(GetIndexedFieldExpr::new(
             expr,
-            GetIndexedFieldExprKey::new(Some(key), None),
-            Some(extra_key),
+            GetFieldAccessExpr::ListRange { start, stop },
         ));
         let result = expr.evaluate(&batch)?.into_array(1);
         let result = as_list_array(&result).expect("failed to downcast to 
ListArray");
@@ -450,20 +401,19 @@ mod tests {
 
     #[test]
     fn get_indexed_field_empty_list() -> Result<()> {
-        let schema = list_schema(&["l", "k"]);
+        let schema = list_schema(&["list", "key"]);
         let builder = StringBuilder::new();
         let mut list_builder = ListBuilder::new(builder);
         let key_array = new_empty_array(&DataType::Int64);
-        let expr = col("l", &schema).unwrap();
-        let key = col("k", &schema).unwrap();
+        let expr = col("list", &schema).unwrap();
+        let key = col("key", &schema).unwrap();
         let batch = RecordBatch::try_new(
             Arc::new(schema),
             vec![Arc::new(list_builder.finish()), key_array],
         )?;
         let expr = Arc::new(GetIndexedFieldExpr::new(
             expr,
-            GetIndexedFieldExprKey::new(Some(key), None),
-            None,
+            GetFieldAccessExpr::ListIndex { key },
         ));
         let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
         assert!(result.is_null(0));
@@ -472,9 +422,9 @@ mod tests {
 
     #[test]
     fn get_indexed_field_invalid_list_index() -> Result<()> {
-        let schema = list_schema(&["l", "e"]);
-        let expr = col("l", &schema).unwrap();
-        let key_expr = col("e", &schema).unwrap();
+        let schema = list_schema(&["list", "error"]);
+        let expr = col("list", &schema).unwrap();
+        let key = col("error", &schema).unwrap();
         let builder = StringBuilder::with_capacity(3, 1024);
         let mut list_builder = ListBuilder::new(builder);
         list_builder.values().append_value("hello");
@@ -487,8 +437,7 @@ mod tests {
         )?;
         let expr = Arc::new(GetIndexedFieldExpr::new(
             expr,
-            GetIndexedFieldExprKey::new(Some(key_expr), None),
-            None,
+            GetFieldAccessExpr::ListIndex { key },
         ));
         let result = expr.evaluate(&batch)?.into_array(1);
         assert!(result.is_null(0));
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index 7f18f748aa..2dff1ecb5d 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -82,7 +82,7 @@ pub use case::{case, CaseExpr};
 pub use cast::{cast, cast_column, cast_with_options, CastExpr};
 pub use column::{col, Column, UnKnownColumn};
 pub use datetime::{date_time_interval_expr, DateTimeIntervalExpr};
-pub use get_indexed_field::{GetIndexedFieldExpr, GetIndexedFieldExprKey};
+pub use get_indexed_field::{GetFieldAccessExpr, GetIndexedFieldExpr};
 pub use in_list::{in_list, InListExpr};
 pub use is_not_null::{is_not_null, IsNotNullExpr};
 pub use is_null::{is_null, IsNullExpr};
diff --git a/datafusion/physical-expr/src/planner.rs 
b/datafusion/physical-expr/src/planner.rs
index 5de76ac741..f2211701fa 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -19,8 +19,8 @@ use crate::var_provider::is_system_variables;
 use crate::{
     execution_props::ExecutionProps,
     expressions::{
-        self, binary, date_time_interval_expr, like, Column, 
GetIndexedFieldExpr,
-        GetIndexedFieldExprKey, Literal,
+        self, binary, date_time_interval_expr, like, Column, 
GetFieldAccessExpr,
+        GetIndexedFieldExpr, Literal,
     },
     functions, udf,
     var_provider::VarType,
@@ -31,7 +31,8 @@ use datafusion_common::plan_err;
 use datafusion_common::{DFSchema, DataFusionError, Result, ScalarValue};
 use datafusion_expr::expr::{Alias, Cast, InList, ScalarFunction, ScalarUDF};
 use datafusion_expr::{
-    binary_expr, Between, BinaryExpr, Expr, GetIndexedField, Like, Operator, 
TryCast,
+    binary_expr, Between, BinaryExpr, Expr, GetFieldAccess, GetIndexedField, 
Like,
+    Operator, TryCast,
 };
 use std::sync::Arc;
 
@@ -339,33 +340,35 @@ pub fn create_physical_expr(
             input_schema,
             execution_props,
         )?),
-        Expr::GetIndexedField(GetIndexedField {
-            key,
-            extra_key,
-            expr,
-        }) => {
-            let extra_key_expr = if let Some(extra_key) = extra_key {
-                Some(create_physical_expr(
-                    extra_key,
-                    input_dfschema,
-                    input_schema,
-                    execution_props,
-                )?)
-            } else {
-                None
-            };
-            let key_expr = if let Some(list_key) = &key.list_key {
-                GetIndexedFieldExprKey::new(
-                    Some(create_physical_expr(
-                        list_key,
+        Expr::GetIndexedField(GetIndexedField { expr, field }) => {
+            let field = match field {
+                GetFieldAccess::NamedStructField { name } => {
+                    GetFieldAccessExpr::NamedStructField { name: name.clone() }
+                }
+                GetFieldAccess::ListIndex { key } => 
GetFieldAccessExpr::ListIndex {
+                    key: create_physical_expr(
+                        key,
                         input_dfschema,
                         input_schema,
                         execution_props,
-                    )?),
-                    None,
-                )
-            } else {
-                GetIndexedFieldExprKey::new(None, 
Some(key.struct_key.clone().unwrap()))
+                    )?,
+                },
+                GetFieldAccess::ListRange { start, stop } => {
+                    GetFieldAccessExpr::ListRange {
+                        start: create_physical_expr(
+                            start,
+                            input_dfschema,
+                            input_schema,
+                            execution_props,
+                        )?,
+                        stop: create_physical_expr(
+                            stop,
+                            input_dfschema,
+                            input_schema,
+                            execution_props,
+                        )?,
+                    }
+                }
             };
             Ok(Arc::new(GetIndexedFieldExpr::new(
                 create_physical_expr(
@@ -374,8 +377,7 @@ pub fn create_physical_expr(
                     input_schema,
                     execution_props,
                 )?,
-                key_expr,
-                extra_key_expr,
+                field,
             )))
         }
 
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index d118b3c1c9..26353d95fb 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -400,12 +400,26 @@ message RollupNode {
   repeated LogicalExprNode expr = 1;
 }
 
+message NamedStructField {
+  ScalarValue name = 1;
+}
+
+message ListIndex {
+  LogicalExprNode key = 1;
+}
 
+message ListRange {
+  LogicalExprNode start = 1;
+  LogicalExprNode stop = 2;
+}
 
 message GetIndexedField {
   LogicalExprNode expr = 1;
-  LogicalExprNode key = 2;
-  LogicalExprNode extra_key = 3;
+  oneof field {
+    NamedStructField named_struct_field = 2;
+    ListIndex list_index = 3;
+    ListRange list_range = 4;
+  }
 }
 
 message IsNull {
@@ -1489,7 +1503,24 @@ message ColumnStats {
   uint32 distinct_count = 4;
 }
 
+message NamedStructFieldExpr {
+  ScalarValue name = 1;
+}
+
+message ListIndexExpr {
+  PhysicalExprNode key = 1;
+}
+
+message ListRangeExpr {
+  PhysicalExprNode start = 1;
+  PhysicalExprNode stop = 2;
+}
+
 message PhysicalGetIndexedFieldExprNode {
   PhysicalExprNode arg = 1;
-  PhysicalExprNode key = 2;
+  oneof field {
+    NamedStructFieldExpr named_struct_field_expr = 2;
+    ListIndexExpr list_index_expr = 3;
+    ListRangeExpr list_range_expr = 4;
+  }
 }
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 9768dc6fdb..e87aa781b1 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -7273,21 +7273,25 @@ impl serde::Serialize for GetIndexedField {
         if self.expr.is_some() {
             len += 1;
         }
-        if self.key.is_some() {
-            len += 1;
-        }
-        if self.extra_key.is_some() {
+        if self.field.is_some() {
             len += 1;
         }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.GetIndexedField", len)?;
         if let Some(v) = self.expr.as_ref() {
             struct_ser.serialize_field("expr", v)?;
         }
-        if let Some(v) = self.key.as_ref() {
-            struct_ser.serialize_field("key", v)?;
-        }
-        if let Some(v) = self.extra_key.as_ref() {
-            struct_ser.serialize_field("extraKey", v)?;
+        if let Some(v) = self.field.as_ref() {
+            match v {
+                get_indexed_field::Field::NamedStructField(v) => {
+                    struct_ser.serialize_field("namedStructField", v)?;
+                }
+                get_indexed_field::Field::ListIndex(v) => {
+                    struct_ser.serialize_field("listIndex", v)?;
+                }
+                get_indexed_field::Field::ListRange(v) => {
+                    struct_ser.serialize_field("listRange", v)?;
+                }
+            }
         }
         struct_ser.end()
     }
@@ -7300,16 +7304,20 @@ impl<'de> serde::Deserialize<'de> for GetIndexedField {
     {
         const FIELDS: &[&str] = &[
             "expr",
-            "key",
-            "extra_key",
-            "extraKey",
+            "named_struct_field",
+            "namedStructField",
+            "list_index",
+            "listIndex",
+            "list_range",
+            "listRange",
         ];
 
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
             Expr,
-            Key,
-            ExtraKey,
+            NamedStructField,
+            ListIndex,
+            ListRange,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -7332,8 +7340,9 @@ impl<'de> serde::Deserialize<'de> for GetIndexedField {
                     {
                         match value {
                             "expr" => Ok(GeneratedField::Expr),
-                            "key" => Ok(GeneratedField::Key),
-                            "extraKey" | "extra_key" => 
Ok(GeneratedField::ExtraKey),
+                            "namedStructField" | "named_struct_field" => 
Ok(GeneratedField::NamedStructField),
+                            "listIndex" | "list_index" => 
Ok(GeneratedField::ListIndex),
+                            "listRange" | "list_range" => 
Ok(GeneratedField::ListRange),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -7354,8 +7363,7 @@ impl<'de> serde::Deserialize<'de> for GetIndexedField {
                     V: serde::de::MapAccess<'de>,
             {
                 let mut expr__ = None;
-                let mut key__ = None;
-                let mut extra_key__ = None;
+                let mut field__ = None;
                 while let Some(k) = map.next_key()? {
                     match k {
                         GeneratedField::Expr => {
@@ -7364,24 +7372,32 @@ impl<'de> serde::Deserialize<'de> for GetIndexedField {
                             }
                             expr__ = map.next_value()?;
                         }
-                        GeneratedField::Key => {
-                            if key__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("key"));
+                        GeneratedField::NamedStructField => {
+                            if field__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("namedStructField"));
                             }
-                            key__ = map.next_value()?;
+                            field__ = 
map.next_value::<::std::option::Option<_>>()?.map(get_indexed_field::Field::NamedStructField)
+;
+                        }
+                        GeneratedField::ListIndex => {
+                            if field__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("listIndex"));
+                            }
+                            field__ = 
map.next_value::<::std::option::Option<_>>()?.map(get_indexed_field::Field::ListIndex)
+;
                         }
-                        GeneratedField::ExtraKey => {
-                            if extra_key__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("extraKey"));
+                        GeneratedField::ListRange => {
+                            if field__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("listRange"));
                             }
-                            extra_key__ = map.next_value()?;
+                            field__ = 
map.next_value::<::std::option::Option<_>>()?.map(get_indexed_field::Field::ListRange)
+;
                         }
                     }
                 }
                 Ok(GetIndexedField {
                     expr: expr__,
-                    key: key__,
-                    extra_key: extra_key__,
+                    field: field__,
                 })
             }
         }
@@ -10051,41 +10067,423 @@ impl<'de> serde::Deserialize<'de> for LimitNode {
                 let mut fetch__ = None;
                 while let Some(k) = map.next_key()? {
                     match k {
-                        GeneratedField::Input => {
-                            if input__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("input"));
-                            }
-                            input__ = map.next_value()?;
-                        }
-                        GeneratedField::Skip => {
-                            if skip__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("skip"));
+                        GeneratedField::Input => {
+                            if input__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("input"));
+                            }
+                            input__ = map.next_value()?;
+                        }
+                        GeneratedField::Skip => {
+                            if skip__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("skip"));
+                            }
+                            skip__ = 
+                                
Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+                            ;
+                        }
+                        GeneratedField::Fetch => {
+                            if fetch__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("fetch"));
+                            }
+                            fetch__ = 
+                                
Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+                            ;
+                        }
+                    }
+                }
+                Ok(LimitNode {
+                    input: input__,
+                    skip: skip__.unwrap_or_default(),
+                    fetch: fetch__.unwrap_or_default(),
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.LimitNode", FIELDS, 
GeneratedVisitor)
+    }
+}
+impl serde::Serialize for List {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.field_type.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = serializer.serialize_struct("datafusion.List", 
len)?;
+        if let Some(v) = self.field_type.as_ref() {
+            struct_ser.serialize_field("fieldType", v)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for List {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "field_type",
+            "fieldType",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            FieldType,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "fieldType" | "field_type" => 
Ok(GeneratedField::FieldType),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = List;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.List")
+            }
+
+            fn visit_map<V>(self, mut map: V) -> std::result::Result<List, 
V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut field_type__ = None;
+                while let Some(k) = map.next_key()? {
+                    match k {
+                        GeneratedField::FieldType => {
+                            if field_type__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("fieldType"));
+                            }
+                            field_type__ = map.next_value()?;
+                        }
+                    }
+                }
+                Ok(List {
+                    field_type: field_type__,
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.List", FIELDS, 
GeneratedVisitor)
+    }
+}
+impl serde::Serialize for ListIndex {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.key.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.ListIndex", len)?;
+        if let Some(v) = self.key.as_ref() {
+            struct_ser.serialize_field("key", v)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for ListIndex {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "key",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Key,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "key" => Ok(GeneratedField::Key),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = ListIndex;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.ListIndex")
+            }
+
+            fn visit_map<V>(self, mut map: V) -> 
std::result::Result<ListIndex, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut key__ = None;
+                while let Some(k) = map.next_key()? {
+                    match k {
+                        GeneratedField::Key => {
+                            if key__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("key"));
+                            }
+                            key__ = map.next_value()?;
+                        }
+                    }
+                }
+                Ok(ListIndex {
+                    key: key__,
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.ListIndex", FIELDS, 
GeneratedVisitor)
+    }
+}
+impl serde::Serialize for ListIndexExpr {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.key.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.ListIndexExpr", len)?;
+        if let Some(v) = self.key.as_ref() {
+            struct_ser.serialize_field("key", v)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for ListIndexExpr {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "key",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Key,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "key" => Ok(GeneratedField::Key),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = ListIndexExpr;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.ListIndexExpr")
+            }
+
+            fn visit_map<V>(self, mut map: V) -> 
std::result::Result<ListIndexExpr, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut key__ = None;
+                while let Some(k) = map.next_key()? {
+                    match k {
+                        GeneratedField::Key => {
+                            if key__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("key"));
+                            }
+                            key__ = map.next_value()?;
+                        }
+                    }
+                }
+                Ok(ListIndexExpr {
+                    key: key__,
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.ListIndexExpr", FIELDS, 
GeneratedVisitor)
+    }
+}
+impl serde::Serialize for ListRange {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.start.is_some() {
+            len += 1;
+        }
+        if self.stop.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.ListRange", len)?;
+        if let Some(v) = self.start.as_ref() {
+            struct_ser.serialize_field("start", v)?;
+        }
+        if let Some(v) = self.stop.as_ref() {
+            struct_ser.serialize_field("stop", v)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for ListRange {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "start",
+            "stop",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Start,
+            Stop,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "start" => Ok(GeneratedField::Start),
+                            "stop" => Ok(GeneratedField::Stop),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = ListRange;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.ListRange")
+            }
+
+            fn visit_map<V>(self, mut map: V) -> 
std::result::Result<ListRange, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut start__ = None;
+                let mut stop__ = None;
+                while let Some(k) = map.next_key()? {
+                    match k {
+                        GeneratedField::Start => {
+                            if start__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("start"));
                             }
-                            skip__ = 
-                                
Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
-                            ;
+                            start__ = map.next_value()?;
                         }
-                        GeneratedField::Fetch => {
-                            if fetch__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("fetch"));
+                        GeneratedField::Stop => {
+                            if stop__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("stop"));
                             }
-                            fetch__ = 
-                                
Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
-                            ;
+                            stop__ = map.next_value()?;
                         }
                     }
                 }
-                Ok(LimitNode {
-                    input: input__,
-                    skip: skip__.unwrap_or_default(),
-                    fetch: fetch__.unwrap_or_default(),
+                Ok(ListRange {
+                    start: start__,
+                    stop: stop__,
                 })
             }
         }
-        deserializer.deserialize_struct("datafusion.LimitNode", FIELDS, 
GeneratedVisitor)
+        deserializer.deserialize_struct("datafusion.ListRange", FIELDS, 
GeneratedVisitor)
     }
 }
-impl serde::Serialize for List {
+impl serde::Serialize for ListRangeExpr {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
     where
@@ -10093,30 +10491,37 @@ impl serde::Serialize for List {
     {
         use serde::ser::SerializeStruct;
         let mut len = 0;
-        if self.field_type.is_some() {
+        if self.start.is_some() {
             len += 1;
         }
-        let mut struct_ser = serializer.serialize_struct("datafusion.List", 
len)?;
-        if let Some(v) = self.field_type.as_ref() {
-            struct_ser.serialize_field("fieldType", v)?;
+        if self.stop.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.ListRangeExpr", len)?;
+        if let Some(v) = self.start.as_ref() {
+            struct_ser.serialize_field("start", v)?;
+        }
+        if let Some(v) = self.stop.as_ref() {
+            struct_ser.serialize_field("stop", v)?;
         }
         struct_ser.end()
     }
 }
-impl<'de> serde::Deserialize<'de> for List {
+impl<'de> serde::Deserialize<'de> for ListRangeExpr {
     #[allow(deprecated)]
     fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
     where
         D: serde::Deserializer<'de>,
     {
         const FIELDS: &[&str] = &[
-            "field_type",
-            "fieldType",
+            "start",
+            "stop",
         ];
 
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
-            FieldType,
+            Start,
+            Stop,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -10138,7 +10543,8 @@ impl<'de> serde::Deserialize<'de> for List {
                         E: serde::de::Error,
                     {
                         match value {
-                            "fieldType" | "field_type" => 
Ok(GeneratedField::FieldType),
+                            "start" => Ok(GeneratedField::Start),
+                            "stop" => Ok(GeneratedField::Stop),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -10148,33 +10554,41 @@ impl<'de> serde::Deserialize<'de> for List {
         }
         struct GeneratedVisitor;
         impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
-            type Value = List;
+            type Value = ListRangeExpr;
 
             fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
-                formatter.write_str("struct datafusion.List")
+                formatter.write_str("struct datafusion.ListRangeExpr")
             }
 
-            fn visit_map<V>(self, mut map: V) -> std::result::Result<List, 
V::Error>
+            fn visit_map<V>(self, mut map: V) -> 
std::result::Result<ListRangeExpr, V::Error>
                 where
                     V: serde::de::MapAccess<'de>,
             {
-                let mut field_type__ = None;
+                let mut start__ = None;
+                let mut stop__ = None;
                 while let Some(k) = map.next_key()? {
                     match k {
-                        GeneratedField::FieldType => {
-                            if field_type__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("fieldType"));
+                        GeneratedField::Start => {
+                            if start__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("start"));
                             }
-                            field_type__ = map.next_value()?;
+                            start__ = map.next_value()?;
+                        }
+                        GeneratedField::Stop => {
+                            if stop__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("stop"));
+                            }
+                            stop__ = map.next_value()?;
                         }
                     }
                 }
-                Ok(List {
-                    field_type: field_type__,
+                Ok(ListRangeExpr {
+                    start: start__,
+                    stop: stop__,
                 })
             }
         }
-        deserializer.deserialize_struct("datafusion.List", FIELDS, 
GeneratedVisitor)
+        deserializer.deserialize_struct("datafusion.ListRangeExpr", FIELDS, 
GeneratedVisitor)
     }
 }
 impl serde::Serialize for ListingTableScanNode {
@@ -12150,6 +12564,188 @@ impl<'de> serde::Deserialize<'de> for 
MaybePhysicalSortExprs {
         deserializer.deserialize_struct("datafusion.MaybePhysicalSortExprs", 
FIELDS, GeneratedVisitor)
     }
 }
+impl serde::Serialize for NamedStructField {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.name.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.NamedStructField", len)?;
+        if let Some(v) = self.name.as_ref() {
+            struct_ser.serialize_field("name", v)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for NamedStructField {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "name",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Name,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "name" => Ok(GeneratedField::Name),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = NamedStructField;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.NamedStructField")
+            }
+
+            fn visit_map<V>(self, mut map: V) -> 
std::result::Result<NamedStructField, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut name__ = None;
+                while let Some(k) = map.next_key()? {
+                    match k {
+                        GeneratedField::Name => {
+                            if name__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("name"));
+                            }
+                            name__ = map.next_value()?;
+                        }
+                    }
+                }
+                Ok(NamedStructField {
+                    name: name__,
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.NamedStructField", FIELDS, 
GeneratedVisitor)
+    }
+}
+impl serde::Serialize for NamedStructFieldExpr {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.name.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.NamedStructFieldExpr", len)?;
+        if let Some(v) = self.name.as_ref() {
+            struct_ser.serialize_field("name", v)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for NamedStructFieldExpr {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "name",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Name,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "name" => Ok(GeneratedField::Name),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = NamedStructFieldExpr;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.NamedStructFieldExpr")
+            }
+
+            fn visit_map<V>(self, mut map: V) -> 
std::result::Result<NamedStructFieldExpr, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut name__ = None;
+                while let Some(k) = map.next_key()? {
+                    match k {
+                        GeneratedField::Name => {
+                            if name__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("name"));
+                            }
+                            name__ = map.next_value()?;
+                        }
+                    }
+                }
+                Ok(NamedStructFieldExpr {
+                    name: name__,
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.NamedStructFieldExpr", 
FIELDS, GeneratedVisitor)
+    }
+}
 impl serde::Serialize for NegativeNode {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
@@ -14796,15 +15392,25 @@ impl serde::Serialize for 
PhysicalGetIndexedFieldExprNode {
         if self.arg.is_some() {
             len += 1;
         }
-        if self.key.is_some() {
+        if self.field.is_some() {
             len += 1;
         }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.PhysicalGetIndexedFieldExprNode", len)?;
         if let Some(v) = self.arg.as_ref() {
             struct_ser.serialize_field("arg", v)?;
         }
-        if let Some(v) = self.key.as_ref() {
-            struct_ser.serialize_field("key", v)?;
+        if let Some(v) = self.field.as_ref() {
+            match v {
+                
physical_get_indexed_field_expr_node::Field::NamedStructFieldExpr(v) => {
+                    struct_ser.serialize_field("namedStructFieldExpr", v)?;
+                }
+                physical_get_indexed_field_expr_node::Field::ListIndexExpr(v) 
=> {
+                    struct_ser.serialize_field("listIndexExpr", v)?;
+                }
+                physical_get_indexed_field_expr_node::Field::ListRangeExpr(v) 
=> {
+                    struct_ser.serialize_field("listRangeExpr", v)?;
+                }
+            }
         }
         struct_ser.end()
     }
@@ -14817,13 +15423,20 @@ impl<'de> serde::Deserialize<'de> for 
PhysicalGetIndexedFieldExprNode {
     {
         const FIELDS: &[&str] = &[
             "arg",
-            "key",
+            "named_struct_field_expr",
+            "namedStructFieldExpr",
+            "list_index_expr",
+            "listIndexExpr",
+            "list_range_expr",
+            "listRangeExpr",
         ];
 
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
             Arg,
-            Key,
+            NamedStructFieldExpr,
+            ListIndexExpr,
+            ListRangeExpr,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -14846,7 +15459,9 @@ impl<'de> serde::Deserialize<'de> for 
PhysicalGetIndexedFieldExprNode {
                     {
                         match value {
                             "arg" => Ok(GeneratedField::Arg),
-                            "key" => Ok(GeneratedField::Key),
+                            "namedStructFieldExpr" | "named_struct_field_expr" 
=> Ok(GeneratedField::NamedStructFieldExpr),
+                            "listIndexExpr" | "list_index_expr" => 
Ok(GeneratedField::ListIndexExpr),
+                            "listRangeExpr" | "list_range_expr" => 
Ok(GeneratedField::ListRangeExpr),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -14867,7 +15482,7 @@ impl<'de> serde::Deserialize<'de> for 
PhysicalGetIndexedFieldExprNode {
                     V: serde::de::MapAccess<'de>,
             {
                 let mut arg__ = None;
-                let mut key__ = None;
+                let mut field__ = None;
                 while let Some(k) = map.next_key()? {
                     match k {
                         GeneratedField::Arg => {
@@ -14876,17 +15491,32 @@ impl<'de> serde::Deserialize<'de> for 
PhysicalGetIndexedFieldExprNode {
                             }
                             arg__ = map.next_value()?;
                         }
-                        GeneratedField::Key => {
-                            if key__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("key"));
+                        GeneratedField::NamedStructFieldExpr => {
+                            if field__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("namedStructFieldExpr"));
                             }
-                            key__ = map.next_value()?;
+                            field__ = 
map.next_value::<::std::option::Option<_>>()?.map(physical_get_indexed_field_expr_node::Field::NamedStructFieldExpr)
+;
+                        }
+                        GeneratedField::ListIndexExpr => {
+                            if field__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("listIndexExpr"));
+                            }
+                            field__ = 
map.next_value::<::std::option::Option<_>>()?.map(physical_get_indexed_field_expr_node::Field::ListIndexExpr)
+;
+                        }
+                        GeneratedField::ListRangeExpr => {
+                            if field__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("listRangeExpr"));
+                            }
+                            field__ = 
map.next_value::<::std::option::Option<_>>()?.map(physical_get_indexed_field_expr_node::Field::ListRangeExpr)
+;
                         }
                     }
                 }
                 Ok(PhysicalGetIndexedFieldExprNode {
                     arg: arg__,
-                    key: key__,
+                    field: field__,
                 })
             }
         }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index d57455a29f..2ec602eb36 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -611,13 +611,44 @@ pub struct RollupNode {
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
+pub struct NamedStructField {
+    #[prost(message, optional, tag = "1")]
+    pub name: ::core::option::Option<ScalarValue>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ListIndex {
+    #[prost(message, optional, boxed, tag = "1")]
+    pub key: 
::core::option::Option<::prost::alloc::boxed::Box<LogicalExprNode>>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ListRange {
+    #[prost(message, optional, boxed, tag = "1")]
+    pub start: 
::core::option::Option<::prost::alloc::boxed::Box<LogicalExprNode>>,
+    #[prost(message, optional, boxed, tag = "2")]
+    pub stop: 
::core::option::Option<::prost::alloc::boxed::Box<LogicalExprNode>>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GetIndexedField {
     #[prost(message, optional, boxed, tag = "1")]
     pub expr: 
::core::option::Option<::prost::alloc::boxed::Box<LogicalExprNode>>,
-    #[prost(message, optional, boxed, tag = "2")]
-    pub key: 
::core::option::Option<::prost::alloc::boxed::Box<LogicalExprNode>>,
-    #[prost(message, optional, boxed, tag = "3")]
-    pub extra_key: 
::core::option::Option<::prost::alloc::boxed::Box<LogicalExprNode>>,
+    #[prost(oneof = "get_indexed_field::Field", tags = "2, 3, 4")]
+    pub field: ::core::option::Option<get_indexed_field::Field>,
+}
+/// Nested message and enum types in `GetIndexedField`.
+pub mod get_indexed_field {
+    #[allow(clippy::derive_partial_eq_without_eq)]
+    #[derive(Clone, PartialEq, ::prost::Oneof)]
+    pub enum Field {
+        #[prost(message, tag = "2")]
+        NamedStructField(super::NamedStructField),
+        #[prost(message, tag = "3")]
+        ListIndex(::prost::alloc::boxed::Box<super::ListIndex>),
+        #[prost(message, tag = "4")]
+        ListRange(::prost::alloc::boxed::Box<super::ListRange>),
+    }
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
@@ -2123,11 +2154,44 @@ pub struct ColumnStats {
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
+pub struct NamedStructFieldExpr {
+    #[prost(message, optional, tag = "1")]
+    pub name: ::core::option::Option<ScalarValue>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ListIndexExpr {
+    #[prost(message, optional, boxed, tag = "1")]
+    pub key: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ListRangeExpr {
+    #[prost(message, optional, boxed, tag = "1")]
+    pub start: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+    #[prost(message, optional, boxed, tag = "2")]
+    pub stop: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PhysicalGetIndexedFieldExprNode {
     #[prost(message, optional, boxed, tag = "1")]
     pub arg: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
-    #[prost(message, optional, boxed, tag = "2")]
-    pub key: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+    #[prost(oneof = "physical_get_indexed_field_expr_node::Field", tags = "2, 
3, 4")]
+    pub field: 
::core::option::Option<physical_get_indexed_field_expr_node::Field>,
+}
+/// Nested message and enum types in `PhysicalGetIndexedFieldExprNode`.
+pub mod physical_get_indexed_field_expr_node {
+    #[allow(clippy::derive_partial_eq_without_eq)]
+    #[derive(Clone, PartialEq, ::prost::Oneof)]
+    pub enum Field {
+        #[prost(message, tag = "2")]
+        NamedStructFieldExpr(super::NamedStructFieldExpr),
+        #[prost(message, tag = "3")]
+        ListIndexExpr(::prost::alloc::boxed::Box<super::ListIndexExpr>),
+        #[prost(message, tag = "4")]
+        ListRangeExpr(::prost::alloc::boxed::Box<super::ListRangeExpr>),
+    }
 }
 #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, 
::prost::Enumeration)]
 #[repr(i32)]
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index 9d7dd4e490..bc865922d3 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -54,7 +54,7 @@ use datafusion_expr::{
     to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, upper, 
uuid,
     window_frame::regularize,
     AggregateFunction, Between, BinaryExpr, BuiltInWindowFunction, 
BuiltinScalarFunction,
-    Case, Cast, Expr, GetIndexedField, GetIndexedFieldKey, GroupingSet,
+    Case, Cast, Expr, GetFieldAccess, GetIndexedField, GroupingSet,
     GroupingSet::GroupingSets,
     JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, 
WindowFrameBound,
     WindowFrameUnits,
@@ -932,14 +932,48 @@ pub fn parse_expr(
                 })
                 .expect("Binary expression could not be reduced to a single 
expression."))
         }
-        ExprType::GetIndexedField(field) => {
-            let expr = parse_required_expr(field.expr.as_deref(), registry, 
"expr")?;
-            let key = parse_required_expr(field.key.as_deref(), registry, 
"key")?;
+        ExprType::GetIndexedField(get_indexed_field) => {
+            let expr =
+                parse_required_expr(get_indexed_field.expr.as_deref(), 
registry, "expr")?;
+            let field = match &get_indexed_field.field {
+                Some(protobuf::get_indexed_field::Field::NamedStructField(
+                    named_struct_field,
+                )) => GetFieldAccess::NamedStructField {
+                    name: named_struct_field
+                        .name
+                        .as_ref()
+                        .ok_or_else(|| Error::required("value"))?
+                        .try_into()?,
+                },
+                
Some(protobuf::get_indexed_field::Field::ListIndex(list_index)) => {
+                    GetFieldAccess::ListIndex {
+                        key: Box::new(parse_required_expr(
+                            list_index.key.as_deref(),
+                            registry,
+                            "key",
+                        )?),
+                    }
+                }
+                
Some(protobuf::get_indexed_field::Field::ListRange(list_range)) => {
+                    GetFieldAccess::ListRange {
+                        start: Box::new(parse_required_expr(
+                            list_range.start.as_deref(),
+                            registry,
+                            "start",
+                        )?),
+                        stop: Box::new(parse_required_expr(
+                            list_range.stop.as_deref(),
+                            registry,
+                            "stop",
+                        )?),
+                    }
+                }
+                None => return Err(proto_error("Field must not be None")),
+            };
 
             Ok(Expr::GetIndexedField(GetIndexedField::new(
                 Box::new(expr),
-                Box::new(GetIndexedFieldKey::new(Some(key), None)),
-                None,
+                field,
             )))
         }
         ExprType::Column(column) => Ok(Expr::Column(column.into())),
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index 4df96c0c54..9c41dcf3c5 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -36,8 +36,8 @@ use arrow::datatypes::{
 };
 use datafusion_common::{Column, DFField, DFSchemaRef, OwnedTableReference, 
ScalarValue};
 use datafusion_expr::expr::{
-    self, Alias, Between, BinaryExpr, Cast, GetIndexedField, GroupingSet, 
InList, Like,
-    Placeholder, ScalarFunction, ScalarUDF, Sort,
+    self, Alias, Between, BinaryExpr, Cast, GetFieldAccess, GetIndexedField, 
GroupingSet,
+    InList, Like, Placeholder, ScalarFunction, ScalarUDF, Sort,
 };
 use datafusion_expr::{
     logical_plan::PlanType, logical_plan::StringifiedPlan, AggregateFunction,
@@ -955,31 +955,41 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
                 // see discussion in 
https://github.com/apache/arrow-datafusion/issues/2565
                 return Err(Error::General("Proto serialization error: 
Expr::ScalarSubquery(_) | Expr::InSubquery(_) | Expr::Exists { .. } | 
Exp:OuterReferenceColumn not supported".to_string()));
             }
-            Expr::GetIndexedField(GetIndexedField {
-                key,
-                expr,
-                extra_key,
-            }) => Self {
-                expr_type: Some(ExprType::GetIndexedField(Box::new(
-                    if let Some(extra_key) = extra_key {
-                        protobuf::GetIndexedField {
-                            extra_key: 
Some(Box::new(extra_key.as_ref().try_into()?)),
-                            key: Some(Box::new(
-                                
(&key.as_ref().list_key.clone().unwrap()).try_into()?,
-                            )),
-                            expr: Some(Box::new(expr.as_ref().try_into()?)),
-                        }
-                    } else {
+            Expr::GetIndexedField(GetIndexedField { expr, field }) => {
+                let field = match field {
+                    GetFieldAccess::NamedStructField { name } => {
+                        protobuf::get_indexed_field::Field::NamedStructField(
+                            protobuf::NamedStructField {
+                                name: Some(name.try_into()?),
+                            },
+                        )
+                    }
+                    GetFieldAccess::ListIndex { key } => {
+                        protobuf::get_indexed_field::Field::ListIndex(Box::new(
+                            protobuf::ListIndex {
+                                key: Some(Box::new(key.as_ref().try_into()?)),
+                            },
+                        ))
+                    }
+                    GetFieldAccess::ListRange { start, stop } => {
+                        protobuf::get_indexed_field::Field::ListRange(Box::new(
+                            protobuf::ListRange {
+                                start: 
Some(Box::new(start.as_ref().try_into()?)),
+                                stop: 
Some(Box::new(stop.as_ref().try_into()?)),
+                            },
+                        ))
+                    }
+                };
+
+                Self {
+                    expr_type: Some(ExprType::GetIndexedField(Box::new(
                         protobuf::GetIndexedField {
-                            extra_key: None,
-                            key: Some(Box::new(
-                                
(&key.as_ref().list_key.clone().unwrap()).try_into()?,
-                            )),
                             expr: Some(Box::new(expr.as_ref().try_into()?)),
-                        }
-                    },
-                ))),
-            },
+                            field: Some(field),
+                        },
+                    ))),
+                }
+            }
 
             Expr::GroupingSet(GroupingSet::Cube(exprs)) => Self {
                 expr_type: Some(ExprType::Cube(CubeNode {
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index c4c897b13e..e084188ccd 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -30,7 +30,7 @@ use datafusion::execution::FunctionRegistry;
 use datafusion::logical_expr::window_function::WindowFunction;
 use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
 use datafusion::physical_plan::expressions::{
-    date_time_interval_expr, GetIndexedFieldExpr, GetIndexedFieldExprKey,
+    date_time_interval_expr, GetFieldAccessExpr, GetIndexedFieldExpr,
 };
 use datafusion::physical_plan::expressions::{in_list, LikeExpr};
 use datafusion::physical_plan::{
@@ -310,6 +310,36 @@ pub fn parse_physical_expr(
             )?,
         )),
         ExprType::GetIndexedFieldExpr(get_indexed_field_expr) => {
+            let field = match &get_indexed_field_expr.field {
+                
Some(protobuf::physical_get_indexed_field_expr_node::Field::NamedStructFieldExpr(named_struct_field_expr))
 => GetFieldAccessExpr::NamedStructField{
+                    name: convert_required!(named_struct_field_expr.name)?,
+                },
+                
Some(protobuf::physical_get_indexed_field_expr_node::Field::ListIndexExpr(list_index_expr))
 => GetFieldAccessExpr::ListIndex{
+                    key: parse_required_physical_expr(
+                        list_index_expr.key.as_deref(),
+                        registry,
+                        "key",
+                        input_schema,
+                    )?},
+                
Some(protobuf::physical_get_indexed_field_expr_node::Field::ListRangeExpr(list_range_expr))
 => GetFieldAccessExpr::ListRange{
+                    start: parse_required_physical_expr(
+                        list_range_expr.start.as_deref(),
+                        registry,
+                        "start",
+                        input_schema,
+                    )?,
+                    stop: parse_required_physical_expr(
+                        list_range_expr.stop.as_deref(),
+                        registry,
+                        "stop",
+                        input_schema
+                    )?,
+                },
+                None =>                 return Err(proto_error(
+                    "Field must not be None",
+                )),
+            };
+
             Arc::new(GetIndexedFieldExpr::new(
                 parse_required_physical_expr(
                     get_indexed_field_expr.arg.as_deref(),
@@ -317,16 +347,7 @@ pub fn parse_physical_expr(
                     "arg",
                     input_schema,
                 )?,
-                GetIndexedFieldExprKey::new(
-                    Some(parse_required_physical_expr(
-                        get_indexed_field_expr.key.as_deref(),
-                        registry,
-                        "key",
-                        input_schema,
-                    )?),
-                    None,
-                ),
-                None,
+                field,
             ))
         }
     };
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 34d0d74741..fdb2ef88cb 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -1400,8 +1400,8 @@ mod roundtrip_tests {
     use datafusion::physical_expr::ScalarFunctionExpr;
     use datafusion::physical_plan::aggregates::PhysicalGroupBy;
     use datafusion::physical_plan::expressions::{
-        date_time_interval_expr, like, BinaryExpr, GetIndexedFieldExpr,
-        GetIndexedFieldExprKey,
+        date_time_interval_expr, like, BinaryExpr, GetFieldAccessExpr,
+        GetIndexedFieldExpr,
     };
     use datafusion::physical_plan::functions::make_scalar_function;
     use datafusion::physical_plan::projection::ProjectionExec;
@@ -1409,7 +1409,7 @@ mod roundtrip_tests {
     use datafusion::{
         arrow::{
             compute::kernels::sort::SortOptions,
-            datatypes::{DataType, Field, Schema},
+            datatypes::{DataType, Field, Fields, Schema},
         },
         datasource::{
             listing::PartitionedFile,
@@ -1920,22 +1920,82 @@ mod roundtrip_tests {
     }
 
     #[test]
-    fn roundtrip_get_indexed_field() -> Result<()> {
+    fn roundtrip_get_indexed_field_named_struct_field() -> Result<()> {
         let fields = vec![
             Field::new("id", DataType::Int64, true),
-            Field::new_list("a", Field::new("item", DataType::Float64, true), 
true),
-            Field::new("b", DataType::Int64, true),
+            Field::new_struct(
+                "arg",
+                Fields::from(vec![Field::new("name", DataType::Float64, 
true)]),
+                true,
+            ),
         ];
 
         let schema = Schema::new(fields);
         let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
 
-        let col_a = col("a", &schema)?;
-        let col_b = col("b", &schema)?;
+        let col_arg = col("arg", &schema)?;
         let get_indexed_field_expr = Arc::new(GetIndexedFieldExpr::new(
-            col_a,
-            GetIndexedFieldExprKey::new(Some(col_b), None),
-            None,
+            col_arg,
+            GetFieldAccessExpr::NamedStructField {
+                name: ScalarValue::Utf8(Some(String::from("name"))),
+            },
+        ));
+
+        let plan = Arc::new(ProjectionExec::try_new(
+            vec![(get_indexed_field_expr, "result".to_string())],
+            input,
+        )?);
+
+        roundtrip_test(plan)
+    }
+
+    #[test]
+    fn roundtrip_get_indexed_field_list_index() -> Result<()> {
+        let fields = vec![
+            Field::new("id", DataType::Int64, true),
+            Field::new_list("arg", Field::new("item", DataType::Float64, 
true), true),
+            Field::new("key", DataType::Int64, true),
+        ];
+
+        let schema = Schema::new(fields);
+        let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
+
+        let col_arg = col("arg", &schema)?;
+        let col_key = col("key", &schema)?;
+        let get_indexed_field_expr = Arc::new(GetIndexedFieldExpr::new(
+            col_arg,
+            GetFieldAccessExpr::ListIndex { key: col_key },
+        ));
+
+        let plan = Arc::new(ProjectionExec::try_new(
+            vec![(get_indexed_field_expr, "result".to_string())],
+            input,
+        )?);
+
+        roundtrip_test(plan)
+    }
+
+    #[test]
+    fn roundtrip_get_indexed_field_list_range() -> Result<()> {
+        let fields = vec![
+            Field::new("id", DataType::Int64, true),
+            Field::new_list("arg", Field::new("item", DataType::Float64, 
true), true),
+            Field::new("start", DataType::Int64, true),
+            Field::new("stop", DataType::Int64, true),
+        ];
+
+        let schema = Schema::new(fields);
+        let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
+
+        let col_arg = col("arg", &schema)?;
+        let col_start = col("start", &schema)?;
+        let col_stop = col("stop", &schema)?;
+        let get_indexed_field_expr = Arc::new(GetIndexedFieldExpr::new(
+            col_arg,
+            GetFieldAccessExpr::ListRange {
+                start: col_start,
+                stop: col_stop,
+            },
         ));
 
         let plan = Arc::new(ProjectionExec::try_new(
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index f80f44a0e3..45c38b5ad5 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -45,9 +45,12 @@ use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
 use crate::protobuf;
 use crate::protobuf::{
     physical_aggregate_expr_node, PhysicalSortExprNode, 
PhysicalSortExprNodeCollection,
+    ScalarValue,
 };
 use datafusion::logical_expr::BuiltinScalarFunction;
-use datafusion::physical_expr::expressions::{DateTimeIntervalExpr, 
GetIndexedFieldExpr};
+use datafusion::physical_expr::expressions::{
+    DateTimeIntervalExpr, GetFieldAccessExpr, GetIndexedFieldExpr,
+};
 use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
 use datafusion::physical_plan::joins::utils::JoinSide;
 use datafusion::physical_plan::udaf::AggregateFunctionExpr;
@@ -388,19 +391,31 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for 
protobuf::PhysicalExprNode {
                 )),
             })
         } else if let Some(expr) = expr.downcast_ref::<GetIndexedFieldExpr>() {
+            let field = match expr.field() {
+                GetFieldAccessExpr::NamedStructField{name} => Some(
+                    
protobuf::physical_get_indexed_field_expr_node::Field::NamedStructFieldExpr(protobuf::NamedStructFieldExpr
 {
+                        name: Some(ScalarValue::try_from(name)?)
+                    })
+                ),
+                GetFieldAccessExpr::ListIndex{key} => Some(
+                    
protobuf::physical_get_indexed_field_expr_node::Field::ListIndexExpr(Box::new(protobuf::ListIndexExpr
 {
+                        key: Some(Box::new(key.to_owned().try_into()?))
+                    }))
+                ),
+                GetFieldAccessExpr::ListRange{start, stop} => Some(
+                    
protobuf::physical_get_indexed_field_expr_node::Field::ListRangeExpr(Box::new(protobuf::ListRangeExpr
 {
+                        start: Some(Box::new(start.to_owned().try_into()?)),
+                        stop: Some(Box::new(stop.to_owned().try_into()?)),
+                    }))
+                ),
+            };
+
             Ok(protobuf::PhysicalExprNode {
                 expr_type: Some(
                     
protobuf::physical_expr_node::ExprType::GetIndexedFieldExpr(
                         Box::new(protobuf::PhysicalGetIndexedFieldExprNode {
                             arg: 
Some(Box::new(expr.arg().to_owned().try_into()?)),
-                            key: Some(Box::new(
-                                expr.key()
-                                    .list_key()
-                                    .clone()
-                                    .unwrap()
-                                    .to_owned()
-                                    .try_into()?,
-                            )),
+                            field,
                         }),
                     ),
                 ),
diff --git a/datafusion/sql/src/expr/identifier.rs 
b/datafusion/sql/src/expr/identifier.rs
index 084ee38ae3..82e4c959ed 100644
--- a/datafusion/sql/src/expr/identifier.rs
+++ b/datafusion/sql/src/expr/identifier.rs
@@ -19,7 +19,7 @@ use crate::planner::{ContextProvider, PlannerContext, 
SqlToRel};
 use datafusion_common::{
     Column, DFField, DFSchema, DataFusionError, Result, ScalarValue, 
TableReference,
 };
-use datafusion_expr::{Case, Expr, GetIndexedField, GetIndexedFieldKey};
+use datafusion_expr::{Case, Expr, GetFieldAccess, GetIndexedField};
 use sqlparser::ast::{Expr as SQLExpr, Ident};
 
 impl<'a, S: ContextProvider> SqlToRel<'a, S> {
@@ -138,11 +138,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     let nested_name = nested_names[0].to_string();
                     Ok(Expr::GetIndexedField(GetIndexedField::new(
                         Box::new(Expr::Column(field.qualified_column())),
-                        Box::new(GetIndexedFieldKey::new(
-                            None,
-                            Some(ScalarValue::Utf8(Some(nested_name))),
-                        )),
-                        None,
+                        GetFieldAccess::NamedStructField {
+                            name: ScalarValue::Utf8(Some(nested_name)),
+                        },
                     )))
                 }
                 // found matching field with no spare identifier(s)
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
index 83aeb67dd1..aad9f770ff 100644
--- a/datafusion/sql/src/expr/mod.rs
+++ b/datafusion/sql/src/expr/mod.rs
@@ -36,7 +36,7 @@ use datafusion_expr::expr::ScalarFunction;
 use datafusion_expr::expr::{InList, Placeholder};
 use datafusion_expr::{
     col, expr, lit, AggregateFunction, Between, BinaryExpr, 
BuiltinScalarFunction, Cast,
-    Expr, ExprSchemable, GetIndexedField, GetIndexedFieldKey, Like, Operator, 
TryCast,
+    Expr, ExprSchemable, GetFieldAccess, GetIndexedField, Like, Operator, 
TryCast,
 };
 use sqlparser::ast::{ArrayAgg, Expr as SQLExpr, JsonOperator, TrimWhereField, 
Value};
 use sqlparser::parser::ParserError::ParserError;
@@ -511,39 +511,41 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         expr: SQLExpr,
         schema: &DFSchema,
         planner_context: &mut PlannerContext,
-    ) -> Result<(Box<GetIndexedFieldKey>, Option<Box<Expr>>)> {
-        let (key, extra_key) = match expr.clone() {
+    ) -> Result<GetFieldAccess> {
+        let field = match expr.clone() {
+            SQLExpr::Value(
+                Value::SingleQuotedString(s) | Value::DoubleQuotedString(s),
+            ) => GetFieldAccess::NamedStructField {
+                name: ScalarValue::Utf8(Some(s)),
+            },
             SQLExpr::JsonAccess {
                 left,
                 operator: JsonOperator::Colon,
                 right,
             } => {
-                let left =
-                    self.sql_expr_to_logical_expr(*left, schema, 
planner_context)?;
-                let right =
-                    self.sql_expr_to_logical_expr(*right, schema, 
planner_context)?;
-
-                (
-                    GetIndexedFieldKey::new(Some(left), None),
-                    Some(Box::new(right)),
-                )
+                let start = Box::new(self.sql_expr_to_logical_expr(
+                    *left,
+                    schema,
+                    planner_context,
+                )?);
+                let stop = Box::new(self.sql_expr_to_logical_expr(
+                    *right,
+                    schema,
+                    planner_context,
+                )?);
+
+                GetFieldAccess::ListRange { start, stop }
             }
-            SQLExpr::Value(
-                Value::SingleQuotedString(s) | Value::DoubleQuotedString(s),
-            ) => (
-                GetIndexedFieldKey::new(None, 
Some(ScalarValue::Utf8(Some(s)))),
-                None,
-            ),
-            _ => (
-                GetIndexedFieldKey::new(
-                    Some(self.sql_expr_to_logical_expr(expr, schema, 
planner_context)?),
-                    None,
-                ),
-                None,
-            ),
+            _ => GetFieldAccess::ListIndex {
+                key: Box::new(self.sql_expr_to_logical_expr(
+                    expr,
+                    schema,
+                    planner_context,
+                )?),
+            },
         };
 
-        Ok((Box::new(key), extra_key))
+        Ok(field)
     }
 
     fn plan_indexed(
@@ -563,11 +565,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             expr
         };
 
-        let (key, extra_key) = self.plan_indices(indices, schema, 
planner_context)?;
         Ok(Expr::GetIndexedField(GetIndexedField::new(
             Box::new(expr),
-            key,
-            extra_key,
+            self.plan_indices(indices, schema, planner_context)?,
         )))
     }
 }
diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs
index 36503d70df..5a570f3903 100644
--- a/datafusion/sql/src/utils.rs
+++ b/datafusion/sql/src/utils.rs
@@ -377,15 +377,12 @@ where
             ))),
             Expr::Wildcard => Ok(Expr::Wildcard),
             Expr::QualifiedWildcard { .. } => Ok(expr.clone()),
-            Expr::GetIndexedField(GetIndexedField {
-                key,
-                extra_key,
-                expr,
-            }) => Ok(Expr::GetIndexedField(GetIndexedField::new(
-                Box::new(clone_with_replacement(expr.as_ref(), 
replacement_fn)?),
-                key.clone(),
-                extra_key.clone(),
-            ))),
+            Expr::GetIndexedField(GetIndexedField { expr, field }) => {
+                Ok(Expr::GetIndexedField(GetIndexedField::new(
+                    Box::new(clone_with_replacement(expr.as_ref(), 
replacement_fn)?),
+                    field.clone(),
+                )))
+            }
             Expr::GroupingSet(set) => match set {
                 GroupingSet::Rollup(exprs) => 
Ok(Expr::GroupingSet(GroupingSet::Rollup(
                     exprs

Reply via email to