This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new b7b5dd11 feat: extend table scan to support v2 deletes (#489)
b7b5dd11 is described below
commit b7b5dd11f0e6fbfe3b7b49c35f65b2651ac9fa5b
Author: Gang Wu <[email protected]>
AuthorDate: Tue Jan 6 18:03:04 2026 +0800
feat: extend table scan to support v2 deletes (#489)
Co-authored-by: Guotao Yu <[email protected]>
Co-authored-by: Zehua Zou <[email protected]>
---
example/demo_example.cc | 1 +
src/iceberg/table.cc | 2 +-
src/iceberg/table_scan.cc | 380 +++++++++++++++++++++---------
src/iceberg/table_scan.h | 325 ++++++++++++++++---------
src/iceberg/test/file_scan_task_test.cc | 6 +-
src/iceberg/util/snapshot_util.cc | 31 ++-
src/iceberg/util/snapshot_util_internal.h | 8 +
7 files changed, 522 insertions(+), 231 deletions(-)
diff --git a/example/demo_example.cc b/example/demo_example.cc
index ab011fee..6869aa37 100644
--- a/example/demo_example.cc
+++ b/example/demo_example.cc
@@ -22,6 +22,7 @@
#include "iceberg/arrow/arrow_file_io.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/catalog/memory/in_memory_catalog.h"
+#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/parquet/parquet_register.h"
#include "iceberg/table.h"
#include "iceberg/table_scan.h"
diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc
index ee3ce594..f2e6d320 100644
--- a/src/iceberg/table.cc
+++ b/src/iceberg/table.cc
@@ -141,7 +141,7 @@ const std::shared_ptr<TableMetadata>& Table::metadata()
const { return metadata_
const std::shared_ptr<Catalog>& Table::catalog() const { return catalog_; }
Result<std::unique_ptr<TableScanBuilder>> Table::NewScan() const {
- return std::make_unique<TableScanBuilder>(metadata_, io_);
+ return TableScanBuilder::Make(metadata_, io_);
}
Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc
index 700cab1f..12410270 100644
--- a/src/iceberg/table_scan.cc
+++ b/src/iceberg/table_scan.cc
@@ -20,22 +20,40 @@
#include "iceberg/table_scan.h"
#include <cstring>
-#include <vector>
-#include "iceberg/arrow_c_data.h"
+#include "iceberg/expression/expression.h"
#include "iceberg/file_reader.h"
#include "iceberg/manifest/manifest_entry.h"
-#include "iceberg/manifest/manifest_list.h"
-#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_group.h"
+#include "iceberg/result.h"
#include "iceberg/schema.h"
-#include "iceberg/schema_field.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
namespace iceberg {
namespace {
+
+const std::vector<std::string> kScanColumns = {
+ "snapshot_id", "file_path", "file_ordinal",
"file_format",
+ "block_size_in_bytes", "file_size_in_bytes", "record_count", "partition",
+ "key_metadata", "split_offsets", "sort_order_id",
+};
+
+const std::vector<std::string> kStatsColumns = {
+ "value_counts", "null_value_counts", "nan_value_counts",
+ "lower_bounds", "upper_bounds", "column_sizes",
+};
+
+const std::vector<std::string> kScanColumnsWithStats = [] {
+ auto cols = kScanColumns;
+ cols.insert(cols.end(), kStatsColumns.begin(), kStatsColumns.end());
+ return cols;
+}();
+
/// \brief Private data structure to hold the Reader and error state
struct ReaderStreamPrivateData {
std::unique_ptr<Reader> reader;
@@ -135,6 +153,25 @@ Result<ArrowArrayStream>
MakeArrowArrayStream(std::unique_ptr<Reader> reader) {
} // namespace
+namespace internal {
+
+Status TableScanContext::Validate() const {
+ ICEBERG_CHECK(columns_to_keep_stats.empty() || return_column_stats,
+ "Cannot select columns to keep stats when column stats are not
returned");
+ ICEBERG_CHECK(projected_schema == nullptr || selected_columns.empty(),
+ "Cannot set projection schema and selected columns at the same
time");
+ ICEBERG_CHECK(!snapshot_id.has_value() ||
+ (!from_snapshot_id.has_value() &&
!to_snapshot_id.has_value()),
+ "Cannot mix snapshot scan and incremental scan");
+ ICEBERG_CHECK(!min_rows_requested.has_value() || min_rows_requested.value()
>= 0,
+ "Min rows requested cannot be negative");
+ return {};
+}
+
+} // namespace internal
+
+ScanTask::~ScanTask() = default;
+
// FileScanTask implementation
FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file,
@@ -142,22 +179,10 @@ FileScanTask::FileScanTask(std::shared_ptr<DataFile>
data_file,
std::shared_ptr<Expression> residual_filter)
: data_file_(std::move(data_file)),
delete_files_(std::move(delete_files)),
- residual_filter_(std::move(residual_filter)) {}
-
-const std::shared_ptr<DataFile>& FileScanTask::data_file() const { return
data_file_; }
-
-const std::vector<std::shared_ptr<DataFile>>& FileScanTask::delete_files()
const {
- return delete_files_;
-}
-
-const std::shared_ptr<Expression>& FileScanTask::residual_filter() const {
- return residual_filter_;
+ residual_filter_(std::move(residual_filter)) {
+ ICEBERG_DCHECK(data_file_ != nullptr, "Data file cannot be null for
FileScanTask");
}
-bool FileScanTask::has_deletes() const { return !delete_files_.empty(); }
-
-bool FileScanTask::has_residual_filter() const { return residual_filter_ !=
nullptr; }
-
int64_t FileScanTask::size_bytes() const { return
data_file_->file_size_in_bytes; }
int32_t FileScanTask::files_count() const { return 1; }
@@ -165,17 +190,16 @@ int32_t FileScanTask::files_count() const { return 1; }
int64_t FileScanTask::estimated_row_count() const { return
data_file_->record_count; }
Result<ArrowArrayStream> FileScanTask::ToArrow(
- const std::shared_ptr<FileIO>& io, const std::shared_ptr<Schema>&
projected_schema,
- const std::shared_ptr<Expression>& filter) const {
- if (has_deletes()) {
+ const std::shared_ptr<FileIO>& io, std::shared_ptr<Schema>
projected_schema) const {
+ if (!delete_files_.empty()) {
return NotSupported("Reading data files with delete files is not yet
supported.");
}
const ReaderOptions options{.path = data_file_->file_path,
.length = data_file_->file_size_in_bytes,
.io = io,
- .projection = projected_schema,
- .filter = filter};
+ .projection = std::move(projected_schema),
+ .filter = residual_filter_};
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ReaderFactoryRegistry::Open(data_file_->file_format,
options));
@@ -183,138 +207,272 @@ Result<ArrowArrayStream> FileScanTask::ToArrow(
return MakeArrowArrayStream(std::move(reader));
}
+Result<std::unique_ptr<TableScanBuilder>> TableScanBuilder::Make(
+ std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io) {
+ ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
+ ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+ return std::unique_ptr<TableScanBuilder>(
+ new TableScanBuilder(std::move(metadata), std::move(io)));
+}
+
TableScanBuilder::TableScanBuilder(std::shared_ptr<TableMetadata>
table_metadata,
std::shared_ptr<FileIO> file_io)
- : file_io_(std::move(file_io)) {
- context_.table_metadata = std::move(table_metadata);
-}
+ : metadata_(std::move(table_metadata)), io_(std::move(file_io)) {}
-TableScanBuilder& TableScanBuilder::WithColumnNames(
- std::vector<std::string> column_names) {
- column_names_ = std::move(column_names);
+TableScanBuilder& TableScanBuilder::Option(std::string key, std::string value)
{
+ context_.options[std::move(key)] = std::move(value);
return *this;
}
-TableScanBuilder&
TableScanBuilder::WithProjectedSchema(std::shared_ptr<Schema> schema) {
+TableScanBuilder& TableScanBuilder::Project(std::shared_ptr<Schema> schema) {
context_.projected_schema = std::move(schema);
return *this;
}
-TableScanBuilder& TableScanBuilder::WithSnapshotId(int64_t snapshot_id) {
- snapshot_id_ = snapshot_id;
+TableScanBuilder& TableScanBuilder::CaseSensitive(bool case_sensitive) {
+ context_.case_sensitive = case_sensitive;
return *this;
}
-TableScanBuilder& TableScanBuilder::WithFilter(std::shared_ptr<Expression>
filter) {
+TableScanBuilder& TableScanBuilder::IncludeColumnStats() {
+ context_.return_column_stats = true;
+ return *this;
+}
+
+TableScanBuilder& TableScanBuilder::IncludeColumnStats(
+ const std::vector<std::string>& requested_columns) {
+ context_.return_column_stats = true;
+ context_.columns_to_keep_stats.clear();
+ context_.columns_to_keep_stats.reserve(requested_columns.size());
+
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema_ref, ResolveSnapshotSchema());
+ const auto& schema = schema_ref.get();
+ for (const auto& column_name : requested_columns) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field,
schema->FindFieldByName(column_name));
+ if (field.has_value()) {
+ context_.columns_to_keep_stats.insert(field.value().get().field_id());
+ }
+ }
+
+ return *this;
+}
+
+TableScanBuilder& TableScanBuilder::Select(const std::vector<std::string>&
column_names) {
+ context_.selected_columns = column_names;
+ return *this;
+}
+
+TableScanBuilder& TableScanBuilder::Filter(std::shared_ptr<Expression> filter)
{
context_.filter = std::move(filter);
return *this;
}
-TableScanBuilder& TableScanBuilder::WithCaseSensitive(bool case_sensitive) {
- context_.case_sensitive = case_sensitive;
+TableScanBuilder& TableScanBuilder::IgnoreResiduals() {
+ context_.ignore_residuals = true;
return *this;
}
-TableScanBuilder& TableScanBuilder::WithOption(std::string property,
std::string value) {
- context_.options[std::move(property)] = std::move(value);
+TableScanBuilder& TableScanBuilder::MinRowsRequested(int64_t num_rows) {
+ context_.min_rows_requested = num_rows;
return *this;
}
-TableScanBuilder& TableScanBuilder::WithLimit(std::optional<int64_t> limit) {
- context_.limit = limit;
+TableScanBuilder& TableScanBuilder::UseSnapshot(int64_t snapshot_id) {
+ ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(),
+ "Cannot override snapshot, already set snapshot id={}",
+ context_.snapshot_id.value());
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore,
metadata_->SnapshotById(snapshot_id));
+ context_.snapshot_id = snapshot_id;
return *this;
}
-Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
- const auto& table_metadata = context_.table_metadata;
- auto snapshot_id = snapshot_id_ ? snapshot_id_ :
table_metadata->current_snapshot_id;
- if (!snapshot_id) {
- return InvalidArgument("No snapshot ID specified for table {}",
- table_metadata->table_uuid);
+TableScanBuilder& TableScanBuilder::UseRef(const std::string& ref) {
+ if (ref == SnapshotRef::kMainBranch) {
+ snapshot_schema_ = nullptr;
+ context_.snapshot_id.reset();
+ return *this;
}
- ICEBERG_ASSIGN_OR_RAISE(context_.snapshot,
table_metadata->SnapshotById(*snapshot_id));
- if (!context_.projected_schema) {
- const auto& snapshot = context_.snapshot;
- auto schema_id = table_metadata->current_schema_id;
- ICEBERG_ASSIGN_OR_RAISE(auto schema,
table_metadata->SchemaById(schema_id));
+ ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(),
+ "Cannot override ref, already set snapshot id={}",
+ context_.snapshot_id.value());
+ auto iter = metadata_->refs.find(ref);
+ ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref {}",
ref);
+ ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
+ int32_t snapshot_id = iter->second->snapshot_id;
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore,
metadata_->SnapshotById(snapshot_id));
+ context_.snapshot_id = snapshot_id;
+
+ return *this;
+}
+
+TableScanBuilder& TableScanBuilder::AsOfTime(int64_t timestamp_millis) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto time_point_ms,
+ TimePointMsFromUnixMs(timestamp_millis));
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(
+ auto snapshot_id, SnapshotUtil::SnapshotIdAsOfTime(*metadata_,
time_point_ms));
+ return UseSnapshot(snapshot_id);
+}
+
+TableScanBuilder& TableScanBuilder::FromSnapshot(
+ [[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool
inclusive) {
+ return AddError(NotImplemented("Incremental scan is not implemented"));
+}
- if (column_names_.empty()) {
- context_.projected_schema = schema;
+TableScanBuilder& TableScanBuilder::FromSnapshot([[maybe_unused]] const
std::string& ref,
+ [[maybe_unused]] bool
inclusive) {
+ return AddError(NotImplemented("Incremental scan is not implemented"));
+}
+
+TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] int64_t
to_snapshot_id) {
+ return AddError(NotImplemented("Incremental scan is not implemented"));
+}
+
+TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] const
std::string& ref) {
+ return AddError(NotImplemented("Incremental scan is not implemented"));
+}
+
+TableScanBuilder& TableScanBuilder::UseBranch(const std::string& branch) {
+ context_.branch = branch;
+ return *this;
+}
+
+Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
+TableScanBuilder::ResolveSnapshotSchema() {
+ if (snapshot_schema_ == nullptr) {
+ if (context_.snapshot_id.has_value()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto snapshot,
+ metadata_->SnapshotById(*context_.snapshot_id));
+ int32_t schema_id =
snapshot->schema_id.value_or(Schema::kInitialSchemaId);
+ ICEBERG_ASSIGN_OR_RAISE(snapshot_schema_,
metadata_->SchemaById(schema_id));
} else {
- // TODO(gty404): collect touched columns from filter expression
- std::vector<SchemaField> projected_fields;
- projected_fields.reserve(column_names_.size());
- for (const auto& column_name : column_names_) {
- // TODO(gty404): support case-insensitive column names
- auto field_opt = schema->GetFieldByName(column_name);
- if (!field_opt) {
- return InvalidArgument("Column {} not found in schema '{}'",
column_name,
- schema_id);
- }
- projected_fields.emplace_back(field_opt.value()->get());
- }
- context_.projected_schema =
- std::make_shared<Schema>(std::move(projected_fields),
schema->schema_id());
+ ICEBERG_ASSIGN_OR_RAISE(snapshot_schema_, metadata_->Schema());
}
- } else if (!column_names_.empty()) {
- return InvalidArgument(
- "Cannot specify column names when a projected schema is provided");
}
+ ICEBERG_CHECK(snapshot_schema_ != nullptr, "Snapshot schema is null");
+ return snapshot_schema_;
+}
+
+bool TableScanBuilder::IsIncrementalScan() const {
+ return context_.from_snapshot_id.has_value() ||
context_.to_snapshot_id.has_value();
+}
+
+Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
+ ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+ ICEBERG_RETURN_UNEXPECTED(context_.Validate());
+
+ if (IsIncrementalScan()) {
+ return NotImplemented("Incremental scan is not yet implemented");
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSnapshotSchema());
+ return DataTableScan::Make(metadata_, schema.get(), io_,
std::move(context_));
+}
+
+TableScan::TableScan(std::shared_ptr<TableMetadata> metadata,
+ std::shared_ptr<Schema> schema, std::shared_ptr<FileIO>
file_io,
+ internal::TableScanContext context)
+ : metadata_(std::move(metadata)),
+ schema_(std::move(schema)),
+ io_(std::move(file_io)),
+ context_(std::move(context)) {}
+
+TableScan::~TableScan() = default;
- return std::make_unique<DataTableScan>(std::move(context_), file_io_);
+const std::shared_ptr<TableMetadata>& TableScan::metadata() const { return
metadata_; }
+
+Result<std::shared_ptr<Snapshot>> TableScan::snapshot() const {
+ auto snapshot_id = context_.snapshot_id ? context_.snapshot_id.value()
+ : metadata_->current_snapshot_id;
+ return metadata_->SnapshotById(snapshot_id);
+}
+
+Result<std::shared_ptr<Schema>> TableScan::schema() const {
+ return ResolveProjectedSchema();
+}
+
+const internal::TableScanContext& TableScan::context() const { return
context_; }
+
+const std::shared_ptr<FileIO>& TableScan::io() const { return io_; }
+
+const std::shared_ptr<Expression>& TableScan::filter() const {
+ const static std::shared_ptr<Expression> true_expr = True::Instance();
+ if (!context_.filter) {
+ return true_expr;
+ }
+ return context_.filter;
}
-TableScan::TableScan(TableScanContext context, std::shared_ptr<FileIO> file_io)
- : context_(std::move(context)), file_io_(std::move(file_io)) {}
+bool TableScan::is_case_sensitive() const { return context_.case_sensitive; }
+
+Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
+TableScan::ResolveProjectedSchema() const {
+ if (projected_schema_ != nullptr) {
+ return projected_schema_;
+ }
-const std::shared_ptr<Snapshot>& TableScan::snapshot() const { return
context_.snapshot; }
+ if (!context_.selected_columns.empty()) {
+ /// TODO(gangwu): port Java BaseScan.lazyColumnProjection to collect field
ids
+ /// from selected column names and bound references in the filter, and
then create
+ /// projected schema based on the collected field ids.
+ return NotImplemented(
+ "Selecting columns by name to create projected schema is not yet
implemented");
+ } else if (context_.projected_schema != nullptr) {
+ projected_schema_ = context_.projected_schema;
+ } else {
+ projected_schema_ = schema_;
+ }
-const std::shared_ptr<Schema>& TableScan::projection() const {
- return context_.projected_schema;
+ return projected_schema_;
}
-const TableScanContext& TableScan::context() const { return context_; }
+const std::vector<std::string>& TableScan::ScanColumns() const {
+ return context_.return_column_stats ? kScanColumnsWithStats : kScanColumns;
+}
-const std::shared_ptr<FileIO>& TableScan::io() const { return file_io_; }
+Result<std::unique_ptr<DataTableScan>> DataTableScan::Make(
+ std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
+ std::shared_ptr<FileIO> io, internal::TableScanContext context) {
+ ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
+ ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
+ ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+ return std::unique_ptr<DataTableScan>(new DataTableScan(
+ std::move(metadata), std::move(schema), std::move(io),
std::move(context)));
+}
-DataTableScan::DataTableScan(TableScanContext context, std::shared_ptr<FileIO>
file_io)
- : TableScan(std::move(context), std::move(file_io)) {}
+DataTableScan::DataTableScan(std::shared_ptr<TableMetadata> metadata,
+ std::shared_ptr<Schema> schema,
std::shared_ptr<FileIO> io,
+ internal::TableScanContext context)
+ : TableScan(std::move(metadata), std::move(schema), std::move(io),
+ std::move(context)) {}
Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles()
const {
- ICEBERG_ASSIGN_OR_RAISE(
- auto manifest_list_reader,
- ManifestListReader::Make(context_.snapshot->manifest_list, file_io_));
- ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files());
-
- std::vector<std::shared_ptr<FileScanTask>> tasks;
- ICEBERG_ASSIGN_OR_RAISE(auto partition_spec,
context_.table_metadata->PartitionSpec());
-
- // Get the table schema
- ICEBERG_ASSIGN_OR_RAISE(auto current_schema,
context_.table_metadata->Schema());
-
- for (const auto& manifest_file : manifest_files) {
- ICEBERG_ASSIGN_OR_RAISE(
- auto manifest_reader,
- ManifestReader::Make(manifest_file, file_io_, current_schema,
partition_spec));
- ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries());
-
- // TODO(gty404): filter manifests using partition spec and filter
expression
-
- for (auto& manifest_entry : manifests) {
- const auto& data_file = manifest_entry.data_file;
- switch (data_file->content) {
- case DataFile::Content::kData:
-
tasks.emplace_back(std::make_shared<FileScanTask>(manifest_entry.data_file));
- break;
- case DataFile::Content::kPositionDeletes:
- case DataFile::Content::kEqualityDeletes:
- return NotSupported("Equality/Position deletes are not supported in
data scan");
- }
- }
+ ICEBERG_ASSIGN_OR_RAISE(auto snapshot, this->snapshot());
+ if (!snapshot) {
+ return std::vector<std::shared_ptr<FileScanTask>>{};
}
- return tasks;
+ TableMetadataCache metadata_cache(metadata_.get());
+ ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id,
metadata_cache.GetPartitionSpecsById());
+
+ SnapshotCache snapshot_cache(snapshot.get());
+ ICEBERG_ASSIGN_OR_RAISE(auto data_manifests,
snapshot_cache.DataManifests(io_));
+ ICEBERG_ASSIGN_OR_RAISE(auto delete_manifests,
snapshot_cache.DeleteManifests(io_));
+
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto manifest_group,
+ ManifestGroup::Make(io_, schema_, specs_by_id,
+ {data_manifests.begin(), data_manifests.end()},
+ {delete_manifests.begin(), delete_manifests.end()}));
+ manifest_group->CaseSensitive(context_.case_sensitive)
+ .Select(ScanColumns())
+ .FilterData(filter())
+ .IgnoreDeleted()
+ .ColumnsToKeepStats(context_.columns_to_keep_stats);
+ if (context_.ignore_residuals) {
+ manifest_group->IgnoreResiduals();
+ }
+ return manifest_group->PlanFiles();
}
} // namespace iceberg
diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h
index 4f2ddfde..cc26830f 100644
--- a/src/iceberg/table_scan.h
+++ b/src/iceberg/table_scan.h
@@ -19,19 +19,30 @@
#pragma once
+#include <functional>
+#include <memory>
+#include <optional>
#include <string>
+#include <unordered_map>
+#include <unordered_set>
#include <vector>
#include "iceberg/arrow_c_data.h"
-#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
+#include "iceberg/util/error_collector.h"
namespace iceberg {
/// \brief An abstract scan task.
class ICEBERG_EXPORT ScanTask {
public:
- virtual ~ScanTask() = default;
+ enum class Kind : uint8_t {
+ kFileScanTask,
+ };
+
+ /// \brief The kind of scan task.
+ virtual Kind kind() const = 0;
/// \brief The number of bytes that should be read by this scan task.
virtual int64_t size_bytes() const = 0;
@@ -41,6 +52,8 @@ class ICEBERG_EXPORT ScanTask {
/// \brief The number of rows that should be read by this scan task.
virtual int64_t estimated_row_count() const = 0;
+
+ virtual ~ScanTask();
};
/// \brief Task representing a data file and its corresponding delete files.
@@ -50,175 +63,271 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask {
///
/// \param data_file The data file to read.
/// \param delete_files Delete files that apply to this data file.
- /// \param residual_filter Optional residual filter to apply after reading.
+ /// \param filter Optional residual filter to apply after reading.
explicit FileScanTask(std::shared_ptr<DataFile> data_file,
std::vector<std::shared_ptr<DataFile>> delete_files =
{},
- std::shared_ptr<Expression> residual_filter = nullptr);
+ std::shared_ptr<Expression> filter = nullptr);
/// \brief The data file that should be read by this scan task.
- const std::shared_ptr<DataFile>& data_file() const;
+ const std::shared_ptr<DataFile>& data_file() const { return data_file_; }
/// \brief Delete files that apply to this data file.
- const std::vector<std::shared_ptr<DataFile>>& delete_files() const;
+ const std::vector<std::shared_ptr<DataFile>>& delete_files() const {
+ return delete_files_;
+ }
/// \brief Residual filter to apply after reading.
- const std::shared_ptr<Expression>& residual_filter() const;
-
- /// \brief Check if any deletes need to be applied.
- bool has_deletes() const;
-
- /// \brief Check if a residual filter needs to be applied.
- bool has_residual_filter() const;
+ const std::shared_ptr<Expression>& residual_filter() const { return
residual_filter_; }
+ Kind kind() const override { return Kind::kFileScanTask; }
int64_t size_bytes() const override;
int32_t files_count() const override;
int64_t estimated_row_count() const override;
- /**
- * \brief Returns a C-ABI compatible ArrowArrayStream to read the data for
this task.
- *
- * \param io The FileIO instance for accessing the file data.
- * \param projected_schema The projected schema for reading the data.
- * \param filter Optional filter expression to apply during reading.
- * \return A Result containing an ArrowArrayStream, or an error on failure.
- */
+ /// TODO(gangwu): move it to iceberg/data/task_scanner.h
+ ///
+ /// \brief Returns a C-ABI compatible ArrowArrayStream to read the data for
this task.
+ ///
+ /// \param io The FileIO instance for accessing the file data.
+ /// \param projected_schema The projected schema for reading the data.
+ /// \return A Result containing an ArrowArrayStream, or an error on failure.
Result<ArrowArrayStream> ToArrow(const std::shared_ptr<FileIO>& io,
- const std::shared_ptr<Schema>&
projected_schema,
- const std::shared_ptr<Expression>& filter)
const;
+ std::shared_ptr<Schema> projected_schema)
const;
private:
- /// \brief Data file metadata.
std::shared_ptr<DataFile> data_file_;
- /// \brief Delete files that apply to this data file.
std::vector<std::shared_ptr<DataFile>> delete_files_;
- /// \brief Residual filter to apply after reading.
std::shared_ptr<Expression> residual_filter_;
};
-/// \brief Scan context holding snapshot and scan-specific metadata.
+namespace internal {
+
+// Internal table scan context used by different scan implementations.
struct TableScanContext {
- /// \brief Table metadata.
- std::shared_ptr<TableMetadata> table_metadata;
- /// \brief Snapshot to scan.
- std::shared_ptr<Snapshot> snapshot;
- /// \brief Projected schema.
- std::shared_ptr<Schema> projected_schema;
- /// \brief Filter expression to apply.
+ std::optional<int64_t> snapshot_id;
std::shared_ptr<Expression> filter;
- /// \brief Whether the scan is case-sensitive.
- bool case_sensitive = false;
- /// \brief Additional options for the scan.
+ bool ignore_residuals{false};
+ bool case_sensitive{true};
+ bool return_column_stats{false};
+ std::unordered_set<int32_t> columns_to_keep_stats;
+ std::vector<std::string> selected_columns;
+ std::shared_ptr<Schema> projected_schema;
std::unordered_map<std::string, std::string> options;
- /// \brief Optional limit on the number of rows to scan.
- std::optional<int64_t> limit;
+ bool from_snapshot_id_inclusive{false};
+ std::optional<int64_t> from_snapshot_id;
+ std::optional<int64_t> to_snapshot_id;
+ std::string branch{};
+ std::optional<int64_t> min_rows_requested;
+
+ // Validate the context parameters to see if they have conflicts.
+ [[nodiscard]] Status Validate() const;
};
+} // namespace internal
+
/// \brief Builder class for creating TableScan instances.
-class ICEBERG_EXPORT TableScanBuilder {
+class ICEBERG_EXPORT TableScanBuilder : public ErrorCollector {
public:
/// \brief Constructs a TableScanBuilder for the given table.
- /// \param table_metadata The metadata of the table to scan.
- /// \param file_io The FileIO instance for reading manifests and data files.
- explicit TableScanBuilder(std::shared_ptr<TableMetadata> table_metadata,
- std::shared_ptr<FileIO> file_io);
-
- /// \brief Sets the snapshot ID to scan.
- /// \param snapshot_id The ID of the snapshot.
- /// \return Reference to the builder.
- TableScanBuilder& WithSnapshotId(int64_t snapshot_id);
-
- /// \brief Selects columns to include in the scan.
- /// \param column_names A list of column names. If empty, all columns will
be selected.
- /// \return Reference to the builder.
- TableScanBuilder& WithColumnNames(std::vector<std::string> column_names);
-
- /// \brief Sets the schema to use for the scan.
- /// \param schema The schema to use.
- /// \return Reference to the builder.
- TableScanBuilder& WithProjectedSchema(std::shared_ptr<Schema> schema);
-
- /// \brief Applies a filter expression to the scan.
- /// \param filter Filter expression to use.
- /// \return Reference to the builder.
- TableScanBuilder& WithFilter(std::shared_ptr<Expression> filter);
-
- /// \brief Sets whether the scan should be case-sensitive.
- /// \param case_sensitive Whether the scan is case-sensitive.
- /// /return Reference to the builder.
- TableScanBuilder& WithCaseSensitive(bool case_sensitive);
-
- /// \brief Sets an option for the scan.
- /// \param property The name of the option.
- /// \param value The value of the option.
- /// \return Reference to the builder.
- TableScanBuilder& WithOption(std::string property, std::string value);
-
- /// \brief Sets an optional limit on the number of rows to scan.
- /// \param limit Optional limit on the number of rows.
- /// \return Reference to the builder.
- TableScanBuilder& WithLimit(std::optional<int64_t> limit);
+ /// \param metadata Current table metadata.
+ /// \param io FileIO instance for reading manifests files.
+ static Result<std::unique_ptr<TableScanBuilder>> Make(
+ std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io);
+
+ /// \brief Update property that will override the table's behavior
+ /// based on the incoming pair. Unknown properties will be ignored.
+ /// \param key name of the table property to be overridden
+ /// \param value value to override with
+ TableScanBuilder& Option(std::string key, std::string value);
+
+ /// \brief Set the projected schema.
+ /// \param schema a projection schema
+ TableScanBuilder& Project(std::shared_ptr<Schema> schema);
+
+ /// \brief If data columns are selected via Select(), controls whether
+ /// the match to the schema will be done with case sensitivity. Default is
true.
+ /// \param case_sensitive whether the scan is case-sensitive
+ TableScanBuilder& CaseSensitive(bool case_sensitive);
+
+ /// \brief Request this scan to load the column stats with each data file.
+ ///
+ /// Column stats include: value count, null value count, lower bounds, and
upper bounds.
+ TableScanBuilder& IncludeColumnStats();
+
+ /// \brief Request this scan to load the column stats for the specific
columns with each
+ /// data file.
+ ///
+ /// Column stats include: value count, null value count, lower bounds, and
upper bounds.
+ ///
+ /// \param requested_columns column names for which to keep the stats.
+ TableScanBuilder& IncludeColumnStats(const std::vector<std::string>&
requested_columns);
+
+ /// \brief Request this scan to read the given data columns.
+ ///
+ /// This produces an expected schema that includes all fields that are
either selected
+ /// or used by this scan's filter expression.
+ ///
+ /// \param column_names column names from the table's schema
+ TableScanBuilder& Select(const std::vector<std::string>& column_names);
+
+ /// \brief Set the expression to filter data.
+ /// \param filter a filter expression
+ TableScanBuilder& Filter(std::shared_ptr<Expression> filter);
+
+ /// \brief Request data filtering to files but not to rows in those files.
+ TableScanBuilder& IgnoreResiduals();
+
+ /// \brief Request this scan to return at least the given number of rows.
+ ///
+ /// This is used as a hint and is entirely optional in order to not have to
return more
+ /// rows than necessary. This may return fewer rows if the scan does not
contain that
+ /// many, or it may return more than requested.
+ ///
+ /// \param num_rows The minimum number of rows requested
+ TableScanBuilder& MinRowsRequested(int64_t num_rows);
+
+ /// \brief Request this scan to use the given snapshot by ID.
+ /// \param snapshot_id a snapshot ID
+ /// \note InvalidArgument will be returned if the snapshot cannot be found
+ TableScanBuilder& UseSnapshot(int64_t snapshot_id);
+
+ /// \brief Request this scan to use the given reference.
+ /// \param ref reference
+ /// \note InvalidArgument will be returned if a reference with the given name
+ /// could not be found
+ TableScanBuilder& UseRef(const std::string& ref);
+
+ /// \brief Request this scan to use the most recent snapshot as of the given
time
+ /// in milliseconds on the branch in the scan or main if no branch is set.
+ /// \param timestamp_millis a timestamp in milliseconds.
+ /// \note InvalidArgument will be returned if the snapshot cannot be found
or time
+ /// travel is attempted on a tag
+ TableScanBuilder& AsOfTime(int64_t timestamp_millis);
+
+ /// \brief Instructs this scan to look for changes starting from a
particular snapshot.
+ ///
+ /// If the start snapshot is not configured, it defaults to the oldest
ancestor of the
+ /// end snapshot (inclusive).
+ ///
+ /// \param from_snapshot_id the start snapshot ID
+ /// \param inclusive whether the start snapshot is inclusive, default is
false
+ /// \note InvalidArgument will be returned if the start snapshot is not an
ancestor of
+ /// the end snapshot
+ TableScanBuilder& FromSnapshot(int64_t from_snapshot_id, bool inclusive =
false);
+
+ /// \brief Instructs this scan to look for changes starting from a
particular snapshot.
+ ///
+ /// If the start snapshot is not configured, it defaults to the oldest
ancestor of the
+ /// end snapshot (inclusive).
+ ///
+ /// \param ref the start ref name that points to a particular snapshot ID
+ /// \param inclusive whether the start snapshot is inclusive, default is
false
+ /// \note InvalidArgument will be returned if the start snapshot is not an
ancestor of
+ /// the end snapshot
+ TableScanBuilder& FromSnapshot(const std::string& ref, bool inclusive =
false);
+
+ /// \brief Instructs this scan to look for changes up to a particular
snapshot
+ /// (inclusive).
+ ///
+ /// If the end snapshot is not configured, it defaults to the current table
snapshot
+ /// (inclusive).
+ ///
+ /// \param to_snapshot_id the end snapshot ID (inclusive)
+ TableScanBuilder& ToSnapshot(int64_t to_snapshot_id);
+
+ /// \brief Instructs this scan to look for changes up to a particular
snapshot ref
+ /// (inclusive).
+ ///
+ /// If the end snapshot is not configured, it defaults to the current table
snapshot
+ /// (inclusive).
+ ///
+ /// \param ref the end snapshot Ref (inclusive)
+ TableScanBuilder& ToSnapshot(const std::string& ref);
+
+ /// \brief Use the specified branch
+ /// \param branch the branch name
+ TableScanBuilder& UseBranch(const std::string& branch);
/// \brief Builds and returns a TableScan instance.
/// \return A Result containing the TableScan or an error.
Result<std::unique_ptr<TableScan>> Build();
private:
- /// \brief the file I/O instance for reading manifests and data files.
- std::shared_ptr<FileIO> file_io_;
- /// \brief column names to project in the scan.
- std::vector<std::string> column_names_;
- /// \brief snapshot ID to scan, if specified.
- std::optional<int64_t> snapshot_id_;
- /// \brief Context for the scan, including snapshot, schema, and filter.
- TableScanContext context_;
+ TableScanBuilder(std::shared_ptr<TableMetadata> metadata,
std::shared_ptr<FileIO> io);
+
+ // Return the schema bound to the specified snapshot.
+ Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
ResolveSnapshotSchema();
+
+ // Return whether current configuration indicates an incremental scan mode.
+ bool IsIncrementalScan() const;
+
+ std::shared_ptr<TableMetadata> metadata_;
+ std::shared_ptr<FileIO> io_;
+ internal::TableScanContext context_;
+ std::shared_ptr<Schema> snapshot_schema_;
};
/// \brief Represents a configured scan operation on a table.
class ICEBERG_EXPORT TableScan {
public:
- virtual ~TableScan() = default;
+ virtual ~TableScan();
- /// \brief Constructs a TableScan with the given context and file I/O.
- /// \param context Scan context including snapshot, schema, and filter.
- /// \param file_io File I/O instance for reading manifests and data files.
- TableScan(TableScanContext context, std::shared_ptr<FileIO> file_io);
+ /// \brief Returns the table metadata being scanned.
+ const std::shared_ptr<TableMetadata>& metadata() const;
- /// \brief Returns the snapshot being scanned.
- /// \return A shared pointer to the snapshot.
- const std::shared_ptr<Snapshot>& snapshot() const;
+ /// \brief Returns the snapshot to scan.
+ Result<std::shared_ptr<Snapshot>> snapshot() const;
/// \brief Returns the projected schema for the scan.
- /// \return A shared pointer to the projected schema.
- const std::shared_ptr<Schema>& projection() const;
+ Result<std::shared_ptr<Schema>> schema() const;
/// \brief Returns the scan context.
- /// \return A reference to the TableScanContext.
- const TableScanContext& context() const;
+ const internal::TableScanContext& context() const;
- /// \brief Returns the file I/O instance used for reading manifests and data
files.
- /// \return A shared pointer to the FileIO instance.
+ /// \brief Returns the file I/O instance used for reading files.
const std::shared_ptr<FileIO>& io() const;
+ /// \brief Returns this scan's filter expression.
+ const std::shared_ptr<Expression>& filter() const;
+
+ /// \brief Returns whether this scan is case-sensitive.
+ bool is_case_sensitive() const;
+
/// \brief Plans the scan tasks by resolving manifests and data files.
/// \return A Result containing scan tasks or an error.
virtual Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const
= 0;
protected:
- /// \brief context for the scan, including snapshot, schema, and filter.
- const TableScanContext context_;
- /// \brief File I/O instance for reading manifests and data files.
- std::shared_ptr<FileIO> file_io_;
+ TableScan(std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema>
schema,
+ std::shared_ptr<FileIO> io, internal::TableScanContext context);
+
+ Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
ResolveProjectedSchema()
+ const;
+
+ virtual const std::vector<std::string>& ScanColumns() const;
+
+ const std::shared_ptr<TableMetadata> metadata_;
+ const std::shared_ptr<Schema> schema_;
+ const std::shared_ptr<FileIO> io_;
+ const internal::TableScanContext context_;
+ mutable std::shared_ptr<Schema> projected_schema_;
};
/// \brief A scan that reads data files and applies delete files to filter
rows.
class ICEBERG_EXPORT DataTableScan : public TableScan {
public:
- /// \brief Constructs a DataScan with the given context and file I/O.
- DataTableScan(TableScanContext context, std::shared_ptr<FileIO> file_io);
+ /// \brief Constructs a DataTableScan instance.
+ static Result<std::unique_ptr<DataTableScan>> Make(
+ std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
+ std::shared_ptr<FileIO> io, internal::TableScanContext context);
/// \brief Plans the scan tasks by resolving manifests and data files.
/// \return A Result containing scan tasks or an error.
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const
override;
+
+ protected:
+ DataTableScan(std::shared_ptr<TableMetadata> metadata,
std::shared_ptr<Schema> schema,
+ std::shared_ptr<FileIO> io, internal::TableScanContext
context);
};
} // namespace iceberg
diff --git a/src/iceberg/test/file_scan_task_test.cc
b/src/iceberg/test/file_scan_task_test.cc
index b7252850..ba0c41b3 100644
--- a/src/iceberg/test/file_scan_task_test.cc
+++ b/src/iceberg/test/file_scan_task_test.cc
@@ -137,7 +137,7 @@ TEST_F(FileScanTaskTest, ReadFullSchema) {
FileScanTask task(data_file);
- auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr);
+ auto stream_result = task.ToArrow(file_io_, projected_schema);
ASSERT_THAT(stream_result, IsOk());
auto stream = std::move(stream_result.value());
@@ -156,7 +156,7 @@ TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) {
FileScanTask task(data_file);
- auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr);
+ auto stream_result = task.ToArrow(file_io_, projected_schema);
ASSERT_THAT(stream_result, IsOk());
auto stream = std::move(stream_result.value());
@@ -175,7 +175,7 @@ TEST_F(FileScanTaskTest, ReadEmptyFile) {
FileScanTask task(data_file);
- auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr);
+ auto stream_result = task.ToArrow(file_io_, projected_schema);
ASSERT_THAT(stream_result, IsOk());
auto stream = std::move(stream_result.value());
diff --git a/src/iceberg/util/snapshot_util.cc
b/src/iceberg/util/snapshot_util.cc
index e76426f3..2ec36478 100644
--- a/src/iceberg/util/snapshot_util.cc
+++ b/src/iceberg/util/snapshot_util.cc
@@ -228,9 +228,30 @@ Result<std::shared_ptr<Snapshot>>
SnapshotUtil::SnapshotAfter(const Table& table
snapshot_id);
}
+namespace {
+
+std::optional<int64_t> OptionalSnapshotIdAsOfTimeImpl(const TableMetadata&
metadata,
+ TimePointMs
timestamp_ms) {
+ std::optional<int64_t> snapshot_id = std::nullopt;
+ for (const auto& log_entry : metadata.snapshot_log) {
+ if (log_entry.timestamp_ms <= timestamp_ms) {
+ snapshot_id = log_entry.snapshot_id;
+ }
+ }
+ return snapshot_id;
+}
+
+} // namespace
+
Result<int64_t> SnapshotUtil::SnapshotIdAsOfTime(const Table& table,
TimePointMs timestamp_ms) {
- auto snapshot_id = OptionalSnapshotIdAsOfTime(table, timestamp_ms);
+ ICEBERG_PRECHECK(table.metadata() != nullptr, "Table metadata is null");
+ return SnapshotIdAsOfTime(*table.metadata(), timestamp_ms);
+}
+
+Result<int64_t> SnapshotUtil::SnapshotIdAsOfTime(const TableMetadata& metadata,
+ TimePointMs timestamp_ms) {
+ auto snapshot_id = OptionalSnapshotIdAsOfTimeImpl(metadata, timestamp_ms);
ICEBERG_CHECK(snapshot_id.has_value(), "Cannot find a snapshot older than
{}",
FormatTimePointMs(timestamp_ms));
return snapshot_id.value();
@@ -238,13 +259,7 @@ Result<int64_t> SnapshotUtil::SnapshotIdAsOfTime(const
Table& table,
std::optional<int64_t> SnapshotUtil::OptionalSnapshotIdAsOfTime(
const Table& table, TimePointMs timestamp_ms) {
- std::optional<int64_t> snapshot_id = std::nullopt;
- for (const auto& log_entry : table.history()) {
- if (log_entry.timestamp_ms <= timestamp_ms) {
- snapshot_id = log_entry.snapshot_id;
- }
- }
- return snapshot_id;
+ return OptionalSnapshotIdAsOfTimeImpl(*table.metadata(), timestamp_ms);
}
Result<std::shared_ptr<Schema>> SnapshotUtil::SchemaFor(const Table& table,
diff --git a/src/iceberg/util/snapshot_util_internal.h
b/src/iceberg/util/snapshot_util_internal.h
index e0d8830f..2b11168e 100644
--- a/src/iceberg/util/snapshot_util_internal.h
+++ b/src/iceberg/util/snapshot_util_internal.h
@@ -170,6 +170,14 @@ class ICEBERG_EXPORT SnapshotUtil {
/// \return The snapshot ID
static Result<int64_t> SnapshotIdAsOfTime(const Table& table, TimePointMs
timestamp_ms);
+ /// \brief Returns the ID of the most recent snapshot for the table as of
the timestamp.
+ ///
+ /// \param metadata The table metadata
+ /// \param timestamp_ms The timestamp in millis since the Unix epoch
+ /// \return The snapshot ID
+ static Result<int64_t> SnapshotIdAsOfTime(const TableMetadata& metadata,
+ TimePointMs timestamp_ms);
+
/// \brief Returns the ID of the most recent snapshot for the table as of
the timestamp,
/// or nullopt if not found.
///