Copilot commented on code in PR #411:
URL: https://github.com/apache/fluss-rust/pull/411#discussion_r2875822104
##########
crates/fluss/src/client/table/lookup.rs:
##########
@@ -85,6 +88,24 @@ impl LookupResult {
.map(|bytes| CompactedRow::from_bytes(&self.row_type,
&bytes[SCHEMA_ID_LENGTH..]))
.collect()
}
+ /// Converts all rows in this result into an Arrow [`RecordBatch`].
+ ///
+ /// This is useful for integration with DataFusion or other Arrow-based
tools.
+ ///
+ /// # Returns
+ /// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an
empty
+ /// batch (with the correct schema) if the result set is empty.
+ /// - `Err(Error)` - If the conversion fails.
+ pub fn to_record_batch(&self) -> Result<RecordBatch> {
+ let mut builder = RowAppendRecordBatchBuilder::new(&self.row_type)?;
+ for bytes in &self.rows {
+ let row = CompactedRow::from_bytes(&self.row_type,
&bytes[SCHEMA_ID_LENGTH..]);
+ builder.append(&row)?;
Review Comment:
`&bytes[SCHEMA_ID_LENGTH..]` will panic if a returned row payload is shorter
than `SCHEMA_ID_LENGTH` (e.g., corrupted/partial response). Since this is a new
public API, it would be better to avoid panicking here and instead return a
structured `Error` by using a checked slice (`bytes.get(SCHEMA_ID_LENGTH..)`)
and failing gracefully if it’s missing.
##########
crates/fluss/src/client/table/lookup.rs:
##########
@@ -85,6 +88,24 @@ impl LookupResult {
.map(|bytes| CompactedRow::from_bytes(&self.row_type,
&bytes[SCHEMA_ID_LENGTH..]))
.collect()
}
+ /// Converts all rows in this result into an Arrow [`RecordBatch`].
+ ///
+ /// This is useful for integration with DataFusion or other Arrow-based
tools.
+ ///
+ /// # Returns
+ /// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an
empty
+ /// batch (with the correct schema) if the result set is empty.
+ /// - `Err(Error)` - If the conversion fails.
+ pub fn to_record_batch(&self) -> Result<RecordBatch> {
+ let mut builder = RowAppendRecordBatchBuilder::new(&self.row_type)?;
+ for bytes in &self.rows {
+ let row = CompactedRow::from_bytes(&self.row_type,
&bytes[SCHEMA_ID_LENGTH..]);
+ builder.append(&row)?;
+ }
+ let arc_batch = builder.build_arrow_record_batch()?;
+ // Unwrap the Arc — if we're the only owner, take it directly;
otherwise clone.
+ Ok(Arc::try_unwrap(arc_batch).unwrap_or_else(|arc: Arc<RecordBatch>|
(*arc).clone()))
Review Comment:
The `Arc::try_unwrap(...).unwrap_or_else(...)` logic is equivalent to
`Arc::unwrap_or_clone(arc_batch)` on the current MSRV (1.85), which is simpler
and avoids the extra closure/type annotation noise.
```suggestion
Ok(Arc::unwrap_or_clone(arc_batch))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]