This is an automated email from the ASF dual-hosted git repository.
wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new e599010e feat(data): coalesce position deletes into range inserts
(#645)
e599010e is described below
commit e599010ef210d0be073815f493339cf6029a9a6a
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Wed May 27 05:16:00 2026 +0200
feat(data): coalesce position deletes into range inserts (#645)
Add ForEachPositionDelete (the C++ equivalent of Java's
PositionDeleteRangeConsumer) and route DeleteLoader through it,
replacing the per-position PositionDeleteIndex::Delete(pos) call. The
function sniffs a 1024-position prefix and dispatches to either run
coalescing (CRoaring addRange) or bulk addMany grouped by high-32-bit
key.
Also rework DeleteLoader::LoadPositionDelete to read Arrow batches via
nanoarrow's ArrowArrayView directly. When the delete file's
referenced_data_file matches the target (V2 writer hint), positions are
passed as a zero-copy span; otherwise a per-batch staging vector filters
by path.
Local microbenchmarks: 2.2x-10.6x for ForEachPositionDelete and
2.1x-2.5x end-to-end through LoadPositionDeletes. Equivalent of
apache/iceberg#16052.
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/data/delete_loader.cc | 98 ++++++++++--
src/iceberg/deletes/position_delete_index.cc | 5 +
src/iceberg/deletes/position_delete_index.h | 11 ++
.../deletes/position_delete_range_consumer.cc | 145 +++++++++++++++++
.../deletes/position_delete_range_consumer.h | 44 ++++++
src/iceberg/deletes/roaring_position_bitmap.cc | 6 +
src/iceberg/deletes/roaring_position_bitmap.h | 9 ++
src/iceberg/meson.build | 1 +
src/iceberg/test/CMakeLists.txt | 1 +
src/iceberg/test/delete_loader_test.cc | 32 ++++
src/iceberg/test/meson.build | 1 +
.../test/position_delete_range_consumer_test.cc | 175 +++++++++++++++++++++
13 files changed, 515 insertions(+), 14 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 672cf54a..7be689f8 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -171,6 +171,7 @@ set(ICEBERG_DATA_SOURCES
data/position_delete_writer.cc
data/writer.cc
deletes/position_delete_index.cc
+ deletes/position_delete_range_consumer.cc
deletes/roaring_position_bitmap.cc
puffin/file_metadata.cc
puffin/json_serde.cc
diff --git a/src/iceberg/data/delete_loader.cc
b/src/iceberg/data/delete_loader.cc
index 35bc926b..92217340 100644
--- a/src/iceberg/data/delete_loader.cc
+++ b/src/iceberg/data/delete_loader.cc
@@ -19,14 +19,21 @@
#include "iceberg/data/delete_loader.h"
+#include <cstring>
+#include <span>
#include <string>
#include <vector>
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
#include "iceberg/arrow_c_data_guard_internal.h"
#include "iceberg/deletes/position_delete_index.h"
+#include "iceberg/deletes/position_delete_range_consumer.h"
#include "iceberg/file_reader.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/metadata_columns.h"
+#include "iceberg/result.h"
#include "iceberg/row/arrow_array_wrapper.h"
#include "iceberg/schema.h"
#include "iceberg/util/macros.h"
@@ -57,6 +64,25 @@ Result<std::unique_ptr<Reader>> OpenDeleteFile(const
DataFile& file,
return ReaderFactoryRegistry::Open(file.file_format, options);
}
+/// Raw `int64` values buffer (offset-adjusted). Skips the validity bitmap:
+/// `kDeleteFilePos` is required by the V2 spec.
+const int64_t* Int64ValuesBuffer(const ArrowArrayView* view) {
+ return view->buffer_views[1].data.as_int64 + view->offset;
+}
+
+/// String-equals at `row_idx` via nanoarrow's unsafe direct-buffer access.
+/// Skips the validity bitmap: `kDeleteFilePath` is required by the V2 spec.
+bool StringEquals(const ArrowArrayView* view, int64_t row_idx,
std::string_view target) {
+ ArrowStringView sv = ArrowArrayViewGetStringUnsafe(view, row_idx);
+ if (static_cast<size_t>(sv.size_bytes) != target.size()) {
+ return false;
+ }
+ if (target.empty()) {
+ return true;
+ }
+ return sv.data != nullptr && std::memcmp(sv.data, target.data(),
target.size()) == 0;
+}
+
} // namespace
DeleteLoader::DeleteLoader(std::shared_ptr<FileIO> io) : io_(std::move(io)) {}
@@ -71,6 +97,28 @@ Status DeleteLoader::LoadPositionDelete(const DataFile&
file, PositionDeleteInde
ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader->Schema());
internal::ArrowSchemaGuard schema_guard(&arrow_schema);
+ // Reused across batches; reads child buffers directly to avoid the
+ // per-row `Scalar` dispatch in `ArrowArrayStructLike`.
+ ArrowArrayView array_view;
+ internal::ArrowArrayViewGuard view_guard(&array_view);
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+ ArrowArrayViewInitFromSchema(&array_view, &arrow_schema, &error), error);
+
+ // Fast path when the writer's `referenced_data_file` hint matches our
+ // target: skip the path column, hand `pos_data` straight to
+ // `ForEachPositionDelete`. Trusts the hint -- spec-compliant writers
+ // only set it when all rows share one data file.
+ const bool use_referenced_data_file_fast_path =
+ file.referenced_data_file.has_value() &&
+ file.referenced_data_file.value() == data_file_path;
+
+ // Filter-path staging buffer; reused across batches via `clear()`.
+ std::vector<int64_t> positions;
+ // Scratch buffer for `ForEachPositionDelete`'s bulk dispatch path;
+ // reused across batches and across both routing branches.
+ std::vector<uint32_t> bulk_scratch;
+
while (true) {
ICEBERG_ASSIGN_OR_RAISE(auto batch_opt, reader->Next());
if (!batch_opt.has_value()) break;
@@ -78,23 +126,45 @@ Status DeleteLoader::LoadPositionDelete(const DataFile&
file, PositionDeleteInde
auto& batch = batch_opt.value();
internal::ArrowArrayGuard batch_guard(&batch);
- ICEBERG_ASSIGN_OR_RAISE(
- auto row, ArrowArrayStructLike::Make(arrow_schema, batch,
/*row_index=*/0));
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+ ArrowArrayViewSetArray(&array_view, &batch, &error), error);
- for (int64_t i = 0; i < batch.length; ++i) {
- if (i > 0) {
- ICEBERG_RETURN_UNEXPECTED(row->Reset(i));
- }
- // Field 0: file_path
- ICEBERG_ASSIGN_OR_RAISE(auto path_scalar, row->GetField(0));
- auto path = std::get<std::string_view>(path_scalar);
-
- if (path == data_file_path) {
- // Field 1: pos
- ICEBERG_ASSIGN_OR_RAISE(auto pos_scalar, row->GetField(1));
- index.Delete(std::get<int64_t>(pos_scalar));
+ const int64_t length = batch.length;
+ if (length <= 0) {
+ continue;
+ }
+
+ // Child indices must match `PosDeleteSchema()`: 0 = file_path, 1 = pos.
+ const ArrowArrayView* path_view = array_view.children[0];
+ const ArrowArrayView* pos_view = array_view.children[1];
+
+ // V2 spec marks pos and file_path as required (NOT NULL). The direct
+ // buffer access below skips the validity bitmap, so a non-compliant
+ // batch would silently corrupt the index. Fail fast instead.
+ if (ArrowArrayViewComputeNullCount(pos_view) != 0 ||
+ ArrowArrayViewComputeNullCount(path_view) != 0) {
+ return InvalidArrowData(
+ "position delete file has null values in required pos/file_path
columns");
+ }
+
+ const int64_t* pos_data = Int64ValuesBuffer(pos_view);
+
+ if (use_referenced_data_file_fast_path) {
+ ForEachPositionDelete(std::span<const int64_t>(pos_data, length), index,
+ bulk_scratch);
+ continue;
+ }
+
+ positions.clear();
+ if (positions.capacity() < static_cast<size_t>(length)) {
+ positions.reserve(static_cast<size_t>(length));
+ }
+ for (int64_t i = 0; i < length; ++i) {
+ if (StringEquals(path_view, i, data_file_path)) {
+ positions.push_back(pos_data[i]);
}
}
+ ForEachPositionDelete(positions, index, bulk_scratch);
}
return reader->Close();
diff --git a/src/iceberg/deletes/position_delete_index.cc
b/src/iceberg/deletes/position_delete_index.cc
index 0ff8f830..f09d0b48 100644
--- a/src/iceberg/deletes/position_delete_index.cc
+++ b/src/iceberg/deletes/position_delete_index.cc
@@ -39,4 +39,9 @@ void PositionDeleteIndex::Merge(const PositionDeleteIndex&
other) {
bitmap_.Or(other.bitmap_);
}
+void PositionDeleteIndex::BulkAddForKey(int32_t key,
+ std::span<const uint32_t> positions) {
+ bitmap_.AddManyForKey(key, positions);
+}
+
} // namespace iceberg
diff --git a/src/iceberg/deletes/position_delete_index.h
b/src/iceberg/deletes/position_delete_index.h
index 5de82a59..592216ed 100644
--- a/src/iceberg/deletes/position_delete_index.h
+++ b/src/iceberg/deletes/position_delete_index.h
@@ -24,6 +24,8 @@
#include <cstdint>
#include <memory>
+#include <span>
+#include <vector>
#include "iceberg/deletes/roaring_position_bitmap.h"
#include "iceberg/iceberg_data_export.h"
@@ -65,6 +67,15 @@ class ICEBERG_DATA_EXPORT PositionDeleteIndex {
void Merge(const PositionDeleteIndex& other);
private:
+ // Bulk-add positions sharing high-32-bit `key`. Private hook for
+ // `ForEachPositionDelete`'s bulk path; keeps `Delete` the sole public
+ // mutation surface.
+ void BulkAddForKey(int32_t key, std::span<const uint32_t> positions);
+
+ friend void ICEBERG_DATA_EXPORT
+ ForEachPositionDelete(std::span<const int64_t> positions,
PositionDeleteIndex& target,
+ std::vector<uint32_t>& scratch);
+
RoaringPositionBitmap bitmap_;
};
diff --git a/src/iceberg/deletes/position_delete_range_consumer.cc
b/src/iceberg/deletes/position_delete_range_consumer.cc
new file mode 100644
index 00000000..f7cf258c
--- /dev/null
+++ b/src/iceberg/deletes/position_delete_range_consumer.cc
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/deletes/position_delete_range_consumer.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <span>
+#include <vector>
+
+#include "iceberg/deletes/position_delete_index.h"
+#include "iceberg/deletes/roaring_position_bitmap.h"
+
+namespace iceberg {
+
+namespace {
+
+bool IsValidPosition(int64_t pos) {
+ return pos >= 0 && pos <= RoaringPositionBitmap::kMaxPosition;
+}
+
+// Unsigned subtraction so negative or wrap-around input can't
+// false-positive via signed overflow.
+bool IsAdjacent(int64_t prev, int64_t next) {
+ return (static_cast<uint64_t>(next) - static_cast<uint64_t>(prev)) == 1;
+}
+
+// `RoaringPositionBitmap` shards positions by their high 32 bits; the
+// bulk path groups by this key before flushing via `BulkAddForKey`.
+int32_t HighKeyFromPosition(int64_t pos) { return static_cast<int32_t>(pos >>
32); }
+
+// Emit `[range_start, last_position]`, collapsing singletons. Callers
+// pre-filter via `IsValidPosition`, so `last_position + 1` cannot overflow.
+void EmitRange(PositionDeleteIndex& target, int64_t range_start, int64_t
last_position) {
+ if (range_start == last_position) {
+ target.Delete(range_start);
+ } else {
+ target.Delete(range_start, last_position + 1);
+ }
+}
+
+// Emit closed-interval runs; out-of-range positions are silently skipped
+// to match `Delete(pos)`.
+void CoalesceIntoRanges(std::span<const int64_t> positions,
PositionDeleteIndex& target) {
+ const size_t n = positions.size();
+
+ size_t i = 0;
+ while (i < n && !IsValidPosition(positions[i])) {
+ ++i;
+ }
+ if (i == n) {
+ return;
+ }
+
+ int64_t range_start = positions[i];
+ int64_t last_position = range_start;
+ ++i;
+
+ for (; i < n; ++i) {
+ const int64_t pos = positions[i];
+ if (!IsValidPosition(pos)) {
+ continue;
+ }
+ if (!IsAdjacent(last_position, pos)) {
+ EmitRange(target, range_start, last_position);
+ range_start = pos;
+ }
+ last_position = pos;
+ }
+
+ EmitRange(target, range_start, last_position);
+}
+
+} // namespace
+
+void ForEachPositionDelete(std::span<const int64_t> positions,
+ PositionDeleteIndex& target, std::vector<uint32_t>&
scratch) {
+ if (positions.empty()) {
+ return;
+ }
+
+ // Below this size the bulk path's fixed overhead beats any coalescing win.
+ constexpr size_t kMinSniffSize = 64;
+ if (positions.size() < kMinSniffSize) {
+ CoalesceIntoRanges(positions, target);
+ return;
+ }
+
+ // Bounded prefix size for the boundary-density estimate.
+ constexpr size_t kSniffSize = 1024;
+ // Above this boundary density take the bulk path; below it stay on coalesce.
+ constexpr size_t kBulkThresholdPercent = 10;
+
+ const size_t sniff = std::min(positions.size(), kSniffSize);
+ size_t boundaries = 0;
+ for (size_t i = 1; i < sniff; ++i) {
+ boundaries += static_cast<size_t>(!IsAdjacent(positions[i - 1],
positions[i]));
+ }
+
+ // boundaries / (sniff - 1) > kBulkThresholdPercent / 100, without FP.
+ if (boundaries * 100 > (sniff - 1) * kBulkThresholdPercent) {
+ // Bulk path: group by high-32-bit key, flush each group via CRoaring's
+ // `addMany` (through `BulkAddForKey`). Reuses the caller-owned `scratch`
+ // vector across key groups -- cleared between groups, capacity retained.
+ const size_t n = positions.size();
+ size_t i = 0;
+ while (i < n) {
+ while (i < n && !IsValidPosition(positions[i])) {
+ ++i;
+ }
+ if (i == n) {
+ break;
+ }
+ const int32_t key = HighKeyFromPosition(positions[i]);
+ scratch.clear();
+ while (i < n && IsValidPosition(positions[i]) &&
+ HighKeyFromPosition(positions[i]) == key) {
+ scratch.push_back(static_cast<uint32_t>(positions[i] & 0xFFFFFFFFu));
+ ++i;
+ }
+ target.BulkAddForKey(key, scratch);
+ }
+ return;
+ }
+
+ CoalesceIntoRanges(positions, target);
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/deletes/position_delete_range_consumer.h
b/src/iceberg/deletes/position_delete_range_consumer.h
new file mode 100644
index 00000000..69d1c33d
--- /dev/null
+++ b/src/iceberg/deletes/position_delete_range_consumer.h
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <span>
+#include <vector>
+
+#include "iceberg/iceberg_data_export.h"
+
+namespace iceberg {
+
+class PositionDeleteIndex;
+
+/// \brief Apply `positions` to `target` as deletes; semantically equivalent
+/// to calling `target.Delete(pos)` for each entry. Out-of-range positions
+/// are silently ignored. Sorted, mostly-contiguous input is fastest.
+///
+/// \param scratch Caller-owned reusable buffer for the bulk dispatch path.
+/// Cleared and reused per key group; retain across calls to amortize
+/// allocations. Pass a distinct `scratch` per thread when calling
+/// concurrently with disjoint `target`.
+void ICEBERG_DATA_EXPORT ForEachPositionDelete(std::span<const int64_t>
positions,
+ PositionDeleteIndex& target,
+ std::vector<uint32_t>& scratch);
+
+} // namespace iceberg
diff --git a/src/iceberg/deletes/roaring_position_bitmap.cc
b/src/iceberg/deletes/roaring_position_bitmap.cc
index 2bf74958..e31efeec 100644
--- a/src/iceberg/deletes/roaring_position_bitmap.cc
+++ b/src/iceberg/deletes/roaring_position_bitmap.cc
@@ -105,6 +105,12 @@ void RoaringPositionBitmap::Add(int64_t pos) {
impl_->bitmaps[key].add(pos32);
}
+void RoaringPositionBitmap::AddManyForKey(int32_t key,
+ std::span<const uint32_t> positions)
{
+ impl_->AllocateBitmapsIfNeeded(key + 1);
+ impl_->bitmaps[key].addMany(positions.size(), positions.data());
+}
+
void RoaringPositionBitmap::AddRange(int64_t pos_start, int64_t pos_end) {
pos_start = std::max(pos_start, int64_t{0});
pos_end = std::min(pos_end, kMaxPosition + 1);
diff --git a/src/iceberg/deletes/roaring_position_bitmap.h
b/src/iceberg/deletes/roaring_position_bitmap.h
index 8d4b3586..bfb7d7c9 100644
--- a/src/iceberg/deletes/roaring_position_bitmap.h
+++ b/src/iceberg/deletes/roaring_position_bitmap.h
@@ -25,6 +25,7 @@
#include <cstdint>
#include <functional>
#include <memory>
+#include <span>
#include <string>
#include <string_view>
@@ -33,6 +34,8 @@
namespace iceberg {
+class PositionDeleteIndex;
+
/// \brief A bitmap that supports positive 64-bit positions, optimized
/// for cases where most positions fit in 32 bits.
///
@@ -110,6 +113,12 @@ class ICEBERG_DATA_EXPORT RoaringPositionBitmap {
std::unique_ptr<Impl> impl_;
explicit RoaringPositionBitmap(std::unique_ptr<Impl> impl);
+
+ // Bulk-add positions sharing high-32-bit `key`. Internal hook for
+ // `PositionDeleteIndex::BulkAddForKey`; per-key grouping is the caller's
+ // job, keeping this a thin wrapper around CRoaring's `addMany`.
+ void AddManyForKey(int32_t key, std::span<const uint32_t> positions);
+ friend class PositionDeleteIndex;
};
} // namespace iceberg
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 69fe733b..48b5d425 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -152,6 +152,7 @@ iceberg_data_sources = files(
'data/position_delete_writer.cc',
'data/writer.cc',
'deletes/position_delete_index.cc',
+ 'deletes/position_delete_range_consumer.cc',
'deletes/roaring_position_bitmap.cc',
'puffin/file_metadata.cc',
'puffin/json_serde.cc',
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index b415154d..d9059c56 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -131,6 +131,7 @@ add_iceberg_test(util_test
math_util_internal_test.cc
roaring_position_bitmap_test.cc
position_delete_index_test.cc
+ position_delete_range_consumer_test.cc
retry_util_test.cc
string_util_test.cc
struct_like_set_test.cc
diff --git a/src/iceberg/test/delete_loader_test.cc
b/src/iceberg/test/delete_loader_test.cc
index c365b8ba..b392065c 100644
--- a/src/iceberg/test/delete_loader_test.cc
+++ b/src/iceberg/test/delete_loader_test.cc
@@ -169,6 +169,10 @@ TEST_F(DeleteLoaderTest,
LoadPositionDeletesFiltersByDataFilePath) {
{"data_b.parquet",
10},
{"data_b.parquet",
20}});
+ // Mixed paths -> writer must NOT set the hint, forcing the loader's
+ // per-row filter path. Locks the routing in case the writer behavior
changes.
+ ASSERT_FALSE(delete_file->referenced_data_file.has_value());
+
std::vector<std::shared_ptr<DataFile>> files = {delete_file};
// Load only positions for data_a.parquet
@@ -207,6 +211,34 @@ TEST_F(DeleteLoaderTest,
LoadPositionDeletesSkipsMismatchedReferencedDataFile) {
ASSERT_TRUE(result.value().IsEmpty());
}
+TEST_F(DeleteLoaderTest, LoadPositionDeletesFastPathHonorsReferencedDataFile) {
+ // Single-file writes -> writer sets referenced_data_file -> loader takes
+ // the zero-copy fast path. Sized above the consumer's 64-element sniff
+ // threshold so the dispatcher's real coalesce/bulk logic runs end-to-end,
+ // not just the small-input shortcut covered by
LoadPositionDeletesSingleFile.
+ constexpr int64_t kRowCount = 128;
+ std::vector<std::pair<std::string, int64_t>> deletes;
+ deletes.reserve(kRowCount);
+ for (int64_t i = 0; i < kRowCount; ++i) {
+ deletes.emplace_back("data.parquet", i);
+ }
+ auto delete_file = WritePositionDeletes("pos_deletes_fast_path.parquet",
deletes);
+
+ ASSERT_TRUE(delete_file->referenced_data_file.has_value());
+ ASSERT_EQ(delete_file->referenced_data_file.value(), "data.parquet");
+
+ std::vector<std::shared_ptr<DataFile>> files = {delete_file};
+ auto result = loader_->LoadPositionDeletes(files, "data.parquet");
+ ASSERT_THAT(result, IsOk());
+
+ auto& index = result.value();
+ ASSERT_EQ(index.Cardinality(), kRowCount);
+ ASSERT_TRUE(index.IsDeleted(0));
+ ASSERT_TRUE(index.IsDeleted(kRowCount / 2));
+ ASSERT_TRUE(index.IsDeleted(kRowCount - 1));
+ ASSERT_FALSE(index.IsDeleted(kRowCount));
+}
+
TEST_F(DeleteLoaderTest, LoadPositionDeletesRejectsDV) {
auto dv_file = std::make_shared<DataFile>(DataFile{
.content = DataFile::Content::kPositionDeletes,
diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build
index 6928ab82..e7f6165c 100644
--- a/src/iceberg/test/meson.build
+++ b/src/iceberg/test/meson.build
@@ -94,6 +94,7 @@ iceberg_tests = {
'location_util_test.cc',
'math_util_internal_test.cc',
'position_delete_index_test.cc',
+ 'position_delete_range_consumer_test.cc',
'retry_util_test.cc',
'roaring_position_bitmap_test.cc',
'string_util_test.cc',
diff --git a/src/iceberg/test/position_delete_range_consumer_test.cc
b/src/iceberg/test/position_delete_range_consumer_test.cc
new file mode 100644
index 00000000..5a58fa5a
--- /dev/null
+++ b/src/iceberg/test/position_delete_range_consumer_test.cc
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/deletes/position_delete_range_consumer.h"
+
+#include <cstdint>
+#include <limits>
+#include <set>
+#include <span>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/deletes/position_delete_index.h"
+#include "iceberg/deletes/roaring_position_bitmap.h"
+
+namespace iceberg {
+
+namespace {
+
+// Reference set: positions a per-pos `Delete(pos)` loop would accept.
+std::set<int64_t> ExpectedValidSet(const std::vector<int64_t>& positions) {
+ std::set<int64_t> expected;
+ for (int64_t pos : positions) {
+ if (pos >= 0 && pos <= RoaringPositionBitmap::kMaxPosition) {
+ expected.insert(pos);
+ }
+ }
+ return expected;
+}
+
+// Strict contents check: cardinality plus per-position membership.
+// Weaker checks would miss divergences at the 32-bit key boundary.
+void AssertMatchesBaseline(const std::vector<int64_t>& positions) {
+ PositionDeleteIndex index;
+ std::vector<uint32_t> scratch;
+ ForEachPositionDelete(std::span<const int64_t>(positions), index, scratch);
+ const auto expected = ExpectedValidSet(positions);
+ ASSERT_EQ(index.Cardinality(), static_cast<int64_t>(expected.size()))
+ << "input size=" << positions.size();
+ for (int64_t pos : expected) {
+ ASSERT_TRUE(index.IsDeleted(pos)) << "missing pos=" << pos;
+ }
+}
+
+} // namespace
+
+TEST(PositionDeleteRangeConsumerTest, EmptySpan) { AssertMatchesBaseline({}); }
+
+TEST(PositionDeleteRangeConsumerTest, SinglePosition) {
AssertMatchesBaseline({42}); }
+
+TEST(PositionDeleteRangeConsumerTest, FullyContiguousRunBecomesSingleRange) {
+ std::vector<int64_t> positions;
+ for (int64_t i = 100; i < 200; ++i) {
+ positions.push_back(i);
+ }
+ AssertMatchesBaseline(positions);
+}
+
+TEST(PositionDeleteRangeConsumerTest, AlternatingPositionsProduceNoCoalescing)
{
+ std::vector<int64_t> positions;
+ for (int64_t i = 0; i < 50; ++i) {
+ positions.push_back(i * 2);
+ }
+ AssertMatchesBaseline(positions);
+}
+
+TEST(PositionDeleteRangeConsumerTest, MixedShortAndLongRuns) {
+ AssertMatchesBaseline({1, 2, 3, 7, 10, 11, 20, 30, 31, 32, 33, 34});
+}
+
+TEST(PositionDeleteRangeConsumerTest, UnsortedInputStillCorrect) {
+ AssertMatchesBaseline({10, 5, 11, 12, 4, 13, 100});
+}
+
+TEST(PositionDeleteRangeConsumerTest, DuplicatesAreIdempotent) {
+ AssertMatchesBaseline({5, 5, 5, 6, 6, 7});
+}
+
+TEST(PositionDeleteRangeConsumerTest, InvalidPositionsSilentlySkipped) {
+ // Invalids at the edges, mid-run, and mixed with valid contiguous runs
+ // must all be dropped without breaking coalescing around them. We stay
+ // well below `kMaxPosition` to avoid forcing the bitmap to resize its
+ // backing vector to ~2^31 empty containers.
+ AssertMatchesBaseline({std::numeric_limits<int64_t>::min(), -5, -4, 10, 11,
-999, 12,
+ 13, RoaringPositionBitmap::kMaxPosition + 1,
+ std::numeric_limits<int64_t>::max()});
+}
+
+TEST(PositionDeleteRangeConsumerTest, ContiguousRunAcrossKeyBoundary) {
+ // Pins `last_position + 1` and the adjacency check at a non-zero
+ // high-32 key. The coalesced run must survive the key transition.
+ constexpr int64_t kBoundary = int64_t{1} << 32;
+ std::vector<int64_t> positions;
+ for (int64_t i = kBoundary - 3; i < kBoundary + 3; ++i) {
+ positions.push_back(i);
+ }
+ AssertMatchesBaseline(positions);
+}
+
+TEST(PositionDeleteRangeConsumerTest, DispatcherAgreesAtBothDensities) {
+ // Above the sniff threshold at densities below and above the 10%
+ // cutoff. We can't observe the choice directly; agreement with the
+ // baseline is the contract.
+ std::vector<int64_t> low_density;
+ std::vector<int64_t> high_density;
+ int64_t lo = 0;
+ int64_t hi = 0;
+ for (int64_t i = 0; i < 2'048; ++i) {
+ low_density.push_back(lo);
+ ++lo;
+ if ((i + 1) % 20 == 0) {
+ lo += 5;
+ }
+ high_density.push_back(hi);
+ hi += ((i % 5 == 0) ? 5 : 1);
+ }
+ AssertMatchesBaseline(low_density);
+ AssertMatchesBaseline(high_density);
+}
+
+TEST(PositionDeleteRangeConsumerTest, DispatcherSkipsSniffOnSmallInputs) {
+ // Below the 64-element threshold the dispatcher bypasses the sniff.
+ // Exercise both a scattered tiny input (where bulk would win at large
+ // n) and a contiguous tiny input (the range path always wins).
+ std::vector<int64_t> scattered;
+ std::vector<int64_t> contiguous;
+ for (int64_t i = 0; i < 32; ++i) {
+ scattered.push_back(i * 100);
+ contiguous.push_back(i);
+ }
+ AssertMatchesBaseline(scattered);
+ AssertMatchesBaseline(contiguous);
+}
+
+TEST(PositionDeleteRangeConsumerTest, DispatcherAgreesAtThresholdBoundary) {
+ // The dispatcher selects the bulk path when
+ // boundaries * 100 > (sniff - 1) * kBulkThresholdPercent
+ // With `sniff = 1024` and `kBulkThresholdPercent = 10`, the cutoff is
+ // 102.3 boundaries: 102 stays on coalesce, 103 flips to bulk. Both
+ // inputs must still produce the same cardinality and membership as
+ // the per-position baseline; this test guards against arithmetic
+ // regressions around the threshold constant.
+ auto build = [](int64_t target_boundaries) {
+ std::vector<int64_t> positions;
+ positions.reserve(1024);
+ int64_t pos = 0;
+ positions.push_back(pos);
+ for (int64_t i = 1; i < 1024; ++i) {
+ pos += (i <= target_boundaries) ? 2 : 1;
+ positions.push_back(pos);
+ }
+ return positions;
+ };
+ AssertMatchesBaseline(build(/*target_boundaries=*/102));
+ AssertMatchesBaseline(build(/*target_boundaries=*/103));
+}
+
+} // namespace iceberg