This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 53fc94f22f Datafusion-cli: Redesign the datafusion-cli execution and 
print, make it totally streaming printing without memory overhead (#14877)
53fc94f22f is described below

commit 53fc94f22fc1830a688b0cc474db578096bb72e7
Author: Qi Zhu <[email protected]>
AuthorDate: Wed Feb 26 23:50:31 2025 +0800

    Datafusion-cli: Redesign the datafusion-cli execution and print, make it 
totally streaming printing without memory overhead (#14877)
    
    * Inital version for streaming batch preview
    
    * Add more reasonable code
    
    * polish code
    
    * polish code
    
    * Add unit test
    
    * fmt
    
    * remove println
---
 datafusion-cli/src/exec.rs          |  69 +++---
 datafusion-cli/src/print_format.rs  | 477 ++++++++++++++++++++++++++++++++++++
 datafusion-cli/src/print_options.rs | 201 ++++++++++++---
 3 files changed, 678 insertions(+), 69 deletions(-)

diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index a9e1e8efdf..4144e4f18c 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -26,12 +26,6 @@ use crate::{
     object_storage::get_object_store,
     print_options::{MaxRows, PrintOptions},
 };
-use futures::StreamExt;
-use std::collections::HashMap;
-use std::fs::File;
-use std::io::prelude::*;
-use std::io::BufReader;
-
 use datafusion::common::instant::Instant;
 use datafusion::common::{plan_datafusion_err, plan_err};
 use datafusion::config::ConfigFileType;
@@ -41,13 +35,15 @@ use datafusion::logical_expr::{DdlStatement, LogicalPlan};
 use datafusion::physical_plan::execution_plan::EmissionType;
 use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties};
 use datafusion::sql::parser::{DFParser, Statement};
-use datafusion::sql::sqlparser::dialect::dialect_from_str;
-
-use datafusion::execution::memory_pool::MemoryConsumer;
-use datafusion::physical_plan::spill::get_record_batch_memory_size;
 use datafusion::sql::sqlparser;
+use datafusion::sql::sqlparser::dialect::dialect_from_str;
 use rustyline::error::ReadlineError;
 use rustyline::Editor;
+use std::collections::HashMap;
+use std::fs::File;
+use std::io::prelude::*;
+use std::io::BufReader;
+use std::sync::Arc;
 use tokio::signal;
 
 /// run and execute SQL statements and commands, against a context with the 
given print options
@@ -230,18 +226,17 @@ pub(super) async fn exec_and_print(
     for statement in statements {
         let adjusted =
             
AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement);
-
         let plan = create_plan(ctx, statement).await?;
         let adjusted = adjusted.with_plan(&plan);
 
         let df = ctx.execute_logical_plan(plan).await?;
         let physical_plan = df.create_physical_plan().await?;
 
-        // Track memory usage for the query result if it's bounded
-        let mut reservation =
-            
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());
+        let is_unbounded = physical_plan.boundedness().is_unbounded();
+        let mut stream = execute_stream(Arc::clone(&physical_plan), 
task_ctx.clone())?;
 
-        if physical_plan.boundedness().is_unbounded() {
+        // Both bounded and unbounded streams are streaming prints
+        if is_unbounded {
             if physical_plan.pipeline_behavior() == EmissionType::Final {
                 return plan_err!(
                     "The given query can generate a valid result only once \
@@ -250,37 +245,43 @@ pub(super) async fn exec_and_print(
             }
             // As the input stream comes, we can generate results.
             // However, memory safety is not guaranteed.
-            let stream = execute_stream(physical_plan, task_ctx.clone())?;
-            print_options.print_stream(stream, now).await?;
+            print_options
+                .print_stream(MaxRows::Unlimited, stream, now)
+                .await?;
         } else {
             // Bounded stream; collected results size is limited by the 
maxrows option
             let schema = physical_plan.schema();
-            let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
-            let mut results = vec![];
-            let mut row_count = 0_usize;
             let max_rows = match print_options.maxrows {
                 MaxRows::Unlimited => usize::MAX,
                 MaxRows::Limited(n) => n,
             };
-            while let Some(batch) = stream.next().await {
-                let batch = batch?;
-                let curr_num_rows = batch.num_rows();
-                // Stop collecting results if the number of rows exceeds the 
limit
-                // results batch should include the last batch that exceeds 
the limit
-                if row_count < max_rows + curr_num_rows {
-                    // Try to grow the reservation to accommodate the batch in 
memory
-                    
reservation.try_grow(get_record_batch_memory_size(&batch))?;
-                    results.push(batch);
-                }
-                row_count += curr_num_rows;
+            let stdout = std::io::stdout();
+            let mut writer = stdout.lock();
+
+            // If we don't want to print the table, we should use the 
streaming print same as above
+            if print_options.format != PrintFormat::Table
+                && print_options.format != PrintFormat::Automatic
+            {
+                print_options
+                    .print_stream(print_options.maxrows, stream, now)
+                    .await?;
+                continue;
             }
+
+            // into_inner will finalize the print options to table if it's 
automatic
             adjusted
                 .into_inner()
-                .print_batches(schema, &results, now, row_count)?;
-            reservation.free();
+                .print_table_batch(
+                    print_options,
+                    schema,
+                    &mut stream,
+                    max_rows,
+                    &mut writer,
+                    now,
+                )
+                .await?;
         }
     }
-
     Ok(())
 }
 
diff --git a/datafusion-cli/src/print_format.rs 
b/datafusion-cli/src/print_format.rs
index 1fc9495935..ed3f03781c 100644
--- a/datafusion-cli/src/print_format.rs
+++ b/datafusion-cli/src/print_format.rs
@@ -23,8 +23,10 @@ use crate::print_options::MaxRows;
 
 use arrow::csv::writer::WriterBuilder;
 use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError;
 use arrow::json::{ArrayWriter, LineDelimitedWriter};
 use arrow::record_batch::RecordBatch;
+use arrow::util::display::{ArrayFormatter, ValueFormatter};
 use arrow::util::pretty::pretty_format_batches_with_options;
 use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS;
 use datafusion::error::Result;
@@ -209,6 +211,145 @@ impl PrintFormat {
         }
         Ok(())
     }
+
+    #[allow(clippy::too_many_arguments)]
+    pub fn process_batch(
+        &self,
+        batch: &RecordBatch,
+        schema: SchemaRef,
+        preview_batches: &mut Vec<RecordBatch>,
+        preview_row_count: &mut usize,
+        preview_limit: usize,
+        precomputed_widths: &mut Option<Vec<usize>>,
+        header_printed: &mut bool,
+        writer: &mut dyn std::io::Write,
+    ) -> Result<()> {
+        if precomputed_widths.is_none() {
+            preview_batches.push(batch.clone());
+            *preview_row_count += batch.num_rows();
+            if *preview_row_count >= preview_limit {
+                let widths =
+                    Self::compute_column_widths(self, preview_batches, 
schema.clone())?;
+                *precomputed_widths = Some(widths.clone());
+                Self::print_header(self, &schema, &widths, writer)?;
+                *header_printed = true;
+                for preview_batch in preview_batches.drain(..) {
+                    Self::print_batch_with_widths(self, &preview_batch, 
&widths, writer)?;
+                }
+            }
+        } else {
+            let widths = precomputed_widths.as_ref().unwrap();
+            if !*header_printed {
+                Self::print_header(self, &schema, widths, writer)?;
+                *header_printed = true;
+            }
+            Self::print_batch_with_widths(self, batch, widths, writer)?;
+        }
+        Ok(())
+    }
+
+    pub fn compute_column_widths(
+        &self,
+        batches: &Vec<RecordBatch>,
+        schema: SchemaRef,
+    ) -> Result<Vec<usize>> {
+        let mut widths: Vec<usize> =
+            schema.fields().iter().map(|f| f.name().len()).collect();
+        for batch in batches {
+            let formatters = batch
+                .columns()
+                .iter()
+                .map(|c| ArrayFormatter::try_new(c.as_ref(), 
&DEFAULT_CLI_FORMAT_OPTIONS))
+                .collect::<Result<Vec<_>, ArrowError>>()?;
+            for row in 0..batch.num_rows() {
+                for (i, formatter) in formatters.iter().enumerate() {
+                    let cell = formatter.value(row);
+                    widths[i] = widths[i].max(cell.to_string().len());
+                }
+            }
+        }
+        Ok(widths)
+    }
+
+    pub fn print_header(
+        &self,
+        schema: &SchemaRef,
+        widths: &[usize],
+        writer: &mut dyn std::io::Write,
+    ) -> Result<()> {
+        Self::print_border(widths, writer)?;
+
+        let header: Vec<String> = schema
+            .fields()
+            .iter()
+            .enumerate()
+            .map(|(i, field)| Self::pad_cell(field.name(), widths[i]))
+            .collect();
+        writeln!(writer, "| {} |", header.join(" | "))?;
+
+        Self::print_border(widths, writer)?;
+        Ok(())
+    }
+
+    pub fn print_batch_with_widths(
+        &self,
+        batch: &RecordBatch,
+        widths: &[usize],
+        writer: &mut dyn std::io::Write,
+    ) -> Result<()> {
+        let formatters = batch
+            .columns()
+            .iter()
+            .map(|c| ArrayFormatter::try_new(c.as_ref(), 
&DEFAULT_CLI_FORMAT_OPTIONS))
+            .collect::<Result<Vec<_>, ArrowError>>()?;
+        for row in 0..batch.num_rows() {
+            let cells: Vec<String> = formatters
+                .iter()
+                .enumerate()
+                .map(|(i, formatter)| Self::pad_value(&formatter.value(row), 
widths[i]))
+                .collect();
+            writeln!(writer, "| {} |", cells.join(" | "))?;
+        }
+        Ok(())
+    }
+
+    pub fn print_dotted_line(
+        &self,
+        widths: &[usize],
+        writer: &mut dyn std::io::Write,
+    ) -> Result<()> {
+        let cells: Vec<String> = widths
+            .iter()
+            .map(|&w| format!(" {: <width$} ", ".", width = w))
+            .collect();
+        writeln!(writer, "|{}|", cells.join("|"))?;
+        Ok(())
+    }
+
+    pub fn print_bottom_border(
+        &self,
+        widths: &[usize],
+        writer: &mut dyn std::io::Write,
+    ) -> Result<()> {
+        let cells: Vec<String> = widths.iter().map(|&w| "-".repeat(w + 
2)).collect();
+        writeln!(writer, "+{}+", cells.join("+"))?;
+        Ok(())
+    }
+
+    fn print_border(widths: &[usize], writer: &mut dyn std::io::Write) -> 
Result<()> {
+        let cells: Vec<String> = widths.iter().map(|&w| "-".repeat(w + 
2)).collect();
+        writeln!(writer, "+{}+", cells.join("+"))?;
+        Ok(())
+    }
+
+    fn pad_cell(cell: &str, width: usize) -> String {
+        format!("{:<width$}", cell, width = width)
+    }
+
+    fn pad_value(formatter: &ValueFormatter, width: usize) -> String {
+        let s = formatter.try_to_string().unwrap_or_default();
+        format!("{:<width$}", s, width = width)
+    }
 }
 
 #[cfg(test)]
