xudong963 commented on code in PR #17867:
URL: https://github.com/apache/datafusion/pull/17867#discussion_r2396911503


##########
datafusion-cli/src/progress/display.rs:
##########
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Progress bar display functionality
+
+use crate::progress::{ProgressInfo, ProgressStyle, ProgressUnit};
+use datafusion_common::instant::Instant;
+use std::io::{self, Write};
+use std::time::Duration;
+
+/// Displays progress information to the terminal
+pub struct ProgressDisplay {
+    style: ProgressStyle,
+    start_time: Instant,
+    last_display: Option<String>,
+}
+
+impl ProgressDisplay {
+    pub fn new(style: ProgressStyle) -> Self {
+        Self {
+            style,
+            start_time: Instant::now(),
+            last_display: None,
+        }
+    }
+
+    /// Update the progress display
+    pub fn update(&mut self, progress: &ProgressInfo, eta: Option<Duration>) {
+        let display_text =
+            match progress.percent.is_some() && self.style == 
ProgressStyle::Bar {
+                true => self.format_progress_bar(progress, eta),
+                false => self.format_spinner(progress),
+            };
+
+        // Only update if the display text has changed
+        if self.last_display.as_ref() != Some(&display_text) {
+            self.print_line(&display_text);
+            self.last_display = Some(display_text);
+        }
+    }
+
+    /// Finish the progress display and clean up
+    pub fn finish(&mut self) {
+        if self.last_display.is_some() {
+            // Clear the progress line and move to next line
+            print!("\r\x1b[K");
+            let _ = io::stdout().flush();
+            self.last_display = None;
+        }
+    }
+
+    /// Format a progress bar with percentage
+    fn format_progress_bar(
+        &self,
+        progress: &ProgressInfo,
+        eta: Option<Duration>,
+    ) -> String {
+        let percent = progress.percent.unwrap_or(0.0);
+        let bar = self.create_bar(percent);
+
+        let current_formatted = progress.unit.format_value(progress.current);
+        let total_formatted = progress
+            .total
+            .map(|t| progress.unit.format_value(t))
+            .unwrap_or_else(|| "?".to_string());
+
+        let throughput = self.calculate_throughput(progress);
+        let eta_text = eta
+            .map(format_duration)
+            .unwrap_or_else(|| "??:??".to_string());
+        let elapsed = format_duration(self.start_time.elapsed());
+
+        format!(
+            "\r{}  {:5.1}%  {} / {}  •  {}  •  ETA {} / {}",
+            bar,
+            percent,
+            current_formatted,
+            total_formatted,
+            throughput,
+            eta_text,
+            elapsed
+        )
+    }
+
+    /// Format a spinner without percentage
+    fn format_spinner(&self, progress: &ProgressInfo) -> String {
+        let spinner = self.get_spinner_char();
+        let current_formatted = progress.unit.format_value(progress.current);
+        let elapsed = format_duration(self.start_time.elapsed());
+
+        format!(
+            "\r{}  {}: {}  elapsed: {}",
+            spinner,
+            match progress.unit {
+                ProgressUnit::Bytes => "bytes",
+                ProgressUnit::Rows => "rows",
+            },
+            current_formatted,
+            elapsed
+        )
+    }
+
+    /// Create a visual progress bar
+    fn create_bar(&self, percent: f64) -> String {

Review Comment:
   FYI, there is a [progress bar crate](https://docs.rs/indicatif) that you may 
like.



##########
datafusion-cli/src/progress/estimator.rs:
##########


Review Comment:
   Some doc for the algorithms is helpful
   
   And some tuning comments for Kalman params make a lot of sense for reviewers.



##########
datafusion-cli/src/progress/estimator.rs:
##########
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ETA estimation algorithms
+
+use crate::progress::{ProgressEstimator as EstimatorType, ProgressInfo};
+use datafusion_common::instant::Instant;
+use std::time::Duration;
+
+/// Estimates time to completion based on progress
+pub struct ProgressEstimator {
+    estimator_type: EstimatorType,
+    start_time: Instant,
+    linear_estimator: LinearEstimator,
+    kalman_estimator: KalmanEstimator,
+}
+
+impl ProgressEstimator {
+    pub fn new(estimator_type: EstimatorType) -> Self {
+        Self {
+            estimator_type,
+            start_time: Instant::now(),
+            linear_estimator: LinearEstimator::new(),
+            kalman_estimator: KalmanEstimator::new(),
+        }
+    }
+
+    /// Update the estimator with new progress and return ETA
+    pub fn update(&mut self, progress: ProgressInfo) -> Option<Duration> {
+        let elapsed = self.start_time.elapsed();
+
+        // Need at least some progress and time to estimate
+        let percent = progress.percent?;
+        if percent <= 0.0 || elapsed.as_secs_f64() < 1.0 {
+            return None;
+        }
+
+        match self.estimator_type {
+            EstimatorType::Linear => self.linear_estimator.update(percent, 
elapsed),
+            EstimatorType::Kalman => self.kalman_estimator.update(percent, 
elapsed),
+        }
+    }
+}
+
+/// Simple linear ETA estimation
+struct LinearEstimator {
+    last_update: Option<(f64, Duration)>,
+}
+
+impl LinearEstimator {
+    fn new() -> Self {
+        Self { last_update: None }
+    }
+
+    fn update(&mut self, percent: f64, elapsed: Duration) -> Option<Duration> {
+        // Store this update for next calculation
+        self.last_update = Some((percent, elapsed));
+
+        if percent <= 5.0 {
+            // Too early to provide reliable estimate
+            return None;
+        }
+
+        // Simple linear extrapolation: if we're X% done in Y time,
+        // total time = Y * 100 / X
+        let total_time_secs = elapsed.as_secs_f64() * 100.0 / percent;
+        let remaining_secs = total_time_secs - elapsed.as_secs_f64();
+
+        if remaining_secs > 0.0 {
+            Some(Duration::from_secs_f64(remaining_secs))
+        } else {
+            Some(Duration::from_secs(0))
+        }
+    }
+}
+
+/// Kalman filter ETA estimation for smoother predictions
+struct KalmanEstimator {
+    // State: [progress_rate, acceleration]
+    state: [f64; 2],
+    // Covariance matrix (2x2, stored as [P00, P01, P10, P11])
+    covariance: [f64; 4],
+    // Process noise
+    process_noise: f64,
+    // Measurement noise
+    measurement_noise: f64,
+    // Previous time for calculating dt
+    last_time: Option<Duration>,
+    // Previous progress
+    last_progress: Option<f64>,
+    // Number of observations
+    observations: usize,
+}
+
+impl KalmanEstimator {
+    fn new() -> Self {
+        Self {
+            state: [0.0, 0.0],                // Initial rate = 0, 
acceleration = 0
+            covariance: [1.0, 0.0, 0.0, 1.0], // Identity matrix
+            process_noise: 0.1,
+            measurement_noise: 1.0,
+            last_time: None,
+            last_progress: None,
+            observations: 0,
+        }
+    }
+
+    fn update(&mut self, percent: f64, elapsed: Duration) -> Option<Duration> {
+        self.observations += 1;
+
+        // Need at least two observations to calculate rate
+        if let (Some(last_time), Some(last_progress)) =
+            (self.last_time, self.last_progress)
+        {
+            let dt = (elapsed - last_time).as_secs_f64();
+            if dt > 0.0 {
+                let measured_rate = (percent - last_progress) / dt;
+                self.kalman_update(measured_rate, dt);
+            }
+        }
+
+        self.last_time = Some(elapsed);
+        self.last_progress = Some(percent);
+
+        // Don't provide estimate until we have enough observations and 
progress
+        if self.observations < 3 || percent <= 5.0 {
+            return None;
+        }
+
+        self.calculate_eta(percent)
+    }
+
+    /// Kalman filter prediction step
+    fn predict(&mut self, dt: f64) {
+        // State transition: progress_rate(k+1) = progress_rate(k) + 
acceleration(k) * dt
+        //                  acceleration(k+1) = acceleration(k)
+
+        // Update state
+        self.state[0] += self.state[1] * dt; // rate += acceleration * dt
+                                             // acceleration stays the same
+
+        // Update covariance with process noise
+        // F = [[1, dt], [0, 1]] (state transition matrix)
+        // P = F * P * F^T + Q
+
+        let p00 = self.covariance[0];
+        let p01 = self.covariance[1];
+        let p10 = self.covariance[2];
+        let p11 = self.covariance[3];
+
+        self.covariance[0] = p00 + 2.0 * dt * p01 + dt * dt * p11 + 
self.process_noise;
+        self.covariance[1] = p01 + dt * p11;
+        self.covariance[2] = p10 + dt * p11;
+        self.covariance[3] = p11 + self.process_noise;
+    }
+
+    /// Kalman filter update step  
+    fn kalman_update(&mut self, measured_rate: f64, dt: f64) {

Review Comment:
   FYI, there is an [algebra lib](https://docs.rs/nalgebra/latest/nalgebra/) 
which may make code neat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to