This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4f6062e81b fix: Capture nullability in `Values` node planning (#14472)
4f6062e81b is described below

commit 4f6062e81b7201b58da2b20c63333a5e3ae3c8df
Author: Rohan Krishnaswamy <[email protected]>
AuthorDate: Fri Feb 7 05:44:50 2025 -0800

    fix: Capture nullability in `Values` node planning (#14472)
    
    * fix: Capture nullability in `Values` node planning
    
    * fix: Clippy errors
    
    * fix: Insert plan tests (push down casts to values node)
    
    * fix: Insert schema error messages
    
    * refactor: create values fields container
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/expr/src/logical_plan/builder.rs        | 53 ++++++++++++++--------
 datafusion/sql/src/statement.rs                    |  4 ++
 datafusion/sql/tests/sql_integration.rs            | 12 ++---
 datafusion/sqllogictest/test_files/insert.slt      |  6 +--
 .../sqllogictest/test_files/insert_to_external.slt |  6 +--
 5 files changed, 51 insertions(+), 30 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 268025c092..4fdfb84aea 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -242,9 +242,10 @@ impl LogicalPlanBuilder {
         schema: &DFSchema,
     ) -> Result<Self> {
         let n_cols = values[0].len();
-        let mut field_types: Vec<DataType> = Vec::with_capacity(n_cols);
+        let mut fields = ValuesFields::new();
         for j in 0..n_cols {
             let field_type = schema.field(j).data_type();
+            let field_nullable = schema.field(j).is_nullable();
             for row in values.iter() {
                 let value = &row[j];
                 let data_type = value.get_type(schema)?;
@@ -260,17 +261,17 @@ impl LogicalPlanBuilder {
                     }
                 }
             }
-            field_types.push(field_type.to_owned());
+            fields.push(field_type.to_owned(), field_nullable);
         }
 
-        Self::infer_inner(values, &field_types, schema)
+        Self::infer_inner(values, fields, schema)
     }
 
     fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
         let n_cols = values[0].len();
         let schema = DFSchema::empty();
+        let mut fields = ValuesFields::new();
 
-        let mut field_types: Vec<DataType> = Vec::with_capacity(n_cols);
         for j in 0..n_cols {
             let mut common_type: Option<DataType> = None;
             for (i, row) in values.iter().enumerate() {
@@ -293,20 +294,21 @@ impl LogicalPlanBuilder {
             }
             // assuming common_type was not set, and no error, therefore the 
type should be NULL
             // since the code loop skips NULL
-            field_types.push(common_type.unwrap_or(DataType::Null));
+            fields.push(common_type.unwrap_or(DataType::Null), true);
         }
 
-        Self::infer_inner(values, &field_types, &schema)
+        Self::infer_inner(values, fields, &schema)
     }
 
     fn infer_inner(
         mut values: Vec<Vec<Expr>>,
-        field_types: &[DataType],
+        fields: ValuesFields,
         schema: &DFSchema,
     ) -> Result<Self> {
+        let fields = fields.into_fields();
         // wrap cast if data type is not same as common type.
         for row in &mut values {
-            for (j, field_type) in field_types.iter().enumerate() {
+            for (j, field_type) in fields.iter().map(|f| 
f.data_type()).enumerate() {
                 if let Expr::Literal(ScalarValue::Null) = row[j] {
                     row[j] = Expr::Literal(ScalarValue::try_from(field_type)?);
                 } else {
@@ -314,16 +316,8 @@ impl LogicalPlanBuilder {
                 }
             }
         }
-        let fields = field_types
-            .iter()
-            .enumerate()
-            .map(|(j, data_type)| {
-                // naming is following convention 
https://www.postgresql.org/docs/current/queries-values.html
-                let name = &format!("column{}", j + 1);
-                Field::new(name, data_type.clone(), true)
-            })
-            .collect::<Vec<_>>();
-        let dfschema = DFSchema::from_unqualified_fields(fields.into(), 
HashMap::new())?;
+
+        let dfschema = DFSchema::from_unqualified_fields(fields, 
HashMap::new())?;
         let schema = DFSchemaRef::new(dfschema);
 
         Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
@@ -1320,6 +1314,29 @@ impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
     }
 }
 
+/// Container used when building fields for a `VALUES` node.
+#[derive(Default)]
+struct ValuesFields {
+    inner: Vec<Field>,
+}
+
+impl ValuesFields {
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    pub fn push(&mut self, data_type: DataType, nullable: bool) {
+        // Naming follows the convention described here:
+        // https://www.postgresql.org/docs/current/queries-values.html
+        let name = format!("column{}", self.inner.len() + 1);
+        self.inner.push(Field::new(name, data_type, nullable));
+    }
+
+    pub fn into_fields(self) -> Fields {
+        self.inner.into()
+    }
+}
+
 pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
     let mut name_map = HashMap::new();
     fields
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 83c91ecde6..d48cc93ee3 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -1897,6 +1897,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
                     let column_index = table_schema
                         .index_of_column_by_name(None, &c)
                         .ok_or_else(|| unqualified_field_not_found(&c, 
&table_schema))?;
+
                     if value_indices[column_index].is_some() {
                         return 
schema_err!(SchemaError::DuplicateUnqualifiedField {
                             name: c,
@@ -1937,6 +1938,9 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
         // Projection
         let mut planner_context =
             
PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
+        planner_context.set_table_schema(Some(DFSchemaRef::new(
+            DFSchema::from_unqualified_fields(fields.clone(), 
Default::default())?,
+        )));
         let source = self.query_to_plan(*source, &mut planner_context)?;
         if fields.len() != source.schema().fields().len() {
             plan_err!("Column count doesn't match insert query!")?;
diff --git a/datafusion/sql/tests/sql_integration.rs 
b/datafusion/sql/tests/sql_integration.rs
index 0883f9a469..22af160f72 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -468,10 +468,10 @@ fn plan_insert() {
     let sql =
         "insert into person (id, first_name, last_name) values (1, 'Alan', 
'Turing')";
     let plan = "Dml: op=[Insert Into] table=[person]\
-                \n  Projection: CAST(column1 AS UInt32) AS id, column2 AS 
first_name, column3 AS last_name, \
+                \n  Projection: column1 AS id, column2 AS first_name, column3 
AS last_name, \
                         CAST(NULL AS Int32) AS age, CAST(NULL AS Utf8) AS 
state, CAST(NULL AS Float64) AS salary, \
                         CAST(NULL AS Timestamp(Nanosecond, None)) AS 
birth_date, CAST(NULL AS Int32) AS 😀\
-                \n    Values: (Int64(1), Utf8(\"Alan\"), Utf8(\"Turing\"))";
+                \n    Values: (CAST(Int64(1) AS UInt32), Utf8(\"Alan\"), 
Utf8(\"Turing\"))";
     quick_test(sql, plan);
 }
 
@@ -480,8 +480,8 @@ fn plan_insert_no_target_columns() {
     let sql = "INSERT INTO test_decimal VALUES (1, 2), (3, 4)";
     let plan = r#"
 Dml: op=[Insert Into] table=[test_decimal]
-  Projection: CAST(column1 AS Int32) AS id, CAST(column2 AS Decimal128(10, 2)) 
AS price
-    Values: (Int64(1), Int64(2)), (Int64(3), Int64(4))
+  Projection: column1 AS id, column2 AS price
+    Values: (CAST(Int64(1) AS Int32), CAST(Int64(2) AS Decimal128(10, 2))), 
(CAST(Int64(3) AS Int32), CAST(Int64(4) AS Decimal128(10, 2)))
     "#
     .trim();
     quick_test(sql, plan);
@@ -499,11 +499,11 @@ Dml: op=[Insert Into] table=[test_decimal]
 )]
 #[case::target_column_count_mismatch(
     "INSERT INTO person (id, first_name, last_name) VALUES ($1, $2)",
-    "Error during planning: Column count doesn't match insert query!"
+    "Error during planning: Inconsistent data length across values list: got 2 
values in row 0 but expected 3"
 )]
 #[case::source_column_count_mismatch(
     "INSERT INTO person VALUES ($1, $2)",
-    "Error during planning: Column count doesn't match insert query!"
+    "Error during planning: Inconsistent data length across values list: got 2 
values in row 0 but expected 8"
 )]
 #[case::extra_placeholder(
     "INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3, $4)",
diff --git a/datafusion/sqllogictest/test_files/insert.slt 
b/datafusion/sqllogictest/test_files/insert.slt
index c1050ee032..cbc989841a 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -255,7 +255,7 @@ insert into table_without_values(id, id) values(3, 3);
 statement error Arrow error: Cast error: Cannot cast string 'zoo' to value of 
Int64 type
 insert into table_without_values(name, id) values(4, 'zoo');
 
-statement error Error during planning: Column count doesn't match insert query!
+statement error Error during planning: Inconsistent data length across values 
list: got 2 values in row 0 but expected 1
 insert into table_without_values(id) values(4, 'zoo');
 
 # insert NULL values for the missing column (name)
@@ -299,10 +299,10 @@ insert into table_without_values(field1) values(3);
 statement error Execution error: Invalid batch column at '0' has null but 
schema specifies non-nullable
 insert into table_without_values(field2) values(300);
 
-statement error Execution error: Invalid batch column at '0' has null but 
schema specifies non-nullable
+statement error Invalid argument error: Column 'column1' is declared as 
non-nullable but contains null values
 insert into table_without_values values(NULL, 300);
 
-statement error Execution error: Invalid batch column at '0' has null but 
schema specifies non-nullable
+statement error Invalid argument error: Column 'column1' is declared as 
non-nullable but contains null values
 insert into table_without_values values(3, 300), (NULL, 400);
 
 query II rowsort
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt 
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index e3822cd920..c5fa2b4e1a 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -500,7 +500,7 @@ insert into table_without_values(id, id) values(3, 3);
 statement error Arrow error: Cast error: Cannot cast string 'zoo' to value of 
Int64 type
 insert into table_without_values(name, id) values(4, 'zoo');
 
-statement error Error during planning: Column count doesn't match insert query!
+statement error Error during planning: Inconsistent data length across values 
list: got 2 values in row 0 but expected 1
 insert into table_without_values(id) values(4, 'zoo');
 
 # insert NULL values for the missing column (name)
@@ -546,10 +546,10 @@ insert into table_without_values(field1) values(3);
 statement error Execution error: Invalid batch column at '0' has null but 
schema specifies non-nullable
 insert into table_without_values(field2) values(300);
 
-statement error Execution error: Invalid batch column at '0' has null but 
schema specifies non-nullable
+statement error Invalid argument error: Column 'column1' is declared as 
non-nullable but contains null values
 insert into table_without_values values(NULL, 300);
 
-statement error Execution error: Invalid batch column at '0' has null but 
schema specifies non-nullable
+statement error Invalid argument error: Column 'column1' is declared as 
non-nullable but contains null values
 insert into table_without_values values(3, 300), (NULL, 400);
 
 query II rowsort


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to