fresh-borzoni commented on code in PR #411:
URL: https://github.com/apache/fluss-rust/pull/411#discussion_r2877847701
##########
crates/fluss/src/client/table/lookup.rs:
##########
@@ -20,13 +20,16 @@ use crate::client::metadata::Metadata;
use crate::client::table::partition_getter::PartitionGetter;
use crate::error::{Error, Result};
use crate::metadata::{PhysicalTablePath, RowType, TableBucket, TableInfo,
TablePath};
+use crate::record::ArrowRecordBatchInnerBuilder;
Review Comment:
Do we use it?
##########
crates/fluss/src/client/table/lookup.rs:
##########
@@ -85,6 +88,35 @@ impl LookupResult {
.map(|bytes| CompactedRow::from_bytes(&self.row_type,
&bytes[SCHEMA_ID_LENGTH..]))
.collect()
}
+ /// Converts all rows in this result into an Arrow [`RecordBatch`].
+ ///
+ /// This is useful for integration with DataFusion or other Arrow-based
tools.
+ ///
+ /// # Returns
+ /// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an
empty
+ /// batch (with the correct schema) if the result set is empty.
+ /// - `Err(Error)` - If the conversion fails.
+ pub fn to_record_batch(&self) -> Result<RecordBatch> {
Review Comment:
it's a new user facing API, shall we add tests and docs?
##########
crates/fluss/src/client/table/lookup.rs:
##########
@@ -85,6 +88,35 @@ impl LookupResult {
.map(|bytes| CompactedRow::from_bytes(&self.row_type,
&bytes[SCHEMA_ID_LENGTH..]))
.collect()
}
+ /// Converts all rows in this result into an Arrow [`RecordBatch`].
+ ///
+ /// This is useful for integration with DataFusion or other Arrow-based
tools.
+ ///
+ /// # Returns
+ /// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an
empty
+ /// batch (with the correct schema) if the result set is empty.
+ /// - `Err(Error)` - If the conversion fails.
+ pub fn to_record_batch(&self) -> Result<RecordBatch> {
+ let mut builder = RowAppendRecordBatchBuilder::new(&self.row_type)?;
+
+ for bytes in &self.rows {
+ let payload = bytes.get(SCHEMA_ID_LENGTH..).ok_or_else(|| {
Review Comment:
do you mind to also change `get_single_row()` and `get_rows()` to the same
pattern?
or it's inconsistent with this change
##########
crates/fluss/src/client/table/lookup.rs:
##########
@@ -85,6 +88,35 @@ impl LookupResult {
.map(|bytes| CompactedRow::from_bytes(&self.row_type,
&bytes[SCHEMA_ID_LENGTH..]))
.collect()
}
+ /// Converts all rows in this result into an Arrow [`RecordBatch`].
+ ///
+ /// This is useful for integration with DataFusion or other Arrow-based
tools.
+ ///
+ /// # Returns
+ /// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an
empty
+ /// batch (with the correct schema) if the result set is empty.
+ /// - `Err(Error)` - If the conversion fails.
+ pub fn to_record_batch(&self) -> Result<RecordBatch> {
Review Comment:
what about bindings?
##########
crates/fluss/src/client/table/lookup.rs:
##########
@@ -85,6 +88,35 @@ impl LookupResult {
.map(|bytes| CompactedRow::from_bytes(&self.row_type,
&bytes[SCHEMA_ID_LENGTH..]))
.collect()
}
+ /// Converts all rows in this result into an Arrow [`RecordBatch`].
+ ///
+ /// This is useful for integration with DataFusion or other Arrow-based
tools.
+ ///
+ /// # Returns
+ /// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an
empty
+ /// batch (with the correct schema) if the result set is empty.
+ /// - `Err(Error)` - If the conversion fails.
+ pub fn to_record_batch(&self) -> Result<RecordBatch> {
+ let mut builder = RowAppendRecordBatchBuilder::new(&self.row_type)?;
+
+ for bytes in &self.rows {
+ let payload = bytes.get(SCHEMA_ID_LENGTH..).ok_or_else(|| {
+ Error::RowConvertError {
+ message: format!(
+ "LookupResult row payload too short: {} bytes, need at
least {} bytes for schema id",
+ bytes.len(),
+ SCHEMA_ID_LENGTH
+ ),
+ }
+ })?;
+
+ let row = CompactedRow::from_bytes(&self.row_type, payload);
+ builder.append(&row)?;
+ }
+
+ let arc_batch = builder.build_arrow_record_batch()?;
Review Comment:
```rust
builder.build_arrow_record_batch().map(Arc::unwrap_or_clone)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]