This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new e1de63d3 fix: support empty scans (#1166)
e1de63d3 is described below

commit e1de63d3ec47ee97d2b22cd193634e9aa9ff6ed8
Author: Dan King <[email protected]>
AuthorDate: Sat Apr 5 00:00:26 2025 -0400

    fix: support empty scans (#1166)
    
    ## Which issue does this PR close?
    
    Closes #1145.
    
    ## What changes are included in this PR?
    
    Allow scanning of a table with no snapshots.
    
    AFAICT, iceberg-python has [analogous
    
    
logic](https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/__init__.py#L1684-L1686)
    to bail out when there is no snapshot.
    
    ## Are these changes tested?
    
    Yes: `test_plan_files_on_table_without_any_snapshots`.
---
 crates/iceberg/src/scan/mod.rs                     | 98 ++++++++++++++++++----
 .../testdata/example_empty_table_metadata_v2.json  | 57 +++++++++++++
 2 files changed, 139 insertions(+), 16 deletions(-)

diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs
index 7cddcfeb..776419a9 100644
--- a/crates/iceberg/src/scan/mod.rs
+++ b/crates/iceberg/src/scan/mod.rs
@@ -214,14 +214,23 @@ impl<'a> TableScanBuilder<'a> {
                     )
                 })?
                 .clone(),
-            None => self
-                .table
-                .metadata()
-                .current_snapshot()
-                .ok_or_else(|| {
-                    Error::new(ErrorKind::Unexpected, "Can't scan table 
without snapshots")
-                })?
-                .clone(),
+            None => {
+                let Some(current_snapshot_id) = 
self.table.metadata().current_snapshot() else {
+                    return Ok(TableScan {
+                        batch_size: self.batch_size,
+                        column_names: self.column_names,
+                        file_io: self.table.file_io().clone(),
+                        plan_context: None,
+                        concurrency_limit_data_files: 
self.concurrency_limit_data_files,
+                        concurrency_limit_manifest_entries: 
self.concurrency_limit_manifest_entries,
+                        concurrency_limit_manifest_files: 
self.concurrency_limit_manifest_files,
+                        row_group_filtering_enabled: 
self.row_group_filtering_enabled,
+                        row_selection_enabled: self.row_selection_enabled,
+                        delete_file_processing_enabled: 
self.delete_file_processing_enabled,
+                    });
+                };
+                current_snapshot_id.clone()
+            }
         };
 
         let schema = snapshot.schema(self.table.metadata())?;
@@ -302,7 +311,7 @@ impl<'a> TableScanBuilder<'a> {
             batch_size: self.batch_size,
             column_names: self.column_names,
             file_io: self.table.file_io().clone(),
-            plan_context,
+            plan_context: Some(plan_context),
             concurrency_limit_data_files: self.concurrency_limit_data_files,
             concurrency_limit_manifest_entries: 
self.concurrency_limit_manifest_entries,
             concurrency_limit_manifest_files: 
self.concurrency_limit_manifest_files,
@@ -316,7 +325,10 @@ impl<'a> TableScanBuilder<'a> {
 /// Table scan.
 #[derive(Debug)]
 pub struct TableScan {
-    plan_context: PlanContext,
+    /// A [PlanContext], if this table has at least one snapshot, otherwise 
None.
+    ///
+    /// If this is None, then the scan contains no rows.
+    plan_context: Option<PlanContext>,
     batch_size: Option<usize>,
     file_io: FileIO,
     column_names: Option<Vec<String>>,
@@ -340,6 +352,10 @@ pub struct TableScan {
 impl TableScan {
     /// Returns a stream of [`FileScanTask`]s.
     pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
+        let Some(plan_context) = self.plan_context.as_ref() else {
+            return Ok(Box::pin(futures::stream::empty()));
+        };
+
         let concurrency_limit_manifest_files = 
self.concurrency_limit_manifest_files;
         let concurrency_limit_manifest_entries = 
self.concurrency_limit_manifest_entries;
 
@@ -359,12 +375,12 @@ impl TableScan {
                 None
             };
 
-        let manifest_list = self.plan_context.get_manifest_list().await?;
+        let manifest_list = plan_context.get_manifest_list().await?;
 
         // get the [`ManifestFile`]s from the [`ManifestList`], filtering out 
any
         // whose partitions cannot match this
         // scan's filter
-        let manifest_file_contexts = 
self.plan_context.build_manifest_file_contexts(
+        let manifest_file_contexts = plan_context.build_manifest_file_contexts(
             manifest_list,
             manifest_entry_data_ctx_tx,
             delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| {
@@ -463,8 +479,8 @@ impl TableScan {
     }
 
     /// Returns a reference to the snapshot of the table scan.
-    pub fn snapshot(&self) -> &SnapshotRef {
-        &self.plan_context.snapshot
+    pub fn snapshot(&self) -> Option<&SnapshotRef> {
+        self.plan_context.as_ref().map(|x| &x.snapshot)
     }
 
     async fn process_data_manifest_entry(
@@ -652,6 +668,45 @@ pub mod tests {
             }
         }
 
+        #[allow(clippy::new_without_default)]
+        pub fn new_empty() -> Self {
+            let tmp_dir = TempDir::new().unwrap();
+            let table_location = tmp_dir.path().join("table1");
+            let table_metadata1_location = 
table_location.join("metadata/v1.json");
+
+            let file_io = 
FileIO::from_path(table_location.as_os_str().to_str().unwrap())
+                .unwrap()
+                .build()
+                .unwrap();
+
+            let table_metadata = {
+                let template_json_str = fs::read_to_string(format!(
+                    "{}/testdata/example_empty_table_metadata_v2.json",
+                    env!("CARGO_MANIFEST_DIR")
+                ))
+                .unwrap();
+                let mut context = Context::new();
+                context.insert("table_location", &table_location);
+                context.insert("table_metadata_1_location", 
&table_metadata1_location);
+
+                let metadata_json = Tera::one_off(&template_json_str, 
&context, false).unwrap();
+                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
+            };
+
+            let table = Table::builder()
+                .metadata(table_metadata)
+                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
+                .file_io(file_io.clone())
+                
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
+                .build()
+                .unwrap();
+
+            Self {
+                table_location: table_location.to_str().unwrap().to_string(),
+                table,
+            }
+        }
+
         pub fn new_unpartitioned() -> Self {
             let tmp_dir = TempDir::new().unwrap();
             let table_location = tmp_dir.path().join("table1");
@@ -1178,7 +1233,7 @@ pub mod tests {
         let table_scan = table.scan().build().unwrap();
         assert_eq!(
             table.metadata().current_snapshot().unwrap().snapshot_id(),
-            table_scan.snapshot().snapshot_id()
+            table_scan.snapshot().unwrap().snapshot_id()
         );
     }
 
@@ -1200,7 +1255,18 @@ pub mod tests {
             .with_row_selection_enabled(true)
             .build()
             .unwrap();
-        assert_eq!(table_scan.snapshot().snapshot_id(), 3051729675574597004);
+        assert_eq!(
+            table_scan.snapshot().unwrap().snapshot_id(),
+            3051729675574597004
+        );
+    }
+
+    #[tokio::test]
+    async fn test_plan_files_on_table_without_any_snapshots() {
+        let table = TableTestFixture::new_empty().table;
+        let batch_stream = 
table.scan().build().unwrap().to_arrow().await.unwrap();
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+        assert!(batches.is_empty());
     }
 
     #[tokio::test]
diff --git a/crates/iceberg/testdata/example_empty_table_metadata_v2.json 
b/crates/iceberg/testdata/example_empty_table_metadata_v2.json
new file mode 100644
index 00000000..75e09b09
--- /dev/null
+++ b/crates/iceberg/testdata/example_empty_table_metadata_v2.json
@@ -0,0 +1,57 @@
+{
+  "format-version": 2,
+  "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
+  "location": "{{ table_location }}",
+  "last-sequence-number": 34,
+  "last-updated-ms": 1602638573590,
+  "last-column-id": 3,
+  "current-schema-id": 1,
+  "schemas": [
+    {
+      "type": "struct",
+      "schema-id": 0,
+      "fields": [
+        {"id": 1, "name": "x", "required": true, "type": "long"}
+      ]},
+    {
+      "type": "struct",
+      "schema-id": 1,
+      "identifier-field-ids": [1, 2],
+      "fields": [
+        {"id": 1, "name": "x", "required": true, "type": "long"},
+        {"id": 2, "name": "y", "required": true, "type": "long", "doc": 
"comment"},
+        {"id": 3, "name": "z", "required": true, "type": "long"},
+        {"id": 4, "name": "a", "required": true, "type": "string"},
+        {"id": 5, "name": "dbl", "required": true, "type": "double"},
+        {"id": 6, "name": "i32", "required": true, "type": "int"},
+        {"id": 7, "name": "i64", "required": true, "type": "long"},
+        {"id": 8, "name": "bool", "required": true, "type": "boolean"}
+      ]
+    }
+  ],
+  "default-spec-id": 0,
+  "partition-specs": [
+    {
+      "spec-id": 0,
+      "fields": [
+        {"name": "x", "transform": "identity", "source-id": 1, "field-id": 
1000}
+      ]
+    }
+  ],
+  "last-partition-id": 1000,
+  "default-sort-order-id": 3,
+  "sort-orders": [
+    {
+      "order-id": 3,
+      "fields": [
+        {"transform": "identity", "source-id": 2, "direction": "asc", 
"null-order": "nulls-first"},
+        {"transform": "bucket[4]", "source-id": 3, "direction": "desc", 
"null-order": "nulls-last"}
+      ]
+    }
+  ],
+  "properties": {"read.split.target.size": "134217728"},
+  "snapshots": [],
+  "snapshot-log": [],
+  "metadata-log": [{"metadata-file": "{{ table_metadata_1_location }}", 
"timestamp-ms": 1515100}],
+  "refs": {}
+}

Reply via email to