Baunsgaard commented on code in PR #16052: URL: https://github.com/apache/iceberg/pull/16052#discussion_r3272647001
########## core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** Coalesces consecutive position deletes into range inserts on a {@link PositionDeleteIndex}. */ +public final class PositionDeleteRangeConsumer { Review Comment: While the position delete files are spec-sorted, the implementation I made is agnostic to the sortedness of the input. This would allow engines to have any iterable or `long[]` list of indexes they want to delete, and flush them through this API to quickly get the `PositionDeleteIndex` updated. To further justify the addition, I also wired the merging of two `PositionDeleteIndex`es through the same accumulator. Both `PositionDeleteIndex.merge(default)` and the `else` branch in `BitmapPositionDeleteIndex.merge` now delegate to a new `PositionDeleteRangeConsumer.forEach( source, target)` overload, so the per-position fallback gets the range-coalescing fast path. The new helper is exercised by every caller of `PositionDeleteIndex.merge`, which today includes: - `PositionDeleteIndexUtil.merge` — used by `BaseDeleteLoader.getOrReadPosDeletes` on the cached multi-file V2 read path (the sibling you flagged below). - `BaseDVFileWriter.delete(path, index, ...)` and `BaseDVFileWriter.close()` — V3 DV writer, bulk-add and merge-with-previous on finalize. - `SortingPositionOnlyDeleteWriter.complete()` — V2 writer, merge-with-previous before sort+write. Honest caveat: in production today the source is almost always another `BitmapPositionDeleteIndex`, so the `bitmap.setAll` fast path is taken and our new helper is not invoked. The win is defensive since any non-bitmap source (custom engine impls, test doubles, future implementations) now gets coalescing for free, and we have one implementation of the fallback instead of two copies of `that.forEach(this::delete)`. But just to be accommodating, I added a class-level comment to name the V2 workload, and to make it explicit that V3 DVs bypass this class via `PositionDeleteIndex.deserialize`. When V2 position delete files are no longer read, this class can retire with them. ########## core/src/main/java/org/apache/iceberg/deletes/Deletes.java: ########## @@ -177,7 +177,7 @@ private static PositionDeleteIndex toPositionIndex( CloseableIterable<Long> posDeletes, List<DeleteFile> files) { try (CloseableIterable<Long> deletes = posDeletes) { PositionDeleteIndex positionDeleteIndex = new BitmapPositionDeleteIndex(files); - deletes.forEach(positionDeleteIndex::delete); + PositionDeleteRangeConsumer.forEach(deletes, positionDeleteIndex); Review Comment: Good point, the cached path is indeed the more-trafficked of the two for engines that don't bundle `iceberg-arrow`. The Arrow PR (#16440) only registers `PositionDeleteIndexReader` for Parquet, so: - Flink reads of position delete files - Avro / ORC position delete files in any engine …all still flow through `Deletes.toPositionIndexes` to `index.delete(position)` per row. I've wired `Deletes.toPositionIndexes` through `PositionDeleteRangeConsumer.forEach`, grouping records by adjacent `file_path` with a `PeekingIterator`. The fast path now kicks in per data-file group, and out-of-order revisits append correctly into the existing bitmap. To keep autoboxing out of the inner loop on this many-path shape I added a package-private `forEach(PrimitiveIterator.OfLong, target)` overload and let `drainPositionsForPath` return that primitive iterator directly. Covered by `TestDeletesToPositionIndexes` (empty / single path / multi-path / out-of-order revisit / mixed `String`/`Utf8` / sparse / `DeleteFile` propagation / close-failure → `UncheckedIOException`) and exercised end-to-end by `TestGenericReaderDeletes`. ########## core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** Coalesces consecutive position deletes into range inserts on a {@link PositionDeleteIndex}. */ +public final class PositionDeleteRangeConsumer { Review Comment: It is, and makes sense to be, for faster combining of PositionDeleteIndexes. However, the public surface I suggested was wider than the design warrants. I reshaped it based on what the Arrow stack on PR #16440 actually needs. The public surface is now exactly three functions, and some static forEach overloads. ``` new PositionDeleteRangeConsumer(target); consumer.acceptAll(long[] positions, int from, int to); consumer.flush(); public static void forEach(Iterable<Long> positions, PositionDeleteIndex target); public static void forEach(PositionDeleteIndex source, PositionDeleteIndex target); ``` ########## core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** Coalesces consecutive position deletes into range inserts on a {@link PositionDeleteIndex}. */ +public final class PositionDeleteRangeConsumer { + + /** + * Batch size for {@link #forEach}. Sized to fit comfortably in L1 (512 bytes). Smaller buffers + * miss the bulk-path branch elision; larger buffers add allocation cost without improving the + * inner-loop throughput (the {@code acceptAll} body is the same regardless of slice length). + */ + private static final int FOREACH_BATCH_SIZE = 64; + + private PositionDeleteRangeConsumer() {} + + /** + * Drains {@code positions} into a {@link RangeAccumulator} and flushes. + * + * <p>Boxed positions are buffered into a small primitive slice and then handed to {@link + * RangeAccumulator#acceptAll(long[], int, int)}, which keeps the sniff/escape state machine out + * of the inner loop. Compared to per-element {@link RangeAccumulator#accept(long)}, this gives a + * ~12% reduction in run-to-run time on dense inputs and -- more importantly -- removes the JIT + * inlining sensitivity that produces a 2 ms standard deviation on the per-element path. + */ + public static void forEach(Iterable<Long> positions, PositionDeleteIndex target) { + RangeAccumulator acc = new RangeAccumulator(target); + long[] buffer = new long[FOREACH_BATCH_SIZE]; + int filled = 0; + for (Long pos : positions) { + buffer[filled++] = pos; + if (filled == FOREACH_BATCH_SIZE) { + acc.acceptAll(buffer, 0, FOREACH_BATCH_SIZE); + filled = 0; + } + } + if (filled > 0) { + acc.acceptAll(buffer, 0, filled); + } + acc.flush(); + } + + /** + * Coalesces consecutive positions into range deletes on the target index. The first {@value + * #SNIFF_SIZE} positions are inspected; if more than {@value #BOUNDARY_THRESHOLD_PERCENT}% cross + * gaps, the accumulator falls back to per-position deletes for the rest of its life. + * + * <p>Single-threaded; one instance per target index. Callers that already have positions in a + * primitive {@code long[]} should call {@link #acceptAll(long[], int, int)} directly -- the bulk + * path keeps the state-machine dispatch out of the inner loop. {@link #accept(long)} exists for + * truly streaming callers that do not buffer; {@link PositionDeleteRangeConsumer#forEach} is the + * standard entry for boxed iterables and handles its own small primitive batching internally. + */ + public static final class RangeAccumulator { + + private static final int SNIFF_SIZE = 256; + private static final int BOUNDARY_THRESHOLD_PERCENT = 30; + + private final PositionDeleteIndex target; + private boolean hasRun; + private long rangeStart; + private long lastPosition; + + private int processed; + private int boundaries; + private boolean escaped; + + public RangeAccumulator(PositionDeleteIndex target) { + Preconditions.checkArgument(target != null, "Invalid target index: null"); + this.target = target; + } + + public void accept(long pos) { + if (escaped) { + target.delete(pos); + return; + } + if (!hasRun) { + initRun(pos); + return; + } + coalesceSniff(pos); + if (processed == SNIFF_SIZE && shouldEscape()) { + enterEscape(); + } + } + + /** + * Bulk variant of {@link #accept(long)}. Runs the entire sniff/coalesce loop inside this method + * so the per-element work in steady state is identical to the original tight inline loop -- one + * gap-check branch and one position store, with no per-call frame. The small private helpers + * are inlined by HotSpot on the hot path. + */ + public void acceptAll(long[] positions, int from, int to) { + Preconditions.checkArgument(positions != null, "Invalid positions array: null"); + Preconditions.checkPositionIndexes(from, to, positions.length); + if (from >= to) { + return; + } + + int cursor = from; + + if (escaped) { + drainEscaped(positions, cursor, to); + return; + } + + if (!hasRun) { + initRun(positions[cursor++]); + } + + while (cursor < to && processed < SNIFF_SIZE) { + coalesceSniff(positions[cursor++]); + } + + if (processed == SNIFF_SIZE && shouldEscape()) { + enterEscape(); + drainEscaped(positions, cursor, to); + return; + } + + while (cursor < to) { + coalesce(positions[cursor++]); + } + } + + /** Emits the active run, if any. The escape decision is sticky across flushes. */ + public void flush() { + if (hasRun) { + emit(); + hasRun = false; + } + } + + /** Starts a new active run anchored at {@code first}. */ + private void initRun(long first) { + rangeStart = first; + lastPosition = first; + hasRun = true; + processed = 1; + } + + /** Extends the active run with {@code pos} during sniffing; counts gaps to inform escape. */ + private void coalesceSniff(long pos) { + if (pos - lastPosition != 1) { + boundaries++; + emit(); + rangeStart = pos; + } + lastPosition = pos; + processed++; + } + + /** Extends the active run with {@code pos} after sniffing has decided not to escape. */ + private void coalesce(long pos) { + if (pos - lastPosition != 1) { + emit(); + rangeStart = pos; + } + lastPosition = pos; + } + + /** True if the sniffed prefix has too many gaps to make coalescing worthwhile. */ + private boolean shouldEscape() { + return boundaries * 100 > (SNIFF_SIZE - 1) * BOUNDARY_THRESHOLD_PERCENT; Review Comment: > position delete files are spec-sorted by (file_path, pos), so within a single file the stream is monotonic, at which point a greedy pos == lastEnd + 1 check gets us coalescing without the sniff window, boundary counter, or escape mode. The consumer is intentionally agnostic to input sortedness — the goal is a single primitive that any caller (V2 file reader, the `PositionDeleteIndex.merge` callsites we now route through here, future query paths) can drop a batch of positions into without having to know that the producer happened to emit them in order. Relying on the V2 spec for correctness would make this a file-format helper rather than a general utility. > did you measure that shape? my intuition is the 2% sparse regression is mostly the sniff cost itself, not per-element branching so if that holds, we keep the dense win and drop the two magic constants. wdyt? I'd like that to be true, but the numbers say otherwise. I added a `GreedyRangeConsumer` (same coalescing logic, no sniff/escape, no boundary counter) and ran the full distribution sweep: | distribution | prod `acceptAll` | greedy `acceptAll` | Δ% | prod `forEach` (boxed) | greedy `forEach` (boxed) | Δ% | |---|---:|---:|---:|---:|---:|---:| | FULL | 2.24 ± 0.44 | 2.25 ± 0.24 | +0.6 % | 9.17 ± 2.01 | 6.36 ± 0.42 | −30.7 % * | | MEDIUM | 6.28 ± 0.11 | 5.90 ± 0.10 | −6.1 % | 13.61 ± 1.53 | 12.48 ± 0.58 | −8.3 % | | SHORT | 41.28 ± 0.17 | 42.30 ± 1.71 | +2.5 % | 48.41 ± 2.47 | 48.93 ± 0.39 | +1.1 % | | SPARSE_95 | 36.72 ± 3.49 | 36.64 ± 0.35 | −0.2 % | 45.43 ± 3.11 | 44.98 ± 0.48 | −1.0 % | | **SPARSE_50** | **48.88 ± 4.80** | **81.59 ± 9.62** | **+66.9 %** | **60.95 ± 14.99** | **91.74 ± 0.42** | **+50.5 %** | | SPARSE_5 | 16.69 ± 0.79 | 16.33 ± 4.57 | −2.2 % | 23.97 ± 1.54 | 23.57 ± 0.16 | −1.7 % | | NONE | 43.82 ± 2.86 | 44.76 ± 1.01 | +2.2 % | 44.38 ± 0.72 | 39.36 ± 0.19 | −11.3 % | SPARSE_50 (alternating consecutive/gap — roughly "every other row deleted in a file") regresses 1.5–1.7× under greedy. The cost isn't sniff overhead; it's that every gap forces greedy to emit the previous singleton via `target.delete(rangeStart)` and re-anchor a fresh run. The adaptive escape avoids that by detecting the gap density during the first 256 positions and falling through to the per-position `target.delete(pos)` path, which doesn't pay the `rangeStart = pos` write. The other shapes are within 8 %, so the greedy wins don't offset the SPARSE_50 cost. I'd rather keep the two constants than give up that ratio on a plausible workload. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
