liurenjie1024 commented on code in PR #207:
URL: https://github.com/apache/iceberg-rust/pull/207#discussion_r1506881445


##########
crates/iceberg/src/scan.rs:
##########
@@ -178,9 +241,16 @@ pub struct FileScanTask {
 pub type ArrowRecordBatchStream = BoxStream<'static, 
crate::Result<RecordBatch>>;
 
 impl FileScanTask {
-    /// Returns a stream of arrow record batches.
-    pub async fn execute(&self) -> crate::Result<ArrowRecordBatchStream> {
-        todo!()
+    pub fn data_file(&self) -> ManifestEntryRef {
+        self.data_file.clone()
+    }
+
+    pub fn start(&self) -> u64 {
+        self.start
+    }
+
+    pub fn length(&self) -> u64 {
+        self.length

Review Comment:
   Currently these fields are only used to by `open` methods, so I think it's 
unnecessary to expose them?



##########
crates/iceberg/src/scan.rs:
##########
@@ -163,6 +178,54 @@ impl TableScan {
 
         Ok(iter(file_scan_tasks).boxed())
     }
+
+    /// Transforms a stream of FileScanTasks from plan_files into a stream of
+    /// Arrow RecordBatches.
+    pub fn open(&self, mut tasks: FileScanTaskStream) -> 
crate::Result<ArrowRecordBatchStream> {
+        let file_io = self.file_io.clone();
+        let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
+        let projection_mask = self.get_arrow_projection_mask();
+        let row_selection = self.get_arrow_row_selection();
+
+        Ok(
+            try_stream! {
+                while let Some(Ok(task)) = tasks.next().await {
+                    let parquet_reader = file_io
+                        .new_input(task.data_file().file_path())?
+                        .reader()
+                        .await?;
+
+                    let mut batch_stream = 
ParquetRecordBatchStreamBuilder::new(parquet_reader)
+                        .await
+                        .map_err(|err| {
+                            Error::new(ErrorKind::Unexpected, "failed to load 
parquet file").with_source(err)
+                        })?
+                        .with_batch_size(batch_size)
+                        .with_offset(task.start() as usize)
+                        .with_limit(task.length() as usize)
+                        .with_projection(projection_mask.clone())
+                        .with_row_selection(row_selection.clone())
+                        .build()
+                        .unwrap()
+                        .map_err(|err| Error::new(ErrorKind::Unexpected, "Fail 
to read data").with_source(err));
+
+                    while let Some(batch) = batch_stream.next().await {
+                        yield batch?;
+                    }
+                }
+            }.boxed()
+        )
+    }
+
+    fn get_arrow_projection_mask(&self) -> ProjectionMask {

Review Comment:
   In fact, we need to calculate the `ProjectionMask` for each parquet file due 
to schema evolution of iceberg, the schema projection of each parquet file may 
be different.



##########
crates/iceberg/src/scan.rs:
##########
@@ -163,6 +178,54 @@ impl TableScan {
 
         Ok(iter(file_scan_tasks).boxed())
     }
+
+    /// Transforms a stream of FileScanTasks from plan_files into a stream of
+    /// Arrow RecordBatches.
+    pub fn open(&self, mut tasks: FileScanTaskStream) -> 
crate::Result<ArrowRecordBatchStream> {
+        let file_io = self.file_io.clone();
+        let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
+        let projection_mask = self.get_arrow_projection_mask();
+        let row_selection = self.get_arrow_row_selection();
+
+        Ok(
+            try_stream! {
+                while let Some(Ok(task)) = tasks.next().await {
+                    let parquet_reader = file_io
+                        .new_input(task.data_file().file_path())?
+                        .reader()
+                        .await?;
+
+                    let mut batch_stream = 
ParquetRecordBatchStreamBuilder::new(parquet_reader)
+                        .await
+                        .map_err(|err| {
+                            Error::new(ErrorKind::Unexpected, "failed to load 
parquet file").with_source(err)
+                        })?
+                        .with_batch_size(batch_size)
+                        .with_offset(task.start() as usize)
+                        .with_limit(task.length() as usize)

Review Comment:
   I think we should omit these two for now.  The `start`/`length` was designed 
to be file pos, while `offset`/`limit` was for row number. We need a conversion 
for these, but they can be left for later.



##########
crates/iceberg/src/scan.rs:
##########
@@ -163,6 +178,54 @@ impl TableScan {
 
         Ok(iter(file_scan_tasks).boxed())
     }
+
+    /// Transforms a stream of FileScanTasks from plan_files into a stream of
+    /// Arrow RecordBatches.
+    pub fn open(&self, mut tasks: FileScanTaskStream) -> 
crate::Result<ArrowRecordBatchStream> {
+        let file_io = self.file_io.clone();
+        let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
+        let projection_mask = self.get_arrow_projection_mask();
+        let row_selection = self.get_arrow_row_selection();
+
+        Ok(
+            try_stream! {
+                while let Some(Ok(task)) = tasks.next().await {
+                    let parquet_reader = file_io
+                        .new_input(task.data_file().file_path())?
+                        .reader()
+                        .await?;
+
+                    let mut batch_stream = 
ParquetRecordBatchStreamBuilder::new(parquet_reader)
+                        .await
+                        .map_err(|err| {
+                            Error::new(ErrorKind::Unexpected, "failed to load 
parquet file").with_source(err)
+                        })?
+                        .with_batch_size(batch_size)
+                        .with_offset(task.start() as usize)
+                        .with_limit(task.length() as usize)
+                        .with_projection(projection_mask.clone())
+                        .with_row_selection(row_selection.clone())
+                        .build()
+                        .unwrap()

Review Comment:
   Why we panic here?



##########
crates/iceberg/src/scan.rs:
##########
@@ -163,6 +178,54 @@ impl TableScan {
 
         Ok(iter(file_scan_tasks).boxed())
     }
+
+    /// Transforms a stream of FileScanTasks from plan_files into a stream of
+    /// Arrow RecordBatches.
+    pub fn open(&self, mut tasks: FileScanTaskStream) -> 
crate::Result<ArrowRecordBatchStream> {

Review Comment:
   This method body's implementation is correct for one parquet file. For a 
stream of input files, we can use 
[futures::StreamExt::flat_map](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.flat_map)
 to combine them into a  stream of record batches.



##########
crates/iceberg/src/scan.rs:
##########
@@ -163,6 +178,54 @@ impl TableScan {
 
         Ok(iter(file_scan_tasks).boxed())
     }
+
+    /// Transforms a stream of FileScanTasks from plan_files into a stream of
+    /// Arrow RecordBatches.
+    pub fn open(&self, mut tasks: FileScanTaskStream) -> 
crate::Result<ArrowRecordBatchStream> {
+        let file_io = self.file_io.clone();
+        let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
+        let projection_mask = self.get_arrow_projection_mask();
+        let row_selection = self.get_arrow_row_selection();
+
+        Ok(
+            try_stream! {
+                while let Some(Ok(task)) = tasks.next().await {
+                    let parquet_reader = file_io
+                        .new_input(task.data_file().file_path())?
+                        .reader()
+                        .await?;
+
+                    let mut batch_stream = 
ParquetRecordBatchStreamBuilder::new(parquet_reader)
+                        .await
+                        .map_err(|err| {
+                            Error::new(ErrorKind::Unexpected, "failed to load 
parquet file").with_source(err)
+                        })?
+                        .with_batch_size(batch_size)
+                        .with_offset(task.start() as usize)
+                        .with_limit(task.length() as usize)
+                        .with_projection(projection_mask.clone())
+                        .with_row_selection(row_selection.clone())
+                        .build()
+                        .unwrap()
+                        .map_err(|err| Error::new(ErrorKind::Unexpected, "Fail 
to read data").with_source(err));
+
+                    while let Some(batch) = batch_stream.next().await {
+                        yield batch?;
+                    }
+                }
+            }.boxed()
+        )
+    }
+
+    fn get_arrow_projection_mask(&self) -> ProjectionMask {
+        // TODO, dummy implementation
+        todo!()
+    }
+
+    fn get_arrow_row_selection(&self) -> RowSelection {

Review Comment:
   We can ignore this for now as it's non trivial to do it.



##########
crates/iceberg/src/scan.rs:
##########
@@ -163,6 +178,54 @@ impl TableScan {
 
         Ok(iter(file_scan_tasks).boxed())
     }
+
+    /// Transforms a stream of FileScanTasks from plan_files into a stream of
+    /// Arrow RecordBatches.
+    pub fn open(&self, mut tasks: FileScanTaskStream) -> 
crate::Result<ArrowRecordBatchStream> {
+        let file_io = self.file_io.clone();
+        let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
+        let projection_mask = self.get_arrow_projection_mask();
+        let row_selection = self.get_arrow_row_selection();
+
+        Ok(
+            try_stream! {
+                while let Some(Ok(task)) = tasks.next().await {
+                    let parquet_reader = file_io
+                        .new_input(task.data_file().file_path())?
+                        .reader()
+                        .await?;
+
+                    let mut batch_stream = 
ParquetRecordBatchStreamBuilder::new(parquet_reader)
+                        .await
+                        .map_err(|err| {
+                            Error::new(ErrorKind::Unexpected, "failed to load 
parquet file").with_source(err)

Review Comment:
   ```suggestion
                               Error::new(ErrorKind::Unexpected, "Failed to 
load parquet file").with_source(err).with_context("filename", "<input file 
path>")
   ```
   Please also add context here better error reporting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to