This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e1de63d3 fix: support empty scans (#1166)
e1de63d3 is described below
commit e1de63d3ec47ee97d2b22cd193634e9aa9ff6ed8
Author: Dan King <[email protected]>
AuthorDate: Sat Apr 5 00:00:26 2025 -0400
fix: support empty scans (#1166)
## Which issue does this PR close?
Closes #1145.
## What changes are included in this PR?
Allow scanning of a table with no snapshots.
AFAICT, iceberg-python has [analogous
logic](https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/__init__.py#L1684-L1686)
to bail out when there is no snapshot.
## Are these changes tested?
Yes: `test_plan_files_on_table_without_any_snapshots`.
---
crates/iceberg/src/scan/mod.rs | 98 ++++++++++++++++++----
.../testdata/example_empty_table_metadata_v2.json | 57 +++++++++++++
2 files changed, 139 insertions(+), 16 deletions(-)
diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs
index 7cddcfeb..776419a9 100644
--- a/crates/iceberg/src/scan/mod.rs
+++ b/crates/iceberg/src/scan/mod.rs
@@ -214,14 +214,23 @@ impl<'a> TableScanBuilder<'a> {
)
})?
.clone(),
- None => self
- .table
- .metadata()
- .current_snapshot()
- .ok_or_else(|| {
- Error::new(ErrorKind::Unexpected, "Can't scan table
without snapshots")
- })?
- .clone(),
+ None => {
+ let Some(current_snapshot_id) =
self.table.metadata().current_snapshot() else {
+ return Ok(TableScan {
+ batch_size: self.batch_size,
+ column_names: self.column_names,
+ file_io: self.table.file_io().clone(),
+ plan_context: None,
+ concurrency_limit_data_files:
self.concurrency_limit_data_files,
+ concurrency_limit_manifest_entries:
self.concurrency_limit_manifest_entries,
+ concurrency_limit_manifest_files:
self.concurrency_limit_manifest_files,
+ row_group_filtering_enabled:
self.row_group_filtering_enabled,
+ row_selection_enabled: self.row_selection_enabled,
+ delete_file_processing_enabled:
self.delete_file_processing_enabled,
+ });
+ };
+ current_snapshot_id.clone()
+ }
};
let schema = snapshot.schema(self.table.metadata())?;
@@ -302,7 +311,7 @@ impl<'a> TableScanBuilder<'a> {
batch_size: self.batch_size,
column_names: self.column_names,
file_io: self.table.file_io().clone(),
- plan_context,
+ plan_context: Some(plan_context),
concurrency_limit_data_files: self.concurrency_limit_data_files,
concurrency_limit_manifest_entries:
self.concurrency_limit_manifest_entries,
concurrency_limit_manifest_files:
self.concurrency_limit_manifest_files,
@@ -316,7 +325,10 @@ impl<'a> TableScanBuilder<'a> {
/// Table scan.
#[derive(Debug)]
pub struct TableScan {
- plan_context: PlanContext,
+ /// A [PlanContext], if this table has at least one snapshot, otherwise
None.
+ ///
+ /// If this is None, then the scan contains no rows.
+ plan_context: Option<PlanContext>,
batch_size: Option<usize>,
file_io: FileIO,
column_names: Option<Vec<String>>,
@@ -340,6 +352,10 @@ pub struct TableScan {
impl TableScan {
/// Returns a stream of [`FileScanTask`]s.
pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
+ let Some(plan_context) = self.plan_context.as_ref() else {
+ return Ok(Box::pin(futures::stream::empty()));
+ };
+
let concurrency_limit_manifest_files =
self.concurrency_limit_manifest_files;
let concurrency_limit_manifest_entries =
self.concurrency_limit_manifest_entries;
@@ -359,12 +375,12 @@ impl TableScan {
None
};
- let manifest_list = self.plan_context.get_manifest_list().await?;
+ let manifest_list = plan_context.get_manifest_list().await?;
// get the [`ManifestFile`]s from the [`ManifestList`], filtering out
any
// whose partitions cannot match this
// scan's filter
- let manifest_file_contexts =
self.plan_context.build_manifest_file_contexts(
+ let manifest_file_contexts = plan_context.build_manifest_file_contexts(
manifest_list,
manifest_entry_data_ctx_tx,
delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| {
@@ -463,8 +479,8 @@ impl TableScan {
}
/// Returns a reference to the snapshot of the table scan.
- pub fn snapshot(&self) -> &SnapshotRef {
- &self.plan_context.snapshot
+ pub fn snapshot(&self) -> Option<&SnapshotRef> {
+ self.plan_context.as_ref().map(|x| &x.snapshot)
}
async fn process_data_manifest_entry(
@@ -652,6 +668,45 @@ pub mod tests {
}
}
+ #[allow(clippy::new_without_default)]
+ pub fn new_empty() -> Self {
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().join("table1");
+ let table_metadata1_location =
table_location.join("metadata/v1.json");
+
+ let file_io =
FileIO::from_path(table_location.as_os_str().to_str().unwrap())
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let table_metadata = {
+ let template_json_str = fs::read_to_string(format!(
+ "{}/testdata/example_empty_table_metadata_v2.json",
+ env!("CARGO_MANIFEST_DIR")
+ ))
+ .unwrap();
+ let mut context = Context::new();
+ context.insert("table_location", &table_location);
+ context.insert("table_metadata_1_location",
&table_metadata1_location);
+
+ let metadata_json = Tera::one_off(&template_json_str,
&context, false).unwrap();
+ serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
+ };
+
+ let table = Table::builder()
+ .metadata(table_metadata)
+ .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
+ .file_io(file_io.clone())
+
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
+ .build()
+ .unwrap();
+
+ Self {
+ table_location: table_location.to_str().unwrap().to_string(),
+ table,
+ }
+ }
+
pub fn new_unpartitioned() -> Self {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().join("table1");
@@ -1178,7 +1233,7 @@ pub mod tests {
let table_scan = table.scan().build().unwrap();
assert_eq!(
table.metadata().current_snapshot().unwrap().snapshot_id(),
- table_scan.snapshot().snapshot_id()
+ table_scan.snapshot().unwrap().snapshot_id()
);
}
@@ -1200,7 +1255,18 @@ pub mod tests {
.with_row_selection_enabled(true)
.build()
.unwrap();
- assert_eq!(table_scan.snapshot().snapshot_id(), 3051729675574597004);
+ assert_eq!(
+ table_scan.snapshot().unwrap().snapshot_id(),
+ 3051729675574597004
+ );
+ }
+
+ #[tokio::test]
+ async fn test_plan_files_on_table_without_any_snapshots() {
+ let table = TableTestFixture::new_empty().table;
+ let batch_stream =
table.scan().build().unwrap().to_arrow().await.unwrap();
+ let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+ assert!(batches.is_empty());
}
#[tokio::test]
diff --git a/crates/iceberg/testdata/example_empty_table_metadata_v2.json
b/crates/iceberg/testdata/example_empty_table_metadata_v2.json
new file mode 100644
index 00000000..75e09b09
--- /dev/null
+++ b/crates/iceberg/testdata/example_empty_table_metadata_v2.json
@@ -0,0 +1,57 @@
+{
+ "format-version": 2,
+ "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
+ "location": "{{ table_location }}",
+ "last-sequence-number": 34,
+ "last-updated-ms": 1602638573590,
+ "last-column-id": 3,
+ "current-schema-id": 1,
+ "schemas": [
+ {
+ "type": "struct",
+ "schema-id": 0,
+ "fields": [
+ {"id": 1, "name": "x", "required": true, "type": "long"}
+ ]},
+ {
+ "type": "struct",
+ "schema-id": 1,
+ "identifier-field-ids": [1, 2],
+ "fields": [
+ {"id": 1, "name": "x", "required": true, "type": "long"},
+ {"id": 2, "name": "y", "required": true, "type": "long", "doc":
"comment"},
+ {"id": 3, "name": "z", "required": true, "type": "long"},
+ {"id": 4, "name": "a", "required": true, "type": "string"},
+ {"id": 5, "name": "dbl", "required": true, "type": "double"},
+ {"id": 6, "name": "i32", "required": true, "type": "int"},
+ {"id": 7, "name": "i64", "required": true, "type": "long"},
+ {"id": 8, "name": "bool", "required": true, "type": "boolean"}
+ ]
+ }
+ ],
+ "default-spec-id": 0,
+ "partition-specs": [
+ {
+ "spec-id": 0,
+ "fields": [
+ {"name": "x", "transform": "identity", "source-id": 1, "field-id":
1000}
+ ]
+ }
+ ],
+ "last-partition-id": 1000,
+ "default-sort-order-id": 3,
+ "sort-orders": [
+ {
+ "order-id": 3,
+ "fields": [
+ {"transform": "identity", "source-id": 2, "direction": "asc",
"null-order": "nulls-first"},
+ {"transform": "bucket[4]", "source-id": 3, "direction": "desc",
"null-order": "nulls-last"}
+ ]
+ }
+ ],
+ "properties": {"read.split.target.size": "134217728"},
+ "snapshots": [],
+ "snapshot-log": [],
+ "metadata-log": [{"metadata-file": "{{ table_metadata_1_location }}",
"timestamp-ms": 1515100}],
+ "refs": {}
+}