liurenjie1024 commented on code in PR #207:
URL: https://github.com/apache/iceberg-rust/pull/207#discussion_r1506881445
##########
crates/iceberg/src/scan.rs:
##########
@@ -178,9 +241,16 @@ pub struct FileScanTask {
pub type ArrowRecordBatchStream = BoxStream<'static,
crate::Result<RecordBatch>>;
impl FileScanTask {
- /// Returns a stream of arrow record batches.
- pub async fn execute(&self) -> crate::Result<ArrowRecordBatchStream> {
- todo!()
+ pub fn data_file(&self) -> ManifestEntryRef {
+ self.data_file.clone()
+ }
+
+ pub fn start(&self) -> u64 {
+ self.start
+ }
+
+ pub fn length(&self) -> u64 {
+ self.length
Review Comment:
Currently these fields are only used to by `open` methods, so I think it's
unnecessary to expose them?
##########
crates/iceberg/src/scan.rs:
##########
@@ -163,6 +178,54 @@ impl TableScan {
Ok(iter(file_scan_tasks).boxed())
}
+
+ /// Transforms a stream of FileScanTasks from plan_files into a stream of
+ /// Arrow RecordBatches.
+ pub fn open(&self, mut tasks: FileScanTaskStream) ->
crate::Result<ArrowRecordBatchStream> {
+ let file_io = self.file_io.clone();
+ let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
+ let projection_mask = self.get_arrow_projection_mask();
+ let row_selection = self.get_arrow_row_selection();
+
+ Ok(
+ try_stream! {
+ while let Some(Ok(task)) = tasks.next().await {
+ let parquet_reader = file_io
+ .new_input(task.data_file().file_path())?
+ .reader()
+ .await?;
+
+ let mut batch_stream =
ParquetRecordBatchStreamBuilder::new(parquet_reader)
+ .await
+ .map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "failed to load
parquet file").with_source(err)
+ })?
+ .with_batch_size(batch_size)
+ .with_offset(task.start() as usize)
+ .with_limit(task.length() as usize)
+ .with_projection(projection_mask.clone())
+ .with_row_selection(row_selection.clone())
+ .build()
+ .unwrap()
+ .map_err(|err| Error::new(ErrorKind::Unexpected, "Fail
to read data").with_source(err));
+
+ while let Some(batch) = batch_stream.next().await {
+ yield batch?;
+ }
+ }
+ }.boxed()
+ )
+ }
+
+ fn get_arrow_projection_mask(&self) -> ProjectionMask {
Review Comment:
In fact, we need to calculate the `ProjectionMask` for each parquet file due
to schema evolution of iceberg, the schema projection of each parquet file may
be different.
##########
crates/iceberg/src/scan.rs:
##########
@@ -163,6 +178,54 @@ impl TableScan {
Ok(iter(file_scan_tasks).boxed())
}
+
+ /// Transforms a stream of FileScanTasks from plan_files into a stream of
+ /// Arrow RecordBatches.
+ pub fn open(&self, mut tasks: FileScanTaskStream) ->
crate::Result<ArrowRecordBatchStream> {
+ let file_io = self.file_io.clone();
+ let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
+ let projection_mask = self.get_arrow_projection_mask();
+ let row_selection = self.get_arrow_row_selection();
+
+ Ok(
+ try_stream! {
+ while let Some(Ok(task)) = tasks.next().await {
+ let parquet_reader = file_io
+ .new_input(task.data_file().file_path())?
+ .reader()
+ .await?;
+
+ let mut batch_stream =
ParquetRecordBatchStreamBuilder::new(parquet_reader)
+ .await
+ .map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "failed to load
parquet file").with_source(err)
+ })?
+ .with_batch_size(batch_size)
+ .with_offset(task.start() as usize)
+ .with_limit(task.length() as usize)
Review Comment:
I think we should omit these two for now. The `start`/`length` was designed
to be file pos, while `offset`/`limit` was for row number. We need a conversion
for these, but they can be left for later.
##########
crates/iceberg/src/scan.rs:
##########
@@ -163,6 +178,54 @@ impl TableScan {
Ok(iter(file_scan_tasks).boxed())
}
+
+ /// Transforms a stream of FileScanTasks from plan_files into a stream of
+ /// Arrow RecordBatches.
+ pub fn open(&self, mut tasks: FileScanTaskStream) ->
crate::Result<ArrowRecordBatchStream> {
+ let file_io = self.file_io.clone();
+ let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
+ let projection_mask = self.get_arrow_projection_mask();
+ let row_selection = self.get_arrow_row_selection();
+
+ Ok(
+ try_stream! {
+ while let Some(Ok(task)) = tasks.next().await {
+ let parquet_reader = file_io
+ .new_input(task.data_file().file_path())?
+ .reader()
+ .await?;
+
+ let mut batch_stream =
ParquetRecordBatchStreamBuilder::new(parquet_reader)
+ .await
+ .map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "failed to load
parquet file").with_source(err)
+ })?
+ .with_batch_size(batch_size)
+ .with_offset(task.start() as usize)
+ .with_limit(task.length() as usize)
+ .with_projection(projection_mask.clone())
+ .with_row_selection(row_selection.clone())
+ .build()
+ .unwrap()
Review Comment:
Why we panic here?
##########
crates/iceberg/src/scan.rs:
##########
@@ -163,6 +178,54 @@ impl TableScan {
Ok(iter(file_scan_tasks).boxed())
}
+
+ /// Transforms a stream of FileScanTasks from plan_files into a stream of
+ /// Arrow RecordBatches.
+ pub fn open(&self, mut tasks: FileScanTaskStream) ->
crate::Result<ArrowRecordBatchStream> {
Review Comment:
This method body's implementation is correct for one parquet file. For a
stream of input files, we can use
[futures::StreamExt::flat_map](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.flat_map)
to combine them into a stream of record batches.
##########
crates/iceberg/src/scan.rs:
##########
@@ -163,6 +178,54 @@ impl TableScan {
Ok(iter(file_scan_tasks).boxed())
}
+
+ /// Transforms a stream of FileScanTasks from plan_files into a stream of
+ /// Arrow RecordBatches.
+ pub fn open(&self, mut tasks: FileScanTaskStream) ->
crate::Result<ArrowRecordBatchStream> {
+ let file_io = self.file_io.clone();
+ let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
+ let projection_mask = self.get_arrow_projection_mask();
+ let row_selection = self.get_arrow_row_selection();
+
+ Ok(
+ try_stream! {
+ while let Some(Ok(task)) = tasks.next().await {
+ let parquet_reader = file_io
+ .new_input(task.data_file().file_path())?
+ .reader()
+ .await?;
+
+ let mut batch_stream =
ParquetRecordBatchStreamBuilder::new(parquet_reader)
+ .await
+ .map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "failed to load
parquet file").with_source(err)
+ })?
+ .with_batch_size(batch_size)
+ .with_offset(task.start() as usize)
+ .with_limit(task.length() as usize)
+ .with_projection(projection_mask.clone())
+ .with_row_selection(row_selection.clone())
+ .build()
+ .unwrap()
+ .map_err(|err| Error::new(ErrorKind::Unexpected, "Fail
to read data").with_source(err));
+
+ while let Some(batch) = batch_stream.next().await {
+ yield batch?;
+ }
+ }
+ }.boxed()
+ )
+ }
+
+ fn get_arrow_projection_mask(&self) -> ProjectionMask {
+ // TODO, dummy implementation
+ todo!()
+ }
+
+ fn get_arrow_row_selection(&self) -> RowSelection {
Review Comment:
We can ignore this for now as it's non trivial to do it.
##########
crates/iceberg/src/scan.rs:
##########
@@ -163,6 +178,54 @@ impl TableScan {
Ok(iter(file_scan_tasks).boxed())
}
+
+ /// Transforms a stream of FileScanTasks from plan_files into a stream of
+ /// Arrow RecordBatches.
+ pub fn open(&self, mut tasks: FileScanTaskStream) ->
crate::Result<ArrowRecordBatchStream> {
+ let file_io = self.file_io.clone();
+ let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
+ let projection_mask = self.get_arrow_projection_mask();
+ let row_selection = self.get_arrow_row_selection();
+
+ Ok(
+ try_stream! {
+ while let Some(Ok(task)) = tasks.next().await {
+ let parquet_reader = file_io
+ .new_input(task.data_file().file_path())?
+ .reader()
+ .await?;
+
+ let mut batch_stream =
ParquetRecordBatchStreamBuilder::new(parquet_reader)
+ .await
+ .map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "failed to load
parquet file").with_source(err)
Review Comment:
```suggestion
Error::new(ErrorKind::Unexpected, "Failed to
load parquet file").with_source(err).with_context("filename", "<input file
path>")
```
Please also add context here better error reporting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]