This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new dd10579 feat: Add getter/setter property for
writer_bucket_no_key_assigner config in python (#397)
dd10579 is described below
commit dd10579d2249775610c4871421293b413747a039
Author: Kaiqi Dong <[email protected]>
AuthorDate: Thu Mar 5 04:47:25 2026 +0100
feat: Add getter/setter property for writer_bucket_no_key_assigner config
in python (#397)
---
bindings/cpp/src/lib.rs | 14 ++++++--------
bindings/python/fluss/__init__.pyi | 4 ++++
bindings/python/src/config.rs | 33 ++++++++++++++++++++++++---------
crates/fluss/src/config.rs | 18 +++++++-----------
4 files changed, 41 insertions(+), 28 deletions(-)
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 366a198..36b9c51 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -649,14 +649,12 @@ fn err_ptr_from_core(e: &fcore::error::Error) ->
ffi::FfiPtrResult {
// Connection implementation
fn new_connection(config: &ffi::FfiConfig) -> ffi::FfiPtrResult {
- let assigner_type = match config.writer_bucket_no_key_assigner.as_str() {
- "round_robin" => fluss::config::NoKeyAssigner::RoundRobin,
- "sticky" => fluss::config::NoKeyAssigner::Sticky,
- other => {
- return client_err_ptr(format!(
- "Unknown bucket assigner type: '{other}', expected 'sticky' or
'round_robin'"
- ));
- }
+ let assigner_type = match config
+ .writer_bucket_no_key_assigner
+ .parse::<fluss::config::NoKeyAssigner>()
+ {
+ Ok(v) => v,
+ Err(e) => return client_err_ptr(format!("Invalid bucket assigner type:
{e}")),
};
let config_core = fluss::config::Config {
bootstrap_servers: config.bootstrap_servers.to_string(),
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index c387d73..417ac9b 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -154,6 +154,10 @@ class Config:
@writer_batch_size.setter
def writer_batch_size(self, size: int) -> None: ...
@property
+ def writer_bucket_no_key_assigner(self) -> str: ...
+ @writer_bucket_no_key_assigner.setter
+ def writer_bucket_no_key_assigner(self, value: str) -> None: ...
+ @property
def scanner_remote_log_prefetch_num(self) -> int: ...
@scanner_remote_log_prefetch_num.setter
def scanner_remote_log_prefetch_num(self, num: int) -> None: ...
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
index 4582d43..f99f9c6 100644
--- a/bindings/python/src/config.rs
+++ b/bindings/python/src/config.rs
@@ -98,15 +98,12 @@ impl Config {
})?;
}
"writer.bucket.no-key-assigner" => {
- config.writer_bucket_no_key_assigner = match
value.as_str() {
- "round_robin" =>
fcore::config::NoKeyAssigner::RoundRobin,
- "sticky" => fcore::config::NoKeyAssigner::Sticky,
- other => {
- return Err(FlussError::new_err(format!(
- "Unknown bucket assigner type: {other},
expected 'sticky' or 'round_robin'"
- )));
- }
- };
+ config.writer_bucket_no_key_assigner =
+
value.parse::<fcore::config::NoKeyAssigner>().map_err(|e| {
+ FlussError::new_err(format!(
+ "Invalid value '{value}' for '{key}': {e}"
+ ))
+ })?;
}
"connect-timeout" => {
config.connect_timeout_ms =
value.parse::<u64>().map_err(|e| {
@@ -255,6 +252,24 @@ impl Config {
self.inner.writer_batch_timeout_ms = timeout;
}
+ /// Get the bucket assignment strategy for tables without bucket keys
+ #[getter]
+ fn writer_bucket_no_key_assigner(&self) -> String {
+ self.inner.writer_bucket_no_key_assigner.to_string()
+ }
+
+ /// Set the bucket assignment strategy for tables without bucket keys
+ #[setter]
+ fn set_writer_bucket_no_key_assigner(&mut self, value: String) ->
PyResult<()> {
+ self.inner.writer_bucket_no_key_assigner =
+ value.parse::<fcore::config::NoKeyAssigner>().map_err(|e| {
+ FlussError::new_err(format!(
+ "Invalid value '{value}' for
'writer.bucket.no-key-assigner': {e}"
+ ))
+ })?;
+ Ok(())
+ }
+
/// Get the connect timeout in milliseconds
#[getter]
fn connect_timeout_ms(&self) -> u64 {
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index 438c948..08ffbfa 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -17,7 +17,7 @@
use clap::{Parser, ValueEnum};
use serde::{Deserialize, Serialize};
-use std::fmt;
+use strum_macros::{Display, EnumString};
const DEFAULT_BOOTSTRAP_SERVER: &str = "127.0.0.1:9123";
const DEFAULT_REQUEST_MAX_SIZE: i32 = 10 * 1024 * 1024;
@@ -36,24 +36,20 @@ const DEFAULT_SASL_MECHANISM: &str = "PLAIN";
/// Bucket assigner strategy for tables without bucket keys.
/// Matches Java `client.writer.bucket.no-key-assigner`.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum, Deserialize, Serialize)]
+#[derive(
+ Debug, Clone, Copy, PartialEq, Eq, ValueEnum, Deserialize, Serialize,
EnumString, Display,
+)]
#[serde(rename_all = "snake_case")]
+#[strum(ascii_case_insensitive)]
pub enum NoKeyAssigner {
/// Sticks to one bucket until the batch is full, then switches.
+ #[strum(serialize = "sticky")]
Sticky,
/// Assigns each record to the next bucket in a rotating sequence.
+ #[strum(serialize = "round_robin")]
RoundRobin,
}
-impl fmt::Display for NoKeyAssigner {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- NoKeyAssigner::Sticky => write!(f, "sticky"),
- NoKeyAssigner::RoundRobin => write!(f, "round_robin"),
- }
- }
-}
-
#[derive(Parser, Clone, Deserialize, Serialize)]
#[command(author, version, about, long_about = None)]
pub struct Config {