This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 3ad3119 [client] Add LookupResult::to_record_batch() (#411)
3ad3119 is described below
commit 3ad3119d8e52269b3cfa393836d5e64a63093be5
Author: Prajwal banakar <[email protected]>
AuthorDate: Sun Mar 15 07:20:11 2026 +0530
[client] Add LookupResult::to_record_batch() (#411)
---
crates/fluss/src/client/table/lookup.rs | 124 +++++++++++++++++++--
crates/fluss/src/record/arrow.rs | 9 ++
website/docs/user-guide/rust/api-reference.md | 3 +-
.../user-guide/rust/example/primary-key-tables.md | 8 ++
4 files changed, 135 insertions(+), 9 deletions(-)
diff --git a/crates/fluss/src/client/table/lookup.rs
b/crates/fluss/src/client/table/lookup.rs
index ce15491..3d643ed 100644
--- a/crates/fluss/src/client/table/lookup.rs
+++ b/crates/fluss/src/client/table/lookup.rs
@@ -20,6 +20,7 @@ use crate::client::metadata::Metadata;
use crate::client::table::partition_getter::PartitionGetter;
use crate::error::{Error, Result};
use crate::metadata::{PhysicalTablePath, RowType, TableBucket, TableInfo,
TablePath};
+use crate::record::RowAppendRecordBatchBuilder;
use crate::record::kv::SCHEMA_ID_LENGTH;
use crate::row::InternalRow;
use crate::row::compacted::CompactedRow;
@@ -27,6 +28,7 @@ use crate::row::encode::{KeyEncoder, KeyEncoderFactory};
use crate::rpc::ApiError;
use crate::rpc::RpcClient;
use crate::rpc::message::LookupRequest;
+use arrow::array::RecordBatch;
use std::sync::Arc;
/// The result of a lookup operation.
@@ -53,6 +55,19 @@ impl LookupResult {
}
}
+ /// Extracts the row payload by stripping the schema id prefix.
+ fn extract_payload(bytes: &[u8]) -> Result<&[u8]> {
+ bytes
+ .get(SCHEMA_ID_LENGTH..)
+ .ok_or_else(|| Error::RowConvertError {
+ message: format!(
+ "Row payload too short: {} bytes, need at least {} for
schema id",
+ bytes.len(),
+ SCHEMA_ID_LENGTH
+ ),
+ })
+ }
+
/// Returns the only row in the result set as a [`CompactedRow`].
///
/// This method provides a zero-copy view of the row data, which means the
returned
@@ -62,14 +77,14 @@ impl LookupResult {
/// - `Ok(Some(row))`: If exactly one row exists.
/// - `Ok(None)`: If the result set is empty.
/// - `Err(Error::UnexpectedError)`: If the result set contains more than
one row.
- ///
+ /// - `Err(Error)`: If the row payload is too short to contain a schema id.
pub fn get_single_row(&self) -> Result<Option<CompactedRow<'_>>> {
match self.rows.len() {
0 => Ok(None),
- 1 => Ok(Some(CompactedRow::from_bytes(
- &self.row_type,
- &self.rows[0][SCHEMA_ID_LENGTH..],
- ))),
+ 1 => {
+ let payload = Self::extract_payload(&self.rows[0])?;
+ Ok(Some(CompactedRow::from_bytes(&self.row_type, payload)))
+ }
_ => Err(Error::UnexpectedError {
message: "LookupResult contains multiple rows, use get_rows()
instead".to_string(),
source: None,
@@ -77,14 +92,42 @@ impl LookupResult {
}
}
- /// Returns all rows as CompactedRows.
- pub fn get_rows(&self) -> Vec<CompactedRow<'_>> {
+ /// Returns all rows in the result set as [`CompactedRow`]s.
+ ///
+ /// # Returns
+ /// - `Ok(rows)` - All rows in the result set.
+ /// - `Err(Error)` - If any row payload is too short to contain a schema
id.
+ pub fn get_rows(&self) -> Result<Vec<CompactedRow<'_>>> {
self.rows
.iter()
// TODO Add schema id check and fetch when implementing prefix
lookup
- .map(|bytes| CompactedRow::from_bytes(&self.row_type,
&bytes[SCHEMA_ID_LENGTH..]))
+ .map(|bytes| {
+ let payload = Self::extract_payload(bytes)?;
+ Ok(CompactedRow::from_bytes(&self.row_type, payload))
+ })
.collect()
}
+
+ /// Converts all rows in this result into an Arrow [`RecordBatch`].
+ ///
+ /// This is useful for integration with DataFusion or other Arrow-based
tools.
+ ///
+ /// # Returns
+ /// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an
empty
+ /// batch (with the correct schema) if the result set is empty.
+ /// - `Err(Error)` - If the conversion fails.
+ pub fn to_record_batch(&self) -> Result<RecordBatch> {
+ let mut builder = RowAppendRecordBatchBuilder::new(&self.row_type)?;
+
+ for bytes in &self.rows {
+ let payload = Self::extract_payload(bytes)?;
+
+ let row = CompactedRow::from_bytes(&self.row_type, payload);
+ builder.append(&row)?;
+ }
+
+ builder.build_arrow_record_batch().map(Arc::unwrap_or_clone)
+ }
}
/// Configuration and factory struct for creating lookup operations.
@@ -306,3 +349,68 @@ impl Lookuper {
&self.table_info
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::{DataField, DataTypes};
+ use crate::row::binary::BinaryWriter;
+ use crate::row::compacted::CompactedRowWriter;
+ use arrow::array::Int32Array;
+
+ fn make_row_bytes(schema_id: i16, row_data: &[u8]) -> Vec<u8> {
+ let mut bytes = Vec::with_capacity(SCHEMA_ID_LENGTH + row_data.len());
+ bytes.extend_from_slice(&schema_id.to_le_bytes());
+ bytes.extend_from_slice(row_data);
+ bytes
+ }
+
+ #[test]
+ fn test_to_record_batch_empty() {
+ let row_type = Arc::new(RowType::new(vec![DataField::new(
+ "id",
+ DataTypes::int(),
+ None,
+ )]));
+ let result = LookupResult::empty(row_type);
+ let batch = result.to_record_batch().unwrap();
+ assert_eq!(batch.num_rows(), 0);
+ assert_eq!(batch.num_columns(), 1);
+ }
+
+ #[test]
+ fn test_to_record_batch_with_row() {
+ let row_type = Arc::new(RowType::new(vec![DataField::new(
+ "id",
+ DataTypes::int(),
+ None,
+ )]));
+
+ let mut writer = CompactedRowWriter::new(1);
+ writer.write_int(42);
+ let row_bytes = make_row_bytes(0, writer.buffer());
+
+ let result = LookupResult::new(vec![row_bytes], Arc::clone(&row_type));
+ let batch = result.to_record_batch().unwrap();
+
+ assert_eq!(batch.num_rows(), 1);
+ let col = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(col.value(0), 42);
+ }
+
+ #[test]
+ fn test_to_record_batch_payload_too_short() {
+ let row_type = Arc::new(RowType::new(vec![DataField::new(
+ "id",
+ DataTypes::int(),
+ None,
+ )]));
+ // Only 1 byte — shorter than SCHEMA_ID_LENGTH (2)
+ let result = LookupResult::new(vec![vec![0u8]], Arc::clone(&row_type));
+ assert!(result.to_record_batch().is_err());
+ }
+}
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index a0dfc84..83d102a 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -246,6 +246,15 @@ impl RowAppendRecordBatchBuilder {
records_count: 0,
})
}
+ /// Appends a row to the builder.
+ pub fn append(&mut self, row: &dyn InternalRow) -> Result<bool> {
+ ArrowRecordBatchInnerBuilder::append(self, row)
+ }
+
+ /// Builds the final Arrow RecordBatch.
+ pub fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>> {
+ ArrowRecordBatchInnerBuilder::build_arrow_record_batch(self)
+ }
}
impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder {
diff --git a/website/docs/user-guide/rust/api-reference.md
b/website/docs/user-guide/rust/api-reference.md
index 7f45522..fbe3428 100644
--- a/website/docs/user-guide/rust/api-reference.md
+++ b/website/docs/user-guide/rust/api-reference.md
@@ -229,7 +229,8 @@ for record in records {
| Method |
Description |
|----------------------------------------------------------------|----------------------------------|
| `fn get_single_row(&self) -> Result<Option<impl InternalRow>>` | Get a
single row from the result |
-| `fn get_rows(&self) -> Vec<impl InternalRow>` | Get all
rows from the result |
+| `fn get_rows(&self) -> Result<Vec<impl InternalRow>>` | Get all
rows from the result |
+| `fn to_record_batch(&self) -> Result<RecordBatch>` | Convert all
rows to an Arrow `RecordBatch` for DataFusion or other Arrow-based tools |
## `WriteResultFuture`
diff --git a/website/docs/user-guide/rust/example/primary-key-tables.md
b/website/docs/user-guide/rust/example/primary-key-tables.md
index 9e81979..7fe8a55 100644
--- a/website/docs/user-guide/rust/example/primary-key-tables.md
+++ b/website/docs/user-guide/rust/example/primary-key-tables.md
@@ -112,3 +112,11 @@ if let Some(row) = result.get_single_row()? {
println!("Record not found");
}
```
+## Looking Up Records as Arrow RecordBatch
+
+Use `to_record_batch()` to get lookup results in Arrow format, for example
when integrating with DataFusion.
+```rust
+let result = lookuper.lookup(&key).await?;
+let batch = result.to_record_batch()?;
+println!("Rows: {}", batch.num_rows());
+```