This is an automated email from the ASF dual-hosted git repository.

wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new e599010e feat(data): coalesce position deletes into range inserts 
(#645)
e599010e is described below

commit e599010ef210d0be073815f493339cf6029a9a6a
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Wed May 27 05:16:00 2026 +0200

    feat(data): coalesce position deletes into range inserts (#645)
    
    Add ForEachPositionDelete (the C++ equivalent of Java's
    PositionDeleteRangeConsumer) and route DeleteLoader through it,
    replacing the per-position PositionDeleteIndex::Delete(pos) call. The
    function sniffs a 1024-position prefix and dispatches to either run
    coalescing (CRoaring addRange) or bulk addMany grouped by high-32-bit
    key.
    
    Also rework DeleteLoader::LoadPositionDelete to read Arrow batches via
    nanoarrow's ArrowArrayView directly. When the delete file's
    referenced_data_file matches the target (V2 writer hint), positions are
    passed as a zero-copy span; otherwise a per-batch staging vector filters
    by path.
    
    Local microbenchmarks: 2.2x-10.6x for ForEachPositionDelete and
    2.1x-2.5x end-to-end through LoadPositionDeletes. Equivalent of
    apache/iceberg#16052.
---
 src/iceberg/CMakeLists.txt                         |   1 +
 src/iceberg/data/delete_loader.cc                  |  98 ++++++++++--
 src/iceberg/deletes/position_delete_index.cc       |   5 +
 src/iceberg/deletes/position_delete_index.h        |  11 ++
 .../deletes/position_delete_range_consumer.cc      | 145 +++++++++++++++++
 .../deletes/position_delete_range_consumer.h       |  44 ++++++
 src/iceberg/deletes/roaring_position_bitmap.cc     |   6 +
 src/iceberg/deletes/roaring_position_bitmap.h      |   9 ++
 src/iceberg/meson.build                            |   1 +
 src/iceberg/test/CMakeLists.txt                    |   1 +
 src/iceberg/test/delete_loader_test.cc             |  32 ++++
 src/iceberg/test/meson.build                       |   1 +
 .../test/position_delete_range_consumer_test.cc    | 175 +++++++++++++++++++++
 13 files changed, 515 insertions(+), 14 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 672cf54a..7be689f8 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -171,6 +171,7 @@ set(ICEBERG_DATA_SOURCES
     data/position_delete_writer.cc
     data/writer.cc
     deletes/position_delete_index.cc
+    deletes/position_delete_range_consumer.cc
     deletes/roaring_position_bitmap.cc
     puffin/file_metadata.cc
     puffin/json_serde.cc
diff --git a/src/iceberg/data/delete_loader.cc 
b/src/iceberg/data/delete_loader.cc
index 35bc926b..92217340 100644
--- a/src/iceberg/data/delete_loader.cc
+++ b/src/iceberg/data/delete_loader.cc
@@ -19,14 +19,21 @@
 
 #include "iceberg/data/delete_loader.h"
 
+#include <cstring>
+#include <span>
 #include <string>
 #include <vector>
 
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
 #include "iceberg/arrow_c_data_guard_internal.h"
 #include "iceberg/deletes/position_delete_index.h"
+#include "iceberg/deletes/position_delete_range_consumer.h"
 #include "iceberg/file_reader.h"
 #include "iceberg/manifest/manifest_entry.h"
 #include "iceberg/metadata_columns.h"
+#include "iceberg/result.h"
 #include "iceberg/row/arrow_array_wrapper.h"
 #include "iceberg/schema.h"
 #include "iceberg/util/macros.h"
@@ -57,6 +64,25 @@ Result<std::unique_ptr<Reader>> OpenDeleteFile(const 
DataFile& file,
   return ReaderFactoryRegistry::Open(file.file_format, options);
 }
 
+/// Raw `int64` values buffer (offset-adjusted). Skips the validity bitmap:
+/// `kDeleteFilePos` is required by the V2 spec.
+const int64_t* Int64ValuesBuffer(const ArrowArrayView* view) {
+  return view->buffer_views[1].data.as_int64 + view->offset;
+}
+
+/// String-equals at `row_idx` via nanoarrow's unsafe direct-buffer access.
+/// Skips the validity bitmap: `kDeleteFilePath` is required by the V2 spec.
+bool StringEquals(const ArrowArrayView* view, int64_t row_idx, 
std::string_view target) {
+  ArrowStringView sv = ArrowArrayViewGetStringUnsafe(view, row_idx);
+  if (static_cast<size_t>(sv.size_bytes) != target.size()) {
+    return false;
+  }
+  if (target.empty()) {
+    return true;
+  }
+  return sv.data != nullptr && std::memcmp(sv.data, target.data(), 
target.size()) == 0;
+}
+
 }  // namespace
 
 DeleteLoader::DeleteLoader(std::shared_ptr<FileIO> io) : io_(std::move(io)) {}
@@ -71,6 +97,28 @@ Status DeleteLoader::LoadPositionDelete(const DataFile& 
file, PositionDeleteInde
   ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader->Schema());
   internal::ArrowSchemaGuard schema_guard(&arrow_schema);
 
+  // Reused across batches; reads child buffers directly to avoid the
+  // per-row `Scalar` dispatch in `ArrowArrayStructLike`.
+  ArrowArrayView array_view;
+  internal::ArrowArrayViewGuard view_guard(&array_view);
+  ArrowError error;
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowArrayViewInitFromSchema(&array_view, &arrow_schema, &error), error);
+
+  // Fast path when the writer's `referenced_data_file` hint matches our
+  // target: skip the path column, hand `pos_data` straight to
+  // `ForEachPositionDelete`. Trusts the hint -- spec-compliant writers
+  // only set it when all rows share one data file.
+  const bool use_referenced_data_file_fast_path =
+      file.referenced_data_file.has_value() &&
+      file.referenced_data_file.value() == data_file_path;
+
+  // Filter-path staging buffer; reused across batches via `clear()`.
+  std::vector<int64_t> positions;
+  // Scratch buffer for `ForEachPositionDelete`'s bulk dispatch path;
+  // reused across batches and across both routing branches.
+  std::vector<uint32_t> bulk_scratch;
+
   while (true) {
     ICEBERG_ASSIGN_OR_RAISE(auto batch_opt, reader->Next());
     if (!batch_opt.has_value()) break;
@@ -78,23 +126,45 @@ Status DeleteLoader::LoadPositionDelete(const DataFile& 
file, PositionDeleteInde
     auto& batch = batch_opt.value();
     internal::ArrowArrayGuard batch_guard(&batch);
 
-    ICEBERG_ASSIGN_OR_RAISE(
-        auto row, ArrowArrayStructLike::Make(arrow_schema, batch, 
/*row_index=*/0));
+    ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+        ArrowArrayViewSetArray(&array_view, &batch, &error), error);
 
-    for (int64_t i = 0; i < batch.length; ++i) {
-      if (i > 0) {
-        ICEBERG_RETURN_UNEXPECTED(row->Reset(i));
-      }
-      // Field 0: file_path
-      ICEBERG_ASSIGN_OR_RAISE(auto path_scalar, row->GetField(0));
-      auto path = std::get<std::string_view>(path_scalar);
-
-      if (path == data_file_path) {
-        // Field 1: pos
-        ICEBERG_ASSIGN_OR_RAISE(auto pos_scalar, row->GetField(1));
-        index.Delete(std::get<int64_t>(pos_scalar));
+    const int64_t length = batch.length;
+    if (length <= 0) {
+      continue;
+    }
+
+    // Child indices must match `PosDeleteSchema()`: 0 = file_path, 1 = pos.
+    const ArrowArrayView* path_view = array_view.children[0];
+    const ArrowArrayView* pos_view = array_view.children[1];
+
+    // V2 spec marks pos and file_path as required (NOT NULL). The direct
+    // buffer access below skips the validity bitmap, so a non-compliant
+    // batch would silently corrupt the index. Fail fast instead.
+    if (ArrowArrayViewComputeNullCount(pos_view) != 0 ||
+        ArrowArrayViewComputeNullCount(path_view) != 0) {
+      return InvalidArrowData(
+          "position delete file has null values in required pos/file_path 
columns");
+    }
+
+    const int64_t* pos_data = Int64ValuesBuffer(pos_view);
+
+    if (use_referenced_data_file_fast_path) {
+      ForEachPositionDelete(std::span<const int64_t>(pos_data, length), index,
+                            bulk_scratch);
+      continue;
+    }
+
+    positions.clear();
+    if (positions.capacity() < static_cast<size_t>(length)) {
+      positions.reserve(static_cast<size_t>(length));
+    }
+    for (int64_t i = 0; i < length; ++i) {
+      if (StringEquals(path_view, i, data_file_path)) {
+        positions.push_back(pos_data[i]);
       }
     }
+    ForEachPositionDelete(positions, index, bulk_scratch);
   }
 
   return reader->Close();
diff --git a/src/iceberg/deletes/position_delete_index.cc 
b/src/iceberg/deletes/position_delete_index.cc
index 0ff8f830..f09d0b48 100644
--- a/src/iceberg/deletes/position_delete_index.cc
+++ b/src/iceberg/deletes/position_delete_index.cc
@@ -39,4 +39,9 @@ void PositionDeleteIndex::Merge(const PositionDeleteIndex& 
other) {
   bitmap_.Or(other.bitmap_);
 }
 
+void PositionDeleteIndex::BulkAddForKey(int32_t key,
+                                        std::span<const uint32_t> positions) {
+  bitmap_.AddManyForKey(key, positions);
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/deletes/position_delete_index.h 
b/src/iceberg/deletes/position_delete_index.h
index 5de82a59..592216ed 100644
--- a/src/iceberg/deletes/position_delete_index.h
+++ b/src/iceberg/deletes/position_delete_index.h
@@ -24,6 +24,8 @@
 
 #include <cstdint>
 #include <memory>
+#include <span>
+#include <vector>
 
 #include "iceberg/deletes/roaring_position_bitmap.h"
 #include "iceberg/iceberg_data_export.h"
@@ -65,6 +67,15 @@ class ICEBERG_DATA_EXPORT PositionDeleteIndex {
   void Merge(const PositionDeleteIndex& other);
 
  private:
+  // Bulk-add positions sharing high-32-bit `key`. Private hook for
+  // `ForEachPositionDelete`'s bulk path; keeps `Delete` the sole public
+  // mutation surface.
+  void BulkAddForKey(int32_t key, std::span<const uint32_t> positions);
+
+  friend void ICEBERG_DATA_EXPORT
+  ForEachPositionDelete(std::span<const int64_t> positions, 
PositionDeleteIndex& target,
+                        std::vector<uint32_t>& scratch);
+
   RoaringPositionBitmap bitmap_;
 };
 
diff --git a/src/iceberg/deletes/position_delete_range_consumer.cc 
b/src/iceberg/deletes/position_delete_range_consumer.cc
new file mode 100644
index 00000000..f7cf258c
--- /dev/null
+++ b/src/iceberg/deletes/position_delete_range_consumer.cc
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/deletes/position_delete_range_consumer.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <span>
+#include <vector>
+
+#include "iceberg/deletes/position_delete_index.h"
+#include "iceberg/deletes/roaring_position_bitmap.h"
+
+namespace iceberg {
+
+namespace {
+
+bool IsValidPosition(int64_t pos) {
+  return pos >= 0 && pos <= RoaringPositionBitmap::kMaxPosition;
+}
+
+// Unsigned subtraction so negative or wrap-around input can't
+// false-positive via signed overflow.
+bool IsAdjacent(int64_t prev, int64_t next) {
+  return (static_cast<uint64_t>(next) - static_cast<uint64_t>(prev)) == 1;
+}
+
+// `RoaringPositionBitmap` shards positions by their high 32 bits; the
+// bulk path groups by this key before flushing via `BulkAddForKey`.
+int32_t HighKeyFromPosition(int64_t pos) { return static_cast<int32_t>(pos >> 
32); }
+
+// Emit `[range_start, last_position]`, collapsing singletons. Callers
+// pre-filter via `IsValidPosition`, so `last_position + 1` cannot overflow.
+void EmitRange(PositionDeleteIndex& target, int64_t range_start, int64_t 
last_position) {
+  if (range_start == last_position) {
+    target.Delete(range_start);
+  } else {
+    target.Delete(range_start, last_position + 1);
+  }
+}
+
+// Emit closed-interval runs; out-of-range positions are silently skipped
+// to match `Delete(pos)`.
+void CoalesceIntoRanges(std::span<const int64_t> positions, 
PositionDeleteIndex& target) {
+  const size_t n = positions.size();
+
+  size_t i = 0;
+  while (i < n && !IsValidPosition(positions[i])) {
+    ++i;
+  }
+  if (i == n) {
+    return;
+  }
+
+  int64_t range_start = positions[i];
+  int64_t last_position = range_start;
+  ++i;
+
+  for (; i < n; ++i) {
+    const int64_t pos = positions[i];
+    if (!IsValidPosition(pos)) {
+      continue;
+    }
+    if (!IsAdjacent(last_position, pos)) {
+      EmitRange(target, range_start, last_position);
+      range_start = pos;
+    }
+    last_position = pos;
+  }
+
+  EmitRange(target, range_start, last_position);
+}
+
+}  // namespace
+
+void ForEachPositionDelete(std::span<const int64_t> positions,
+                           PositionDeleteIndex& target, std::vector<uint32_t>& 
scratch) {
+  if (positions.empty()) {
+    return;
+  }
+
+  // Below this size the bulk path's fixed overhead beats any coalescing win.
+  constexpr size_t kMinSniffSize = 64;
+  if (positions.size() < kMinSniffSize) {
+    CoalesceIntoRanges(positions, target);
+    return;
+  }
+
+  // Bounded prefix size for the boundary-density estimate.
+  constexpr size_t kSniffSize = 1024;
+  // Above this boundary density take the bulk path; below it stay on coalesce.
+  constexpr size_t kBulkThresholdPercent = 10;
+
+  const size_t sniff = std::min(positions.size(), kSniffSize);
+  size_t boundaries = 0;
+  for (size_t i = 1; i < sniff; ++i) {
+    boundaries += static_cast<size_t>(!IsAdjacent(positions[i - 1], 
positions[i]));
+  }
+
+  // boundaries / (sniff - 1) > kBulkThresholdPercent / 100, without FP.
+  if (boundaries * 100 > (sniff - 1) * kBulkThresholdPercent) {
+    // Bulk path: group by high-32-bit key, flush each group via CRoaring's
+    // `addMany` (through `BulkAddForKey`). Reuses the caller-owned `scratch`
+    // vector across key groups -- cleared between groups, capacity retained.
+    const size_t n = positions.size();
+    size_t i = 0;
+    while (i < n) {
+      while (i < n && !IsValidPosition(positions[i])) {
+        ++i;
+      }
+      if (i == n) {
+        break;
+      }
+      const int32_t key = HighKeyFromPosition(positions[i]);
+      scratch.clear();
+      while (i < n && IsValidPosition(positions[i]) &&
+             HighKeyFromPosition(positions[i]) == key) {
+        scratch.push_back(static_cast<uint32_t>(positions[i] & 0xFFFFFFFFu));
+        ++i;
+      }
+      target.BulkAddForKey(key, scratch);
+    }
+    return;
+  }
+
+  CoalesceIntoRanges(positions, target);
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/deletes/position_delete_range_consumer.h 
b/src/iceberg/deletes/position_delete_range_consumer.h
new file mode 100644
index 00000000..69d1c33d
--- /dev/null
+++ b/src/iceberg/deletes/position_delete_range_consumer.h
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <span>
+#include <vector>
+
+#include "iceberg/iceberg_data_export.h"
+
+namespace iceberg {
+
+class PositionDeleteIndex;
+
+/// \brief Apply `positions` to `target` as deletes; semantically equivalent
+/// to calling `target.Delete(pos)` for each entry. Out-of-range positions
+/// are silently ignored. Sorted, mostly-contiguous input is fastest.
+///
+/// \param scratch Caller-owned reusable buffer for the bulk dispatch path.
+///     Cleared and reused per key group; retain across calls to amortize
+///     allocations. Pass a distinct `scratch` per thread when calling
+///     concurrently with disjoint `target`.
+void ICEBERG_DATA_EXPORT ForEachPositionDelete(std::span<const int64_t> 
positions,
+                                               PositionDeleteIndex& target,
+                                               std::vector<uint32_t>& scratch);
+
+}  // namespace iceberg
diff --git a/src/iceberg/deletes/roaring_position_bitmap.cc 
b/src/iceberg/deletes/roaring_position_bitmap.cc
index 2bf74958..e31efeec 100644
--- a/src/iceberg/deletes/roaring_position_bitmap.cc
+++ b/src/iceberg/deletes/roaring_position_bitmap.cc
@@ -105,6 +105,12 @@ void RoaringPositionBitmap::Add(int64_t pos) {
   impl_->bitmaps[key].add(pos32);
 }
 
+void RoaringPositionBitmap::AddManyForKey(int32_t key,
+                                          std::span<const uint32_t> positions) 
{
+  impl_->AllocateBitmapsIfNeeded(key + 1);
+  impl_->bitmaps[key].addMany(positions.size(), positions.data());
+}
+
 void RoaringPositionBitmap::AddRange(int64_t pos_start, int64_t pos_end) {
   pos_start = std::max(pos_start, int64_t{0});
   pos_end = std::min(pos_end, kMaxPosition + 1);
diff --git a/src/iceberg/deletes/roaring_position_bitmap.h 
b/src/iceberg/deletes/roaring_position_bitmap.h
index 8d4b3586..bfb7d7c9 100644
--- a/src/iceberg/deletes/roaring_position_bitmap.h
+++ b/src/iceberg/deletes/roaring_position_bitmap.h
@@ -25,6 +25,7 @@
 #include <cstdint>
 #include <functional>
 #include <memory>
+#include <span>
 #include <string>
 #include <string_view>
 
@@ -33,6 +34,8 @@
 
 namespace iceberg {
 
+class PositionDeleteIndex;
+
 /// \brief A bitmap that supports positive 64-bit positions, optimized
 /// for cases where most positions fit in 32 bits.
 ///
@@ -110,6 +113,12 @@ class ICEBERG_DATA_EXPORT RoaringPositionBitmap {
   std::unique_ptr<Impl> impl_;
 
   explicit RoaringPositionBitmap(std::unique_ptr<Impl> impl);
+
+  // Bulk-add positions sharing high-32-bit `key`. Internal hook for
+  // `PositionDeleteIndex::BulkAddForKey`; per-key grouping is the caller's
+  // job, keeping this a thin wrapper around CRoaring's `addMany`.
+  void AddManyForKey(int32_t key, std::span<const uint32_t> positions);
+  friend class PositionDeleteIndex;
 };
 
 }  // namespace iceberg
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 69fe733b..48b5d425 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -152,6 +152,7 @@ iceberg_data_sources = files(
     'data/position_delete_writer.cc',
     'data/writer.cc',
     'deletes/position_delete_index.cc',
+    'deletes/position_delete_range_consumer.cc',
     'deletes/roaring_position_bitmap.cc',
     'puffin/file_metadata.cc',
     'puffin/json_serde.cc',
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index b415154d..d9059c56 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -131,6 +131,7 @@ add_iceberg_test(util_test
                  math_util_internal_test.cc
                  roaring_position_bitmap_test.cc
                  position_delete_index_test.cc
+                 position_delete_range_consumer_test.cc
                  retry_util_test.cc
                  string_util_test.cc
                  struct_like_set_test.cc
diff --git a/src/iceberg/test/delete_loader_test.cc 
b/src/iceberg/test/delete_loader_test.cc
index c365b8ba..b392065c 100644
--- a/src/iceberg/test/delete_loader_test.cc
+++ b/src/iceberg/test/delete_loader_test.cc
@@ -169,6 +169,10 @@ TEST_F(DeleteLoaderTest, 
LoadPositionDeletesFiltersByDataFilePath) {
                                                          {"data_b.parquet", 
10},
                                                          {"data_b.parquet", 
20}});
 
+  // Mixed paths -> writer must NOT set the hint, forcing the loader's
+  // per-row filter path. Locks the routing in case the writer behavior 
changes.
+  ASSERT_FALSE(delete_file->referenced_data_file.has_value());
+
   std::vector<std::shared_ptr<DataFile>> files = {delete_file};
 
   // Load only positions for data_a.parquet
@@ -207,6 +211,34 @@ TEST_F(DeleteLoaderTest, 
LoadPositionDeletesSkipsMismatchedReferencedDataFile) {
   ASSERT_TRUE(result.value().IsEmpty());
 }
 
+TEST_F(DeleteLoaderTest, LoadPositionDeletesFastPathHonorsReferencedDataFile) {
+  // Single-file writes -> writer sets referenced_data_file -> loader takes
+  // the zero-copy fast path. Sized above the consumer's 64-element sniff
+  // threshold so the dispatcher's real coalesce/bulk logic runs end-to-end,
+  // not just the small-input shortcut covered by 
LoadPositionDeletesSingleFile.
+  constexpr int64_t kRowCount = 128;
+  std::vector<std::pair<std::string, int64_t>> deletes;
+  deletes.reserve(kRowCount);
+  for (int64_t i = 0; i < kRowCount; ++i) {
+    deletes.emplace_back("data.parquet", i);
+  }
+  auto delete_file = WritePositionDeletes("pos_deletes_fast_path.parquet", 
deletes);
+
+  ASSERT_TRUE(delete_file->referenced_data_file.has_value());
+  ASSERT_EQ(delete_file->referenced_data_file.value(), "data.parquet");
+
+  std::vector<std::shared_ptr<DataFile>> files = {delete_file};
+  auto result = loader_->LoadPositionDeletes(files, "data.parquet");
+  ASSERT_THAT(result, IsOk());
+
+  auto& index = result.value();
+  ASSERT_EQ(index.Cardinality(), kRowCount);
+  ASSERT_TRUE(index.IsDeleted(0));
+  ASSERT_TRUE(index.IsDeleted(kRowCount / 2));
+  ASSERT_TRUE(index.IsDeleted(kRowCount - 1));
+  ASSERT_FALSE(index.IsDeleted(kRowCount));
+}
+
 TEST_F(DeleteLoaderTest, LoadPositionDeletesRejectsDV) {
   auto dv_file = std::make_shared<DataFile>(DataFile{
       .content = DataFile::Content::kPositionDeletes,
diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build
index 6928ab82..e7f6165c 100644
--- a/src/iceberg/test/meson.build
+++ b/src/iceberg/test/meson.build
@@ -94,6 +94,7 @@ iceberg_tests = {
             'location_util_test.cc',
             'math_util_internal_test.cc',
             'position_delete_index_test.cc',
+            'position_delete_range_consumer_test.cc',
             'retry_util_test.cc',
             'roaring_position_bitmap_test.cc',
             'string_util_test.cc',
diff --git a/src/iceberg/test/position_delete_range_consumer_test.cc 
b/src/iceberg/test/position_delete_range_consumer_test.cc
new file mode 100644
index 00000000..5a58fa5a
--- /dev/null
+++ b/src/iceberg/test/position_delete_range_consumer_test.cc
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/deletes/position_delete_range_consumer.h"
+
+#include <cstdint>
+#include <limits>
+#include <set>
+#include <span>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/deletes/position_delete_index.h"
+#include "iceberg/deletes/roaring_position_bitmap.h"
+
+namespace iceberg {
+
+namespace {
+
+// Reference set: positions a per-pos `Delete(pos)` loop would accept.
+std::set<int64_t> ExpectedValidSet(const std::vector<int64_t>& positions) {
+  std::set<int64_t> expected;
+  for (int64_t pos : positions) {
+    if (pos >= 0 && pos <= RoaringPositionBitmap::kMaxPosition) {
+      expected.insert(pos);
+    }
+  }
+  return expected;
+}
+
+// Strict contents check: cardinality plus per-position membership.
+// Weaker checks would miss divergences at the 32-bit key boundary.
+void AssertMatchesBaseline(const std::vector<int64_t>& positions) {
+  PositionDeleteIndex index;
+  std::vector<uint32_t> scratch;
+  ForEachPositionDelete(std::span<const int64_t>(positions), index, scratch);
+  const auto expected = ExpectedValidSet(positions);
+  ASSERT_EQ(index.Cardinality(), static_cast<int64_t>(expected.size()))
+      << "input size=" << positions.size();
+  for (int64_t pos : expected) {
+    ASSERT_TRUE(index.IsDeleted(pos)) << "missing pos=" << pos;
+  }
+}
+
+}  // namespace
+
+TEST(PositionDeleteRangeConsumerTest, EmptySpan) { AssertMatchesBaseline({}); }
+
+TEST(PositionDeleteRangeConsumerTest, SinglePosition) { 
AssertMatchesBaseline({42}); }
+
+TEST(PositionDeleteRangeConsumerTest, FullyContiguousRunBecomesSingleRange) {
+  std::vector<int64_t> positions;
+  for (int64_t i = 100; i < 200; ++i) {
+    positions.push_back(i);
+  }
+  AssertMatchesBaseline(positions);
+}
+
+TEST(PositionDeleteRangeConsumerTest, AlternatingPositionsProduceNoCoalescing) 
{
+  std::vector<int64_t> positions;
+  for (int64_t i = 0; i < 50; ++i) {
+    positions.push_back(i * 2);
+  }
+  AssertMatchesBaseline(positions);
+}
+
+TEST(PositionDeleteRangeConsumerTest, MixedShortAndLongRuns) {
+  AssertMatchesBaseline({1, 2, 3, 7, 10, 11, 20, 30, 31, 32, 33, 34});
+}
+
+TEST(PositionDeleteRangeConsumerTest, UnsortedInputStillCorrect) {
+  AssertMatchesBaseline({10, 5, 11, 12, 4, 13, 100});
+}
+
+TEST(PositionDeleteRangeConsumerTest, DuplicatesAreIdempotent) {
+  AssertMatchesBaseline({5, 5, 5, 6, 6, 7});
+}
+
+TEST(PositionDeleteRangeConsumerTest, InvalidPositionsSilentlySkipped) {
+  // Invalids at the edges, mid-run, and mixed with valid contiguous runs
+  // must all be dropped without breaking coalescing around them. We stay
+  // well below `kMaxPosition` to avoid forcing the bitmap to resize its
+  // backing vector to ~2^31 empty containers.
+  AssertMatchesBaseline({std::numeric_limits<int64_t>::min(), -5, -4, 10, 11, 
-999, 12,
+                         13, RoaringPositionBitmap::kMaxPosition + 1,
+                         std::numeric_limits<int64_t>::max()});
+}
+
+TEST(PositionDeleteRangeConsumerTest, ContiguousRunAcrossKeyBoundary) {
+  // Pins `last_position + 1` and the adjacency check at a non-zero
+  // high-32 key. The coalesced run must survive the key transition.
+  constexpr int64_t kBoundary = int64_t{1} << 32;
+  std::vector<int64_t> positions;
+  for (int64_t i = kBoundary - 3; i < kBoundary + 3; ++i) {
+    positions.push_back(i);
+  }
+  AssertMatchesBaseline(positions);
+}
+
+TEST(PositionDeleteRangeConsumerTest, DispatcherAgreesAtBothDensities) {
+  // Above the sniff threshold at densities below and above the 10%
+  // cutoff. We can't observe the choice directly; agreement with the
+  // baseline is the contract.
+  std::vector<int64_t> low_density;
+  std::vector<int64_t> high_density;
+  int64_t lo = 0;
+  int64_t hi = 0;
+  for (int64_t i = 0; i < 2'048; ++i) {
+    low_density.push_back(lo);
+    ++lo;
+    if ((i + 1) % 20 == 0) {
+      lo += 5;
+    }
+    high_density.push_back(hi);
+    hi += ((i % 5 == 0) ? 5 : 1);
+  }
+  AssertMatchesBaseline(low_density);
+  AssertMatchesBaseline(high_density);
+}
+
+TEST(PositionDeleteRangeConsumerTest, DispatcherSkipsSniffOnSmallInputs) {
+  // Below the 64-element threshold the dispatcher bypasses the sniff.
+  // Exercise both a scattered tiny input (where bulk would win at large
+  // n) and a contiguous tiny input (the range path always wins).
+  std::vector<int64_t> scattered;
+  std::vector<int64_t> contiguous;
+  for (int64_t i = 0; i < 32; ++i) {
+    scattered.push_back(i * 100);
+    contiguous.push_back(i);
+  }
+  AssertMatchesBaseline(scattered);
+  AssertMatchesBaseline(contiguous);
+}
+
+TEST(PositionDeleteRangeConsumerTest, DispatcherAgreesAtThresholdBoundary) {
+  // The dispatcher selects the bulk path when
+  //   boundaries * 100 > (sniff - 1) * kBulkThresholdPercent
+  // With `sniff = 1024` and `kBulkThresholdPercent = 10`, the cutoff is
+  // 102.3 boundaries: 102 stays on coalesce, 103 flips to bulk. Both
+  // inputs must still produce the same cardinality and membership as
+  // the per-position baseline; this test guards against arithmetic
+  // regressions around the threshold constant.
+  auto build = [](int64_t target_boundaries) {
+    std::vector<int64_t> positions;
+    positions.reserve(1024);
+    int64_t pos = 0;
+    positions.push_back(pos);
+    for (int64_t i = 1; i < 1024; ++i) {
+      pos += (i <= target_boundaries) ? 2 : 1;
+      positions.push_back(pos);
+    }
+    return positions;
+  };
+  AssertMatchesBaseline(build(/*target_boundaries=*/102));
+  AssertMatchesBaseline(build(/*target_boundaries=*/103));
+}
+
+}  // namespace iceberg

Reply via email to