This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 8642ea4  feat: allow configuring scanner fetch parameters (#417)
8642ea4 is described below

commit 8642ea454d95b283653e49489e60216d07c8ebb7
Author: Prajwal banakar <[email protected]>
AuthorDate: Thu Mar 12 07:37:20 2026 +0530

    feat: allow configuring scanner fetch parameters (#417)
---
 bindings/cpp/include/fluss.hpp                  |  8 ++
 bindings/cpp/src/ffi_converter.hpp              |  4 +
 bindings/cpp/src/lib.rs                         |  8 ++
 bindings/python/fluss/__init__.pyi              | 16 ++++
 bindings/python/src/config.rs                   | 74 +++++++++++++++++++
 crates/fluss/src/client/connection.rs           |  2 +
 crates/fluss/src/client/table/scanner.rs        | 87 ++++++++++++++++------
 crates/fluss/src/config.rs                      | 97 +++++++++++++++++++++++++
 website/docs/user-guide/cpp/api-reference.md    |  4 +
 website/docs/user-guide/python/api-reference.md |  4 +
 website/docs/user-guide/rust/api-reference.md   |  4 +
 11 files changed, 286 insertions(+), 22 deletions(-)

diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index d0da617..79561a1 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -1012,6 +1012,14 @@ struct Configuration {
     size_t scanner_remote_log_read_concurrency{4};
     // Maximum number of records returned in a single call to Poll() for 
LogScanner
     size_t scanner_log_max_poll_records{500};
+    // Maximum bytes per fetch response for LogScanner (16 MB)
+    int32_t scanner_log_fetch_max_bytes{16 * 1024 * 1024};
+    // Minimum bytes to accumulate before server returns a fetch response
+    int32_t scanner_log_fetch_min_bytes{1};
+    // Maximum time (ms) the server may wait to satisfy min bytes
+    int32_t scanner_log_fetch_wait_max_time_ms{500};
+    // Maximum bytes per fetch response per bucket for LogScanner (1 MB)
+    int32_t scanner_log_fetch_max_bytes_for_bucket{1024 * 1024};
     int64_t writer_batch_timeout_ms{100};
     // Connect timeout in milliseconds for TCP transport connect
     uint64_t connect_timeout_ms{120000};
diff --git a/bindings/cpp/src/ffi_converter.hpp 
b/bindings/cpp/src/ffi_converter.hpp
index 93a60bf..754ed0f 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -64,6 +64,10 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& 
config) {
     ffi_config.remote_file_download_thread_num = 
config.remote_file_download_thread_num;
     ffi_config.scanner_remote_log_read_concurrency = 
config.scanner_remote_log_read_concurrency;
     ffi_config.scanner_log_max_poll_records = 
config.scanner_log_max_poll_records;
+    ffi_config.scanner_log_fetch_max_bytes = 
config.scanner_log_fetch_max_bytes;
+    ffi_config.scanner_log_fetch_min_bytes = 
config.scanner_log_fetch_min_bytes;
+    ffi_config.scanner_log_fetch_wait_max_time_ms = 
config.scanner_log_fetch_wait_max_time_ms;
+    ffi_config.scanner_log_fetch_max_bytes_for_bucket = 
config.scanner_log_fetch_max_bytes_for_bucket;
     ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
     ffi_config.connect_timeout_ms = config.connect_timeout_ms;
     ffi_config.security_protocol = rust::String(config.security_protocol);
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 36b9c51..8d5153e 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -48,6 +48,10 @@ mod ffi {
         remote_file_download_thread_num: usize,
         scanner_remote_log_read_concurrency: usize,
         scanner_log_max_poll_records: usize,
+        scanner_log_fetch_max_bytes: i32,
+        scanner_log_fetch_min_bytes: i32,
+        scanner_log_fetch_wait_max_time_ms: i32,
+        scanner_log_fetch_max_bytes_for_bucket: i32,
         writer_batch_timeout_ms: i64,
         connect_timeout_ms: u64,
         security_protocol: String,
@@ -668,6 +672,10 @@ fn new_connection(config: &ffi::FfiConfig) -> 
ffi::FfiPtrResult {
         remote_file_download_thread_num: 
config.remote_file_download_thread_num,
         scanner_remote_log_read_concurrency: 
config.scanner_remote_log_read_concurrency,
         scanner_log_max_poll_records: config.scanner_log_max_poll_records,
+        scanner_log_fetch_max_bytes: config.scanner_log_fetch_max_bytes,
+        scanner_log_fetch_min_bytes: config.scanner_log_fetch_min_bytes,
+        scanner_log_fetch_wait_max_time_ms: 
config.scanner_log_fetch_wait_max_time_ms,
+        scanner_log_fetch_max_bytes_for_bucket: 
config.scanner_log_fetch_max_bytes_for_bucket,
         connect_timeout_ms: config.connect_timeout_ms,
         security_protocol: config.security_protocol.to_string(),
         security_sasl_mechanism: config.security_sasl_mechanism.to_string(),
diff --git a/bindings/python/fluss/__init__.pyi 
b/bindings/python/fluss/__init__.pyi
index 63be0e2..20b259e 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -174,6 +174,22 @@ class Config:
     @scanner_log_max_poll_records.setter
     def scanner_log_max_poll_records(self, num: int) -> None: ...
     @property
+    def scanner_log_fetch_max_bytes(self) -> int: ...
+    @scanner_log_fetch_max_bytes.setter
+    def scanner_log_fetch_max_bytes(self, bytes: int) -> None: ...
+    @property
+    def scanner_log_fetch_min_bytes(self) -> int: ...
+    @scanner_log_fetch_min_bytes.setter
+    def scanner_log_fetch_min_bytes(self, bytes: int) -> None: ...
+    @property
+    def scanner_log_fetch_wait_max_time_ms(self) -> int: ...
+    @scanner_log_fetch_wait_max_time_ms.setter
+    def scanner_log_fetch_wait_max_time_ms(self, ms: int) -> None: ...
+    @property
+    def scanner_log_fetch_max_bytes_for_bucket(self) -> int: ...
+    @scanner_log_fetch_max_bytes_for_bucket.setter
+    def scanner_log_fetch_max_bytes_for_bucket(self, bytes: int) -> None: ...
+    @property
     def writer_batch_timeout_ms(self) -> int: ...
     @writer_batch_timeout_ms.setter
     def writer_batch_timeout_ms(self, timeout: int) -> None: ...
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
index f99f9c6..fd3c980 100644
--- a/bindings/python/src/config.rs
+++ b/bindings/python/src/config.rs
@@ -97,6 +97,32 @@ impl Config {
                                 ))
                             })?;
                     }
+                    "scanner.log.fetch.max-bytes" => {
+                        config.scanner_log_fetch_max_bytes = 
value.parse::<i32>().map_err(|e| {
+                            FlussError::new_err(format!("Invalid value 
'{value}' for '{key}': {e}"))
+                        })?;
+                    }
+                    "scanner.log.fetch.min-bytes" => {
+                        config.scanner_log_fetch_min_bytes = 
value.parse::<i32>().map_err(|e| {
+                            FlussError::new_err(format!("Invalid value 
'{value}' for '{key}': {e}"))
+                        })?;
+                    }
+                    "scanner.log.fetch.wait-max-time-ms" => {
+                        config.scanner_log_fetch_wait_max_time_ms =
+                            value.parse::<i32>().map_err(|e| {
+                                FlussError::new_err(format!(
+                                    "Invalid value '{value}' for '{key}': {e}"
+                                ))
+                            })?;
+                    }
+                    "scanner.log.fetch.max-bytes-for-bucket" => {
+                        config.scanner_log_fetch_max_bytes_for_bucket =
+                            value.parse::<i32>().map_err(|e| {
+                                FlussError::new_err(format!(
+                                    "Invalid value '{value}' for '{key}': {e}"
+                                ))
+                            })?;
+                    }
                     "writer.bucket.no-key-assigner" => {
                         config.writer_bucket_no_key_assigner =
                             
value.parse::<fcore::config::NoKeyAssigner>().map_err(|e| {
@@ -329,6 +355,54 @@ impl Config {
     fn set_security_sasl_password(&mut self, password: String) {
         self.inner.security_sasl_password = password;
     }
+
+    /// Get the maximum bytes per fetch response for LogScanner
+    #[getter]
+    fn scanner_log_fetch_max_bytes(&self) -> i32 {
+        self.inner.scanner_log_fetch_max_bytes
+    }
+
+    /// Set the maximum bytes per fetch response for LogScanner
+    #[setter]
+    fn set_scanner_log_fetch_max_bytes(&mut self, bytes: i32) {
+        self.inner.scanner_log_fetch_max_bytes = bytes;
+    }
+
+    /// Get the minimum bytes to accumulate before returning a fetch response
+    #[getter]
+    fn scanner_log_fetch_min_bytes(&self) -> i32 {
+        self.inner.scanner_log_fetch_min_bytes
+    }
+
+    /// Set the minimum bytes to accumulate before returning a fetch response
+    #[setter]
+    fn set_scanner_log_fetch_min_bytes(&mut self, bytes: i32) {
+        self.inner.scanner_log_fetch_min_bytes = bytes;
+    }
+
+    /// Get the maximum time (ms) the server may wait to satisfy min-bytes
+    #[getter]
+    fn scanner_log_fetch_wait_max_time_ms(&self) -> i32 {
+        self.inner.scanner_log_fetch_wait_max_time_ms
+    }
+
+    /// Set the maximum time (ms) the server may wait to satisfy min-bytes
+    #[setter]
+    fn set_scanner_log_fetch_wait_max_time_ms(&mut self, ms: i32) {
+        self.inner.scanner_log_fetch_wait_max_time_ms = ms;
+    }
+
+    /// Get the maximum bytes per fetch response per bucket for LogScanner
+    #[getter]
+    fn scanner_log_fetch_max_bytes_for_bucket(&self) -> i32 {
+        self.inner.scanner_log_fetch_max_bytes_for_bucket
+    }
+
+    /// Set the maximum bytes per fetch response per bucket for LogScanner
+    #[setter]
+    fn set_scanner_log_fetch_max_bytes_for_bucket(&mut self, bytes: i32) {
+        self.inner.scanner_log_fetch_max_bytes_for_bucket = bytes;
+    }
 }
 
 impl Config {
diff --git a/crates/fluss/src/client/connection.rs 
b/crates/fluss/src/client/connection.rs
index 703b588..2610f6d 100644
--- a/crates/fluss/src/client/connection.rs
+++ b/crates/fluss/src/client/connection.rs
@@ -39,6 +39,8 @@ impl FlussConnection {
     pub async fn new(arg: Config) -> Result<Self> {
         arg.validate_security()
             .map_err(|msg| Error::IllegalArgument { message: msg })?;
+        arg.validate_scanner_fetch()
+            .map_err(|msg| Error::IllegalArgument { message: msg })?;
 
         let timeout = Duration::from_millis(arg.connect_timeout_ms);
         let connections = if arg.is_sasl_enabled() {
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index e837ba7..4302539 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -15,17 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow_schema::SchemaRef;
-use log::{debug, warn};
-use parking_lot::{Mutex, RwLock};
-use std::{
-    collections::{HashMap, HashSet},
-    slice::from_ref,
-    sync::Arc,
-    time::{Duration, Instant},
-};
-use tempfile::TempDir;
-
 use crate::client::connection::FlussConnection;
 use crate::client::credentials::SecurityTokenManager;
 use crate::client::metadata::Metadata;
@@ -44,12 +33,16 @@ use crate::record::{
 use crate::rpc::{RpcClient, RpcError, message};
 use crate::util::FairBucketStatusMap;
 use crate::{PartitionId, TableId};
-
-const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
-#[allow(dead_code)]
-const LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024;
-const LOG_FETCH_MIN_BYTES: i32 = 1;
-const LOG_FETCH_WAIT_MAX_TIME: i32 = 500;
+use arrow_schema::SchemaRef;
+use log::{debug, warn};
+use parking_lot::{Mutex, RwLock};
+use std::{
+    collections::{HashMap, HashSet},
+    slice::from_ref,
+    sync::Arc,
+    time::{Duration, Instant},
+};
+use tempfile::TempDir;
 
 pub struct TableScan<'a> {
     conn: &'a FlussConnection,
@@ -637,6 +630,10 @@ struct LogFetcher {
     log_fetch_buffer: Arc<LogFetchBuffer>,
     nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
     max_poll_records: usize,
+    fetch_max_bytes: i32,
+    fetch_min_bytes: i32,
+    fetch_wait_max_time_ms: i32,
+    fetch_max_bytes_for_bucket: i32,
 }
 
 struct FetchResponseContext {
@@ -697,6 +694,10 @@ impl LogFetcher {
             log_fetch_buffer,
             nodes_with_pending_fetch_requests: 
Arc::new(Mutex::new(HashSet::new())),
             max_poll_records: config.scanner_log_max_poll_records,
+            fetch_max_bytes: config.scanner_log_fetch_max_bytes,
+            fetch_min_bytes: config.scanner_log_fetch_min_bytes,
+            fetch_wait_max_time_ms: config.scanner_log_fetch_wait_max_time_ms,
+            fetch_max_bytes_for_bucket: 
config.scanner_log_fetch_max_bytes_for_bucket,
         })
     }
 
@@ -1479,8 +1480,7 @@ impl LogFetcher {
                             partition_id: bucket.partition_id(),
                             bucket_id: bucket.bucket_id(),
                             fetch_offset: offset,
-                            // 1M
-                            max_fetch_bytes: 1024 * 1024,
+                            max_fetch_bytes: self.fetch_max_bytes_for_bucket,
                         };
 
                         fetch_log_req_for_buckets
@@ -1514,10 +1514,10 @@ impl LogFetcher {
 
                     let fetch_log_request = FetchLogRequest {
                         follower_server_id: -1,
-                        max_bytes: LOG_FETCH_MAX_BYTES,
+                        max_bytes: self.fetch_max_bytes,
                         tables_req: vec![req_for_table],
-                        max_wait_ms: Some(LOG_FETCH_WAIT_MAX_TIME),
-                        min_bytes: Some(LOG_FETCH_MIN_BYTES),
+                        max_wait_ms: Some(self.fetch_wait_max_time_ms),
+                        min_bytes: Some(self.fetch_min_bytes),
                     };
                     (leader_id, fetch_log_request)
                 })
@@ -1990,4 +1990,47 @@ mod tests {
         let result = validate_scan_support(&table_path, &table_info);
         assert!(result.is_ok());
     }
+    #[tokio::test]
+    async fn prepare_fetch_log_requests_uses_configured_fetch_params() -> 
Result<()> {
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let table_info = build_table_info(table_path.clone(), 1, 1);
+        let cluster = build_cluster_arc(&table_path, 1, 1);
+        let metadata = Arc::new(Metadata::new_for_test(cluster));
+        let status = Arc::new(LogScannerStatus::new());
+        status.assign_scan_bucket(TableBucket::new(1, 0), 0);
+
+        let config = crate::config::Config {
+            scanner_log_fetch_max_bytes: 1234,
+            scanner_log_fetch_min_bytes: 7,
+            scanner_log_fetch_wait_max_time_ms: 89,
+            scanner_log_fetch_max_bytes_for_bucket: 512,
+            ..crate::config::Config::default()
+        };
+
+        let fetcher = LogFetcher::new(
+            table_info,
+            Arc::new(RpcClient::new()),
+            metadata,
+            status,
+            &config,
+            None,
+        )?;
+
+        let requests = fetcher.prepare_fetch_log_requests().await;
+        // In this test cluster, leader id should exist; but even if it 
changes,
+        // assert over all built requests.
+        assert!(!requests.is_empty());
+        for req in requests.values() {
+            assert_eq!(req.max_bytes, 1234);
+            assert_eq!(req.min_bytes, Some(7));
+            assert_eq!(req.max_wait_ms, Some(89));
+
+            for table_req in &req.tables_req {
+                for bucket_req in &table_req.buckets_req {
+                    assert_eq!(bucket_req.max_fetch_bytes, 512);
+                }
+            }
+        }
+        Ok(())
+    }
 }
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index 08ffbfa..e85a449 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -27,7 +27,11 @@ const DEFAULT_PREFETCH_NUM: usize = 4;
 const DEFAULT_DOWNLOAD_THREADS: usize = 3;
 const DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY: usize = 4;
 const DEFAULT_MAX_POLL_RECORDS: usize = 500;
+const DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
+const DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES: i32 = 1;
+const DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS: i32 = 500;
 const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100;
+const DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024 * 1024;
 
 const DEFAULT_ACKS: &str = "all";
 const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 120_000;
@@ -91,11 +95,31 @@ pub struct Config {
     #[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)]
     pub scanner_log_max_poll_records: usize,
 
+    /// Maximum bytes per fetch response for LogScanner.
+    /// Default: 16777216 (16MB)
+    #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES)]
+    pub scanner_log_fetch_max_bytes: i32,
+
+    /// Minimum bytes to accumulate before returning a fetch response.
+    /// Default: 1
+    #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES)]
+    pub scanner_log_fetch_min_bytes: i32,
+
+    /// Maximum time the server may wait (ms) to satisfy min-bytes.
+    /// Default: 500
+    #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS)]
+    pub scanner_log_fetch_wait_max_time_ms: i32,
+
     /// The maximum time to wait for a batch to be completed in milliseconds.
     /// Default: 100 (matching Java CLIENT_WRITER_BATCH_TIMEOUT)
     #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_TIMEOUT_MS)]
     pub writer_batch_timeout_ms: i64,
 
+    /// Maximum bytes per fetch response **per bucket** for LogScanner.
+    /// Default: 1048576 (1MB)
+    #[arg(long, default_value_t = 
DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET)]
+    pub scanner_log_fetch_max_bytes_for_bucket: i32,
+
     /// Connect timeout in milliseconds for TCP transport connect.
     /// Default: 120000 (120 seconds).
     #[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT_MS)]
@@ -139,6 +163,22 @@ impl std::fmt::Debug for Config {
                 "scanner_log_max_poll_records",
                 &self.scanner_log_max_poll_records,
             )
+            .field(
+                "scanner_log_fetch_max_bytes",
+                &self.scanner_log_fetch_max_bytes,
+            )
+            .field(
+                "scanner_log_fetch_min_bytes",
+                &self.scanner_log_fetch_min_bytes,
+            )
+            .field(
+                "scanner_log_fetch_max_bytes_for_bucket",
+                &self.scanner_log_fetch_max_bytes_for_bucket,
+            )
+            .field(
+                "scanner_log_fetch_wait_max_time_ms",
+                &self.scanner_log_fetch_wait_max_time_ms,
+            )
             .field("writer_batch_timeout_ms", &self.writer_batch_timeout_ms)
             .field("connect_timeout_ms", &self.connect_timeout_ms)
             .field("security_protocol", &self.security_protocol)
@@ -162,6 +202,10 @@ impl Default for Config {
             remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
             scanner_remote_log_read_concurrency: 
DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY,
             scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
+            scanner_log_fetch_max_bytes: DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES,
+            scanner_log_fetch_min_bytes: DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES,
+            scanner_log_fetch_wait_max_time_ms: 
DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS,
+            scanner_log_fetch_max_bytes_for_bucket: 
DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET,
             writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS,
             connect_timeout_ms: DEFAULT_CONNECT_TIMEOUT_MS,
             security_protocol: String::from(DEFAULT_SECURITY_PROTOCOL),
@@ -205,6 +249,32 @@ impl Config {
         }
         Ok(())
     }
+    pub fn validate_scanner_fetch(&self) -> Result<(), String> {
+        if self.scanner_log_fetch_min_bytes <= 0 {
+            return Err("scanner_log_fetch_min_bytes must be > 0".to_string());
+        }
+        if self.scanner_log_fetch_max_bytes <= 0 {
+            return Err("scanner_log_fetch_max_bytes must be > 0".to_string());
+        }
+        if self.scanner_log_fetch_max_bytes < self.scanner_log_fetch_min_bytes 
{
+            return Err(
+                "scanner_log_fetch_max_bytes must be >= 
scanner_log_fetch_min_bytes".to_string(),
+            );
+        }
+        if self.scanner_log_fetch_wait_max_time_ms < 0 {
+            return Err("scanner_log_fetch_wait_max_time_ms must be >= 
0".to_string());
+        }
+        if self.scanner_log_fetch_max_bytes_for_bucket <= 0 {
+            return Err("scanner_log_fetch_max_bytes_for_bucket must be > 
0".to_string());
+        }
+        if self.scanner_log_fetch_max_bytes_for_bucket > 
self.scanner_log_fetch_max_bytes {
+            return Err(
+                "scanner_log_fetch_max_bytes_for_bucket must be <= 
scanner_log_fetch_max_bytes"
+                    .to_string(),
+            );
+        }
+        Ok(())
+    }
 }
 
 #[cfg(test)]
@@ -274,4 +344,31 @@ mod tests {
         };
         assert!(config.validate_security().is_err());
     }
+    #[test]
+    fn test_scanner_fetch_defaults_valid() {
+        let config = Config::default();
+        assert!(config.validate_scanner_fetch().is_ok());
+        assert_eq!(config.scanner_log_fetch_max_bytes, 16 * 1024 * 1024);
+        assert_eq!(config.scanner_log_fetch_min_bytes, 1);
+        assert_eq!(config.scanner_log_fetch_wait_max_time_ms, 500);
+    }
+
+    #[test]
+    fn test_scanner_fetch_invalid_ranges() {
+        let config = Config {
+            scanner_log_fetch_min_bytes: 2,
+            scanner_log_fetch_max_bytes: 1,
+            ..Config::default()
+        };
+        assert!(config.validate_scanner_fetch().is_err());
+    }
+
+    #[test]
+    fn test_scanner_fetch_negative_wait() {
+        let config = Config {
+            scanner_log_fetch_wait_max_time_ms: -1,
+            ..Config::default()
+        };
+        assert!(config.validate_scanner_fetch().is_err());
+    }
 }
diff --git a/website/docs/user-guide/cpp/api-reference.md 
b/website/docs/user-guide/cpp/api-reference.md
index debd311..d14cf16 100644
--- a/website/docs/user-guide/cpp/api-reference.md
+++ b/website/docs/user-guide/cpp/api-reference.md
@@ -28,6 +28,10 @@ Complete API reference for the Fluss C++ client.
 | `remote_file_download_thread_num`     | `size_t`      | `3`                  
| Number of threads for remote log downloads                                    
           |
 | `scanner_remote_log_read_concurrency` | `size_t`      | `4`                  
| Streaming read concurrency within a remote log file                           
           |
 | `scanner_log_max_poll_records`        | `size_t`      | `500`                
| Maximum number of records returned in a single Poll()                         
           |
+| `scanner_log_fetch_max_bytes`         | `int32_t`     | `16777216` (16 MB)   
| Maximum bytes per fetch response for LogScanner                               
           |
+| `scanner_log_fetch_min_bytes`         | `int32_t`     | `1`                  
| Minimum bytes the server must accumulate before returning a fetch response    
           |
+| `scanner_log_fetch_wait_max_time_ms`  | `int32_t`     | `500`                
| Maximum time (ms) the server may wait to satisfy min-bytes                    
           |
+| `scanner_log_fetch_max_bytes_for_bucket`| `int32_t`   | `1048576` (1 MB)     
| Maximum bytes per fetch response per bucket for LogScanner                    
           |
 | `connect_timeout_ms`                  | `uint64_t`    | `120000`             
| TCP connect timeout in milliseconds                                           
           |
 | `security_protocol`                   | `std::string` | `"PLAINTEXT"`        
| `"PLAINTEXT"` (default) or `"sasl"` for SASL auth                             
           |
 | `security_sasl_mechanism`             | `std::string` | `"PLAIN"`            
| SASL mechanism (only `"PLAIN"` is supported)                                  
           |
diff --git a/website/docs/user-guide/python/api-reference.md 
b/website/docs/user-guide/python/api-reference.md
index fef10a8..a4b594b 100644
--- a/website/docs/user-guide/python/api-reference.md
+++ b/website/docs/user-guide/python/api-reference.md
@@ -21,6 +21,10 @@ Complete API reference for the Fluss Python client.
 | `remote_file_download_thread_num`     | `remote-file.download-thread-num`    
 | Get/set number of threads for remote log downloads                           
           |
 | `scanner_remote_log_read_concurrency` | 
`scanner.remote-log.read-concurrency` | Get/set streaming read concurrency 
within a remote log file                             |
 | `scanner_log_max_poll_records`        | `scanner.log.max-poll-records`       
 | Get/set max number of records returned in a single poll()                    
           |
+| `scanner_log_fetch_max_bytes`         | `scanner.log.fetch.max-bytes`        
 | Get/set maximum bytes per fetch response for LogScanner                      
           |
+| `scanner_log_fetch_min_bytes`         | `scanner.log.fetch.min-bytes`        
 | Get/set minimum bytes the server must accumulate before returning a fetch 
response      |
+| `scanner_log_fetch_wait_max_time_ms`  | `scanner.log.fetch.wait-max-time-ms` 
 | Get/set maximum time (ms) the server may wait to satisfy min-bytes           
           |
+| `scanner_log_fetch_max_bytes_for_bucket` | 
`scanner.log.fetch.max-bytes-for-bucket` | Get/set maximum bytes per fetch 
response per bucket for LogScanner                |
 | `connect_timeout_ms`                  | `connect-timeout`                    
 | Get/set TCP connect timeout in milliseconds                                  
           |
 | `security_protocol`                   | `security.protocol`                  
 | Get/set security protocol (`"PLAINTEXT"` or `"sasl"`)                        
           |
 | `security_sasl_mechanism`             | `security.sasl.mechanism`            
 | Get/set SASL mechanism (only `"PLAIN"` is supported)                         
           |
diff --git a/website/docs/user-guide/rust/api-reference.md 
b/website/docs/user-guide/rust/api-reference.md
index d539a86..7f45522 100644
--- a/website/docs/user-guide/rust/api-reference.md
+++ b/website/docs/user-guide/rust/api-reference.md
@@ -20,6 +20,10 @@ Complete API reference for the Fluss Rust client.
 | `remote_file_download_thread_num`     | `usize`         | `3`                
| Number of threads for remote log downloads                                    
       |
 | `scanner_remote_log_read_concurrency` | `usize`         | `4`                
| Streaming read concurrency within a remote log file                           
       |
 | `scanner_log_max_poll_records`        | `usize`         | `500`              
| Maximum number of records returned in a single poll()                         
       |
+| `scanner_log_fetch_max_bytes`         | `i32`           | `16777216` (16 MB) 
| Maximum bytes per fetch response for LogScanner                               
       |
+| `scanner_log_fetch_min_bytes`         | `i32`           | `1`                
| Minimum bytes the server must accumulate before returning a fetch response    
       |
+| `scanner_log_fetch_wait_max_time_ms`  | `i32`           | `500`              
| Maximum time (ms) the server may wait to satisfy min-bytes                    
       |
+| `scanner_log_fetch_max_bytes_for_bucket`| `i32`         | `1048576` (1 MB)   
| Maximum bytes per fetch response per bucket for LogScanner                    
       |
 | `connect_timeout_ms`                  | `u64`           | `120000`           
| TCP connect timeout in milliseconds                                           
       |
 | `security_protocol`                   | `String`        | `"PLAINTEXT"`      
| `PLAINTEXT` (default) or `sasl` for SASL auth                                 
       |
 | `security_sasl_mechanism`             | `String`        | `"PLAIN"`          
| SASL mechanism (only `PLAIN` is supported)                                    
       |

Reply via email to