This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6de0796b4 Ensure the row count is preserved when coalescing over empty 
records (#3439)
6de0796b4 is described below

commit 6de0796b44766a8aef308b09706cddc9609801f0
Author: Batuhan Taskaya <[email protected]>
AuthorDate: Mon Sep 12 21:04:28 2022 +0300

    Ensure the row count is preserved when coalescing over empty records (#3439)
    
    * Ensure the row count is preserved when coalescing over empty records
    
    * Explain the reasoning for not using optimized_plan in tests
---
 datafusion/core/src/physical_plan/coalesce_batches.rs |  8 ++++++--
 datafusion/core/tests/sql/mod.rs                      | 14 +++++++++++---
 2 files changed, 17 insertions(+), 5 deletions(-)

diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs 
b/datafusion/core/src/physical_plan/coalesce_batches.rs
index a257ccf09..f2edacd90 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -33,7 +33,7 @@ use crate::execution::context::TaskContext;
 use arrow::compute::kernels::concat::concat;
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
-use arrow::record_batch::RecordBatch;
+use arrow::record_batch::{RecordBatch, RecordBatchOptions};
 use futures::stream::{Stream, StreamExt};
 use log::trace;
 
@@ -291,7 +291,11 @@ pub fn concat_batches(
         batches.len(),
         row_count
     );
-    RecordBatch::try_new(schema.clone(), arrays)
+
+    let mut options = RecordBatchOptions::default();
+    options.row_count = Some(row_count);
+
+    RecordBatch::try_new_with_options(schema.clone(), arrays, &options)
 }
 
 #[cfg(test)]
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index c16386c5d..ff419f3f2 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -770,14 +770,22 @@ async fn execute_to_batches(ctx: &SessionContext, sql: 
&str) -> Vec<RecordBatch>
         .unwrap();
     let logical_schema = plan.schema();
 
+    // We are not really interested in the direct output of 
optimized_logical_plan
+    // since the physical plan construction already optimizes the given 
logical plan
+    // and we want to avoid double-optimization as a consequence. So we just 
construct
+    // it here to make sure that it doesn't fail at this step and get the 
optimized
+    // schema (to assert later that the logical and optimized schemas are the 
same).
     let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan);
-    let plan = ctx
+    let optimized_logical_plan = ctx
         .optimize(&plan)
         .map_err(|e| format!("{:?} at {}", e, msg))
         .unwrap();
-    let optimized_logical_schema = plan.schema();
+    let optimized_logical_schema = optimized_logical_plan.schema();
 
-    let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
+    let msg = format!(
+        "Creating physical plan for '{}': {:?}",
+        sql, optimized_logical_plan
+    );
     let plan = ctx
         .create_physical_plan(&plan)
         .await

Reply via email to