This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4f6062e81b fix: Capture nullability in `Values` node planning (#14472)
4f6062e81b is described below
commit 4f6062e81b7201b58da2b20c63333a5e3ae3c8df
Author: Rohan Krishnaswamy <[email protected]>
AuthorDate: Fri Feb 7 05:44:50 2025 -0800
fix: Capture nullability in `Values` node planning (#14472)
* fix: Capture nullability in `Values` node planning
* fix: Clippy errors
* fix: Insert plan tests (push down casts to values node)
* fix: Insert schema error messages
* refactor: create values fields container
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/expr/src/logical_plan/builder.rs | 53 ++++++++++++++--------
datafusion/sql/src/statement.rs | 4 ++
datafusion/sql/tests/sql_integration.rs | 12 ++---
datafusion/sqllogictest/test_files/insert.slt | 6 +--
.../sqllogictest/test_files/insert_to_external.slt | 6 +--
5 files changed, 51 insertions(+), 30 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 268025c092..4fdfb84aea 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -242,9 +242,10 @@ impl LogicalPlanBuilder {
schema: &DFSchema,
) -> Result<Self> {
let n_cols = values[0].len();
- let mut field_types: Vec<DataType> = Vec::with_capacity(n_cols);
+ let mut fields = ValuesFields::new();
for j in 0..n_cols {
let field_type = schema.field(j).data_type();
+ let field_nullable = schema.field(j).is_nullable();
for row in values.iter() {
let value = &row[j];
let data_type = value.get_type(schema)?;
@@ -260,17 +261,17 @@ impl LogicalPlanBuilder {
}
}
}
- field_types.push(field_type.to_owned());
+ fields.push(field_type.to_owned(), field_nullable);
}
- Self::infer_inner(values, &field_types, schema)
+ Self::infer_inner(values, fields, schema)
}
fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
let n_cols = values[0].len();
let schema = DFSchema::empty();
+ let mut fields = ValuesFields::new();
- let mut field_types: Vec<DataType> = Vec::with_capacity(n_cols);
for j in 0..n_cols {
let mut common_type: Option<DataType> = None;
for (i, row) in values.iter().enumerate() {
@@ -293,20 +294,21 @@ impl LogicalPlanBuilder {
}
// assuming common_type was not set, and no error, therefore the
type should be NULL
// since the code loop skips NULL
- field_types.push(common_type.unwrap_or(DataType::Null));
+ fields.push(common_type.unwrap_or(DataType::Null), true);
}
- Self::infer_inner(values, &field_types, &schema)
+ Self::infer_inner(values, fields, &schema)
}
fn infer_inner(
mut values: Vec<Vec<Expr>>,
- field_types: &[DataType],
+ fields: ValuesFields,
schema: &DFSchema,
) -> Result<Self> {
+ let fields = fields.into_fields();
// wrap cast if data type is not same as common type.
for row in &mut values {
- for (j, field_type) in field_types.iter().enumerate() {
+ for (j, field_type) in fields.iter().map(|f|
f.data_type()).enumerate() {
if let Expr::Literal(ScalarValue::Null) = row[j] {
row[j] = Expr::Literal(ScalarValue::try_from(field_type)?);
} else {
@@ -314,16 +316,8 @@ impl LogicalPlanBuilder {
}
}
}
- let fields = field_types
- .iter()
- .enumerate()
- .map(|(j, data_type)| {
- // naming is following convention
https://www.postgresql.org/docs/current/queries-values.html
- let name = &format!("column{}", j + 1);
- Field::new(name, data_type.clone(), true)
- })
- .collect::<Vec<_>>();
- let dfschema = DFSchema::from_unqualified_fields(fields.into(),
HashMap::new())?;
+
+ let dfschema = DFSchema::from_unqualified_fields(fields,
HashMap::new())?;
let schema = DFSchemaRef::new(dfschema);
Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
@@ -1320,6 +1314,29 @@ impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
}
}
+/// Container used when building fields for a `VALUES` node.
+#[derive(Default)]
+struct ValuesFields {
+ inner: Vec<Field>,
+}
+
+impl ValuesFields {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn push(&mut self, data_type: DataType, nullable: bool) {
+ // Naming follows the convention described here:
+ // https://www.postgresql.org/docs/current/queries-values.html
+ let name = format!("column{}", self.inner.len() + 1);
+ self.inner.push(Field::new(name, data_type, nullable));
+ }
+
+ pub fn into_fields(self) -> Fields {
+ self.inner.into()
+ }
+}
+
pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
let mut name_map = HashMap::new();
fields
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 83c91ecde6..d48cc93ee3 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -1897,6 +1897,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
let column_index = table_schema
.index_of_column_by_name(None, &c)
.ok_or_else(|| unqualified_field_not_found(&c,
&table_schema))?;
+
if value_indices[column_index].is_some() {
return
schema_err!(SchemaError::DuplicateUnqualifiedField {
name: c,
@@ -1937,6 +1938,9 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
// Projection
let mut planner_context =
PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
+ planner_context.set_table_schema(Some(DFSchemaRef::new(
+ DFSchema::from_unqualified_fields(fields.clone(),
Default::default())?,
+ )));
let source = self.query_to_plan(*source, &mut planner_context)?;
if fields.len() != source.schema().fields().len() {
plan_err!("Column count doesn't match insert query!")?;
diff --git a/datafusion/sql/tests/sql_integration.rs
b/datafusion/sql/tests/sql_integration.rs
index 0883f9a469..22af160f72 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -468,10 +468,10 @@ fn plan_insert() {
let sql =
"insert into person (id, first_name, last_name) values (1, 'Alan',
'Turing')";
let plan = "Dml: op=[Insert Into] table=[person]\
- \n Projection: CAST(column1 AS UInt32) AS id, column2 AS
first_name, column3 AS last_name, \
+ \n Projection: column1 AS id, column2 AS first_name, column3
AS last_name, \
CAST(NULL AS Int32) AS age, CAST(NULL AS Utf8) AS
state, CAST(NULL AS Float64) AS salary, \
CAST(NULL AS Timestamp(Nanosecond, None)) AS
birth_date, CAST(NULL AS Int32) AS 😀\
- \n Values: (Int64(1), Utf8(\"Alan\"), Utf8(\"Turing\"))";
+ \n Values: (CAST(Int64(1) AS UInt32), Utf8(\"Alan\"),
Utf8(\"Turing\"))";
quick_test(sql, plan);
}
@@ -480,8 +480,8 @@ fn plan_insert_no_target_columns() {
let sql = "INSERT INTO test_decimal VALUES (1, 2), (3, 4)";
let plan = r#"
Dml: op=[Insert Into] table=[test_decimal]
- Projection: CAST(column1 AS Int32) AS id, CAST(column2 AS Decimal128(10, 2))
AS price
- Values: (Int64(1), Int64(2)), (Int64(3), Int64(4))
+ Projection: column1 AS id, column2 AS price
+ Values: (CAST(Int64(1) AS Int32), CAST(Int64(2) AS Decimal128(10, 2))),
(CAST(Int64(3) AS Int32), CAST(Int64(4) AS Decimal128(10, 2)))
"#
.trim();
quick_test(sql, plan);
@@ -499,11 +499,11 @@ Dml: op=[Insert Into] table=[test_decimal]
)]
#[case::target_column_count_mismatch(
"INSERT INTO person (id, first_name, last_name) VALUES ($1, $2)",
- "Error during planning: Column count doesn't match insert query!"
+ "Error during planning: Inconsistent data length across values list: got 2
values in row 0 but expected 3"
)]
#[case::source_column_count_mismatch(
"INSERT INTO person VALUES ($1, $2)",
- "Error during planning: Column count doesn't match insert query!"
+ "Error during planning: Inconsistent data length across values list: got 2
values in row 0 but expected 8"
)]
#[case::extra_placeholder(
"INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3, $4)",
diff --git a/datafusion/sqllogictest/test_files/insert.slt
b/datafusion/sqllogictest/test_files/insert.slt
index c1050ee032..cbc989841a 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -255,7 +255,7 @@ insert into table_without_values(id, id) values(3, 3);
statement error Arrow error: Cast error: Cannot cast string 'zoo' to value of
Int64 type
insert into table_without_values(name, id) values(4, 'zoo');
-statement error Error during planning: Column count doesn't match insert query!
+statement error Error during planning: Inconsistent data length across values
list: got 2 values in row 0 but expected 1
insert into table_without_values(id) values(4, 'zoo');
# insert NULL values for the missing column (name)
@@ -299,10 +299,10 @@ insert into table_without_values(field1) values(3);
statement error Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
insert into table_without_values(field2) values(300);
-statement error Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
+statement error Invalid argument error: Column 'column1' is declared as
non-nullable but contains null values
insert into table_without_values values(NULL, 300);
-statement error Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
+statement error Invalid argument error: Column 'column1' is declared as
non-nullable but contains null values
insert into table_without_values values(3, 300), (NULL, 400);
query II rowsort
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index e3822cd920..c5fa2b4e1a 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -500,7 +500,7 @@ insert into table_without_values(id, id) values(3, 3);
statement error Arrow error: Cast error: Cannot cast string 'zoo' to value of
Int64 type
insert into table_without_values(name, id) values(4, 'zoo');
-statement error Error during planning: Column count doesn't match insert query!
+statement error Error during planning: Inconsistent data length across values
list: got 2 values in row 0 but expected 1
insert into table_without_values(id) values(4, 'zoo');
# insert NULL values for the missing column (name)
@@ -546,10 +546,10 @@ insert into table_without_values(field1) values(3);
statement error Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
insert into table_without_values(field2) values(300);
-statement error Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
+statement error Invalid argument error: Column 'column1' is declared as
non-nullable but contains null values
insert into table_without_values values(NULL, 300);
-statement error Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
+statement error Invalid argument error: Column 'column1' is declared as
non-nullable but contains null values
insert into table_without_values values(3, 300), (NULL, 400);
query II rowsort
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]