@@ -539,6 +680,329 @@ mod tests {
             .run();
     }
 
+    #[test]
+    fn test_compute_column_widths() {
+        let schema = three_column_schema();
+        let batches = vec![three_column_batch()];
+        let format = PrintFormat::Table;
+        let widths = format.compute_column_widths(&batches, schema).unwrap();
+        assert_eq!(widths, vec![1, 1, 1]);
+
+        let schema = one_column_schema();
+        let batches = vec![one_column_batch()];
+        let format = PrintFormat::Table;
+        let widths = format.compute_column_widths(&batches, schema).unwrap();
+        assert_eq!(widths, vec![1]);
+
+        let schema = three_column_schema();
+        let batches = vec![three_column_batch_with_widths()];
+        let format = PrintFormat::Table;
+        let widths = format.compute_column_widths(&batches, schema).unwrap();
+        assert_eq!(widths, [7, 5, 6]);
+    }
+
+    #[test]
+    fn test_print_header() {
+        let schema = three_column_schema();
+        let widths = vec![1, 1, 1];
+        let mut writer = Vec::new();
+        let format = PrintFormat::Table;
+        format.print_header(&schema, &widths, &mut writer).unwrap();
+        let expected = &["+---+---+---+", "| a | b | c |", "+---+---+---+"];
+        let binding = String::from_utf8(writer.clone()).unwrap();
+        let actual: Vec<_> = binding.trim_end().split('\n').collect();
+        assert_eq!(actual, expected);
+    }
+
+    #[test]
+    fn test_print_batch_with_same_widths() {
+        let batch = three_column_batch();
+        let widths = vec![1, 1, 1];
+        let mut writer = Vec::new();
+        let format = PrintFormat::Table;
+        format
+            .print_batch_with_widths(&batch, &widths, &mut writer)
+            .unwrap();
+        let expected = &["| 1 | 4 | 7 |", "| 2 | 5 | 8 |", "| 3 | 6 | 9 |"];
+        let binding = String::from_utf8(writer.clone()).unwrap();
+        let actual: Vec<_> = binding.trim_end().split('\n').collect();
+        assert_eq!(actual, expected);
+    }
+
+    #[test]
+    fn test_print_batch_with_different_widths() {
+        let batch = three_column_batch_with_widths();
+        let widths = vec![7, 5, 6];
+        let mut writer = Vec::new();
+        let format = PrintFormat::Table;
+        format
+            .print_batch_with_widths(&batch, &widths, &mut writer)
+            .unwrap();
+        let expected = &[
+            "| 1       | 42222 | 7      |",
+            "| 2222222 | 5     | 8      |",
+            "| 3       | 6     | 922222 |",
+        ];
+        let binding = String::from_utf8(writer.clone()).unwrap();
+        let actual: Vec<_> = binding.trim_end().split('\n').collect();
+        assert_eq!(actual, expected);
+    }
+
+    #[test]
+    fn test_print_dotted_line() {
+        let widths = vec![1, 1, 1];
+        let mut writer = Vec::new();
+        let format = PrintFormat::Table;
+        format.print_dotted_line(&widths, &mut writer).unwrap();
+        let expected = &["| . | . | . |"];
+        let binding = String::from_utf8(writer.clone()).unwrap();
+        let actual: Vec<_> = binding.trim_end().split('\n').collect();
+        assert_eq!(actual, expected);
+    }
+
+    #[test]
+    fn test_print_bottom_border() {
+        let widths = vec![1, 1, 1];
+        let mut writer = Vec::new();
+        let format = PrintFormat::Table;
+        format.print_bottom_border(&widths, &mut writer).unwrap();
+        let expected = &["+---+---+---+"];
+        let binding = String::from_utf8(writer.clone()).unwrap();
+        let actual: Vec<_> = binding.trim_end().split('\n').collect();
+        assert_eq!(actual, expected);
+    }
+
+    #[test]
+    fn test_print_batches_with_maxrows() {
+        let batch = one_column_batch();
+        let schema = one_column_schema();
+        let format = PrintFormat::Table;
+
+        // should print out entire output with no truncation if unlimited or
+        // limit greater than number of batches or equal to the number of 
batches
+        for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), 
MaxRows::Limited(3)] {
+            let mut writer = Vec::new();
+            format
+                .print_batches(
+                    &mut writer,
+                    schema.clone(),
+                    &[batch.clone()],
+                    max_rows,
+                    true,
+                )
+                .unwrap();
+            let expected = &[
+                "+---+", "| a |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+",
+            ];
+            let binding = String::from_utf8(writer.clone()).unwrap();
+            let actual: Vec<_> = binding.trim_end().split('\n').collect();
+            assert_eq!(actual, expected);
+        }
+
+        // should truncate output if limit is less than number of batches
+        let mut writer = Vec::new();
+        format
+            .print_batches(
+                &mut writer,
+                schema.clone(),
+                &[batch.clone()],
+                MaxRows::Limited(1),
+                true,
+            )
+            .unwrap();
+        let expected = &[
+            "+---+", "| a |", "+---+", "| 1 |", "| . |", "| . |", "| . |", 
"+---+",
+        ];
+        let binding = String::from_utf8(writer.clone()).unwrap();
+        let actual: Vec<_> = binding.trim_end().split('\n').collect();
+        assert_eq!(actual, expected);
+    }
+
+    // test print_batch with different batch widths
+    // and preview count is less than the first batch
+    #[test]
+    fn test_print_batches_with_preview_count_less_than_first_batch() {
+        let batch = three_column_batch_with_widths();
+        let schema = three_column_schema();
+        let format = PrintFormat::Table;
+        let preview_limit = 2;
+        let mut preview_batches = Vec::new();
+        let mut preview_row_count = 0;
+        let mut precomputed_widths = None;
+        let mut header_printed = false;
+        let mut writer = Vec::new();
+
+        format
+            .process_batch(
+                &batch,
+                schema.clone(),
+                &mut preview_batches,
+                &mut preview_row_count,
+                preview_limit,
+                &mut precomputed_widths,
+                &mut header_printed,
+                &mut writer,
+            )
+            .unwrap();
+
+        let expected = &[
+            "+---------+-------+--------+",
+            "| a       | b     | c      |",
+            "+---------+-------+--------+",
+            "| 1       | 42222 | 7      |",
+            "| 2222222 | 5     | 8      |",
+            "| 3       | 6     | 922222 |",
+        ];
+        let binding = String::from_utf8(writer.clone()).unwrap();
+        let actual: Vec<_> = binding.trim_end().split('\n').collect();
+        assert_eq!(actual, expected);
+    }
+
+    #[test]
+    fn test_print_batches_with_preview_and_later_batches() {
+        let batch1 = three_column_batch();
+        let batch2 = three_column_batch_with_widths();
+        let schema = three_column_schema();
+        let format = PrintFormat::Table;
+        // preview limit is less than the first batch
+        // so the second batch if it's width is greater than the first batch, 
it will be unformatted
+        let preview_limit = 2;
+        let mut preview_batches = Vec::new();
+        let mut preview_row_count = 0;
+        let mut precomputed_widths = None;
+        let mut header_printed = false;
+        let mut writer = Vec::new();
+
+        format
+            .process_batch(
+                &batch1,
+                schema.clone(),
+                &mut preview_batches,
+                &mut preview_row_count,
+                preview_limit,
+                &mut precomputed_widths,
+                &mut header_printed,
+                &mut writer,
+            )
+            .unwrap();
+
+        format
+            .process_batch(
+                &batch2,
+                schema.clone(),
+                &mut preview_batches,
+                &mut preview_row_count,
+                preview_limit,
+                &mut precomputed_widths,
+                &mut header_printed,
+                &mut writer,
+            )
+            .unwrap();
+
+        format
+            .process_batch(
+                &batch1,
+                schema.clone(),
+                &mut preview_batches,
+                &mut preview_row_count,
+                preview_limit,
+                &mut precomputed_widths,
+                &mut header_printed,
+                &mut writer,
+            )
+            .unwrap();
+
+        let expected = &[
+            "+---+---+---+",
+            "| a | b | c |",
+            "+---+---+---+",
+            "| 1 | 4 | 7 |",
+            "| 2 | 5 | 8 |",
+            "| 3 | 6 | 9 |",
+            "| 1 | 42222 | 7 |",
+            "| 2222222 | 5 | 8 |",
+            "| 3 | 6 | 922222 |",
+            "| 1 | 4 | 7 |",
+            "| 2 | 5 | 8 |",
+            "| 3 | 6 | 9 |",
+        ];
+        let binding = String::from_utf8(writer.clone()).unwrap();
+        let actual: Vec<_> = binding.trim_end().split('\n').collect();
+        assert_eq!(actual, expected);
+    }
+
+    #[test]
+    fn test_print_batches_with_preview_cover_later_batches() {
+        let batch1 = three_column_batch();
+        let batch2 = three_column_batch_with_widths();
+        let schema = three_column_schema();
+        let format = PrintFormat::Table;
+        // preview limit is greater than the first batch
+        let preview_limit = 4;
+        let mut preview_batches = Vec::new();
+        let mut preview_row_count = 0;
+        let mut precomputed_widths = None;
+        let mut header_printed = false;
+        let mut writer = Vec::new();
+
+        format
+            .process_batch(
+                &batch1,
+                schema.clone(),
+                &mut preview_batches,
+                &mut preview_row_count,
+                preview_limit,
+                &mut precomputed_widths,
+                &mut header_printed,
+                &mut writer,
+            )
+            .unwrap();
+
+        format
+            .process_batch(
+                &batch2,
+                schema.clone(),
+                &mut preview_batches,
+                &mut preview_row_count,
+                preview_limit,
+                &mut precomputed_widths,
+                &mut header_printed,
+                &mut writer,
+            )
+            .unwrap();
+
+        format
+            .process_batch(
+                &batch1,
+                schema.clone(),
+                &mut preview_batches,
+                &mut preview_row_count,
+                preview_limit,
+                &mut precomputed_widths,
+                &mut header_printed,
+                &mut writer,
+            )
+            .unwrap();
+
+        let expected = &[
+            "+---------+-------+--------+",
+            "| a       | b     | c      |",
+            "+---------+-------+--------+",
+            "| 1       | 4     | 7      |",
+            "| 2       | 5     | 8      |",
+            "| 3       | 6     | 9      |",
+            "| 1       | 42222 | 7      |",
+            "| 2222222 | 5     | 8      |",
+            "| 3       | 6     | 922222 |",
+            "| 1       | 4     | 7      |",
+            "| 2       | 5     | 8      |",
+            "| 3       | 6     | 9      |",
+        ];
+        let binding = String::from_utf8(writer.clone()).unwrap();
+        let actual: Vec<_> = binding.trim_end().split('\n').collect();
+        assert_eq!(actual, expected);
+    }
+
     #[derive(Debug)]
     struct PrintBatchesTest {
         format: PrintFormat,
@@ -672,6 +1136,19 @@ mod tests {
         .unwrap()
     }
 
+    /// Return a batch with three columns and three rows, but with different 
widths
+    fn three_column_batch_with_widths() -> RecordBatch {
+        RecordBatch::try_new(
+            three_column_schema(),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2222222, 3])),
+                Arc::new(Int32Array::from(vec![42222, 5, 6])),
+                Arc::new(Int32Array::from(vec![7, 8, 922222])),
+            ],
+        )
+        .unwrap()
+    }
+
     /// Return a schema with one column
     fn one_column_schema() -> SchemaRef {
         Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
diff --git a/datafusion-cli/src/print_options.rs 
b/datafusion-cli/src/print_options.rs
index 9557e783e8..092483faed 100644
--- a/datafusion-cli/src/print_options.rs
+++ b/datafusion-cli/src/print_options.rs
@@ -29,6 +29,7 @@ use datafusion::common::DataFusionError;
 use datafusion::error::Result;
 use datafusion::physical_plan::RecordBatchStream;
 
+use datafusion::execution::SendableRecordBatchStream;
 use futures::StreamExt;
 
 #[derive(Debug, Clone, PartialEq, Copy)]
@@ -74,27 +75,6 @@ pub struct PrintOptions {
     pub color: bool,
 }
 
-// Returns the query execution details formatted
-fn get_execution_details_formatted(
-    row_count: usize,
-    maxrows: MaxRows,
-    query_start_time: Instant,
-) -> String {
-    let nrows_shown_msg = match maxrows {
-        MaxRows::Limited(nrows) if nrows < row_count => {
-            format!("(First {nrows} displayed. Use --maxrows to adjust)")
-        }
-        _ => String::new(),
-    };
-
-    format!(
-        "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n",
-        row_count,
-        nrows_shown_msg,
-        query_start_time.elapsed().as_secs_f64()
-    )
-}
-
 impl PrintOptions {
     /// Print the batches to stdout using the specified format
     pub fn print_batches(
@@ -110,7 +90,7 @@ impl PrintOptions {
         self.format
             .print_batches(&mut writer, schema, batches, self.maxrows, true)?;
 
-        let formatted_exec_details = get_execution_details_formatted(
+        let formatted_exec_details = self.get_execution_details_formatted(
             row_count,
             if self.format == PrintFormat::Table {
                 self.maxrows
@@ -127,9 +107,119 @@ impl PrintOptions {
         Ok(())
     }
 
+    pub async fn print_table_batch(
+        &self,
+        print_options: &PrintOptions,
+        schema: SchemaRef,
+        stream: &mut SendableRecordBatchStream,
+        max_rows: usize,
+        writer: &mut dyn std::io::Write,
+        now: Instant,
+    ) -> Result<()> {
+        let preview_limit: usize = 1000;
+        let mut preview_batches: Vec<RecordBatch> = vec![];
+        let mut preview_row_count = 0_usize;
+        let mut total_count = 0_usize;
+        let mut precomputed_widths: Option<Vec<usize>> = None;
+        let mut header_printed = false;
+        let mut max_rows_reached = false;
+
+        while let Some(batch) = stream.next().await {
+            let batch = batch?;
+            let batch_rows = batch.num_rows();
+
+            if !max_rows_reached && total_count < max_rows {
+                if total_count + batch_rows > max_rows {
+                    let needed = max_rows - total_count;
+                    let batch_to_print = batch.slice(0, needed);
+                    print_options.format.process_batch(
+                        &batch_to_print,
+                        schema.clone(),
+                        &mut preview_batches,
+                        &mut preview_row_count,
+                        preview_limit,
+                        &mut precomputed_widths,
+                        &mut header_printed,
+                        writer,
+                    )?;
+                    if precomputed_widths.is_none() {
+                        let widths = print_options
+                            .format
+                            .compute_column_widths(&preview_batches, 
schema.clone())?;
+                        precomputed_widths = Some(widths.clone());
+                        if !header_printed {
+                            print_options
+                                .format
+                                .print_header(&schema, &widths, writer)?;
+                            header_printed = true;
+                        }
+                        for preview_batch in preview_batches.drain(..) {
+                            print_options.format.print_batch_with_widths(
+                                &preview_batch,
+                                &widths,
+                                writer,
+                            )?;
+                        }
+                    }
+                    if let Some(ref widths) = precomputed_widths {
+                        for _ in 0..3 {
+                            print_options.format.print_dotted_line(widths, 
writer)?;
+                        }
+                        print_options.format.print_bottom_border(widths, 
writer)?;
+                    }
+                    max_rows_reached = true;
+                } else {
+                    print_options.format.process_batch(
+                        &batch,
+                        schema.clone(),
+                        &mut preview_batches,
+                        &mut preview_row_count,
+                        preview_limit,
+                        &mut precomputed_widths,
+                        &mut header_printed,
+                        writer,
+                    )?;
+                }
+            }
+
+            total_count += batch_rows;
+        }
+
+        if !max_rows_reached {
+            if precomputed_widths.is_none() && !preview_batches.is_empty() {
+                let widths = print_options
+                    .format
+                    .compute_column_widths(&preview_batches, schema.clone())?;
+                precomputed_widths = Some(widths);
+                if !header_printed {
+                    print_options.format.print_header(
+                        &schema,
+                        precomputed_widths.as_ref().unwrap(),
+                        writer,
+                    )?;
+                }
+            }
+            if let Some(ref widths) = precomputed_widths {
+                print_options.format.print_bottom_border(widths, writer)?;
+            }
+        }
+
+        let formatted_exec_details = 
print_options.get_execution_details_formatted(
+            total_count,
+            print_options.maxrows,
+            now,
+        );
+        if !print_options.quiet {
+            writeln!(writer, "{}", formatted_exec_details)?;
+        }
+
+        Ok(())
+    }
+
     /// Print the stream to stdout using the specified format
     pub async fn print_stream(
         &self,
+        max_rows: MaxRows,
         mut stream: Pin<Box<dyn RecordBatchStream>>,
         query_start_time: Instant,
     ) -> Result<()> {
@@ -139,30 +229,49 @@ impl PrintOptions {
             ));
         };
 
+        let max_count = match self.maxrows {
+            MaxRows::Unlimited => usize::MAX,
+            MaxRows::Limited(n) => n,
+        };
+
         let stdout = std::io::stdout();
         let mut writer = stdout.lock();
 
         let mut row_count = 0_usize;
         let mut with_header = true;
+        let mut max_rows_reached = false;
 
         while let Some(maybe_batch) = stream.next().await {
             let batch = maybe_batch?;
-            row_count += batch.num_rows();
-            self.format.print_batches(
-                &mut writer,
-                batch.schema(),
-                &[batch],
-                MaxRows::Unlimited,
-                with_header,
-            )?;
+            let curr_batch_rows = batch.num_rows();
+            if !max_rows_reached && row_count < max_count {
+                if row_count + curr_batch_rows > max_count {
+                    let needed = max_count - row_count;
+                    let batch_to_print = batch.slice(0, needed);
+                    self.format.print_batches(
+                        &mut writer,
+                        batch.schema(),
+                        &[batch_to_print],
+                        max_rows,
+                        with_header,
+                    )?;
+                    max_rows_reached = true;
+                } else {
+                    self.format.print_batches(
+                        &mut writer,
+                        batch.schema(),
+                        &[batch],
+                        max_rows,
+                        with_header,
+                    )?;
+                }
+            }
+            row_count += curr_batch_rows;
             with_header = false;
         }
 
-        let formatted_exec_details = get_execution_details_formatted(
-            row_count,
-            MaxRows::Unlimited,
-            query_start_time,
-        );
+        let formatted_exec_details =
+            self.get_execution_details_formatted(row_count, max_rows, 
query_start_time);
 
         if !self.quiet {
             writeln!(writer, "{formatted_exec_details}")?;
@@ -170,4 +279,26 @@ impl PrintOptions {
 
         Ok(())
     }
+
+    // Returns the query execution details formatted
+    pub fn get_execution_details_formatted(
+        &self,
+        row_count: usize,
+        maxrows: MaxRows,
+        query_start_time: Instant,
+    ) -> String {
+        let nrows_shown_msg = match maxrows {
+            MaxRows::Limited(nrows) if nrows < row_count => {
+                format!("(First {nrows} displayed. Use --maxrows to adjust)")
+            }
+            _ => String::new(),
+        };
+
+        format!(
+            "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n",
+            row_count,
+            nrows_shown_msg,
+            query_start_time.elapsed().as_secs_f64()
+        )
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to