Copilot commented on code in PR #415:
URL: https://github.com/apache/fluss-rust/pull/415#discussion_r2876632198
##########
crates/fluss/Cargo.toml:
##########
@@ -22,10 +22,12 @@ rust-version = { workspace = true }
version = { workspace = true }
name = "fluss-rs"
authors = { workspace = true }
-description = { workspace = true }
-homepage = { workspace = true }
+description = "The offical rust client of Apache Fluss (Incubating)"
Review Comment:
Typo in the crate description: "offical" should be "official" (and consider
capitalizing "Rust"). This string is published to crates.io metadata.
```suggestion
description = "The official Rust client of Apache Fluss (Incubating)"
```
##########
crates/fluss/README.md:
##########
@@ -1,25 +1,104 @@
-# Apache Fluss™ Rust Client (Incubating)
+# Apache Fluss (Incubating) Official Rust Client
-Rust client library for [Apache Fluss™](https://fluss.apache.org/). This crate
provides the core client used by the fluss-rust workspace and by the Python and
C++ bindings.
+Official Rust client library for [Apache Fluss
(Incubating)](https://fluss.apache.org/).
-# Todo: move how to use to the first, and how to build to the last,
https://github.com/apache/opendal/blob/main/core/README.md
-# is a good reference
+[](https://crates.io/crates/fluss-rs)
+[](https://docs.rs/fluss-rs/)
-## Requirements
+## Usage
-- Rust (see [rust-toolchain.toml](../../rust-toolchain.toml) at repo root)
-- protobuf (for build)
+The following example shows both **primary key (KV) tables** and **log
tables** in one flow: connect, create a KV table (upsert + lookup), then create
a log table (append + scan).
-## Build
+```rust
+use fluss::client::EARLIEST_OFFSET;
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use fluss::error::Result;
+use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+use fluss::row::{GenericRow, InternalRow};
+use std::time::Duration;
-From the repository root:
+#[tokio::main]
+async fn main() -> Result<()> {
+ let mut config = Config::default();
+ config.bootstrap_servers = "127.0.0.1:9123".to_string();
+ let connection = FlussConnection::new(config).await?;
+ let admin = connection.get_admin().await?;
-```bash
-cargo build -p fluss-rs
+ // ---- Primary key (KV) table: upsert and lookup ----
+ let kv_path = TablePath::new("fluss", "users");
+ let mut kv_schema = Schema::builder()
+ .column("id", DataTypes::int())
+ .column("name", DataTypes::string())
+ .column("age", DataTypes::bigint())
+ .primary_key(vec!["id"]);
+ let kv_descriptor = TableDescriptor::builder()
+ .schema(kv_schema.build()?)
+ .build()?;
+ admin.create_table(&kv_path, &kv_descriptor, false).await?;
+
+ let kv_table = connection.get_table(&kv_path).await?;
+ let upsert_writer = kv_table.new_upsert()?.create_writer()?;
+ let mut row = GenericRow::new(3);
+ row.set_field(0, 1i32);
+ row.set_field(1, "Alice");
+ row.set_field(2, 30i64);
+ upsert_writer.upsert(&row)?;
+ upsert_writer.flush().await?;
+
+ let mut lookuper = kv_table.new_lookup()?.create_lookuper()?;
+ let mut key = GenericRow::new(1);
+ key.set_field(0, 1i32);
+ let result = lookuper.lookup(&key).await?;
+ if let Some(r) = result.get_single_row()? {
+ println!("KV lookup: id={}, name={}, age={}",
+ r.get_int(0)?, r.get_string(1)?, r.get_long(2)?);
+ }
+
+ // ---- Log table: append and scan ----
+ let log_path = TablePath::new("fluss", "events");
+ let mut log_schema_builder = Schema::builder()
+ .column("ts", DataTypes::bigint())
+ .column("message", DataTypes::string());
+ let log_descriptor = TableDescriptor::builder()
+ .schema(log_schema_builder.build()?)
+ .build()?;
+ admin.create_table(&log_path, &log_descriptor, false).await?;
+
+ let log_table = connection.get_table(&log_path).await?;
+ let append_writer = log_table.new_append()?.create_writer()?;
+ let mut event = GenericRow::new(2);
+ event.set_field(0, 1700000000i64);
+ event.set_field(1, "hello");
+ append_writer.append(&event)?;
+ append_writer.flush().await?;
+
+ let scanner = log_table.new_scan().create_log_scanner()?;
+ scanner.subscribe(0, EARLIEST_OFFSET).await?;
+ let scan_records = scanner.poll(Duration::from_secs(1)).await?;
+ for record in scan_records {
+ let r = record.row();
+ println!("Log scan: ts={}, message={}", r.get_long(0)?,
r.get_string(1)?);
Review Comment:
`scanner.poll(...)` returns `ScanRecords`, which is not directly iterable.
The `for record in scan_records` loop won’t compile; iterate via
`scan_records.records_by_buckets()` (and flatten) or pick a bucket and call
`scan_records.records(&table_bucket)` instead.
```suggestion
for (_bucket, records) in scan_records.records_by_buckets() {
for record in records {
let r = record.row();
println!("Log scan: ts={}, message={}", r.get_long(0)?,
r.get_string(1)?);
}
```
##########
crates/fluss/README.md:
##########
@@ -1,25 +1,104 @@
-# Apache Fluss™ Rust Client (Incubating)
+# Apache Fluss (Incubating) Official Rust Client
-Rust client library for [Apache Fluss™](https://fluss.apache.org/). This crate
provides the core client used by the fluss-rust workspace and by the Python and
C++ bindings.
+Official Rust client library for [Apache Fluss
(Incubating)](https://fluss.apache.org/).
-# Todo: move how to use to the first, and how to build to the last,
https://github.com/apache/opendal/blob/main/core/README.md
-# is a good reference
+[](https://crates.io/crates/fluss-rs)
+[](https://docs.rs/fluss-rs/)
-## Requirements
+## Usage
-- Rust (see [rust-toolchain.toml](../../rust-toolchain.toml) at repo root)
-- protobuf (for build)
+The following example shows both **primary key (KV) tables** and **log
tables** in one flow: connect, create a KV table (upsert + lookup), then create
a log table (append + scan).
-## Build
+```rust
+use fluss::client::EARLIEST_OFFSET;
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use fluss::error::Result;
+use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+use fluss::row::{GenericRow, InternalRow};
+use std::time::Duration;
-From the repository root:
+#[tokio::main]
+async fn main() -> Result<()> {
+ let mut config = Config::default();
+ config.bootstrap_servers = "127.0.0.1:9123".to_string();
+ let connection = FlussConnection::new(config).await?;
+ let admin = connection.get_admin().await?;
-```bash
-cargo build -p fluss-rs
+ // ---- Primary key (KV) table: upsert and lookup ----
+ let kv_path = TablePath::new("fluss", "users");
+ let mut kv_schema = Schema::builder()
+ .column("id", DataTypes::int())
+ .column("name", DataTypes::string())
+ .column("age", DataTypes::bigint())
+ .primary_key(vec!["id"]);
+ let kv_descriptor = TableDescriptor::builder()
+ .schema(kv_schema.build()?)
+ .build()?;
+ admin.create_table(&kv_path, &kv_descriptor, false).await?;
+
+ let kv_table = connection.get_table(&kv_path).await?;
+ let upsert_writer = kv_table.new_upsert()?.create_writer()?;
+ let mut row = GenericRow::new(3);
+ row.set_field(0, 1i32);
+ row.set_field(1, "Alice");
+ row.set_field(2, 30i64);
+ upsert_writer.upsert(&row)?;
+ upsert_writer.flush().await?;
+
+ let mut lookuper = kv_table.new_lookup()?.create_lookuper()?;
+ let mut key = GenericRow::new(1);
+ key.set_field(0, 1i32);
+ let result = lookuper.lookup(&key).await?;
+ if let Some(r) = result.get_single_row()? {
+ println!("KV lookup: id={}, name={}, age={}",
+ r.get_int(0)?, r.get_string(1)?, r.get_long(2)?);
+ }
+
+ // ---- Log table: append and scan ----
+ let log_path = TablePath::new("fluss", "events");
+ let mut log_schema_builder = Schema::builder()
+ .column("ts", DataTypes::bigint())
+ .column("message", DataTypes::string());
+ let log_descriptor = TableDescriptor::builder()
+ .schema(log_schema_builder.build()?)
+ .build()?;
+ admin.create_table(&log_path, &log_descriptor, false).await?;
+
+ let log_table = connection.get_table(&log_path).await?;
+ let append_writer = log_table.new_append()?.create_writer()?;
+ let mut event = GenericRow::new(2);
+ event.set_field(0, 1700000000i64);
+ event.set_field(1, "hello");
+ append_writer.append(&event)?;
+ append_writer.flush().await?;
+
+ let scanner = log_table.new_scan().create_log_scanner()?;
+ scanner.subscribe(0, EARLIEST_OFFSET).await?;
+ let scan_records = scanner.poll(Duration::from_secs(1)).await?;
+ for record in scan_records {
+ let r = record.row();
+ println!("Log scan: ts={}, message={}", r.get_long(0)?,
r.get_string(1)?);
+ }
+
+ Ok(())
+}
```
-## Quick start and examples
+## Storage Support
+
+The Fluss client reads remote data by accessing Fluss’s **remote files** (e.g.
log segments and snapshots) directly. The following **remote file systems** are
supported; enable the matching feature(s) for your deployment:
+
+| Storage Backend | Feature Flag | Status | Description |
+|----------------|--------------|--------|-------------|
+| Local Filesystem | `storage-fs` | ✅ Stable | Local filesystem storage |
+| Amazon S3 | `storage-s3` | ✅ Stable | Amazon S3 storage |
+| Alibaba Cloud OSS | `storage-oss` | ✅ Stable | Alibaba Cloud Object Storage
Service |
-## TODO
-- [ ] Expand API documentation and usage examples in this README.
-- [ ] Add more examples for table, log scan, and write flows.
+You can enable all storage backends at once using the `storage-all` feature
flag.
+
+Example usage in Cargo.toml:
+```toml
+[dependencies]
+fluss-rs = { version = "0.1.0", features = ["storage-s3", "storage-fs"] }
Review Comment:
The dependency example hardcodes `version = "0.1.0"`, but the workspace
version is `0.2.0` (and will drift again in the future). Consider updating this
to the current crate version or using a placeholder like `"0.x"` to avoid
immediately stale docs.
```suggestion
fluss-rs = { version = "0.x", features = ["storage-s3", "storage-fs"] }
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]