sdd commented on code in PR #207:
URL: https://github.com/apache/iceberg-rust/pull/207#discussion_r1507126728


##########
crates/iceberg/src/scan.rs:
##########
@@ -163,6 +178,54 @@ impl TableScan {
 
         Ok(iter(file_scan_tasks).boxed())
     }
+
+    /// Transforms a stream of FileScanTasks from plan_files into a stream of
+    /// Arrow RecordBatches.
+    pub fn open(&self, mut tasks: FileScanTaskStream) -> 
crate::Result<ArrowRecordBatchStream> {
+        let file_io = self.file_io.clone();
+        let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
+        let projection_mask = self.get_arrow_projection_mask();
+        let row_selection = self.get_arrow_row_selection();
+
+        Ok(
+            try_stream! {
+                while let Some(Ok(task)) = tasks.next().await {
+                    let parquet_reader = file_io
+                        .new_input(task.data_file().file_path())?
+                        .reader()
+                        .await?;
+
+                    let mut batch_stream = 
ParquetRecordBatchStreamBuilder::new(parquet_reader)
+                        .await
+                        .map_err(|err| {
+                            Error::new(ErrorKind::Unexpected, "failed to load 
parquet file").with_source(err)
+                        })?
+                        .with_batch_size(batch_size)
+                        .with_offset(task.start() as usize)
+                        .with_limit(task.length() as usize)
+                        .with_projection(projection_mask.clone())
+                        .with_row_selection(row_selection.clone())
+                        .build()
+                        .unwrap()
+                        .map_err(|err| Error::new(ErrorKind::Unexpected, "Fail 
to read data").with_source(err));
+
+                    while let Some(batch) = batch_stream.next().await {
+                        yield batch?;
+                    }
+                }
+            }.boxed()
+        )
+    }
+
+    fn get_arrow_projection_mask(&self) -> ProjectionMask {

Review Comment:
   Aah, of course. Will update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to