EeshanBembi commented on code in PR #17867:
URL: https://github.com/apache/datafusion/pull/17867#discussion_r2399580798


##########
datafusion-cli/src/progress/estimator.rs:
##########
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ETA estimation algorithms
+
+use crate::progress::{ProgressEstimator as EstimatorType, ProgressInfo};
+use datafusion_common::instant::Instant;
+use std::time::Duration;
+
+/// Estimates time to completion based on progress
+pub struct ProgressEstimator {
+    estimator_type: EstimatorType,
+    start_time: Instant,
+    linear_estimator: LinearEstimator,
+    kalman_estimator: KalmanEstimator,
+}
+
+impl ProgressEstimator {
+    pub fn new(estimator_type: EstimatorType) -> Self {
+        Self {
+            estimator_type,
+            start_time: Instant::now(),
+            linear_estimator: LinearEstimator::new(),
+            kalman_estimator: KalmanEstimator::new(),
+        }
+    }
+
+    /// Update the estimator with new progress and return ETA
+    pub fn update(&mut self, progress: ProgressInfo) -> Option<Duration> {
+        let elapsed = self.start_time.elapsed();
+
+        // Need at least some progress and time to estimate
+        let percent = progress.percent?;
+        if percent <= 0.0 || elapsed.as_secs_f64() < 1.0 {
+            return None;
+        }
+
+        match self.estimator_type {
+            EstimatorType::Linear => self.linear_estimator.update(percent, 
elapsed),
+            EstimatorType::Kalman => self.kalman_estimator.update(percent, 
elapsed),
+        }
+    }
+}
+
+/// Simple linear ETA estimation
+struct LinearEstimator {
+    last_update: Option<(f64, Duration)>,
+}
+
+impl LinearEstimator {
+    fn new() -> Self {
+        Self { last_update: None }
+    }
+
+    fn update(&mut self, percent: f64, elapsed: Duration) -> Option<Duration> {
+        // Store this update for next calculation
+        self.last_update = Some((percent, elapsed));
+
+        if percent <= 5.0 {
+            // Too early to provide reliable estimate
+            return None;
+        }
+
+        // Simple linear extrapolation: if we're X% done in Y time,
+        // total time = Y * 100 / X
+        let total_time_secs = elapsed.as_secs_f64() * 100.0 / percent;
+        let remaining_secs = total_time_secs - elapsed.as_secs_f64();
+
+        if remaining_secs > 0.0 {
+            Some(Duration::from_secs_f64(remaining_secs))
+        } else {
+            Some(Duration::from_secs(0))
+        }
+    }
+}
+
+/// Kalman filter ETA estimation for smoother predictions
+struct KalmanEstimator {
+    // State: [progress_rate, acceleration]
+    state: [f64; 2],
+    // Covariance matrix (2x2, stored as [P00, P01, P10, P11])
+    covariance: [f64; 4],
+    // Process noise
+    process_noise: f64,
+    // Measurement noise
+    measurement_noise: f64,
+    // Previous time for calculating dt
+    last_time: Option<Duration>,
+    // Previous progress
+    last_progress: Option<f64>,
+    // Number of observations
+    observations: usize,
+}
+
+impl KalmanEstimator {
+    fn new() -> Self {
+        Self {
+            state: [0.0, 0.0],                // Initial rate = 0, 
acceleration = 0
+            covariance: [1.0, 0.0, 0.0, 1.0], // Identity matrix
+            process_noise: 0.1,
+            measurement_noise: 1.0,
+            last_time: None,
+            last_progress: None,
+            observations: 0,
+        }
+    }
+
+    fn update(&mut self, percent: f64, elapsed: Duration) -> Option<Duration> {
+        self.observations += 1;
+
+        // Need at least two observations to calculate rate
+        if let (Some(last_time), Some(last_progress)) =
+            (self.last_time, self.last_progress)
+        {
+            let dt = (elapsed - last_time).as_secs_f64();
+            if dt > 0.0 {
+                let measured_rate = (percent - last_progress) / dt;
+                self.kalman_update(measured_rate, dt);
+            }
+        }
+
+        self.last_time = Some(elapsed);
+        self.last_progress = Some(percent);
+
+        // Don't provide estimate until we have enough observations and 
progress
+        if self.observations < 3 || percent <= 5.0 {
+            return None;
+        }
+
+        self.calculate_eta(percent)
+    }
+
+    /// Kalman filter prediction step
+    fn predict(&mut self, dt: f64) {
+        // State transition: progress_rate(k+1) = progress_rate(k) + 
acceleration(k) * dt
+        //                  acceleration(k+1) = acceleration(k)
+
+        // Update state
+        self.state[0] += self.state[1] * dt; // rate += acceleration * dt
+                                             // acceleration stays the same
+
+        // Update covariance with process noise
+        // F = [[1, dt], [0, 1]] (state transition matrix)
+        // P = F * P * F^T + Q
+
+        let p00 = self.covariance[0];
+        let p01 = self.covariance[1];
+        let p10 = self.covariance[2];
+        let p11 = self.covariance[3];
+
+        self.covariance[0] = p00 + 2.0 * dt * p01 + dt * dt * p11 + 
self.process_noise;
+        self.covariance[1] = p01 + dt * p11;
+        self.covariance[2] = p10 + dt * p11;
+        self.covariance[3] = p11 + self.process_noise;
+    }
+
+    /// Kalman filter update step  
+    fn kalman_update(&mut self, measured_rate: f64, dt: f64) {

Review Comment:
   @xudong963 Great suggestion on the algebra library! I actually looked into 
using nalgebra for
     the Kalman filter matrix operations.
   
     After some consideration, I decided to stick with the current 
hand-optimized approach for a
     few reasons:
     - The 2x2 matrix operations are pretty straightforward and fast as-is
     - Adding nalgebra would bring in quite a few extra dependencies
     - The current code is actually quite readable with the explicit math
     - Keeps the CLI nice and lightweight
   
     That said, if we need more complex state estimation, we can consider using 
it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to