This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new c79273e9f fix: Don't use `maxrows` as a "fetched rows" but calculate
it from the batches (#1480)
c79273e9f is described below
commit c79273e9f92510edc19a8944f10316562411e27f
Author: Martin Grigorov <[email protected]>
AuthorDate: Mon Mar 2 13:25:51 2026 +0200
fix: Don't use `maxrows` as a "fetched rows" but calculate it from the
batches (#1480)
* fix: Don't use `maxrows` as a "fetched rows" but calculate it from the
batches
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
* Set limit to the DataFrame if max_rows is Limited
---------
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
---
ballista-cli/src/exec.rs | 25 +++++++++++--------------
1 file changed, 11 insertions(+), 14 deletions(-)
diff --git a/ballista-cli/src/exec.rs b/ballista-cli/src/exec.rs
index 887728f65..508226e50 100644
--- a/ballista-cli/src/exec.rs
+++ b/ballista-cli/src/exec.rs
@@ -41,10 +41,6 @@ pub async fn exec_from_lines(
print_options: &PrintOptions,
) {
let mut query = "".to_owned();
- let max_rows = match print_options.maxrows {
- datafusion_cli::print_options::MaxRows::Unlimited => usize::MAX,
- datafusion_cli::print_options::MaxRows::Limited(max_rows) => max_rows,
- };
for line in reader.lines() {
match line {
@@ -55,7 +51,7 @@ pub async fn exec_from_lines(
let line = line.trim_end();
query.push_str(line);
if line.ends_with(';') {
- match exec_and_print(ctx, print_options, query,
max_rows).await {
+ match exec_and_print(ctx, print_options, query).await {
Ok(_) => {}
Err(err) => println!("{err:?}"),
}
@@ -76,7 +72,7 @@ pub async fn exec_from_lines(
// run the left over query if the last statement doesn't contain ‘;’
if !query.is_empty() {
- match exec_and_print(ctx, print_options, query, max_rows).await {
+ match exec_and_print(ctx, print_options, query).await {
Ok(_) => {}
Err(err) => println!("{err:?}"),
}
@@ -109,11 +105,6 @@ pub async fn exec_from_repl(ctx: &SessionContext,
print_options: &mut PrintOptio
let mut print_options = print_options.clone();
- let max_rows = match print_options.maxrows {
- datafusion_cli::print_options::MaxRows::Unlimited => usize::MAX,
- datafusion_cli::print_options::MaxRows::Limited(max_rows) => max_rows,
- };
-
loop {
match rl.readline("❯ ") {
Ok(line) if line.starts_with('\\') => {
@@ -152,7 +143,7 @@ pub async fn exec_from_repl(ctx: &SessionContext,
print_options: &mut PrintOptio
}
Ok(line) => {
rl.add_history_entry(line.trim_end()).unwrap();
- match exec_and_print(ctx, &print_options, line,
max_rows).await {
+ match exec_and_print(ctx, &print_options, line).await {
Ok(_) => {}
Err(err) => eprintln!("{err:?}"),
}
@@ -179,13 +170,19 @@ async fn exec_and_print(
ctx: &SessionContext,
print_options: &PrintOptions,
sql: String,
- row_count: usize,
) -> Result<()> {
let now = Instant::now();
let df = ctx.sql(&sql).await?;
+ let df = match print_options.maxrows {
+ datafusion_cli::print_options::MaxRows::Unlimited => df,
+ datafusion_cli::print_options::MaxRows::Limited(max_rows) => {
+ df.limit(0, Some(max_rows))?
+ }
+ };
let schema = Arc::new(df.schema().as_arrow().clone());
let results = df.collect().await?;
- print_options.print_batches(schema, &results, now, row_count,
&Default::default())?;
+ let rows = &results.iter().map(|b| b.num_rows()).sum::<usize>();
+ print_options.print_batches(schema, &results, now, *rows,
&Default::default())?;
Ok(())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]