This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ad3119  [client] Add LookupResult::to_record_batch() (#411)
3ad3119 is described below

commit 3ad3119d8e52269b3cfa393836d5e64a63093be5
Author: Prajwal banakar <[email protected]>
AuthorDate: Sun Mar 15 07:20:11 2026 +0530

    [client] Add LookupResult::to_record_batch() (#411)
---
 crates/fluss/src/client/table/lookup.rs            | 124 +++++++++++++++++++--
 crates/fluss/src/record/arrow.rs                   |   9 ++
 website/docs/user-guide/rust/api-reference.md      |   3 +-
 .../user-guide/rust/example/primary-key-tables.md  |   8 ++
 4 files changed, 135 insertions(+), 9 deletions(-)

diff --git a/crates/fluss/src/client/table/lookup.rs 
b/crates/fluss/src/client/table/lookup.rs
index ce15491..3d643ed 100644
--- a/crates/fluss/src/client/table/lookup.rs
+++ b/crates/fluss/src/client/table/lookup.rs
@@ -20,6 +20,7 @@ use crate::client::metadata::Metadata;
 use crate::client::table::partition_getter::PartitionGetter;
 use crate::error::{Error, Result};
 use crate::metadata::{PhysicalTablePath, RowType, TableBucket, TableInfo, 
TablePath};
+use crate::record::RowAppendRecordBatchBuilder;
 use crate::record::kv::SCHEMA_ID_LENGTH;
 use crate::row::InternalRow;
 use crate::row::compacted::CompactedRow;
@@ -27,6 +28,7 @@ use crate::row::encode::{KeyEncoder, KeyEncoderFactory};
 use crate::rpc::ApiError;
 use crate::rpc::RpcClient;
 use crate::rpc::message::LookupRequest;
+use arrow::array::RecordBatch;
 use std::sync::Arc;
 
 /// The result of a lookup operation.
@@ -53,6 +55,19 @@ impl LookupResult {
         }
     }
 
+    /// Extracts the row payload by stripping the schema id prefix.
+    fn extract_payload(bytes: &[u8]) -> Result<&[u8]> {
+        bytes
+            .get(SCHEMA_ID_LENGTH..)
+            .ok_or_else(|| Error::RowConvertError {
+                message: format!(
+                    "Row payload too short: {} bytes, need at least {} for 
schema id",
+                    bytes.len(),
+                    SCHEMA_ID_LENGTH
+                ),
+            })
+    }
+
     /// Returns the only row in the result set as a [`CompactedRow`].
     ///
     /// This method provides a zero-copy view of the row data, which means the 
returned
@@ -62,14 +77,14 @@ impl LookupResult {
     /// - `Ok(Some(row))`: If exactly one row exists.
     /// - `Ok(None)`: If the result set is empty.
     /// - `Err(Error::UnexpectedError)`: If the result set contains more than 
one row.
-    ///
+    /// - `Err(Error)`: If the row payload is too short to contain a schema id.
     pub fn get_single_row(&self) -> Result<Option<CompactedRow<'_>>> {
         match self.rows.len() {
             0 => Ok(None),
-            1 => Ok(Some(CompactedRow::from_bytes(
-                &self.row_type,
-                &self.rows[0][SCHEMA_ID_LENGTH..],
-            ))),
+            1 => {
+                let payload = Self::extract_payload(&self.rows[0])?;
+                Ok(Some(CompactedRow::from_bytes(&self.row_type, payload)))
+            }
             _ => Err(Error::UnexpectedError {
                 message: "LookupResult contains multiple rows, use get_rows() 
instead".to_string(),
                 source: None,
@@ -77,14 +92,42 @@ impl LookupResult {
         }
     }
 
-    /// Returns all rows as CompactedRows.
-    pub fn get_rows(&self) -> Vec<CompactedRow<'_>> {
+    /// Returns all rows in the result set as [`CompactedRow`]s.
+    ///
+    /// # Returns
+    /// - `Ok(rows)` - All rows in the result set.
+    /// - `Err(Error)` - If any row payload is too short to contain a schema 
id.
+    pub fn get_rows(&self) -> Result<Vec<CompactedRow<'_>>> {
         self.rows
             .iter()
             // TODO Add schema id check and fetch when implementing prefix 
lookup
-            .map(|bytes| CompactedRow::from_bytes(&self.row_type, 
&bytes[SCHEMA_ID_LENGTH..]))
+            .map(|bytes| {
+                let payload = Self::extract_payload(bytes)?;
+                Ok(CompactedRow::from_bytes(&self.row_type, payload))
+            })
             .collect()
     }
+
+    /// Converts all rows in this result into an Arrow [`RecordBatch`].
+    ///
+    /// This is useful for integration with DataFusion or other Arrow-based 
tools.
+    ///
+    /// # Returns
+    /// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an 
empty
+    ///   batch (with the correct schema) if the result set is empty.
+    /// - `Err(Error)` - If the conversion fails.
+    pub fn to_record_batch(&self) -> Result<RecordBatch> {
+        let mut builder = RowAppendRecordBatchBuilder::new(&self.row_type)?;
+
+        for bytes in &self.rows {
+            let payload = Self::extract_payload(bytes)?;
+
+            let row = CompactedRow::from_bytes(&self.row_type, payload);
+            builder.append(&row)?;
+        }
+
+        builder.build_arrow_record_batch().map(Arc::unwrap_or_clone)
+    }
 }
 
 /// Configuration and factory struct for creating lookup operations.
@@ -306,3 +349,68 @@ impl Lookuper {
         &self.table_info
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::{DataField, DataTypes};
+    use crate::row::binary::BinaryWriter;
+    use crate::row::compacted::CompactedRowWriter;
+    use arrow::array::Int32Array;
+
+    fn make_row_bytes(schema_id: i16, row_data: &[u8]) -> Vec<u8> {
+        let mut bytes = Vec::with_capacity(SCHEMA_ID_LENGTH + row_data.len());
+        bytes.extend_from_slice(&schema_id.to_le_bytes());
+        bytes.extend_from_slice(row_data);
+        bytes
+    }
+
+    #[test]
+    fn test_to_record_batch_empty() {
+        let row_type = Arc::new(RowType::new(vec![DataField::new(
+            "id",
+            DataTypes::int(),
+            None,
+        )]));
+        let result = LookupResult::empty(row_type);
+        let batch = result.to_record_batch().unwrap();
+        assert_eq!(batch.num_rows(), 0);
+        assert_eq!(batch.num_columns(), 1);
+    }
+
+    #[test]
+    fn test_to_record_batch_with_row() {
+        let row_type = Arc::new(RowType::new(vec![DataField::new(
+            "id",
+            DataTypes::int(),
+            None,
+        )]));
+
+        let mut writer = CompactedRowWriter::new(1);
+        writer.write_int(42);
+        let row_bytes = make_row_bytes(0, writer.buffer());
+
+        let result = LookupResult::new(vec![row_bytes], Arc::clone(&row_type));
+        let batch = result.to_record_batch().unwrap();
+
+        assert_eq!(batch.num_rows(), 1);
+        let col = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(col.value(0), 42);
+    }
+
+    #[test]
+    fn test_to_record_batch_payload_too_short() {
+        let row_type = Arc::new(RowType::new(vec![DataField::new(
+            "id",
+            DataTypes::int(),
+            None,
+        )]));
+        // Only 1 byte — shorter than SCHEMA_ID_LENGTH (2)
+        let result = LookupResult::new(vec![vec![0u8]], Arc::clone(&row_type));
+        assert!(result.to_record_batch().is_err());
+    }
+}
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index a0dfc84..83d102a 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -246,6 +246,15 @@ impl RowAppendRecordBatchBuilder {
             records_count: 0,
         })
     }
