Copilot commented on code in PR #291: URL: https://github.com/apache/fluss-rust/pull/291#discussion_r2781055180
########## docs/python-client.md: ########## @@ -0,0 +1,449 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# Python Client Guide + +This guide covers how to use the Fluss Python client for reading and writing data to log tables and primary key tables. + +The Python client is async-first, built on top of the Rust core via [PyO3](https://pyo3.rs/), and uses [PyArrow](https://arrow.apache.org/docs/python/) for schema definitions and data interchange. + +## Key Concepts + +- **Log table** — an append-only table (no primary key). Records are immutable once written. Use for event streams, logs, and audit trails. +- **Primary key (PK) table** — a table with a primary key. Supports upsert, delete, and point lookups. +- **Bucket** — the unit of parallelism within a table (similar to Kafka partitions). Each table has one or more buckets. Readers subscribe to individual buckets. +- **Partition** — a way to organize data by column values (e.g. by date or region). Each partition contains its own set of buckets. Partitions must be created explicitly before writing. +- **Offset** — the position of a record within a bucket. Used to track reading progress. Start from `EARLIEST_OFFSET` to read all data, or `LATEST_OFFSET` to only read new records. + +## Prerequisites + +You need a running Fluss cluster to use the Python client. See the [Quick-Start guide](../README.md#quick-start) for how to start a local cluster. + +## Installation + +```bash +pip install pyfluss +``` + +To build from source instead, see the [Python bindings README](../bindings/python/README.md). + +## Quick Start + +A minimal end-to-end example: connect, create a table, write data, and read it back. Assumes a Fluss cluster is running on `localhost:9123`. + +```python +import asyncio +import pyarrow as pa +import fluss + +async def main(): + # Connect + config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) + conn = await fluss.FlussConnection.connect(config) + admin = await conn.get_admin() + + # Create a log table + schema = fluss.Schema(pa.schema([ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("score", pa.float32()), + ])) + table_path = fluss.TablePath("fluss", "quick_start") + await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True) + + # Write + table = await conn.get_table(table_path) + writer = await table.new_append_writer() + writer.append({"id": 1, "name": "Alice", "score": 95.5}) + writer.append({"id": 2, "name": "Bob", "score": 87.0}) + await writer.flush() + + # Read + num_buckets = (await admin.get_table(table_path)).num_buckets + scanner = await table.new_scan().create_batch_scanner() + scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + print(scanner.to_pandas()) + + # Cleanup + await admin.drop_table(table_path, ignore_if_not_exists=True) + conn.close() + +asyncio.run(main()) +``` + +## Connection Setup + +```python +config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) +conn = await fluss.FlussConnection.connect(config) +``` + +The connection also supports context managers: + +```python +with await fluss.FlussConnection.connect(config) as conn: + ... +``` + +### Configuration Options + +| Key | Description | Default | +|-----|-------------|---------| +| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` | +| `request.max.size` | Maximum request size in bytes | `10485760` (10 MB) | +| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | +| `writer.retries` | Number of retries on failure | `2147483647` | Review Comment: The markdown table has an extra leading pipe (`||`), which renders as an unintended empty first column in GitHub-flavored Markdown. Use a single leading pipe (`|`) for the header and each row. ########## docs/python-client.md: ########## @@ -0,0 +1,449 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# Python Client Guide + +This guide covers how to use the Fluss Python client for reading and writing data to log tables and primary key tables. + +The Python client is async-first, built on top of the Rust core via [PyO3](https://pyo3.rs/), and uses [PyArrow](https://arrow.apache.org/docs/python/) for schema definitions and data interchange. + +## Key Concepts + +- **Log table** — an append-only table (no primary key). Records are immutable once written. Use for event streams, logs, and audit trails. +- **Primary key (PK) table** — a table with a primary key. Supports upsert, delete, and point lookups. +- **Bucket** — the unit of parallelism within a table (similar to Kafka partitions). Each table has one or more buckets. Readers subscribe to individual buckets. +- **Partition** — a way to organize data by column values (e.g. by date or region). Each partition contains its own set of buckets. Partitions must be created explicitly before writing. +- **Offset** — the position of a record within a bucket. Used to track reading progress. Start from `EARLIEST_OFFSET` to read all data, or `LATEST_OFFSET` to only read new records. + +## Prerequisites + +You need a running Fluss cluster to use the Python client. See the [Quick-Start guide](../README.md#quick-start) for how to start a local cluster. + +## Installation + +```bash +pip install pyfluss +``` + +To build from source instead, see the [Python bindings README](../bindings/python/README.md). + +## Quick Start + +A minimal end-to-end example: connect, create a table, write data, and read it back. Assumes a Fluss cluster is running on `localhost:9123`. + +```python +import asyncio +import pyarrow as pa +import fluss + +async def main(): + # Connect + config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) + conn = await fluss.FlussConnection.connect(config) + admin = await conn.get_admin() + + # Create a log table + schema = fluss.Schema(pa.schema([ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("score", pa.float32()), + ])) + table_path = fluss.TablePath("fluss", "quick_start") + await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True) + + # Write + table = await conn.get_table(table_path) + writer = await table.new_append_writer() + writer.append({"id": 1, "name": "Alice", "score": 95.5}) + writer.append({"id": 2, "name": "Bob", "score": 87.0}) + await writer.flush() + + # Read + num_buckets = (await admin.get_table(table_path)).num_buckets + scanner = await table.new_scan().create_batch_scanner() + scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + print(scanner.to_pandas()) + + # Cleanup + await admin.drop_table(table_path, ignore_if_not_exists=True) + conn.close() + +asyncio.run(main()) +``` + +## Connection Setup + +```python +config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) +conn = await fluss.FlussConnection.connect(config) +``` + +The connection also supports context managers: + +```python +with await fluss.FlussConnection.connect(config) as conn: + ... +``` + +### Configuration Options + +| Key | Description | Default | +|-----|-------------|---------| +| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` | +| `request.max.size` | Maximum request size in bytes | `10485760` (10 MB) | +| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | +| `writer.retries` | Number of retries on failure | `2147483647` | +| `writer.batch.size` | Batch size for writes in bytes | `2097152` (2 MB) | + Review Comment: `Config` currently extracts all property values as strings (the bindings expect `dict[str, str]`). Consider clarifying here that values like `request.max.size` / `writer.retries` / `writer.batch.size` must be passed as strings (or update bindings to accept ints) to avoid runtime type errors when users provide numeric literals. ########## docs/python-client.md: ########## @@ -0,0 +1,449 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# Python Client Guide + +This guide covers how to use the Fluss Python client for reading and writing data to log tables and primary key tables. + +The Python client is async-first, built on top of the Rust core via [PyO3](https://pyo3.rs/), and uses [PyArrow](https://arrow.apache.org/docs/python/) for schema definitions and data interchange. + +## Key Concepts + +- **Log table** — an append-only table (no primary key). Records are immutable once written. Use for event streams, logs, and audit trails. +- **Primary key (PK) table** — a table with a primary key. Supports upsert, delete, and point lookups. +- **Bucket** — the unit of parallelism within a table (similar to Kafka partitions). Each table has one or more buckets. Readers subscribe to individual buckets. +- **Partition** — a way to organize data by column values (e.g. by date or region). Each partition contains its own set of buckets. Partitions must be created explicitly before writing. +- **Offset** — the position of a record within a bucket. Used to track reading progress. Start from `EARLIEST_OFFSET` to read all data, or `LATEST_OFFSET` to only read new records. + +## Prerequisites + +You need a running Fluss cluster to use the Python client. See the [Quick-Start guide](../README.md#quick-start) for how to start a local cluster. + +## Installation + +```bash +pip install pyfluss +``` + +To build from source instead, see the [Python bindings README](../bindings/python/README.md). + +## Quick Start + +A minimal end-to-end example: connect, create a table, write data, and read it back. Assumes a Fluss cluster is running on `localhost:9123`. + +```python +import asyncio +import pyarrow as pa +import fluss + +async def main(): + # Connect + config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) + conn = await fluss.FlussConnection.connect(config) + admin = await conn.get_admin() + + # Create a log table + schema = fluss.Schema(pa.schema([ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("score", pa.float32()), + ])) + table_path = fluss.TablePath("fluss", "quick_start") + await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True) + + # Write + table = await conn.get_table(table_path) + writer = await table.new_append_writer() + writer.append({"id": 1, "name": "Alice", "score": 95.5}) + writer.append({"id": 2, "name": "Bob", "score": 87.0}) + await writer.flush() + + # Read + num_buckets = (await admin.get_table(table_path)).num_buckets + scanner = await table.new_scan().create_batch_scanner() + scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + print(scanner.to_pandas()) + + # Cleanup + await admin.drop_table(table_path, ignore_if_not_exists=True) + conn.close() + +asyncio.run(main()) +``` + +## Connection Setup + +```python +config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) +conn = await fluss.FlussConnection.connect(config) +``` + +The connection also supports context managers: + +```python +with await fluss.FlussConnection.connect(config) as conn: + ... +``` + +### Configuration Options + +| Key | Description | Default | +|-----|-------------|---------| +| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` | +| `request.max.size` | Maximum request size in bytes | `10485760` (10 MB) | +| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | +| `writer.retries` | Number of retries on failure | `2147483647` | +| `writer.batch.size` | Batch size for writes in bytes | `2097152` (2 MB) | + +## Admin Operations + +```python +admin = await conn.get_admin() +``` + +### Databases + +```python +await admin.create_database("my_database", ignore_if_exists=True) +databases = await admin.list_databases() +exists = await admin.database_exists("my_database") +await admin.drop_database("my_database", ignore_if_not_exists=True, cascade=True) +``` + +### Tables + +Schemas are defined using PyArrow and wrapped in `fluss.Schema`: + +```python +import pyarrow as pa + +schema = fluss.Schema(pa.schema([ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("amount", pa.int64()), +])) + +table_path = fluss.TablePath("my_database", "my_table") +await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True) + +table_info = await admin.get_table(table_path) +tables = await admin.list_tables("my_database") +await admin.drop_table(table_path, ignore_if_not_exists=True) +``` + +`TableDescriptor` accepts these optional parameters: + +| Parameter | Description | +|---|---| +| `partition_keys` | Column names to partition by (e.g. `["region"]`) | +| `bucket_count` | Number of buckets (parallelism units) for the table | +| `bucket_keys` | Columns used to determine bucket assignment | +| `comment` | Table comment / description | + Review Comment: `TableDescriptor` supports additional kwargs beyond the four listed here (e.g., `properties`, `custom_properties`, `log_format`, `kv_format`). As written, the docs imply these are the only supported options; please either list all supported parameters or reword to indicate this is a partial list. ########## docs/python-api-reference.md: ########## @@ -0,0 +1,278 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# Python API Reference + +Complete API reference for the Fluss Python client. For a usage guide with examples, see the [Python Client Guide](python-client.md). + +## `Config` + +| Method / Property | Description | +|---|---| +| `Config(properties: dict = None)` | Create config from a dict of key-value pairs | +| `.bootstrap_server` | Get/set coordinator server address | +| `.request_max_size` | Get/set max request size in bytes | +| `.writer_batch_size` | Get/set write batch size in bytes | + Review Comment: The markdown tables throughout this reference start with `||`, which introduces an empty first column when rendered. Use a single leading `|` for the header and each row to avoid a blank column. ########## docs/python-api-reference.md: ########## @@ -0,0 +1,278 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# Python API Reference + +Complete API reference for the Fluss Python client. For a usage guide with examples, see the [Python Client Guide](python-client.md). + +## `Config` + +| Method / Property | Description | +|---|---| +| `Config(properties: dict = None)` | Create config from a dict of key-value pairs | +| `.bootstrap_server` | Get/set coordinator server address | +| `.request_max_size` | Get/set max request size in bytes | +| `.writer_batch_size` | Get/set write batch size in bytes | + +## `FlussConnection` + +| Method | Description | +|---|---| +| `await FlussConnection.connect(config) -> FlussConnection` | Connect to a Fluss cluster | +| `await conn.get_admin() -> FlussAdmin` | Get admin interface | +| `await conn.get_table(table_path) -> FlussTable` | Get a table for read/write operations | +| `conn.close()` | Close the connection | + +Supports `with` statement (context manager). + +## `FlussAdmin` + +| Method | Description | +|---|---| +| `await create_database(name, ignore_if_exists=False, database_descriptor=None)` | Create a database | +| `await drop_database(name, ignore_if_not_exists=False, cascade=True)` | Drop a database | +| `await list_databases() -> list[str]` | List all databases | +| `await database_exists(name) -> bool` | Check if a database exists | +| `await get_database_info(name) -> DatabaseInfo` | Get database metadata | +| `await create_table(table_path, table_descriptor, ignore_if_exists=False)` | Create a table | +| `await drop_table(table_path, ignore_if_not_exists=False)` | Drop a table | +| `await get_table(table_path) -> TableInfo` | Get table metadata | +| `await list_tables(database_name) -> list[str]` | List tables in a database | +| `await table_exists(table_path) -> bool` | Check if a table exists | +| `await list_offsets(table_path, bucket_ids, offset_type, timestamp=None) -> dict[int, int]` | Get offsets for buckets | +| `await list_partition_offsets(table_path, partition_name, bucket_ids, offset_type, timestamp=None) -> dict[int, int]` | Get offsets for a partition's buckets | +| `await create_partition(table_path, partition_spec, ignore_if_exists=False)` | Create a partition | +| `await drop_partition(table_path, partition_spec, ignore_if_not_exists=False)` | Drop a partition | +| `await list_partition_infos(table_path) -> list[PartitionInfo]` | List partitions | +| `await get_latest_lake_snapshot(table_path) -> LakeSnapshot` | Get latest lake snapshot | + +## `FlussTable` + +| Method | Description | +|---|---| +| `new_scan() -> TableScan` | Create a scan builder | +| `await new_append_writer() -> AppendWriter` | Create writer for log tables | +| `new_upsert(columns=None, column_indices=None) -> UpsertWriter` | Create writer for PK tables (optionally partial) | +| `new_lookup() -> Lookuper` | Create lookuper for PK tables | +| `get_table_info() -> TableInfo` | Get table metadata | +| `get_table_path() -> TablePath` | Get table path | +| `has_primary_key() -> bool` | Check if table has a primary key | + +## `TableScan` + +| Method | Description | +|---|---| +| `.project(indices) -> TableScan` | Project columns by index | +| `.project_by_name(names) -> TableScan` | Project columns by name | +| `await .create_log_scanner() -> LogScanner` | Create record-based scanner (for `poll()`) | +| `await .create_batch_scanner() -> LogScanner` | Create batch-based scanner (for `poll_arrow()`, `to_arrow()`, etc.) | + +## `AppendWriter` + +| Method | Description | +|---|---| +| `.append(row) -> WriteResultHandle` | Append a row (dict, list, or tuple) | +| `.write_arrow(table)` | Write a PyArrow Table | +| `.write_arrow_batch(batch) -> WriteResultHandle` | Write a PyArrow RecordBatch | +| `.write_pandas(df)` | Write a Pandas DataFrame | +| `await .flush()` | Flush all pending writes | + +## `UpsertWriter` + +| Method | Description | +|---|---| +| `.upsert(row) -> WriteResultHandle` | Upsert a row (insert or update by PK) | +| `.delete(pk) -> WriteResultHandle` | Delete a row by primary key | +| `await .flush()` | Flush all pending operations | + +## `WriteResultHandle` + +| Method | Description | +|---|---| +| `await .wait()` | Wait for server acknowledgment of this write | + +## `Lookuper` + +| Method | Description | +|---|---| +| `await .lookup(pk) -> dict \| None` | Lookup a row by primary key | + +## `LogScanner` + +| Method | Description | +|---|---| +| `.subscribe(bucket_id, start_offset)` | Subscribe to a bucket | +| `.subscribe_buckets(bucket_offsets)` | Subscribe to multiple buckets (`{bucket_id: offset}`) | +| `.subscribe_partition(partition_id, bucket_id, start_offset)` | Subscribe to a partition bucket | +| `.subscribe_partition_buckets(partition_bucket_offsets)` | Subscribe to multiple partition+bucket combos (`{(part_id, bucket_id): offset}`) | +| `.unsubscribe_partition(partition_id, bucket_id)` | Unsubscribe from a partition bucket | +| `.poll(timeout_ms) -> list[ScanRecord]` | Poll individual records (record scanner only) | +| `.poll_arrow(timeout_ms) -> pa.Table` | Poll as Arrow Table (batch scanner only) | +| `.poll_batches(timeout_ms) -> list[RecordBatch]` | Poll batches with metadata (batch scanner only) | +| `.to_arrow() -> pa.Table` | Read all subscribed data as Arrow Table (batch scanner only) | +| `.to_pandas() -> pd.DataFrame` | Read all subscribed data as DataFrame (batch scanner only) | + +## `ScanRecord` + +| Property | Description | +|---|---| +| `.bucket -> TableBucket` | Bucket this record belongs to | +| `.offset -> int` | Record offset in the log | +| `.timestamp -> int` | Record timestamp | +| `.change_type -> ChangeType` | Change type (AppendOnly, Insert, UpdateBefore, UpdateAfter, Delete) | +| `.row -> dict` | Row data as `{column_name: value}` | + +## `RecordBatch` + +| Property | Description | +|---|---| +| `.batch -> pa.RecordBatch` | Arrow RecordBatch data | +| `.bucket -> TableBucket` | Bucket this batch belongs to | +| `.base_offset -> int` | First record offset | +| `.last_offset -> int` | Last record offset | + +## `Schema` + +| Method | Description | +|---|---| +| `Schema(schema: pa.Schema, primary_keys=None)` | Create from PyArrow schema | +| `.get_column_names() -> list[str]` | Get column names | +| `.get_column_types() -> list[str]` | Get column type names | + +## `TableDescriptor` + +| Method | Description | +|---|---| +| `TableDescriptor(schema, *, partition_keys=None, bucket_count=None, bucket_keys=None, comment=None, **properties)` | Create table descriptor | +| `.get_schema() -> Schema` | Get the schema | + +## `TablePath` + +| Method / Property | Description | +|---|---| +| `TablePath(database, table)` | Create a table path | +| `.database_name -> str` | Database name | +| `.table_name -> str` | Table name | + +## `TableInfo` + +| Property / Method | Description | +|---|---| +| `.table_id -> int` | Table ID | +| `.table_path -> TablePath` | Table path | +| `.num_buckets -> int` | Number of buckets | +| `.schema_id -> int` | Schema ID | +| `.comment -> str \| None` | Table comment | +| `.created_time -> int` | Creation timestamp | +| `.modified_time -> int` | Last modification timestamp | +| `.get_primary_keys() -> list[str]` | Primary key columns | +| `.get_partition_keys() -> list[str]` | Partition columns | +| `.get_bucket_keys() -> list[str]` | Bucket key columns | +| `.has_primary_key() -> bool` | Has primary key? | +| `.is_partitioned() -> bool` | Is partitioned? | +| `.get_schema() -> Schema` | Get table schema | +| `.get_column_names() -> list[str]` | Column names | +| `.get_column_count() -> int` | Number of columns | +| `.get_properties() -> dict` | All table properties | +| `.get_custom_properties() -> dict` | Custom properties only | + +## `PartitionInfo` + +| Property | Description | +|---|---| +| `.partition_id -> int` | Partition ID | +| `.partition_name -> str` | Partition name | + +## `DatabaseDescriptor` + +| Method / Property | Description | +|---|---| +| `DatabaseDescriptor(comment=None, custom_properties=None)` | Create descriptor | +| `.comment -> str \| None` | Database comment | +| `.get_custom_properties() -> dict` | Custom properties | + +## `DatabaseInfo` + +| Property / Method | Description | +|---|---| +| `.database_name -> str` | Database name | +| `.created_time -> int` | Creation timestamp | +| `.modified_time -> int` | Last modification timestamp | +| `.get_database_descriptor() -> DatabaseDescriptor` | Get descriptor | + +## `LakeSnapshot` + +| Property / Method | Description | +|---|---| +| `.snapshot_id -> int` | Snapshot ID | +| `.table_buckets_offset -> dict[TableBucket, int]` | All bucket offsets | +| `.get_bucket_offset(bucket) -> int \| None` | Get offset for a bucket | +| `.get_table_buckets() -> list[TableBucket]` | Get all buckets | + +## `TableBucket` + +| Method / Property | Description | +|---|---| +| `TableBucket(table_id, bucket)` | Create non-partitioned bucket | +| `TableBucket.with_partition(table_id, partition_id, bucket)` | Create partitioned bucket | +| `.table_id -> int` | Table ID | +| `.bucket_id -> int` | Bucket ID | +| `.partition_id -> int \| None` | Partition ID (None if non-partitioned) | + +## `FlussError` + +| Property | Description | +|---|---| +| `.message -> str` | Error message | + +Raised for all Fluss-specific errors (connection failures, table not found, schema mismatches, etc.). Inherits from `Exception`. + +## Constants + +| Constant | Value | Description | +|---|---|---| +| `fluss.EARLIEST_OFFSET` | `-2` | Start reading from earliest available offset | +| `fluss.LATEST_OFFSET` | `-1` | Start reading from latest offset (only new records) | +| `fluss.OffsetType.EARLIEST` | `"earliest"` | For `list_offsets()` | +| `fluss.OffsetType.LATEST` | `"latest"` | For `list_offsets()` | +| `fluss.OffsetType.TIMESTAMP` | `"timestamp"` | For `list_offsets()` with timestamp | + Review Comment: This reference documents `fluss.LATEST_OFFSET = -1`, but the Python module currently only exports `EARLIEST_OFFSET` (and `OffsetType.*` for `list_offsets`). Please remove or correct `LATEST_OFFSET` here, and document the supported way to start scanning from the end (e.g., fetch latest offsets via `OffsetType.LATEST`). ```suggestion | `fluss.OffsetType.EARLIEST` | `"earliest"` | For `list_offsets()` | | `fluss.OffsetType.LATEST` | `"latest"` | For `list_offsets()` (to fetch latest offsets) | | `fluss.OffsetType.TIMESTAMP` | `"timestamp"` | For `list_offsets()` with timestamp | To start reading from the latest offsets (only new records), first call `list_offsets()` with `fluss.OffsetType.LATEST` to obtain the current end offsets, and then use those offsets as the starting positions for your scans. ``` ########## docs/python-client.md: ########## @@ -0,0 +1,449 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# Python Client Guide + +This guide covers how to use the Fluss Python client for reading and writing data to log tables and primary key tables. + +The Python client is async-first, built on top of the Rust core via [PyO3](https://pyo3.rs/), and uses [PyArrow](https://arrow.apache.org/docs/python/) for schema definitions and data interchange. + +## Key Concepts + +- **Log table** — an append-only table (no primary key). Records are immutable once written. Use for event streams, logs, and audit trails. +- **Primary key (PK) table** — a table with a primary key. Supports upsert, delete, and point lookups. +- **Bucket** — the unit of parallelism within a table (similar to Kafka partitions). Each table has one or more buckets. Readers subscribe to individual buckets. +- **Partition** — a way to organize data by column values (e.g. by date or region). Each partition contains its own set of buckets. Partitions must be created explicitly before writing. +- **Offset** — the position of a record within a bucket. Used to track reading progress. Start from `EARLIEST_OFFSET` to read all data, or `LATEST_OFFSET` to only read new records. Review Comment: The guide refers to `LATEST_OFFSET` as a start position constant, but the Python bindings currently only expose `fluss.EARLIEST_OFFSET` (there is no `fluss.LATEST_OFFSET`). Update the docs to either instruct users to compute the latest offset via `admin.list_offsets(..., offset_type=fluss.OffsetType.LATEST)` or document the correct constant/value that the bindings actually expose. ```suggestion - **Offset** — the position of a record within a bucket. Used to track reading progress. Start from `fluss.EARLIEST_OFFSET` to read all data, or use `admin.list_offsets(..., offset_type=fluss.OffsetType.LATEST)` to obtain the latest offset if you only want to read new records. ``` ########## docs/python-client.md: ########## @@ -0,0 +1,449 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# Python Client Guide + +This guide covers how to use the Fluss Python client for reading and writing data to log tables and primary key tables. + +The Python client is async-first, built on top of the Rust core via [PyO3](https://pyo3.rs/), and uses [PyArrow](https://arrow.apache.org/docs/python/) for schema definitions and data interchange. + +## Key Concepts + +- **Log table** — an append-only table (no primary key). Records are immutable once written. Use for event streams, logs, and audit trails. +- **Primary key (PK) table** — a table with a primary key. Supports upsert, delete, and point lookups. +- **Bucket** — the unit of parallelism within a table (similar to Kafka partitions). Each table has one or more buckets. Readers subscribe to individual buckets. +- **Partition** — a way to organize data by column values (e.g. by date or region). Each partition contains its own set of buckets. Partitions must be created explicitly before writing. +- **Offset** — the position of a record within a bucket. Used to track reading progress. Start from `EARLIEST_OFFSET` to read all data, or `LATEST_OFFSET` to only read new records. + +## Prerequisites + +You need a running Fluss cluster to use the Python client. See the [Quick-Start guide](../README.md#quick-start) for how to start a local cluster. + +## Installation + +```bash +pip install pyfluss +``` + +To build from source instead, see the [Python bindings README](../bindings/python/README.md). + +## Quick Start + +A minimal end-to-end example: connect, create a table, write data, and read it back. Assumes a Fluss cluster is running on `localhost:9123`. + +```python +import asyncio +import pyarrow as pa +import fluss + +async def main(): + # Connect + config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) + conn = await fluss.FlussConnection.connect(config) + admin = await conn.get_admin() + + # Create a log table + schema = fluss.Schema(pa.schema([ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("score", pa.float32()), + ])) + table_path = fluss.TablePath("fluss", "quick_start") + await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True) + + # Write + table = await conn.get_table(table_path) + writer = await table.new_append_writer() + writer.append({"id": 1, "name": "Alice", "score": 95.5}) + writer.append({"id": 2, "name": "Bob", "score": 87.0}) + await writer.flush() + + # Read + num_buckets = (await admin.get_table(table_path)).num_buckets + scanner = await table.new_scan().create_batch_scanner() + scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + print(scanner.to_pandas()) + + # Cleanup + await admin.drop_table(table_path, ignore_if_not_exists=True) + conn.close() + +asyncio.run(main()) +``` + +## Connection Setup + +```python +config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"}) +conn = await fluss.FlussConnection.connect(config) +``` + +The connection also supports context managers: + +```python +with await fluss.FlussConnection.connect(config) as conn: + ... +``` + +### Configuration Options + +| Key | Description | Default | +|-----|-------------|---------| +| `bootstrap.servers` | Coordinator server address | `127.0.0.1:9123` | +| `request.max.size` | Maximum request size in bytes | `10485760` (10 MB) | +| `writer.acks` | Acknowledgment setting (`all` waits for all replicas) | `all` | +| `writer.retries` | Number of retries on failure | `2147483647` | +| `writer.batch.size` | Batch size for writes in bytes | `2097152` (2 MB) | + +## Admin Operations + +```python +admin = await conn.get_admin() +``` + +### Databases + +```python +await admin.create_database("my_database", ignore_if_exists=True) +databases = await admin.list_databases() +exists = await admin.database_exists("my_database") +await admin.drop_database("my_database", ignore_if_not_exists=True, cascade=True) +``` + +### Tables + +Schemas are defined using PyArrow and wrapped in `fluss.Schema`: + +```python +import pyarrow as pa + +schema = fluss.Schema(pa.schema([ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("amount", pa.int64()), +])) + +table_path = fluss.TablePath("my_database", "my_table") +await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=True) + +table_info = await admin.get_table(table_path) +tables = await admin.list_tables("my_database") +await admin.drop_table(table_path, ignore_if_not_exists=True) +``` + +`TableDescriptor` accepts these optional parameters: + +| Parameter | Description | +|---|---| +| `partition_keys` | Column names to partition by (e.g. `["region"]`) | +| `bucket_count` | Number of buckets (parallelism units) for the table | +| `bucket_keys` | Columns used to determine bucket assignment | +| `comment` | Table comment / description | + +### Offsets + +```python +# Latest offsets for buckets +offsets = await admin.list_offsets(table_path, bucket_ids=[0, 1], offset_type="latest") + +# By timestamp +offsets = await admin.list_offsets(table_path, bucket_ids=[0], offset_type="timestamp", timestamp=1704067200000) + +# Per-partition offsets +offsets = await admin.list_partition_offsets(table_path, partition_name="US", bucket_ids=[0], offset_type="latest") +``` + +## Log Tables + +Log tables are append-only tables without primary keys, suitable for event streaming. + +### Writing + +Rows can be appended as dicts, lists, or tuples. For bulk writes, use `write_arrow()`, `write_arrow_batch()`, or `write_pandas()`. + +Write methods like `append()` and `write_arrow_batch()` return a `WriteResultHandle`. You can ignore it for fire-and-forget semantics (flush at the end), or `await handle.wait()` to block until the server acknowledges that specific write. + +```python +table = await conn.get_table(table_path) +writer = await table.new_append_writer() + +# Fire-and-forget: queue writes, flush at the end +writer.append({"id": 1, "name": "Alice", "score": 95.5}) +writer.append([2, "Bob", 87.0]) +await writer.flush() + +# Per-record acknowledgment +handle = writer.append({"id": 3, "name": "Charlie", "score": 91.0}) +await handle.wait() + +# Bulk writes +writer.write_arrow(pa_table) # PyArrow Table +writer.write_arrow_batch(record_batch) # PyArrow RecordBatch +writer.write_pandas(df) # Pandas DataFrame +await writer.flush() +``` + +### Reading + +There are two scanner types: +- **Batch scanner** (`create_batch_scanner()`) — returns Arrow Tables or DataFrames, best for analytics +- **Record scanner** (`create_log_scanner()`) — returns individual records with metadata (offset, timestamp, change type), best for streaming + +And two reading modes: +- **`to_arrow()` / `to_pandas()`** — reads all data from subscribed buckets up to the current latest offset, then returns. Best for one-shot batch reads. +- **`poll_arrow()` / `poll()` / `poll_batches()`** — returns whatever data is available within the timeout, then returns. Call in a loop for continuous streaming. + +#### Batch Read (One-Shot) + +```python +num_buckets = (await admin.get_table(table_path)).num_buckets + +scanner = await table.new_scan().create_batch_scanner() +scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + +# Reads everything up to current latest offset, then returns +arrow_table = scanner.to_arrow() +df = scanner.to_pandas() +``` + +#### Continuous Polling + +Use `poll_arrow()` or `poll()` in a loop for streaming consumption: + +```python +# Batch scanner: poll as Arrow Tables +scanner = await table.new_scan().create_batch_scanner() +scanner.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) + +while True: + result = scanner.poll_arrow(timeout_ms=5000) + if result.num_rows > 0: + print(result.to_pandas()) + +# Record scanner: poll individual records with metadata +scanner = await table.new_scan().create_log_scanner() +scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + +while True: + for record in scanner.poll(timeout_ms=5000): + print(f"offset={record.offset}, change={record.change_type.short_string()}, row={record.row}") +``` + +#### Subscribe from Latest Offset + +To only consume new records (skip existing data), use `LATEST_OFFSET`: + +```python +scanner = await table.new_scan().create_batch_scanner() +scanner.subscribe(bucket_id=0, start_offset=fluss.LATEST_OFFSET) Review Comment: This “Subscribe from Latest Offset” example uses `fluss.LATEST_OFFSET`, which is not currently exported by the bindings (only `EARLIEST_OFFSET` exists). The example as written will fail for users; please adjust to the supported way of starting from the end (e.g., fetch latest offsets via `list_offsets` / `OffsetType.LATEST`). ```suggestion To only consume new records (skip existing data), start from the latest offset: ```python offsets = await client.list_offsets( table=table.name, bucket_ids=[0], offset_type=fluss.OffsetType.LATEST, ) latest_offset = offsets[0].offset scanner = await table.new_scan().create_batch_scanner() scanner.subscribe(bucket_id=0, start_offset=latest_offset) ``` ########## docs/python-api-reference.md: ########## @@ -0,0 +1,278 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# Python API Reference + +Complete API reference for the Fluss Python client. For a usage guide with examples, see the [Python Client Guide](python-client.md). + +## `Config` + +| Method / Property | Description | +|---|---| +| `Config(properties: dict = None)` | Create config from a dict of key-value pairs | +| `.bootstrap_server` | Get/set coordinator server address | +| `.request_max_size` | Get/set max request size in bytes | +| `.writer_batch_size` | Get/set write batch size in bytes | + +## `FlussConnection` + +| Method | Description | +|---|---| +| `await FlussConnection.connect(config) -> FlussConnection` | Connect to a Fluss cluster | +| `await conn.get_admin() -> FlussAdmin` | Get admin interface | +| `await conn.get_table(table_path) -> FlussTable` | Get a table for read/write operations | +| `conn.close()` | Close the connection | + +Supports `with` statement (context manager). + +## `FlussAdmin` + +| Method | Description | +|---|---| +| `await create_database(name, ignore_if_exists=False, database_descriptor=None)` | Create a database | +| `await drop_database(name, ignore_if_not_exists=False, cascade=True)` | Drop a database | +| `await list_databases() -> list[str]` | List all databases | +| `await database_exists(name) -> bool` | Check if a database exists | +| `await get_database_info(name) -> DatabaseInfo` | Get database metadata | +| `await create_table(table_path, table_descriptor, ignore_if_exists=False)` | Create a table | +| `await drop_table(table_path, ignore_if_not_exists=False)` | Drop a table | +| `await get_table(table_path) -> TableInfo` | Get table metadata | +| `await list_tables(database_name) -> list[str]` | List tables in a database | +| `await table_exists(table_path) -> bool` | Check if a table exists | +| `await list_offsets(table_path, bucket_ids, offset_type, timestamp=None) -> dict[int, int]` | Get offsets for buckets | +| `await list_partition_offsets(table_path, partition_name, bucket_ids, offset_type, timestamp=None) -> dict[int, int]` | Get offsets for a partition's buckets | +| `await create_partition(table_path, partition_spec, ignore_if_exists=False)` | Create a partition | +| `await drop_partition(table_path, partition_spec, ignore_if_not_exists=False)` | Drop a partition | +| `await list_partition_infos(table_path) -> list[PartitionInfo]` | List partitions | +| `await get_latest_lake_snapshot(table_path) -> LakeSnapshot` | Get latest lake snapshot | + +## `FlussTable` + +| Method | Description | +|---|---| +| `new_scan() -> TableScan` | Create a scan builder | +| `await new_append_writer() -> AppendWriter` | Create writer for log tables | +| `new_upsert(columns=None, column_indices=None) -> UpsertWriter` | Create writer for PK tables (optionally partial) | +| `new_lookup() -> Lookuper` | Create lookuper for PK tables | +| `get_table_info() -> TableInfo` | Get table metadata | +| `get_table_path() -> TablePath` | Get table path | +| `has_primary_key() -> bool` | Check if table has a primary key | + +## `TableScan` + +| Method | Description | +|---|---| +| `.project(indices) -> TableScan` | Project columns by index | +| `.project_by_name(names) -> TableScan` | Project columns by name | +| `await .create_log_scanner() -> LogScanner` | Create record-based scanner (for `poll()`) | +| `await .create_batch_scanner() -> LogScanner` | Create batch-based scanner (for `poll_arrow()`, `to_arrow()`, etc.) | + +## `AppendWriter` + +| Method | Description | +|---|---| +| `.append(row) -> WriteResultHandle` | Append a row (dict, list, or tuple) | +| `.write_arrow(table)` | Write a PyArrow Table | +| `.write_arrow_batch(batch) -> WriteResultHandle` | Write a PyArrow RecordBatch | +| `.write_pandas(df)` | Write a Pandas DataFrame | +| `await .flush()` | Flush all pending writes | + +## `UpsertWriter` + +| Method | Description | +|---|---| +| `.upsert(row) -> WriteResultHandle` | Upsert a row (insert or update by PK) | +| `.delete(pk) -> WriteResultHandle` | Delete a row by primary key | +| `await .flush()` | Flush all pending operations | + +## `WriteResultHandle` + +| Method | Description | +|---|---| +| `await .wait()` | Wait for server acknowledgment of this write | + +## `Lookuper` + +| Method | Description | +|---|---| +| `await .lookup(pk) -> dict \| None` | Lookup a row by primary key | + +## `LogScanner` + +| Method | Description | +|---|---| +| `.subscribe(bucket_id, start_offset)` | Subscribe to a bucket | +| `.subscribe_buckets(bucket_offsets)` | Subscribe to multiple buckets (`{bucket_id: offset}`) | +| `.subscribe_partition(partition_id, bucket_id, start_offset)` | Subscribe to a partition bucket | +| `.subscribe_partition_buckets(partition_bucket_offsets)` | Subscribe to multiple partition+bucket combos (`{(part_id, bucket_id): offset}`) | +| `.unsubscribe_partition(partition_id, bucket_id)` | Unsubscribe from a partition bucket | +| `.poll(timeout_ms) -> list[ScanRecord]` | Poll individual records (record scanner only) | +| `.poll_arrow(timeout_ms) -> pa.Table` | Poll as Arrow Table (batch scanner only) | +| `.poll_batches(timeout_ms) -> list[RecordBatch]` | Poll batches with metadata (batch scanner only) | +| `.to_arrow() -> pa.Table` | Read all subscribed data as Arrow Table (batch scanner only) | +| `.to_pandas() -> pd.DataFrame` | Read all subscribed data as DataFrame (batch scanner only) | + +## `ScanRecord` + +| Property | Description | +|---|---| +| `.bucket -> TableBucket` | Bucket this record belongs to | +| `.offset -> int` | Record offset in the log | +| `.timestamp -> int` | Record timestamp | +| `.change_type -> ChangeType` | Change type (AppendOnly, Insert, UpdateBefore, UpdateAfter, Delete) | +| `.row -> dict` | Row data as `{column_name: value}` | + +## `RecordBatch` + +| Property | Description | +|---|---| +| `.batch -> pa.RecordBatch` | Arrow RecordBatch data | +| `.bucket -> TableBucket` | Bucket this batch belongs to | +| `.base_offset -> int` | First record offset | +| `.last_offset -> int` | Last record offset | + +## `Schema` + +| Method | Description | +|---|---| +| `Schema(schema: pa.Schema, primary_keys=None)` | Create from PyArrow schema | +| `.get_column_names() -> list[str]` | Get column names | +| `.get_column_types() -> list[str]` | Get column type names | + +## `TableDescriptor` + +| Method | Description | +|---|---| +| `TableDescriptor(schema, *, partition_keys=None, bucket_count=None, bucket_keys=None, comment=None, **properties)` | Create table descriptor | Review Comment: The `TableDescriptor` signature here shows `**properties`, but the bindings implementation does not consume arbitrary keyword args as properties; it expects `properties={...}` / `custom_properties={...}` dictionaries (and a few named kwargs like `log_format`/`kv_format`). Please update the signature/documentation to match actual supported kwargs to avoid users passing silently-ignored `**properties`. ```suggestion | `TableDescriptor(schema, *, partition_keys=None, bucket_count=None, bucket_keys=None, comment=None, properties=None, custom_properties=None, log_format=None, kv_format=None)` | Create table descriptor | ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
