This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 3d5c9a0 chore: added is_retriable() to FlussError (#422)
3d5c9a0 is described below
commit 3d5c9a04a4569d1aecddbe3cc6ba8f2a9b29b677
Author: Aryamaan Singh <[email protected]>
AuthorDate: Wed Mar 11 21:03:11 2026 +0530
chore: added is_retriable() to FlussError (#422)
---
bindings/cpp/include/fluss.hpp | 15 +++++
bindings/python/fluss/__init__.pyi | 2 +
bindings/python/src/error.rs | 10 +++
crates/fluss/src/error.rs | 11 ++++
crates/fluss/src/rpc/fluss_api_error.rs | 77 ++++++++++++++++++++++++
website/docs/user-guide/cpp/error-handling.md | 47 +++++++++++++++
website/docs/user-guide/python/error-handling.md | 40 ++++++++++++
website/docs/user-guide/rust/error-handling.md | 42 +++++++++++++
8 files changed, 244 insertions(+)
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index b507da7..d0da617 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -180,6 +180,18 @@ struct ErrorCode {
static constexpr int INVALID_ALTER_TABLE_EXCEPTION = 56;
/// Deletion operations are disabled on this table.
static constexpr int DELETION_DISABLED_EXCEPTION = 57;
+
+ /// Returns true if retrying the request may succeed. Mirrors Java's
RetriableException hierarchy.
+ static constexpr bool IsRetriable(int32_t code) {
+ return code == NETWORK_EXCEPTION || code == CORRUPT_MESSAGE ||
+ code == SCHEMA_NOT_EXIST || code == LOG_STORAGE_EXCEPTION ||
+ code == KV_STORAGE_EXCEPTION || code == NOT_LEADER_OR_FOLLOWER
||
+ code == CORRUPT_RECORD_EXCEPTION ||
+ code == UNKNOWN_TABLE_OR_BUCKET_EXCEPTION || code ==
REQUEST_TIME_OUT ||
+ code == STORAGE_EXCEPTION ||
+ code == NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION ||
+ code == NOT_ENOUGH_REPLICAS_EXCEPTION || code ==
LEADER_NOT_AVAILABLE_EXCEPTION;
+ }
};
struct Date {
@@ -326,6 +338,9 @@ struct Result {
std::string error_message;
bool Ok() const { return error_code == 0; }
+
+ /// Returns true if retrying the request may succeed. Client-side errors
always return false.
+ bool IsRetriable() const { return ErrorCode::IsRetriable(error_code); }
};
struct TablePath {
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index e06e932..63be0e2 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -837,6 +837,8 @@ class FlussError(Exception):
error_code: int
def __init__(self, message: str, error_code: int = -2) -> None: ...
def __str__(self) -> str: ...
+ @property
+ def is_retriable(self) -> bool: ...
class LakeSnapshot:
def __init__(self, snapshot_id: int) -> None: ...
diff --git a/bindings/python/src/error.rs b/bindings/python/src/error.rs
index 606b9f4..10c6cfa 100644
--- a/bindings/python/src/error.rs
+++ b/bindings/python/src/error.rs
@@ -51,6 +51,16 @@ impl FlussError {
format!("FlussError: {}", self.message)
}
}
+
+ /// Returns ``True`` if retrying the request may succeed. Client-side
errors always return ``False``.
+ #[getter]
+ fn is_retriable(&self) -> bool {
+ use fluss::rpc::FlussError as CoreFlussError;
+ if self.error_code == CLIENT_ERROR_CODE {
+ return false;
+ }
+ CoreFlussError::for_code(self.error_code).is_retriable()
+ }
}
impl FlussError {
diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs
index 5cf0d4b..d56432c 100644
--- a/crates/fluss/src/error.rs
+++ b/crates/fluss/src/error.rs
@@ -165,6 +165,17 @@ impl Error {
None
}
}
+
+ /// Returns `true` if retrying the request may succeed.
+ /// [`Error::RpcError`] is always retriable; [`Error::FlussAPIError`]
delegates to
+ /// [`ApiError::is_retriable`]; all other variants are not.
+ pub fn is_retriable(&self) -> bool {
+ match self {
+ Error::RpcError { .. } => true,
+ Error::FlussAPIError { api_error } => api_error.is_retriable(),
+ _ => false,
+ }
+ }
}
impl From<ArrowError> for Error {
diff --git a/crates/fluss/src/rpc/fluss_api_error.rs
b/crates/fluss/src/rpc/fluss_api_error.rs
index a501b99..95a39c6 100644
--- a/crates/fluss/src/rpc/fluss_api_error.rs
+++ b/crates/fluss/src/rpc/fluss_api_error.rs
@@ -39,6 +39,13 @@ impl Display for ApiError {
}
}
+impl ApiError {
+ /// Returns `true` if retrying the request may succeed. Delegates to
[`FlussError::is_retriable`].
+ pub fn is_retriable(&self) -> bool {
+ FlussError::for_code(self.code).is_retriable()
+ }
+}
+
/// Fluss protocol errors. These errors are part of the client-server protocol.
/// The error codes cannot be changed, but the names can be.
///
@@ -172,6 +179,25 @@ impl FlussError {
*self as i32
}
+ pub fn is_retriable(&self) -> bool {
+ matches!(
+ self,
+ FlussError::NetworkException
+ | FlussError::CorruptMessage
+ | FlussError::SchemaNotExist
+ | FlussError::LogStorageException
+ | FlussError::KvStorageException
+ | FlussError::NotLeaderOrFollower
+ | FlussError::CorruptRecordException
+ | FlussError::UnknownTableOrBucketException
+ | FlussError::RequestTimeOut
+ | FlussError::StorageException
+ | FlussError::NotEnoughReplicasAfterAppendException
+ | FlussError::NotEnoughReplicasException
+ | FlussError::LeaderNotAvailableException
+ )
+ }
+
/// Returns a friendly description of the error.
pub fn message(&self) -> &'static str {
match self {
@@ -403,4 +429,55 @@ mod tests {
let fluss_error = FlussError::from(api_error);
assert_eq!(fluss_error, FlussError::TableNotExist);
}
+
+ #[test]
+ fn is_retriable_known_retriable_errors() {
+ let retriable = [
+ FlussError::NetworkException,
+ FlussError::CorruptMessage,
+ FlussError::SchemaNotExist,
+ FlussError::LogStorageException,
+ FlussError::KvStorageException,
+ FlussError::NotLeaderOrFollower,
+ FlussError::CorruptRecordException,
+ FlussError::UnknownTableOrBucketException,
+ FlussError::RequestTimeOut,
+ FlussError::StorageException,
+ FlussError::NotEnoughReplicasAfterAppendException,
+ FlussError::NotEnoughReplicasException,
+ FlussError::LeaderNotAvailableException,
+ ];
+ for err in &retriable {
+ assert!(err.is_retriable(), "{err:?} should be retriable");
+ }
+ }
+
+ #[test]
+ fn is_retriable_known_non_retriable_errors() {
+ let non_retriable = [
+ FlussError::UnknownServerError,
+ FlussError::None,
+ FlussError::TableNotExist,
+ FlussError::AuthenticateException,
+ FlussError::AuthorizationException,
+ FlussError::RecordTooLargeException,
+ FlussError::DeletionDisabledException,
+ FlussError::InvalidCoordinatorException,
+ FlussError::FencedLeaderEpochException,
+ FlussError::FencedTieringEpochException,
+ FlussError::RetriableAuthenticateException,
+ ];
+ for err in &non_retriable {
+ assert!(!err.is_retriable(), "{err:?} should not be retriable");
+ }
+ }
+
+ #[test]
+ fn api_error_is_retriable_delegates_to_fluss_error() {
+ let retriable_api = FlussError::RequestTimeOut.to_api_error(None);
+ assert!(retriable_api.is_retriable());
+
+ let permanent_api = FlussError::TableNotExist.to_api_error(None);
+ assert!(!permanent_api.is_retriable());
+ }
}
diff --git a/website/docs/user-guide/cpp/error-handling.md
b/website/docs/user-guide/cpp/error-handling.md
index 3ded0c2..7447a26 100644
--- a/website/docs/user-guide/cpp/error-handling.md
+++ b/website/docs/user-guide/cpp/error-handling.md
@@ -98,6 +98,53 @@ if (!result.Ok()) {
See `fluss::ErrorCode` in `fluss.hpp` for the full list of named constants.
+## Retry Logic
+
+Some errors are transient, where the server may be temporarily unavailable,
mid-election, or under load. `IsRetriable()` can be used for deciding to to
retry an operation rather than treating the error as permanent.
+
+`ErrorCode::IsRetriable(int32_t code)` is a static helper available directly
on the error code:
+
+```cpp
+fluss::Result result = writer.Append(row);
+if (!result.Ok()) {
+ if (result.IsRetriable()) {
+ // Transient failure — safe to retry
+ } else {
+ // Permanent failure — log and abort
+ std::cerr << "Fatal error (code " << result.error_code
+ << "): " << result.error_message << std::endl;
+ }
+}
+```
+
+`Result::IsRetriable()` delegates to `ErrorCode::IsRetriable()`, so you can
also call it directly on the code:
+
+```cpp
+if (fluss::ErrorCode::IsRetriable(result.error_code)) {
+ // retry
+}
+```
+
+### Retriable Error Codes
+
+| Constant | Code | Reason
|
+|-------------------------------------------------------------|------|-------------------------------------------|
+| `ErrorCode::NETWORK_EXCEPTION` | 1 | Server
disconnected |
+| `ErrorCode::CORRUPT_MESSAGE` | 3 | CRC or size
error |
+| `ErrorCode::SCHEMA_NOT_EXIST` | 9 | Schema may
not exist |
+| `ErrorCode::LOG_STORAGE_EXCEPTION` | 10 | Transient
log storage error |
+| `ErrorCode::KV_STORAGE_EXCEPTION` | 11 | Transient
KV storage error |
+| `ErrorCode::NOT_LEADER_OR_FOLLOWER` | 12 | Leader
election in progress |
+| `ErrorCode::CORRUPT_RECORD_EXCEPTION` | 14 | Corrupt
record |
+| `ErrorCode::UNKNOWN_TABLE_OR_BUCKET_EXCEPTION` | 21 | Metadata
not yet available |
+| `ErrorCode::REQUEST_TIME_OUT` | 25 | Request
timed out |
+| `ErrorCode::STORAGE_EXCEPTION` | 26 | Transient
storage error |
+| `ErrorCode::NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION` | 28 | Wrote to
server but with low ISR size |
+| `ErrorCode::NOT_ENOUGH_REPLICAS_EXCEPTION` | 29 | Low ISR
size at write time |
+| `ErrorCode::LEADER_NOT_AVAILABLE_EXCEPTION` | 44 | No leader
available for partition |
+
+Client-side errors (`ErrorCode::CLIENT_ERROR`, code -2) always return `false`
from `IsRetriable()`.
+
## Common Error Scenarios
### Connection Refused
diff --git a/website/docs/user-guide/python/error-handling.md
b/website/docs/user-guide/python/error-handling.md
index 50a9e46..5bef366 100644
--- a/website/docs/user-guide/python/error-handling.md
+++ b/website/docs/user-guide/python/error-handling.md
@@ -57,6 +57,46 @@ except fluss.FlussError as e:
See `fluss.ErrorCode` for the full list of named constants.
+## Retry Logic
+
+Some errors are transient, where the server may be temporarily unavailable,
mid-election, or under load. `is_retriable` can be used for deciding to retry
an operation rather than treating the error as permanent.
+
+`FlussError.is_retriable` is a property available directly on the exception:
+
+```python
+import fluss
+
+try:
+ await writer.append(row)
+except fluss.FlussError as e:
+ if e.is_retriable:
+ # Transient failure — safe to retry
+ pass
+ else:
+ # Permanent failure — log and abort
+ print(f"Fatal error (code {e.error_code}): {e.message}")
+```
+
+### Retriable Error Codes
+
+| Constant | Code | Reason
|
+|--------------------------------------------------------------|------|-------------------------------------------|
+| `ErrorCode.NETWORK_EXCEPTION` | 1 | Server
disconnected |
+| `ErrorCode.CORRUPT_MESSAGE` | 3 | CRC or
size error |
+| `ErrorCode.SCHEMA_NOT_EXIST` | 9 | Schema
may not exist |
+| `ErrorCode.LOG_STORAGE_EXCEPTION` | 10 |
Transient log storage error |
+| `ErrorCode.KV_STORAGE_EXCEPTION` | 11 |
Transient KV storage error |
+| `ErrorCode.NOT_LEADER_OR_FOLLOWER` | 12 | Leader
election in progress |
+| `ErrorCode.CORRUPT_RECORD_EXCEPTION` | 14 | Corrupt
record |
+| `ErrorCode.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION` | 21 |
Metadata not yet available |
+| `ErrorCode.REQUEST_TIME_OUT` | 25 | Request
timed out |
+| `ErrorCode.STORAGE_EXCEPTION` | 26 |
Transient storage error |
+| `ErrorCode.NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION` | 28 | Wrote
to server but with low ISR size |
+| `ErrorCode.NOT_ENOUGH_REPLICAS_EXCEPTION` | 29 | Low ISR
size at write time |
+| `ErrorCode.LEADER_NOT_AVAILABLE_EXCEPTION` | 44 | No
leader available for partition |
+
+Client-side errors (`ErrorCode.CLIENT_ERROR`, code -2) always return `False`
from `is_retriable`.
+
## Common Error Scenarios
### Connection Refused
diff --git a/website/docs/user-guide/rust/error-handling.md
b/website/docs/user-guide/rust/error-handling.md
index 964f81f..b7e4b45 100644
--- a/website/docs/user-guide/rust/error-handling.md
+++ b/website/docs/user-guide/rust/error-handling.md
@@ -78,6 +78,48 @@ match result {
}
```
+## Retry Logic
+
+Some errors are transient, where the server may be temporarily unavailable,
mid-election, or under load. `is_retriable()` can be used for deciding to retry
an operation rather than treating the error as permanent.
+
+`Error::is_retriable()` is available directly on any `Error` value. `RpcError`
is always retriable; `FlussAPIError` delegates to the server error code; all
other variants return `false`.
+
+```rust
+use fluss::error::Error;
+
+match writer.append(&row) {
+ Ok(_) => {}
+ Err(ref e) if e.is_retriable() => {
+ // Transient failure — safe to retry
+ }
+ Err(e) => {
+ // Permanent failure — log and abort
+ eprintln!("Fatal error: {}", e);
+ }
+}
+```
+
+### Retriable Variants
+
+| Variant / Error | Code | Reason
|
+|----------------------------------------------|------|-------------------------------------------|
+| `Error::RpcError` | — | Network-level failure,
always retriable |
+| `FlussError::NetworkException` | 1 | Server disconnected
|
+| `FlussError::CorruptMessage` | 3 | CRC or size error
|
+| `FlussError::SchemaNotExist` | 9 | Schema may not exist
|
+| `FlussError::LogStorageException` | 10 | Transient log storage
error |
+| `FlussError::KvStorageException` | 11 | Transient KV storage
error |
+| `FlussError::NotLeaderOrFollower` | 12 | Leader election in
progress |
+| `FlussError::CorruptRecordException` | 14 | Corrupt record
|
+| `FlussError::UnknownTableOrBucketException` | 21 | Metadata not yet
available |
+| `FlussError::RequestTimeOut` | 25 | Request timed out
|
+| `FlussError::StorageException` | 26 | Transient storage
error |
+| `FlussError::NotEnoughReplicasAfterAppendException` | 28 | Wrote to server
but with low ISR size |
+| `FlussError::NotEnoughReplicasException` | 29 | Low ISR size at write
time |
+| `FlussError::LeaderNotAvailableException` | 44 | No leader available
for partition |
+
+All other `Error` variants (e.g. `RowConvertError`, `IllegalArgument`,
`UnsupportedOperation`) always return `false` from `is_retriable()`.
+
## Common Error Scenarios
### Connection Refused