This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1c73a23900 fix union serialisation order in proto (#13709)
1c73a23900 is described below
commit 1c73a23900ad27a0561ff79d840897916802966b
Author: Onur Satici <[email protected]>
AuthorDate: Thu Dec 12 21:16:11 2024 +0000
fix union serialisation order in proto (#13709)
* fix union serialisation order in proto
* clippy
* address comments
---
datafusion/proto/src/logical_plan/mod.rs | 19 +++++-------
.../proto/tests/cases/roundtrip_logical_plan.rs | 34 ++++++++++++++++++++++
2 files changed, 41 insertions(+), 12 deletions(-)
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index 50636048eb..addafeb762 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -737,23 +737,18 @@ impl AsLogicalPlan for LogicalPlanNode {
builder.build()
}
LogicalPlanType::Union(union) => {
- let mut input_plans: Vec<LogicalPlan> = union
- .inputs
- .iter()
- .map(|i| i.try_into_logical_plan(ctx, extension_codec))
- .collect::<Result<_>>()?;
-
- if input_plans.len() < 2 {
+ if union.inputs.len() < 2 {
return Err( DataFusionError::Internal(String::from(
"Protobuf deserialization error, Union was require at
least two input.",
)));
}
+ let (first, rest) = union.inputs.split_first().unwrap();
+ let mut builder = LogicalPlanBuilder::from(
+ first.try_into_logical_plan(ctx, extension_codec)?,
+ );
- let first = input_plans.pop().ok_or_else(||
DataFusionError::Internal(String::from(
- "Protobuf deserialization error, Union was require at
least two input.",
- )))?;
- let mut builder = LogicalPlanBuilder::from(first);
- for plan in input_plans {
+ for i in rest {
+ let plan = i.try_into_logical_plan(ctx, extension_codec)?;
builder = builder.union(plan)?;
}
builder.build()
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 8c150b20dd..f793e96f61 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -25,6 +25,8 @@ use arrow::datatypes::{
};
use arrow::util::pretty::pretty_format_batches;
use datafusion::datasource::file_format::json::JsonFormatFactory;
+use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion;
+use datafusion::optimizer::Optimizer;
use datafusion_common::parsers::CompressionTypeVariant;
use prost::Message;
use std::any::Any;
@@ -2555,3 +2557,35 @@ async fn roundtrip_recursive_query() {
format!("{}", pretty_format_batches(&output_round_trip).unwrap())
);
}
+
+#[tokio::test]
+async fn roundtrip_union_query() -> Result<()> {
+ let query = "SELECT a FROM t1
+ UNION (SELECT a from t1 UNION SELECT a from t2)";
+
+ let ctx = SessionContext::new();
+ ctx.register_csv("t1", "tests/testdata/test.csv",
CsvReadOptions::default())
+ .await?;
+ ctx.register_csv("t2", "tests/testdata/test.csv",
CsvReadOptions::default())
+ .await?;
+ let dataframe = ctx.sql(query).await?;
+ let plan = dataframe.into_optimized_plan()?;
+
+ let bytes = logical_plan_to_bytes(&plan)?;
+
+ let ctx = SessionContext::new();
+ ctx.register_csv("t1", "tests/testdata/test.csv",
CsvReadOptions::default())
+ .await?;
+ ctx.register_csv("t2", "tests/testdata/test.csv",
CsvReadOptions::default())
+ .await?;
+ let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+ // proto deserialisation only supports 2-way union, hence this plan has
nested unions
+ // apply the flatten unions optimizer rule to be able to compare
+ let optimizer =
Optimizer::with_rules(vec![Arc::new(EliminateNestedUnion::new())]);
+ let unnested = optimizer.optimize(logical_round_trip, &(ctx.state()), |_x,
_y| {})?;
+ assert_eq!(
+ format!("{}", plan.display_indent_schema()),
+ format!("{}", unnested.display_indent_schema()),
+ );
+ Ok(())
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]