This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c73a23900 fix union serialisation order in proto (#13709)
1c73a23900 is described below

commit 1c73a23900ad27a0561ff79d840897916802966b
Author: Onur Satici <[email protected]>
AuthorDate: Thu Dec 12 21:16:11 2024 +0000

    fix union serialisation order in proto (#13709)
    
    * fix union serialisation order in proto
    
    * clippy
    
    * address comments
---
 datafusion/proto/src/logical_plan/mod.rs           | 19 +++++-------
 .../proto/tests/cases/roundtrip_logical_plan.rs    | 34 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 12 deletions(-)

diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index 50636048eb..addafeb762 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -737,23 +737,18 @@ impl AsLogicalPlan for LogicalPlanNode {
                 builder.build()
             }
             LogicalPlanType::Union(union) => {
-                let mut input_plans: Vec<LogicalPlan> = union
-                    .inputs
-                    .iter()
-                    .map(|i| i.try_into_logical_plan(ctx, extension_codec))
-                    .collect::<Result<_>>()?;
-
-                if input_plans.len() < 2 {
+                if union.inputs.len() < 2 {
                     return  Err( DataFusionError::Internal(String::from(
                         "Protobuf deserialization error, Union was require at 
least two input.",
                     )));
                 }
+                let (first, rest) = union.inputs.split_first().unwrap();
+                let mut builder = LogicalPlanBuilder::from(
+                    first.try_into_logical_plan(ctx, extension_codec)?,
+                );
 
-                let first = input_plans.pop().ok_or_else(|| 
DataFusionError::Internal(String::from(
-                    "Protobuf deserialization error, Union was require at 
least two input.",
-                )))?;
-                let mut builder = LogicalPlanBuilder::from(first);
-                for plan in input_plans {
+                for i in rest {
+                    let plan = i.try_into_logical_plan(ctx, extension_codec)?;
                     builder = builder.union(plan)?;
                 }
                 builder.build()
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 8c150b20dd..f793e96f61 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -25,6 +25,8 @@ use arrow::datatypes::{
 };
 use arrow::util::pretty::pretty_format_batches;
 use datafusion::datasource::file_format::json::JsonFormatFactory;
+use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion;
+use datafusion::optimizer::Optimizer;
 use datafusion_common::parsers::CompressionTypeVariant;
 use prost::Message;
 use std::any::Any;
@@ -2555,3 +2557,35 @@ async fn roundtrip_recursive_query() {
         format!("{}", pretty_format_batches(&output_round_trip).unwrap())
     );
 }
+
+#[tokio::test]
+async fn roundtrip_union_query() -> Result<()> {
+    let query = "SELECT a FROM t1
+        UNION (SELECT a from t1 UNION SELECT a from t2)";
+
+    let ctx = SessionContext::new();
+    ctx.register_csv("t1", "tests/testdata/test.csv", 
CsvReadOptions::default())
+        .await?;
+    ctx.register_csv("t2", "tests/testdata/test.csv", 
CsvReadOptions::default())
+        .await?;
+    let dataframe = ctx.sql(query).await?;
+    let plan = dataframe.into_optimized_plan()?;
+
+    let bytes = logical_plan_to_bytes(&plan)?;
+
+    let ctx = SessionContext::new();
+    ctx.register_csv("t1", "tests/testdata/test.csv", 
CsvReadOptions::default())
+        .await?;
+    ctx.register_csv("t2", "tests/testdata/test.csv", 
CsvReadOptions::default())
+        .await?;
+    let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+    // proto deserialisation only supports 2-way union, hence this plan has 
nested unions
+    // apply the flatten unions optimizer rule to be able to compare
+    let optimizer = 
Optimizer::with_rules(vec![Arc::new(EliminateNestedUnion::new())]);
+    let unnested = optimizer.optimize(logical_round_trip, &(ctx.state()), |_x, 
_y| {})?;
+    assert_eq!(
+        format!("{}", plan.display_indent_schema()),
+        format!("{}", unnested.display_indent_schema()),
+    );
+    Ok(())
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to