+    /// Appends a row to the builder.
+    pub fn append(&mut self, row: &dyn InternalRow) -> Result<bool> {
+        ArrowRecordBatchInnerBuilder::append(self, row)
+    }
+
+    /// Builds the final Arrow RecordBatch.
+    pub fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>> {
+        ArrowRecordBatchInnerBuilder::build_arrow_record_batch(self)
+    }
 }
 
 impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder {
diff --git a/website/docs/user-guide/rust/api-reference.md 
b/website/docs/user-guide/rust/api-reference.md
index 7f45522..fbe3428 100644
--- a/website/docs/user-guide/rust/api-reference.md
+++ b/website/docs/user-guide/rust/api-reference.md
@@ -229,7 +229,8 @@ for record in records {
 | Method                                                         |  
Description                     |
 
|----------------------------------------------------------------|----------------------------------|
 | `fn get_single_row(&self) -> Result<Option<impl InternalRow>>` | Get a 
single row from the result |
-| `fn get_rows(&self) -> Vec<impl InternalRow>`                  | Get all 
rows from the result     |
+| `fn get_rows(&self) -> Result<Vec<impl InternalRow>>`          | Get all 
rows from the result     |
+| `fn to_record_batch(&self) -> Result<RecordBatch>`             | Convert all 
rows to an Arrow `RecordBatch` for DataFusion or other Arrow-based tools    |
 
 ## `WriteResultFuture`
 
diff --git a/website/docs/user-guide/rust/example/primary-key-tables.md 
b/website/docs/user-guide/rust/example/primary-key-tables.md
index 9e81979..7fe8a55 100644
--- a/website/docs/user-guide/rust/example/primary-key-tables.md
+++ b/website/docs/user-guide/rust/example/primary-key-tables.md
@@ -112,3 +112,11 @@ if let Some(row) = result.get_single_row()? {
     println!("Record not found");
 }
 ```
+## Looking Up Records as Arrow RecordBatch
+
+Use `to_record_batch()` to get lookup results in Arrow format, for example 
when integrating with DataFusion.
+```rust
+let result = lookuper.lookup(&key).await?;
+let batch = result.to_record_batch()?;
+println!("Rows: {}", batch.num_rows());
+```

Reply via email to