This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 1b26535 [feat] Add examples and stub files for Python bindings (#10)
1b26535 is described below
commit 1b265353f7329cbc83b50e443aa44288899d370e
Author: naivedogger <[email protected]>
AuthorDate: Fri Oct 17 16:54:04 2025 +0800
[feat] Add examples and stub files for Python bindings (#10)
---------
Co-authored-by: luoyuxia <[email protected]>
---
.licenserc.yaml | 1 +
bindings/python/README.md | 19 ++--
bindings/python/example/example.py | 188 +++++++++++++++++++++++++++++++++++++
bindings/python/fluss/__init__.pyi | 171 +++++++++++++++++++++++++++++++++
bindings/python/fluss/py.typed | 0
5 files changed, 369 insertions(+), 10 deletions(-)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 3813b48..a3cfcd1 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -26,4 +26,5 @@ header:
- 'LICENSE'
- 'NOTICE'
- 'DISCLAIMER'
+ - 'bindings/python/fluss/py.typed'
comment: on-failure
diff --git a/bindings/python/README.md b/bindings/python/README.md
index 5258f53..44d6099 100644
--- a/bindings/python/README.md
+++ b/bindings/python/README.md
@@ -108,7 +108,7 @@ uv run python example/example.py
### Build API docs:
```bash
-uv run pdoc fluss_python
+uv run pdoc fluss
```
### Release
@@ -124,10 +124,10 @@ uv run maturin publish
## Project Structure
```
bindings/python/
-├── Cargo.toml # Rust dependency configuration
-├── pyproject.toml # Python project configuration
-├── README.md # This file
-├── src/ # Rust source code
+├── Cargo.toml # Rust dependency configuration
+├── pyproject.toml # Python project configuration
+├── README.md # This file
+├── src/ # Rust source code
│ ├── lib.rs # Main entry module
│ ├── config.rs # Configuration related
│ ├── connection.rs # Connection management
@@ -135,11 +135,10 @@ bindings/python/
│ ├── table.rs # Table operations
│ ├── types.rs # Data types
│ └── error.rs # Error handling
-├── python/ # Python package source
-│ └── fluss_python/
-│ ├── __init__.py # Python package entry
-│ ├── __init__.pyi # Stub file
-│ └── py.typed # Type declarations
+├── fluss/ # Python package source
+│ ├── __init__.py # Python package entry
+│ ├── __init__.pyi # Stub file
+│ └── py.typed # Type declarations
└── example/ # Example code
└── example.py
```
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
new file mode 100644
index 0000000..0523f94
--- /dev/null
+++ b/bindings/python/example/example.py
@@ -0,0 +1,188 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+import time
+
+import pandas as pd
+import pyarrow as pa
+
+import fluss
+
+
+async def main():
+ # Create connection configuration
+ config_spec = {
+ "bootstrap.servers": "127.0.0.1:9123",
+ # Add other configuration options as needed
+ "request.max.size": "10485760", # 10 MB
+ "writer.acks": "all", # Wait for all replicas to acknowledge
+ "writer.retries": "3", # Retry up to 3 times on failure
+ "writer.batch.size": "1000", # Batch size for writes
+ }
+ config = fluss.Config(config_spec)
+
+ # Create connection using the static connect method
+ conn = await fluss.FlussConnection.connect(config)
+
+ # Define fields for PyArrow
+ fields = [
+ pa.field("id", pa.int32()),
+ pa.field("name", pa.string()),
+ pa.field("score", pa.float32()),
+ pa.field("age", pa.int32()),
+ ]
+
+ # Create a PyArrow schema
+ schema = pa.schema(fields)
+
+ # Create a Fluss Schema first (this is what TableDescriptor expects)
+ fluss_schema = fluss.Schema(schema)
+
+ # Create a Fluss TableDescriptor
+ table_descriptor = fluss.TableDescriptor(fluss_schema)
+
+ # Get the admin for Fluss
+ admin = await conn.get_admin()
+
+ # Create a Fluss table
+ table_path = fluss.TablePath("fluss", "sample_table")
+
+ try:
+ await admin.create_table(table_path, table_descriptor, True)
+ print(f"Created table: {table_path}")
+ except Exception as e:
+ print(f"Table creation failed: {e}")
+
+ # Get table information via admin
+ try:
+ table_info = await admin.get_table(table_path)
+ print(f"Table info: {table_info}")
+ print(f"Table ID: {table_info.table_id}")
+ print(f"Schema ID: {table_info.schema_id}")
+ print(f"Created time: {table_info.created_time}")
+ print(f"Primary keys: {table_info.get_primary_keys()}")
+ except Exception as e:
+ print(f"Failed to get table info: {e}")
+
+ # Get the table instance
+ table = await conn.get_table(table_path)
+ print(f"Got table: {table}")
+
+ # Create a writer for the table
+ append_writer = await table.new_append_writer()
+ print(f"Created append writer: {append_writer}")
+
+ try:
+ # Test 1: Write PyArrow Table
+ print("\n--- Testing PyArrow Table write ---")
+ pa_table = pa.Table.from_arrays(
+ [
+ pa.array([1, 2, 3], type=pa.int32()),
+ pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
+ pa.array([95.2, 87.2, 92.1], type=pa.float32()),
+ pa.array([25, 30, 35], type=pa.int32()),
+ ],
+ schema=schema,
+ )
+
+ append_writer.write_arrow(pa_table)
+ print("Successfully wrote PyArrow Table")
+
+ # Test 2: Write PyArrow RecordBatch
+ print("\n--- Testing PyArrow RecordBatch write ---")
+ pa_record_batch = pa.RecordBatch.from_arrays(
+ [
+ pa.array([4, 5], type=pa.int32()),
+ pa.array(["David", "Eve"], type=pa.string()),
+ pa.array([88.5, 91.0], type=pa.float32()),
+ pa.array([28, 32], type=pa.int32()),
+ ],
+ schema=schema,
+ )
+
+ append_writer.write_arrow_batch(pa_record_batch)
+ print("Successfully wrote PyArrow RecordBatch")
+
+ # Test 3: Write Pandas DataFrame
+ print("\n--- Testing Pandas DataFrame write ---")
+ df = pd.DataFrame(
+ {
+ "id": [6, 7],
+ "name": ["Frank", "Grace"],
+ "score": [89.3, 94.7],
+ "age": [29, 27],
+ }
+ )
+
+ append_writer.write_pandas(df)
+ print("Successfully wrote Pandas DataFrame")
+
+ # Flush all pending data
+ print("\n--- Flushing data ---")
+ append_writer.flush()
+ print("Successfully flushed data")
+
+ except Exception as e:
+ print(f"Error during writing: {e}")
+
+ # Now scan the table to verify data was written
+ print("\n--- Scanning table ---")
+ try:
+ log_scanner = await table.new_log_scanner()
+ print(f"Created log scanner: {log_scanner}")
+
+ # Subscribe to scan from earliest to latest
+ # start_timestamp=None (earliest), end_timestamp=None (latest)
+ log_scanner.subscribe(None, None)
+
+ print("Scanning results using to_arrow():")
+
+ # Try to get as PyArrow Table
+ try:
+ pa_table_result = log_scanner.to_arrow()
+ print(f"\nAs PyArrow Table: {pa_table_result}")
+ except Exception as e:
+ print(f"Could not convert to PyArrow: {e}")
+
+ # Let's subscribe from the beginning again.
+ # Reset subscription
+ log_scanner.subscribe(None, None)
+
+ # Try to get as Pandas DataFrame
+ try:
+ df_result = log_scanner.to_pandas()
+ print(f"\nAs Pandas DataFrame:\n{df_result}")
+ except Exception as e:
+ print(f"Could not convert to Pandas: {e}")
+
+ # TODO: support to_arrow_batch_reader()
+ # which is reserved for streaming use cases
+
+ # TODO: support to_duckdb()
+
+ except Exception as e:
+ print(f"Error during scanning: {e}")
+
+ # Close connection
+ conn.close()
+ print("\nConnection closed")
+
+
+if __name__ == "__main__":
+ # Run the async main function
+ asyncio.run(main())
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
new file mode 100644
index 0000000..4565242
--- /dev/null
+++ b/bindings/python/fluss/__init__.pyi
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Type stubs for Fluss Python bindings."""
+
+from types import TracebackType
+from typing import Dict, List, Optional, Tuple
+
+import pandas as pd
+import pyarrow as pa
+
+class Config:
+ def __init__(self, properties: Optional[Dict[str, str]] = None) -> None:
...
+ @property
+ def bootstrap_server(self) -> Optional[str]: ...
+ @bootstrap_server.setter
+ def bootstrap_server(self, server: str) -> None: ...
+ @property
+ def request_max_size(self) -> int: ...
+ @request_max_size.setter
+ def request_max_size(self, size: int) -> None: ...
+ @property
+ def writer_batch_size(self) -> int: ...
+ @writer_batch_size.setter
+ def writer_batch_size(self, size: int) -> None: ...
+
+class FlussConnection:
+ @staticmethod
+ async def connect(config: Config) -> FlussConnection: ...
+ async def get_admin(self) -> FlussAdmin: ...
+ async def get_table(self, table_path: TablePath) -> FlussTable: ...
+ def close(self) -> None: ...
+ def __enter__(self) -> FlussConnection: ...
+ def __exit__(self, exc_type: Optional[type], exc_value:
Optional[BaseException], traceback: Optional[TracebackType]) -> bool: ...
+ def __repr__(self) -> str: ...
+
+class FlussAdmin:
+ async def create_table(
+ self,
+ table_path: TablePath,
+ table_descriptor: TableDescriptor,
+ ignore_if_exists: Optional[bool] = False,
+ ) -> None: ...
+ async def get_table(self, table_path: TablePath) -> TableInfo: ...
+ async def get_latest_lake_snapshot(self, table_path: TablePath) ->
LakeSnapshot: ...
+ def __repr__(self) -> str: ...
+
+class FlussTable:
+ async def new_append_writer(self) -> AppendWriter: ...
+ async def new_log_scanner(self) -> LogScanner: ...
+ def get_table_info(self) -> TableInfo: ...
+ def get_table_path(self) -> TablePath: ...
+ def has_primary_key(self) -> bool: ...
+ def __repr__(self) -> str: ...
+
+class AppendWriter:
+ def write_arrow(self, table: pa.Table) -> None: ...
+ def write_arrow_batch(self, batch: pa.RecordBatch) -> None: ...
+ def write_pandas(self, df: pd.DataFrame) -> None: ...
+ def flush(self) -> None: ...
+ def __repr__(self) -> str: ...
+
+class LogScanner:
+ def subscribe(
+ self, start_timestamp: Optional[int], end_timestamp: Optional[int]
+ ) -> None: ...
+ def to_pandas(self) -> pd.DataFrame: ...
+ def to_arrow(self) -> pa.Table: ...
+ def __repr__(self) -> str: ...
+
+class Schema:
+ def __init__(self, schema: pa.Schema, primary_keys: Optional[List[str]] =
None) -> None: ...
+ def get_column_names(self) -> List[str]: ...
+ def get_column_types(self) -> List[str]: ...
+ def get_columns(self) -> List[Tuple[str,str]]: ...
+ def __str__(self) -> str: ...
+
+class TableDescriptor:
+ def __init__(self, schema: Schema, **kwargs: str) -> None: ...
+ def get_schema(self) -> Schema: ...
+
+class TablePath:
+ def __init__(self, database: str, table: str) -> None: ...
+ @property
+ def database_name(self) -> str: ...
+ @property
+ def table_name(self) -> str: ...
+ def table_path_str(self) -> str: ...
+ def __str__(self) -> str: ...
+ def __repr__(self) -> str: ...
+ def __hash__(self) -> int: ...
+ def __eq__(self, other: object) -> bool: ...
+
+class TableInfo:
+ @property
+ def table_id(self) -> int: ...
+ @property
+ def schema_id(self) -> int: ...
+ @property
+ def created_time(self) -> int: ...
+ @property
+ def modified_time(self) -> int: ...
+ @property
+ def table_path(self) -> TablePath: ...
+ @property
+ def num_buckets(self) -> int: ...
+ @property
+ def comment(self) -> Optional[str]: ...
+ def get_primary_keys(self) -> List[str]: ...
+ def get_bucket_keys(self) -> List[str]: ...
+ def get_partition_keys(self) -> List[str]: ...
+ def has_primary_key(self) -> bool: ...
+ def is_partitioned(self) -> bool: ...
+ def get_properties(self) -> Dict[str, str]: ...
+ def get_custom_properties(self) -> Dict[str, str]: ...
+ def get_schema(self) -> Schema: ...
+ def get_column_names(self) -> List[str]: ...
+ def get_column_count(self) -> int: ...
+
+class FlussError(Exception):
+ message: str
+ def __init__(self, message: str) -> None: ...
+ def __str__(self) -> str: ...
+
+class LakeSnapshot:
+ def __init__(self, snapshot_id: int) -> None: ...
+ @property
+ def snapshot_id(self) -> int: ...
+ @property
+ def table_buckets_offset(self) -> Dict[TableBucket, int]: ...
+ def get_bucket_offset(self, bucket: TableBucket) -> Optional[int]: ...
+ def get_table_buckets(self) -> List[TableBucket]: ...
+ def __str__(self) -> str: ...
+ def __repr__(self) -> str: ...
+
+class TableBucket:
+ def __init__(self, table_id: int, bucket: int) -> None: ...
+ @staticmethod
+ def with_partition(
+ table_id: int, partition_id: int, bucket: int
+ ) -> TableBucket: ...
+ @property
+ def table_id(self) -> int: ...
+ @property
+ def bucket_id(self) -> int: ...
+ @property
+ def partition_id(self) -> Optional[int]: ...
+ def __hash__(self) -> int: ...
+ def __eq__(self, other: object) -> bool: ...
+ def __str__(self) -> str: ...
+ def __repr__(self) -> str: ...
+
+class TableDistribution:
+ def bucket_keys(self) -> List[str]: ...
+ def bucket_count(self) -> Optional[int]: ...
+
+__version__: str
diff --git a/bindings/python/fluss/py.typed b/bindings/python/fluss/py.typed
new file mode 100644
index 0000000..e69de29