This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 53fc94f22f Datafusion-cli: Redesign the datafusion-cli execution and
print, make it totally streaming printing without memory overhead (#14877)
53fc94f22f is described below
commit 53fc94f22fc1830a688b0cc474db578096bb72e7
Author: Qi Zhu <[email protected]>
AuthorDate: Wed Feb 26 23:50:31 2025 +0800
Datafusion-cli: Redesign the datafusion-cli execution and print, make it
totally streaming printing without memory overhead (#14877)
* Inital version for streaming batch preview
* Add more reasonable code
* polish code
* polish code
* Add unit test
* fmt
* remove println
---
datafusion-cli/src/exec.rs | 69 +++---
datafusion-cli/src/print_format.rs | 477 ++++++++++++++++++++++++++++++++++++
datafusion-cli/src/print_options.rs | 201 ++++++++++++---
3 files changed, 678 insertions(+), 69 deletions(-)
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index a9e1e8efdf..4144e4f18c 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -26,12 +26,6 @@ use crate::{
object_storage::get_object_store,
print_options::{MaxRows, PrintOptions},
};
-use futures::StreamExt;
-use std::collections::HashMap;
-use std::fs::File;
-use std::io::prelude::*;
-use std::io::BufReader;
-
use datafusion::common::instant::Instant;
use datafusion::common::{plan_datafusion_err, plan_err};
use datafusion::config::ConfigFileType;
@@ -41,13 +35,15 @@ use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
-use datafusion::sql::sqlparser::dialect::dialect_from_str;
-
-use datafusion::execution::memory_pool::MemoryConsumer;
-use datafusion::physical_plan::spill::get_record_batch_memory_size;
use datafusion::sql::sqlparser;
+use datafusion::sql::sqlparser::dialect::dialect_from_str;
use rustyline::error::ReadlineError;
use rustyline::Editor;
+use std::collections::HashMap;
+use std::fs::File;
+use std::io::prelude::*;
+use std::io::BufReader;
+use std::sync::Arc;
use tokio::signal;
/// run and execute SQL statements and commands, against a context with the
given print options
@@ -230,18 +226,17 @@ pub(super) async fn exec_and_print(
for statement in statements {
let adjusted =
AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement);
-
let plan = create_plan(ctx, statement).await?;
let adjusted = adjusted.with_plan(&plan);
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;
- // Track memory usage for the query result if it's bounded
- let mut reservation =
-
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());
+ let is_unbounded = physical_plan.boundedness().is_unbounded();
+ let mut stream = execute_stream(Arc::clone(&physical_plan),
task_ctx.clone())?;
- if physical_plan.boundedness().is_unbounded() {
+ // Both bounded and unbounded streams are streaming prints
+ if is_unbounded {
if physical_plan.pipeline_behavior() == EmissionType::Final {
return plan_err!(
"The given query can generate a valid result only once \
@@ -250,37 +245,43 @@ pub(super) async fn exec_and_print(
}
// As the input stream comes, we can generate results.
// However, memory safety is not guaranteed.
- let stream = execute_stream(physical_plan, task_ctx.clone())?;
- print_options.print_stream(stream, now).await?;
+ print_options
+ .print_stream(MaxRows::Unlimited, stream, now)
+ .await?;
} else {
// Bounded stream; collected results size is limited by the
maxrows option
let schema = physical_plan.schema();
- let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
- let mut results = vec![];
- let mut row_count = 0_usize;
let max_rows = match print_options.maxrows {
MaxRows::Unlimited => usize::MAX,
MaxRows::Limited(n) => n,
};
- while let Some(batch) = stream.next().await {
- let batch = batch?;
- let curr_num_rows = batch.num_rows();
- // Stop collecting results if the number of rows exceeds the
limit
- // results batch should include the last batch that exceeds
the limit
- if row_count < max_rows + curr_num_rows {
- // Try to grow the reservation to accommodate the batch in
memory
-
reservation.try_grow(get_record_batch_memory_size(&batch))?;
- results.push(batch);
- }
- row_count += curr_num_rows;
+ let stdout = std::io::stdout();
+ let mut writer = stdout.lock();
+
+ // If we don't want to print the table, we should use the
streaming print same as above
+ if print_options.format != PrintFormat::Table
+ && print_options.format != PrintFormat::Automatic
+ {
+ print_options
+ .print_stream(print_options.maxrows, stream, now)
+ .await?;
+ continue;
}
+
+ // into_inner will finalize the print options to table if it's
automatic
adjusted
.into_inner()
- .print_batches(schema, &results, now, row_count)?;
- reservation.free();
+ .print_table_batch(
+ print_options,
+ schema,
+ &mut stream,
+ max_rows,
+ &mut writer,
+ now,
+ )
+ .await?;
}
}
-
Ok(())
}
diff --git a/datafusion-cli/src/print_format.rs
b/datafusion-cli/src/print_format.rs
index 1fc9495935..ed3f03781c 100644
--- a/datafusion-cli/src/print_format.rs
+++ b/datafusion-cli/src/print_format.rs
@@ -23,8 +23,10 @@ use crate::print_options::MaxRows;
use arrow::csv::writer::WriterBuilder;
use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError;
use arrow::json::{ArrayWriter, LineDelimitedWriter};
use arrow::record_batch::RecordBatch;
+use arrow::util::display::{ArrayFormatter, ValueFormatter};
use arrow::util::pretty::pretty_format_batches_with_options;
use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS;
use datafusion::error::Result;
@@ -209,6 +211,145 @@ impl PrintFormat {
}
Ok(())
}
+
+ #[allow(clippy::too_many_arguments)]
+ pub fn process_batch(
+ &self,
+ batch: &RecordBatch,
+ schema: SchemaRef,
+ preview_batches: &mut Vec<RecordBatch>,
+ preview_row_count: &mut usize,
+ preview_limit: usize,
+ precomputed_widths: &mut Option<Vec<usize>>,
+ header_printed: &mut bool,
+ writer: &mut dyn std::io::Write,
+ ) -> Result<()> {
+ if precomputed_widths.is_none() {
+ preview_batches.push(batch.clone());
+ *preview_row_count += batch.num_rows();
+ if *preview_row_count >= preview_limit {
+ let widths =
+ Self::compute_column_widths(self, preview_batches,
schema.clone())?;
+ *precomputed_widths = Some(widths.clone());
+ Self::print_header(self, &schema, &widths, writer)?;
+ *header_printed = true;
+ for preview_batch in preview_batches.drain(..) {
+ Self::print_batch_with_widths(self, &preview_batch,
&widths, writer)?;
+ }
+ }
+ } else {
+ let widths = precomputed_widths.as_ref().unwrap();
+ if !*header_printed {
+ Self::print_header(self, &schema, widths, writer)?;
+ *header_printed = true;
+ }
+ Self::print_batch_with_widths(self, batch, widths, writer)?;
+ }
+ Ok(())
+ }
+
+ pub fn compute_column_widths(
+ &self,
+ batches: &Vec<RecordBatch>,
+ schema: SchemaRef,
+ ) -> Result<Vec<usize>> {
+ let mut widths: Vec<usize> =
+ schema.fields().iter().map(|f| f.name().len()).collect();
+ for batch in batches {
+ let formatters = batch
+ .columns()
+ .iter()
+ .map(|c| ArrayFormatter::try_new(c.as_ref(),
&DEFAULT_CLI_FORMAT_OPTIONS))
+ .collect::<Result<Vec<_>, ArrowError>>()?;
+ for row in 0..batch.num_rows() {
+ for (i, formatter) in formatters.iter().enumerate() {
+ let cell = formatter.value(row);
+ widths[i] = widths[i].max(cell.to_string().len());
+ }
+ }
+ }
+ Ok(widths)
+ }
+
+ pub fn print_header(
+ &self,
+ schema: &SchemaRef,
+ widths: &[usize],
+ writer: &mut dyn std::io::Write,
+ ) -> Result<()> {
+ Self::print_border(widths, writer)?;
+
+ let header: Vec<String> = schema
+ .fields()
+ .iter()
+ .enumerate()
+ .map(|(i, field)| Self::pad_cell(field.name(), widths[i]))
+ .collect();
+ writeln!(writer, "| {} |", header.join(" | "))?;
+
+ Self::print_border(widths, writer)?;
+ Ok(())
+ }
+
+ pub fn print_batch_with_widths(
+ &self,
+ batch: &RecordBatch,
+ widths: &[usize],
+ writer: &mut dyn std::io::Write,
+ ) -> Result<()> {
+ let formatters = batch
+ .columns()
+ .iter()
+ .map(|c| ArrayFormatter::try_new(c.as_ref(),
&DEFAULT_CLI_FORMAT_OPTIONS))
+ .collect::<Result<Vec<_>, ArrowError>>()?;
+ for row in 0..batch.num_rows() {
+ let cells: Vec<String> = formatters
+ .iter()
+ .enumerate()
+ .map(|(i, formatter)| Self::pad_value(&formatter.value(row),
widths[i]))
+ .collect();
+ writeln!(writer, "| {} |", cells.join(" | "))?;
+ }
+ Ok(())
+ }
+
+ pub fn print_dotted_line(
+ &self,
+ widths: &[usize],
+ writer: &mut dyn std::io::Write,
+ ) -> Result<()> {
+ let cells: Vec<String> = widths
+ .iter()
+ .map(|&w| format!(" {: <width$} ", ".", width = w))
+ .collect();
+ writeln!(writer, "|{}|", cells.join("|"))?;
+ Ok(())
+ }
+
+ pub fn print_bottom_border(
+ &self,
+ widths: &[usize],
+ writer: &mut dyn std::io::Write,
+ ) -> Result<()> {
+ let cells: Vec<String> = widths.iter().map(|&w| "-".repeat(w +
2)).collect();
+ writeln!(writer, "+{}+", cells.join("+"))?;
+ Ok(())
+ }
+
+ fn print_border(widths: &[usize], writer: &mut dyn std::io::Write) ->
Result<()> {
+ let cells: Vec<String> = widths.iter().map(|&w| "-".repeat(w +
2)).collect();
+ writeln!(writer, "+{}+", cells.join("+"))?;
+ Ok(())
+ }
+
+ fn pad_cell(cell: &str, width: usize) -> String {
+ format!("{:<width$}", cell, width = width)
+ }
+
+ fn pad_value(formatter: &ValueFormatter, width: usize) -> String {
+ let s = formatter.try_to_string().unwrap_or_default();
+ format!("{:<width$}", s, width = width)
+ }
}
#[cfg(test)]
@@ -539,6 +680,329 @@ mod tests {
.run();
}
+ #[test]
+ fn test_compute_column_widths() {
+ let schema = three_column_schema();
+ let batches = vec![three_column_batch()];
+ let format = PrintFormat::Table;
+ let widths = format.compute_column_widths(&batches, schema).unwrap();
+ assert_eq!(widths, vec![1, 1, 1]);
+
+ let schema = one_column_schema();
+ let batches = vec![one_column_batch()];
+ let format = PrintFormat::Table;
+ let widths = format.compute_column_widths(&batches, schema).unwrap();
+ assert_eq!(widths, vec![1]);
+
+ let schema = three_column_schema();
+ let batches = vec![three_column_batch_with_widths()];
+ let format = PrintFormat::Table;
+ let widths = format.compute_column_widths(&batches, schema).unwrap();
+ assert_eq!(widths, [7, 5, 6]);
+ }
+
+ #[test]
+ fn test_print_header() {
+ let schema = three_column_schema();
+ let widths = vec![1, 1, 1];
+ let mut writer = Vec::new();
+ let format = PrintFormat::Table;
+ format.print_header(&schema, &widths, &mut writer).unwrap();
+ let expected = &["+---+---+---+", "| a | b | c |", "+---+---+---+"];
+ let binding = String::from_utf8(writer.clone()).unwrap();
+ let actual: Vec<_> = binding.trim_end().split('\n').collect();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn test_print_batch_with_same_widths() {
+ let batch = three_column_batch();
+ let widths = vec![1, 1, 1];
+ let mut writer = Vec::new();
+ let format = PrintFormat::Table;
+ format
+ .print_batch_with_widths(&batch, &widths, &mut writer)
+ .unwrap();
+ let expected = &["| 1 | 4 | 7 |", "| 2 | 5 | 8 |", "| 3 | 6 | 9 |"];
+ let binding = String::from_utf8(writer.clone()).unwrap();
+ let actual: Vec<_> = binding.trim_end().split('\n').collect();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn test_print_batch_with_different_widths() {
+ let batch = three_column_batch_with_widths();
+ let widths = vec![7, 5, 6];
+ let mut writer = Vec::new();
+ let format = PrintFormat::Table;
+ format
+ .print_batch_with_widths(&batch, &widths, &mut writer)
+ .unwrap();
+ let expected = &[
+ "| 1 | 42222 | 7 |",
+ "| 2222222 | 5 | 8 |",
+ "| 3 | 6 | 922222 |",
+ ];
+ let binding = String::from_utf8(writer.clone()).unwrap();
+ let actual: Vec<_> = binding.trim_end().split('\n').collect();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn test_print_dotted_line() {
+ let widths = vec![1, 1, 1];
+ let mut writer = Vec::new();
+ let format = PrintFormat::Table;
+ format.print_dotted_line(&widths, &mut writer).unwrap();
+ let expected = &["| . | . | . |"];
+ let binding = String::from_utf8(writer.clone()).unwrap();
+ let actual: Vec<_> = binding.trim_end().split('\n').collect();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn test_print_bottom_border() {
+ let widths = vec![1, 1, 1];
+ let mut writer = Vec::new();
+ let format = PrintFormat::Table;
+ format.print_bottom_border(&widths, &mut writer).unwrap();
+ let expected = &["+---+---+---+"];
+ let binding = String::from_utf8(writer.clone()).unwrap();
+ let actual: Vec<_> = binding.trim_end().split('\n').collect();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn test_print_batches_with_maxrows() {
+ let batch = one_column_batch();
+ let schema = one_column_schema();
+ let format = PrintFormat::Table;
+
+ // should print out entire output with no truncation if unlimited or
+ // limit greater than number of batches or equal to the number of
batches
+ for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5),
MaxRows::Limited(3)] {
+ let mut writer = Vec::new();
+ format
+ .print_batches(
+ &mut writer,
+ schema.clone(),
+ &[batch.clone()],
+ max_rows,
+ true,
+ )
+ .unwrap();
+ let expected = &[
+ "+---+", "| a |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+",
+ ];
+ let binding = String::from_utf8(writer.clone()).unwrap();
+ let actual: Vec<_> = binding.trim_end().split('\n').collect();
+ assert_eq!(actual, expected);
+ }
+
+ // should truncate output if limit is less than number of batches
+ let mut writer = Vec::new();
+ format
+ .print_batches(
+ &mut writer,
+ schema.clone(),
+ &[batch.clone()],
+ MaxRows::Limited(1),
+ true,
+ )
+ .unwrap();
+ let expected = &[
+ "+---+", "| a |", "+---+", "| 1 |", "| . |", "| . |", "| . |",
"+---+",
+ ];
+ let binding = String::from_utf8(writer.clone()).unwrap();
+ let actual: Vec<_> = binding.trim_end().split('\n').collect();
+ assert_eq!(actual, expected);
+ }
+
+ // test print_batch with different batch widths
+ // and preview count is less than the first batch
+ #[test]
+ fn test_print_batches_with_preview_count_less_than_first_batch() {
+ let batch = three_column_batch_with_widths();
+ let schema = three_column_schema();
+ let format = PrintFormat::Table;
+ let preview_limit = 2;
+ let mut preview_batches = Vec::new();
+ let mut preview_row_count = 0;
+ let mut precomputed_widths = None;
+ let mut header_printed = false;
+ let mut writer = Vec::new();
+
+ format
+ .process_batch(
+ &batch,
+ schema.clone(),
+ &mut preview_batches,
+ &mut preview_row_count,
+ preview_limit,
+ &mut precomputed_widths,
+ &mut header_printed,
+ &mut writer,
+ )
+ .unwrap();
+
+ let expected = &[
+ "+---------+-------+--------+",
+ "| a | b | c |",
+ "+---------+-------+--------+",
+ "| 1 | 42222 | 7 |",
+ "| 2222222 | 5 | 8 |",
+ "| 3 | 6 | 922222 |",
+ ];
+ let binding = String::from_utf8(writer.clone()).unwrap();
+ let actual: Vec<_> = binding.trim_end().split('\n').collect();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn test_print_batches_with_preview_and_later_batches() {
+ let batch1 = three_column_batch();
+ let batch2 = three_column_batch_with_widths();
+ let schema = three_column_schema();
+ let format = PrintFormat::Table;
+ // preview limit is less than the first batch
+ // so the second batch if it's width is greater than the first batch,
it will be unformatted
+ let preview_limit = 2;
+ let mut preview_batches = Vec::new();
+ let mut preview_row_count = 0;
+ let mut precomputed_widths = None;
+ let mut header_printed = false;
+ let mut writer = Vec::new();
+
+ format
+ .process_batch(
+ &batch1,
+ schema.clone(),
+ &mut preview_batches,
+ &mut preview_row_count,
+ preview_limit,
+ &mut precomputed_widths,
+ &mut header_printed,
+ &mut writer,
+ )
+ .unwrap();
+
+ format
+ .process_batch(
+ &batch2,
+ schema.clone(),
+ &mut preview_batches,
+ &mut preview_row_count,
+ preview_limit,
+ &mut precomputed_widths,
+ &mut header_printed,
+ &mut writer,
+ )
+ .unwrap();
+
+ format
+ .process_batch(
+ &batch1,
+ schema.clone(),
+ &mut preview_batches,
+ &mut preview_row_count,
+ preview_limit,
+ &mut precomputed_widths,
+ &mut header_printed,
+ &mut writer,
+ )
+ .unwrap();
+
+ let expected = &[
+ "+---+---+---+",
+ "| a | b | c |",
+ "+---+---+---+",
+ "| 1 | 4 | 7 |",
+ "| 2 | 5 | 8 |",
+ "| 3 | 6 | 9 |",
+ "| 1 | 42222 | 7 |",
+ "| 2222222 | 5 | 8 |",
+ "| 3 | 6 | 922222 |",
+ "| 1 | 4 | 7 |",
+ "| 2 | 5 | 8 |",
+ "| 3 | 6 | 9 |",
+ ];
+ let binding = String::from_utf8(writer.clone()).unwrap();
+ let actual: Vec<_> = binding.trim_end().split('\n').collect();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn test_print_batches_with_preview_cover_later_batches() {
+ let batch1 = three_column_batch();
+ let batch2 = three_column_batch_with_widths();
+ let schema = three_column_schema();
+ let format = PrintFormat::Table;
+ // preview limit is greater than the first batch
+ let preview_limit = 4;
+ let mut preview_batches = Vec::new();
+ let mut preview_row_count = 0;
+ let mut precomputed_widths = None;
+ let mut header_printed = false;
+ let mut writer = Vec::new();
+
+ format
+ .process_batch(
+ &batch1,
+ schema.clone(),
+ &mut preview_batches,
+ &mut preview_row_count,
+ preview_limit,
+ &mut precomputed_widths,
+ &mut header_printed,
+ &mut writer,
+ )
+ .unwrap();
+
+ format
+ .process_batch(
+ &batch2,
+ schema.clone(),
+ &mut preview_batches,
+ &mut preview_row_count,
+ preview_limit,
+ &mut precomputed_widths,
+ &mut header_printed,
+ &mut writer,
+ )
+ .unwrap();
+
+ format
+ .process_batch(
+ &batch1,
+ schema.clone(),
+ &mut preview_batches,
+ &mut preview_row_count,
+ preview_limit,
+ &mut precomputed_widths,
+ &mut header_printed,
+ &mut writer,
+ )
+ .unwrap();
+
+ let expected = &[
+ "+---------+-------+--------+",
+ "| a | b | c |",
+ "+---------+-------+--------+",
+ "| 1 | 4 | 7 |",
+ "| 2 | 5 | 8 |",
+ "| 3 | 6 | 9 |",
+ "| 1 | 42222 | 7 |",
+ "| 2222222 | 5 | 8 |",
+ "| 3 | 6 | 922222 |",
+ "| 1 | 4 | 7 |",
+ "| 2 | 5 | 8 |",
+ "| 3 | 6 | 9 |",
+ ];
+ let binding = String::from_utf8(writer.clone()).unwrap();
+ let actual: Vec<_> = binding.trim_end().split('\n').collect();
+ assert_eq!(actual, expected);
+ }
+
#[derive(Debug)]
struct PrintBatchesTest {
format: PrintFormat,
@@ -672,6 +1136,19 @@ mod tests {
.unwrap()
}
+ /// Return a batch with three columns and three rows, but with different
widths
+ fn three_column_batch_with_widths() -> RecordBatch {
+ RecordBatch::try_new(
+ three_column_schema(),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2222222, 3])),
+ Arc::new(Int32Array::from(vec![42222, 5, 6])),
+ Arc::new(Int32Array::from(vec![7, 8, 922222])),
+ ],
+ )
+ .unwrap()
+ }
+
/// Return a schema with one column
fn one_column_schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
diff --git a/datafusion-cli/src/print_options.rs
b/datafusion-cli/src/print_options.rs
index 9557e783e8..092483faed 100644
--- a/datafusion-cli/src/print_options.rs
+++ b/datafusion-cli/src/print_options.rs
@@ -29,6 +29,7 @@ use datafusion::common::DataFusionError;
use datafusion::error::Result;
use datafusion::physical_plan::RecordBatchStream;
+use datafusion::execution::SendableRecordBatchStream;
use futures::StreamExt;
#[derive(Debug, Clone, PartialEq, Copy)]
@@ -74,27 +75,6 @@ pub struct PrintOptions {
pub color: bool,
}
-// Returns the query execution details formatted
-fn get_execution_details_formatted(
- row_count: usize,
- maxrows: MaxRows,
- query_start_time: Instant,
-) -> String {
- let nrows_shown_msg = match maxrows {
- MaxRows::Limited(nrows) if nrows < row_count => {
- format!("(First {nrows} displayed. Use --maxrows to adjust)")
- }
- _ => String::new(),
- };
-
- format!(
- "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n",
- row_count,
- nrows_shown_msg,
- query_start_time.elapsed().as_secs_f64()
- )
-}
-
impl PrintOptions {
/// Print the batches to stdout using the specified format
pub fn print_batches(
@@ -110,7 +90,7 @@ impl PrintOptions {
self.format
.print_batches(&mut writer, schema, batches, self.maxrows, true)?;
- let formatted_exec_details = get_execution_details_formatted(
+ let formatted_exec_details = self.get_execution_details_formatted(
row_count,
if self.format == PrintFormat::Table {
self.maxrows
@@ -127,9 +107,119 @@ impl PrintOptions {
Ok(())
}
+ pub async fn print_table_batch(
+ &self,
+ print_options: &PrintOptions,
+ schema: SchemaRef,
+ stream: &mut SendableRecordBatchStream,
+ max_rows: usize,
+ writer: &mut dyn std::io::Write,
+ now: Instant,
+ ) -> Result<()> {
+ let preview_limit: usize = 1000;
+ let mut preview_batches: Vec<RecordBatch> = vec![];
+ let mut preview_row_count = 0_usize;
+ let mut total_count = 0_usize;
+ let mut precomputed_widths: Option<Vec<usize>> = None;
+ let mut header_printed = false;
+ let mut max_rows_reached = false;
+
+ while let Some(batch) = stream.next().await {
+ let batch = batch?;
+ let batch_rows = batch.num_rows();
+
+ if !max_rows_reached && total_count < max_rows {
+ if total_count + batch_rows > max_rows {
+ let needed = max_rows - total_count;
+ let batch_to_print = batch.slice(0, needed);
+ print_options.format.process_batch(
+ &batch_to_print,
+ schema.clone(),
+ &mut preview_batches,
+ &mut preview_row_count,
+ preview_limit,
+ &mut precomputed_widths,
+ &mut header_printed,
+ writer,
+ )?;
+ if precomputed_widths.is_none() {
+ let widths = print_options
+ .format
+ .compute_column_widths(&preview_batches,
schema.clone())?;
+ precomputed_widths = Some(widths.clone());
+ if !header_printed {
+ print_options
+ .format
+ .print_header(&schema, &widths, writer)?;
+ header_printed = true;
+ }
+ for preview_batch in preview_batches.drain(..) {
+ print_options.format.print_batch_with_widths(
+ &preview_batch,
+ &widths,
+ writer,
+ )?;
+ }
+ }
+ if let Some(ref widths) = precomputed_widths {
+ for _ in 0..3 {
+ print_options.format.print_dotted_line(widths,
writer)?;
+ }
+ print_options.format.print_bottom_border(widths,
writer)?;
+ }
+ max_rows_reached = true;
+ } else {
+ print_options.format.process_batch(
+ &batch,
+ schema.clone(),
+ &mut preview_batches,
+ &mut preview_row_count,
+ preview_limit,
+ &mut precomputed_widths,
+ &mut header_printed,
+ writer,
+ )?;
+ }
+ }
+
+ total_count += batch_rows;
+ }
+
+ if !max_rows_reached {
+ if precomputed_widths.is_none() && !preview_batches.is_empty() {
+ let widths = print_options
+ .format
+ .compute_column_widths(&preview_batches, schema.clone())?;
+ precomputed_widths = Some(widths);
+ if !header_printed {
+ print_options.format.print_header(
+ &schema,
+ precomputed_widths.as_ref().unwrap(),
+ writer,
+ )?;
+ }
+ }
+ if let Some(ref widths) = precomputed_widths {
+ print_options.format.print_bottom_border(widths, writer)?;
+ }
+ }
+
+ let formatted_exec_details =
print_options.get_execution_details_formatted(
+ total_count,
+ print_options.maxrows,
+ now,
+ );
+ if !print_options.quiet {
+ writeln!(writer, "{}", formatted_exec_details)?;
+ }
+
+ Ok(())
+ }
+
/// Print the stream to stdout using the specified format
pub async fn print_stream(
&self,
+ max_rows: MaxRows,
mut stream: Pin<Box<dyn RecordBatchStream>>,
query_start_time: Instant,
) -> Result<()> {
@@ -139,30 +229,49 @@ impl PrintOptions {
));
};
+ let max_count = match self.maxrows {
+ MaxRows::Unlimited => usize::MAX,
+ MaxRows::Limited(n) => n,
+ };
+
let stdout = std::io::stdout();
let mut writer = stdout.lock();
let mut row_count = 0_usize;
let mut with_header = true;
+ let mut max_rows_reached = false;
while let Some(maybe_batch) = stream.next().await {
let batch = maybe_batch?;
- row_count += batch.num_rows();
- self.format.print_batches(
- &mut writer,
- batch.schema(),
- &[batch],
- MaxRows::Unlimited,
- with_header,
- )?;
+ let curr_batch_rows = batch.num_rows();
+ if !max_rows_reached && row_count < max_count {
+ if row_count + curr_batch_rows > max_count {
+ let needed = max_count - row_count;
+ let batch_to_print = batch.slice(0, needed);
+ self.format.print_batches(
+ &mut writer,
+ batch.schema(),
+ &[batch_to_print],
+ max_rows,
+ with_header,
+ )?;
+ max_rows_reached = true;
+ } else {
+ self.format.print_batches(
+ &mut writer,
+ batch.schema(),
+ &[batch],
+ max_rows,
+ with_header,
+ )?;
+ }
+ }
+ row_count += curr_batch_rows;
with_header = false;
}
- let formatted_exec_details = get_execution_details_formatted(
- row_count,
- MaxRows::Unlimited,
- query_start_time,
- );
+ let formatted_exec_details =
+ self.get_execution_details_formatted(row_count, max_rows,
query_start_time);
if !self.quiet {
writeln!(writer, "{formatted_exec_details}")?;
@@ -170,4 +279,26 @@ impl PrintOptions {
Ok(())
}
+
+ // Returns the query execution details formatted
+ pub fn get_execution_details_formatted(
+ &self,
+ row_count: usize,
+ maxrows: MaxRows,
+ query_start_time: Instant,
+ ) -> String {
+ let nrows_shown_msg = match maxrows {
+ MaxRows::Limited(nrows) if nrows < row_count => {
+ format!("(First {nrows} displayed. Use --maxrows to adjust)")
+ }
+ _ => String::new(),
+ };
+
+ format!(
+ "{} row(s) fetched. {}\nElapsed {:.3} seconds.\n",
+ row_count,
+ nrows_shown_msg,
+ query_start_time.elapsed().as_secs_f64()
+ )
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]