This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 8642ea4 feat: allow configuring scanner fetch parameters (#417)
8642ea4 is described below
commit 8642ea454d95b283653e49489e60216d07c8ebb7
Author: Prajwal banakar <[email protected]>
AuthorDate: Thu Mar 12 07:37:20 2026 +0530
feat: allow configuring scanner fetch parameters (#417)
---
bindings/cpp/include/fluss.hpp | 8 ++
bindings/cpp/src/ffi_converter.hpp | 4 +
bindings/cpp/src/lib.rs | 8 ++
bindings/python/fluss/__init__.pyi | 16 ++++
bindings/python/src/config.rs | 74 +++++++++++++++++++
crates/fluss/src/client/connection.rs | 2 +
crates/fluss/src/client/table/scanner.rs | 87 ++++++++++++++++------
crates/fluss/src/config.rs | 97 +++++++++++++++++++++++++
website/docs/user-guide/cpp/api-reference.md | 4 +
website/docs/user-guide/python/api-reference.md | 4 +
website/docs/user-guide/rust/api-reference.md | 4 +
11 files changed, 286 insertions(+), 22 deletions(-)
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index d0da617..79561a1 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -1012,6 +1012,14 @@ struct Configuration {
size_t scanner_remote_log_read_concurrency{4};
// Maximum number of records returned in a single call to Poll() for
LogScanner
size_t scanner_log_max_poll_records{500};
+ // Maximum bytes per fetch response for LogScanner (16 MB)
+ int32_t scanner_log_fetch_max_bytes{16 * 1024 * 1024};
+ // Minimum bytes to accumulate before server returns a fetch response
+ int32_t scanner_log_fetch_min_bytes{1};
+ // Maximum time (ms) the server may wait to satisfy min bytes
+ int32_t scanner_log_fetch_wait_max_time_ms{500};
+ // Maximum bytes per fetch response per bucket for LogScanner (1 MB)
+ int32_t scanner_log_fetch_max_bytes_for_bucket{1024 * 1024};
int64_t writer_batch_timeout_ms{100};
// Connect timeout in milliseconds for TCP transport connect
uint64_t connect_timeout_ms{120000};
diff --git a/bindings/cpp/src/ffi_converter.hpp
b/bindings/cpp/src/ffi_converter.hpp
index 93a60bf..754ed0f 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -64,6 +64,10 @@ inline ffi::FfiConfig to_ffi_config(const Configuration&
config) {
ffi_config.remote_file_download_thread_num =
config.remote_file_download_thread_num;
ffi_config.scanner_remote_log_read_concurrency =
config.scanner_remote_log_read_concurrency;
ffi_config.scanner_log_max_poll_records =
config.scanner_log_max_poll_records;
+ ffi_config.scanner_log_fetch_max_bytes =
config.scanner_log_fetch_max_bytes;
+ ffi_config.scanner_log_fetch_min_bytes =
config.scanner_log_fetch_min_bytes;
+ ffi_config.scanner_log_fetch_wait_max_time_ms =
config.scanner_log_fetch_wait_max_time_ms;
+ ffi_config.scanner_log_fetch_max_bytes_for_bucket =
config.scanner_log_fetch_max_bytes_for_bucket;
ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
ffi_config.connect_timeout_ms = config.connect_timeout_ms;
ffi_config.security_protocol = rust::String(config.security_protocol);
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 36b9c51..8d5153e 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -48,6 +48,10 @@ mod ffi {
remote_file_download_thread_num: usize,
scanner_remote_log_read_concurrency: usize,
scanner_log_max_poll_records: usize,
+ scanner_log_fetch_max_bytes: i32,
+ scanner_log_fetch_min_bytes: i32,
+ scanner_log_fetch_wait_max_time_ms: i32,
+ scanner_log_fetch_max_bytes_for_bucket: i32,
writer_batch_timeout_ms: i64,
connect_timeout_ms: u64,
security_protocol: String,
@@ -668,6 +672,10 @@ fn new_connection(config: &ffi::FfiConfig) ->
ffi::FfiPtrResult {
remote_file_download_thread_num:
config.remote_file_download_thread_num,
scanner_remote_log_read_concurrency:
config.scanner_remote_log_read_concurrency,
scanner_log_max_poll_records: config.scanner_log_max_poll_records,
+ scanner_log_fetch_max_bytes: config.scanner_log_fetch_max_bytes,
+ scanner_log_fetch_min_bytes: config.scanner_log_fetch_min_bytes,
+ scanner_log_fetch_wait_max_time_ms:
config.scanner_log_fetch_wait_max_time_ms,
+ scanner_log_fetch_max_bytes_for_bucket:
config.scanner_log_fetch_max_bytes_for_bucket,
connect_timeout_ms: config.connect_timeout_ms,
security_protocol: config.security_protocol.to_string(),
security_sasl_mechanism: config.security_sasl_mechanism.to_string(),
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index 63be0e2..20b259e 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -174,6 +174,22 @@ class Config:
@scanner_log_max_poll_records.setter
def scanner_log_max_poll_records(self, num: int) -> None: ...
@property
+ def scanner_log_fetch_max_bytes(self) -> int: ...
+ @scanner_log_fetch_max_bytes.setter
+ def scanner_log_fetch_max_bytes(self, bytes: int) -> None: ...
+ @property
+ def scanner_log_fetch_min_bytes(self) -> int: ...
+ @scanner_log_fetch_min_bytes.setter
+ def scanner_log_fetch_min_bytes(self, bytes: int) -> None: ...
+ @property
+ def scanner_log_fetch_wait_max_time_ms(self) -> int: ...
+ @scanner_log_fetch_wait_max_time_ms.setter
+ def scanner_log_fetch_wait_max_time_ms(self, ms: int) -> None: ...
+ @property
+ def scanner_log_fetch_max_bytes_for_bucket(self) -> int: ...
+ @scanner_log_fetch_max_bytes_for_bucket.setter
+ def scanner_log_fetch_max_bytes_for_bucket(self, bytes: int) -> None: ...
+ @property
def writer_batch_timeout_ms(self) -> int: ...
@writer_batch_timeout_ms.setter
def writer_batch_timeout_ms(self, timeout: int) -> None: ...
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
index f99f9c6..fd3c980 100644
--- a/bindings/python/src/config.rs
+++ b/bindings/python/src/config.rs
@@ -97,6 +97,32 @@ impl Config {
))
})?;
}
+ "scanner.log.fetch.max-bytes" => {
+ config.scanner_log_fetch_max_bytes =
value.parse::<i32>().map_err(|e| {
+ FlussError::new_err(format!("Invalid value
'{value}' for '{key}': {e}"))
+ })?;
+ }
+ "scanner.log.fetch.min-bytes" => {
+ config.scanner_log_fetch_min_bytes =
value.parse::<i32>().map_err(|e| {
+ FlussError::new_err(format!("Invalid value
'{value}' for '{key}': {e}"))
+ })?;
+ }
+ "scanner.log.fetch.wait-max-time-ms" => {
+ config.scanner_log_fetch_wait_max_time_ms =
+ value.parse::<i32>().map_err(|e| {
+ FlussError::new_err(format!(
+ "Invalid value '{value}' for '{key}': {e}"
+ ))
+ })?;
+ }
+ "scanner.log.fetch.max-bytes-for-bucket" => {
+ config.scanner_log_fetch_max_bytes_for_bucket =
+ value.parse::<i32>().map_err(|e| {
+ FlussError::new_err(format!(
+ "Invalid value '{value}' for '{key}': {e}"
+ ))
+ })?;
+ }
"writer.bucket.no-key-assigner" => {
config.writer_bucket_no_key_assigner =
value.parse::<fcore::config::NoKeyAssigner>().map_err(|e| {
@@ -329,6 +355,54 @@ impl Config {
fn set_security_sasl_password(&mut self, password: String) {
self.inner.security_sasl_password = password;
}
+
+ /// Get the maximum bytes per fetch response for LogScanner
+ #[getter]
+ fn scanner_log_fetch_max_bytes(&self) -> i32 {
+ self.inner.scanner_log_fetch_max_bytes
+ }
+
+ /// Set the maximum bytes per fetch response for LogScanner
+ #[setter]
+ fn set_scanner_log_fetch_max_bytes(&mut self, bytes: i32) {
+ self.inner.scanner_log_fetch_max_bytes = bytes;
+ }
+
+ /// Get the minimum bytes to accumulate before returning a fetch response
+ #[getter]
+ fn scanner_log_fetch_min_bytes(&self) -> i32 {
+ self.inner.scanner_log_fetch_min_bytes
+ }
+
+ /// Set the minimum bytes to accumulate before returning a fetch response
+ #[setter]
+ fn set_scanner_log_fetch_min_bytes(&mut self, bytes: i32) {
+ self.inner.scanner_log_fetch_min_bytes = bytes;
+ }
+
+ /// Get the maximum time (ms) the server may wait to satisfy min-bytes
+ #[getter]
+ fn scanner_log_fetch_wait_max_time_ms(&self) -> i32 {
+ self.inner.scanner_log_fetch_wait_max_time_ms
+ }
+
+ /// Set the maximum time (ms) the server may wait to satisfy min-bytes
+ #[setter]
+ fn set_scanner_log_fetch_wait_max_time_ms(&mut self, ms: i32) {
+ self.inner.scanner_log_fetch_wait_max_time_ms = ms;
+ }
+
+ /// Get the maximum bytes per fetch response per bucket for LogScanner
+ #[getter]
+ fn scanner_log_fetch_max_bytes_for_bucket(&self) -> i32 {
+ self.inner.scanner_log_fetch_max_bytes_for_bucket
+ }
+
+ /// Set the maximum bytes per fetch response per bucket for LogScanner
+ #[setter]
+ fn set_scanner_log_fetch_max_bytes_for_bucket(&mut self, bytes: i32) {
+ self.inner.scanner_log_fetch_max_bytes_for_bucket = bytes;
+ }
}
impl Config {
diff --git a/crates/fluss/src/client/connection.rs
b/crates/fluss/src/client/connection.rs
index 703b588..2610f6d 100644
--- a/crates/fluss/src/client/connection.rs
+++ b/crates/fluss/src/client/connection.rs
@@ -39,6 +39,8 @@ impl FlussConnection {
pub async fn new(arg: Config) -> Result<Self> {
arg.validate_security()
.map_err(|msg| Error::IllegalArgument { message: msg })?;
+ arg.validate_scanner_fetch()
+ .map_err(|msg| Error::IllegalArgument { message: msg })?;
let timeout = Duration::from_millis(arg.connect_timeout_ms);
let connections = if arg.is_sasl_enabled() {
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index e837ba7..4302539 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -15,17 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use arrow_schema::SchemaRef;
-use log::{debug, warn};
-use parking_lot::{Mutex, RwLock};
-use std::{
- collections::{HashMap, HashSet},
- slice::from_ref,
- sync::Arc,
- time::{Duration, Instant},
-};
-use tempfile::TempDir;
-
use crate::client::connection::FlussConnection;
use crate::client::credentials::SecurityTokenManager;
use crate::client::metadata::Metadata;
@@ -44,12 +33,16 @@ use crate::record::{
use crate::rpc::{RpcClient, RpcError, message};
use crate::util::FairBucketStatusMap;
use crate::{PartitionId, TableId};
-
-const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
-#[allow(dead_code)]
-const LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024;
-const LOG_FETCH_MIN_BYTES: i32 = 1;
-const LOG_FETCH_WAIT_MAX_TIME: i32 = 500;
+use arrow_schema::SchemaRef;
+use log::{debug, warn};
+use parking_lot::{Mutex, RwLock};
+use std::{
+ collections::{HashMap, HashSet},
+ slice::from_ref,
+ sync::Arc,
+ time::{Duration, Instant},
+};
+use tempfile::TempDir;
pub struct TableScan<'a> {
conn: &'a FlussConnection,
@@ -637,6 +630,10 @@ struct LogFetcher {
log_fetch_buffer: Arc<LogFetchBuffer>,
nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
max_poll_records: usize,
+ fetch_max_bytes: i32,
+ fetch_min_bytes: i32,
+ fetch_wait_max_time_ms: i32,
+ fetch_max_bytes_for_bucket: i32,
}
struct FetchResponseContext {
@@ -697,6 +694,10 @@ impl LogFetcher {
log_fetch_buffer,
nodes_with_pending_fetch_requests:
Arc::new(Mutex::new(HashSet::new())),
max_poll_records: config.scanner_log_max_poll_records,
+ fetch_max_bytes: config.scanner_log_fetch_max_bytes,
+ fetch_min_bytes: config.scanner_log_fetch_min_bytes,
+ fetch_wait_max_time_ms: config.scanner_log_fetch_wait_max_time_ms,
+ fetch_max_bytes_for_bucket:
config.scanner_log_fetch_max_bytes_for_bucket,
})
}
@@ -1479,8 +1480,7 @@ impl LogFetcher {
partition_id: bucket.partition_id(),
bucket_id: bucket.bucket_id(),
fetch_offset: offset,
- // 1M
- max_fetch_bytes: 1024 * 1024,
+ max_fetch_bytes: self.fetch_max_bytes_for_bucket,
};
fetch_log_req_for_buckets
@@ -1514,10 +1514,10 @@ impl LogFetcher {
let fetch_log_request = FetchLogRequest {
follower_server_id: -1,
- max_bytes: LOG_FETCH_MAX_BYTES,
+ max_bytes: self.fetch_max_bytes,
tables_req: vec![req_for_table],
- max_wait_ms: Some(LOG_FETCH_WAIT_MAX_TIME),
- min_bytes: Some(LOG_FETCH_MIN_BYTES),
+ max_wait_ms: Some(self.fetch_wait_max_time_ms),
+ min_bytes: Some(self.fetch_min_bytes),
};
(leader_id, fetch_log_request)
})
@@ -1990,4 +1990,47 @@ mod tests {
let result = validate_scan_support(&table_path, &table_info);
assert!(result.is_ok());
}
+ #[tokio::test]
+ async fn prepare_fetch_log_requests_uses_configured_fetch_params() ->
Result<()> {
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+ let table_info = build_table_info(table_path.clone(), 1, 1);
+ let cluster = build_cluster_arc(&table_path, 1, 1);
+ let metadata = Arc::new(Metadata::new_for_test(cluster));
+ let status = Arc::new(LogScannerStatus::new());
+ status.assign_scan_bucket(TableBucket::new(1, 0), 0);
+
+ let config = crate::config::Config {
+ scanner_log_fetch_max_bytes: 1234,
+ scanner_log_fetch_min_bytes: 7,
+ scanner_log_fetch_wait_max_time_ms: 89,
+ scanner_log_fetch_max_bytes_for_bucket: 512,
+ ..crate::config::Config::default()
+ };
+
+ let fetcher = LogFetcher::new(
+ table_info,
+ Arc::new(RpcClient::new()),
+ metadata,
+ status,
+ &config,
+ None,
+ )?;
+
+ let requests = fetcher.prepare_fetch_log_requests().await;
+ // In this test cluster, leader id should exist; but even if it
changes,
+ // assert over all built requests.
+ assert!(!requests.is_empty());
+ for req in requests.values() {
+ assert_eq!(req.max_bytes, 1234);
+ assert_eq!(req.min_bytes, Some(7));
+ assert_eq!(req.max_wait_ms, Some(89));
+
+ for table_req in &req.tables_req {
+ for bucket_req in &table_req.buckets_req {
+ assert_eq!(bucket_req.max_fetch_bytes, 512);
+ }
+ }
+ }
+ Ok(())
+ }
}
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index 08ffbfa..e85a449 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -27,7 +27,11 @@ const DEFAULT_PREFETCH_NUM: usize = 4;
const DEFAULT_DOWNLOAD_THREADS: usize = 3;
const DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY: usize = 4;
const DEFAULT_MAX_POLL_RECORDS: usize = 500;
+const DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
+const DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES: i32 = 1;
+const DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS: i32 = 500;
const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100;
+const DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024 * 1024;
const DEFAULT_ACKS: &str = "all";
const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 120_000;
@@ -91,11 +95,31 @@ pub struct Config {
#[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)]
pub scanner_log_max_poll_records: usize,
+ /// Maximum bytes per fetch response for LogScanner.
+ /// Default: 16777216 (16MB)
+ #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES)]
+ pub scanner_log_fetch_max_bytes: i32,
+
+ /// Minimum bytes to accumulate before returning a fetch response.
+ /// Default: 1
+ #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES)]
+ pub scanner_log_fetch_min_bytes: i32,
+
+ /// Maximum time the server may wait (ms) to satisfy min-bytes.
+ /// Default: 500
+ #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS)]
+ pub scanner_log_fetch_wait_max_time_ms: i32,
+
/// The maximum time to wait for a batch to be completed in milliseconds.
/// Default: 100 (matching Java CLIENT_WRITER_BATCH_TIMEOUT)
#[arg(long, default_value_t = DEFAULT_WRITER_BATCH_TIMEOUT_MS)]
pub writer_batch_timeout_ms: i64,
+ /// Maximum bytes per fetch response **per bucket** for LogScanner.
+ /// Default: 1048576 (1MB)
+ #[arg(long, default_value_t =
DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET)]
+ pub scanner_log_fetch_max_bytes_for_bucket: i32,
+
/// Connect timeout in milliseconds for TCP transport connect.
/// Default: 120000 (120 seconds).
#[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT_MS)]
@@ -139,6 +163,22 @@ impl std::fmt::Debug for Config {
"scanner_log_max_poll_records",
&self.scanner_log_max_poll_records,
)
+ .field(
+ "scanner_log_fetch_max_bytes",
+ &self.scanner_log_fetch_max_bytes,
+ )
+ .field(
+ "scanner_log_fetch_min_bytes",
+ &self.scanner_log_fetch_min_bytes,
+ )
+ .field(
+ "scanner_log_fetch_max_bytes_for_bucket",
+ &self.scanner_log_fetch_max_bytes_for_bucket,
+ )
+ .field(
+ "scanner_log_fetch_wait_max_time_ms",
+ &self.scanner_log_fetch_wait_max_time_ms,
+ )
.field("writer_batch_timeout_ms", &self.writer_batch_timeout_ms)
.field("connect_timeout_ms", &self.connect_timeout_ms)
.field("security_protocol", &self.security_protocol)
@@ -162,6 +202,10 @@ impl Default for Config {
remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
scanner_remote_log_read_concurrency:
DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY,
scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
+ scanner_log_fetch_max_bytes: DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES,
+ scanner_log_fetch_min_bytes: DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES,
+ scanner_log_fetch_wait_max_time_ms:
DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS,
+ scanner_log_fetch_max_bytes_for_bucket:
DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET,
writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS,
connect_timeout_ms: DEFAULT_CONNECT_TIMEOUT_MS,
security_protocol: String::from(DEFAULT_SECURITY_PROTOCOL),
@@ -205,6 +249,32 @@ impl Config {
}
Ok(())
}
+ pub fn validate_scanner_fetch(&self) -> Result<(), String> {
+ if self.scanner_log_fetch_min_bytes <= 0 {
+ return Err("scanner_log_fetch_min_bytes must be > 0".to_string());
+ }
+ if self.scanner_log_fetch_max_bytes <= 0 {
+ return Err("scanner_log_fetch_max_bytes must be > 0".to_string());
+ }
+ if self.scanner_log_fetch_max_bytes < self.scanner_log_fetch_min_bytes
{
+ return Err(
+ "scanner_log_fetch_max_bytes must be >=
scanner_log_fetch_min_bytes".to_string(),
+ );
+ }
+ if self.scanner_log_fetch_wait_max_time_ms < 0 {
+ return Err("scanner_log_fetch_wait_max_time_ms must be >=
0".to_string());
+ }
+ if self.scanner_log_fetch_max_bytes_for_bucket <= 0 {
+ return Err("scanner_log_fetch_max_bytes_for_bucket must be >
0".to_string());
+ }
+ if self.scanner_log_fetch_max_bytes_for_bucket >
self.scanner_log_fetch_max_bytes {
+ return Err(
+ "scanner_log_fetch_max_bytes_for_bucket must be <=
scanner_log_fetch_max_bytes"
+ .to_string(),
+ );
+ }
+ Ok(())
+ }
}
#[cfg(test)]
@@ -274,4 +344,31 @@ mod tests {
};
assert!(config.validate_security().is_err());
}
+ #[test]
+ fn test_scanner_fetch_defaults_valid() {
+ let config = Config::default();
+ assert!(config.validate_scanner_fetch().is_ok());
+ assert_eq!(config.scanner_log_fetch_max_bytes, 16 * 1024 * 1024);
+ assert_eq!(config.scanner_log_fetch_min_bytes, 1);
+ assert_eq!(config.scanner_log_fetch_wait_max_time_ms, 500);
+ }
+
+ #[test]
+ fn test_scanner_fetch_invalid_ranges() {
+ let config = Config {
+ scanner_log_fetch_min_bytes: 2,
+ scanner_log_fetch_max_bytes: 1,
+ ..Config::default()
+ };
+ assert!(config.validate_scanner_fetch().is_err());
+ }
+
+ #[test]
+ fn test_scanner_fetch_negative_wait() {
+ let config = Config {
+ scanner_log_fetch_wait_max_time_ms: -1,
+ ..Config::default()
+ };
+ assert!(config.validate_scanner_fetch().is_err());
+ }
}
diff --git a/website/docs/user-guide/cpp/api-reference.md
b/website/docs/user-guide/cpp/api-reference.md
index debd311..d14cf16 100644
--- a/website/docs/user-guide/cpp/api-reference.md
+++ b/website/docs/user-guide/cpp/api-reference.md
@@ -28,6 +28,10 @@ Complete API reference for the Fluss C++ client.
| `remote_file_download_thread_num` | `size_t` | `3`
| Number of threads for remote log downloads
|
| `scanner_remote_log_read_concurrency` | `size_t` | `4`
| Streaming read concurrency within a remote log file
|
| `scanner_log_max_poll_records` | `size_t` | `500`
| Maximum number of records returned in a single Poll()
|
+| `scanner_log_fetch_max_bytes` | `int32_t` | `16777216` (16 MB)
| Maximum bytes per fetch response for LogScanner
|
+| `scanner_log_fetch_min_bytes` | `int32_t` | `1`
| Minimum bytes the server must accumulate before returning a fetch response
|
+| `scanner_log_fetch_wait_max_time_ms` | `int32_t` | `500`
| Maximum time (ms) the server may wait to satisfy min-bytes
|
+| `scanner_log_fetch_max_bytes_for_bucket`| `int32_t` | `1048576` (1 MB)
| Maximum bytes per fetch response per bucket for LogScanner
|
| `connect_timeout_ms` | `uint64_t` | `120000`
| TCP connect timeout in milliseconds
|
| `security_protocol` | `std::string` | `"PLAINTEXT"`
| `"PLAINTEXT"` (default) or `"sasl"` for SASL auth
|
| `security_sasl_mechanism` | `std::string` | `"PLAIN"`
| SASL mechanism (only `"PLAIN"` is supported)
|
diff --git a/website/docs/user-guide/python/api-reference.md
b/website/docs/user-guide/python/api-reference.md
index fef10a8..a4b594b 100644
--- a/website/docs/user-guide/python/api-reference.md
+++ b/website/docs/user-guide/python/api-reference.md
@@ -21,6 +21,10 @@ Complete API reference for the Fluss Python client.
| `remote_file_download_thread_num` | `remote-file.download-thread-num`
| Get/set number of threads for remote log downloads
|
| `scanner_remote_log_read_concurrency` |
`scanner.remote-log.read-concurrency` | Get/set streaming read concurrency
within a remote log file |
| `scanner_log_max_poll_records` | `scanner.log.max-poll-records`
| Get/set max number of records returned in a single poll()
|
+| `scanner_log_fetch_max_bytes` | `scanner.log.fetch.max-bytes`
| Get/set maximum bytes per fetch response for LogScanner
|
+| `scanner_log_fetch_min_bytes` | `scanner.log.fetch.min-bytes`
| Get/set minimum bytes the server must accumulate before returning a fetch
response |
+| `scanner_log_fetch_wait_max_time_ms` | `scanner.log.fetch.wait-max-time-ms`
| Get/set maximum time (ms) the server may wait to satisfy min-bytes
|
+| `scanner_log_fetch_max_bytes_for_bucket` |
`scanner.log.fetch.max-bytes-for-bucket` | Get/set maximum bytes per fetch
response per bucket for LogScanner |
| `connect_timeout_ms` | `connect-timeout`
| Get/set TCP connect timeout in milliseconds
|
| `security_protocol` | `security.protocol`
| Get/set security protocol (`"PLAINTEXT"` or `"sasl"`)
|
| `security_sasl_mechanism` | `security.sasl.mechanism`
| Get/set SASL mechanism (only `"PLAIN"` is supported)
|
diff --git a/website/docs/user-guide/rust/api-reference.md
b/website/docs/user-guide/rust/api-reference.md
index d539a86..7f45522 100644
--- a/website/docs/user-guide/rust/api-reference.md
+++ b/website/docs/user-guide/rust/api-reference.md
@@ -20,6 +20,10 @@ Complete API reference for the Fluss Rust client.
| `remote_file_download_thread_num` | `usize` | `3`
| Number of threads for remote log downloads
|
| `scanner_remote_log_read_concurrency` | `usize` | `4`
| Streaming read concurrency within a remote log file
|
| `scanner_log_max_poll_records` | `usize` | `500`
| Maximum number of records returned in a single poll()
|
+| `scanner_log_fetch_max_bytes` | `i32` | `16777216` (16 MB)
| Maximum bytes per fetch response for LogScanner
|
+| `scanner_log_fetch_min_bytes` | `i32` | `1`
| Minimum bytes the server must accumulate before returning a fetch response
|
+| `scanner_log_fetch_wait_max_time_ms` | `i32` | `500`
| Maximum time (ms) the server may wait to satisfy min-bytes
|
+| `scanner_log_fetch_max_bytes_for_bucket`| `i32` | `1048576` (1 MB)
| Maximum bytes per fetch response per bucket for LogScanner
|
| `connect_timeout_ms` | `u64` | `120000`
| TCP connect timeout in milliseconds
|
| `security_protocol` | `String` | `"PLAINTEXT"`
| `PLAINTEXT` (default) or `sasl` for SASL auth
|
| `security_sasl_mechanism` | `String` | `"PLAIN"`
| SASL mechanism (only `PLAIN` is supported)
|