This is an automated email from the ASF dual-hosted git repository.
xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new fe98a6cc2 [Improvement] Hoist StructProjection.create out of
per-record loop in CombinedDeleteFilter (#4145)
fe98a6cc2 is described below
commit fe98a6cc28ebc620b5045051f0d8766ff0cd5d3e
Author: Jiwon Park <[email protected]>
AuthorDate: Sat Mar 28 02:49:01 2026 +0900
[Improvement] Hoist StructProjection.create out of per-record loop in
CombinedDeleteFilter (#4145)
* Hoist StructProjection.create out of per-record loop in
initializeBloomFilter
StructProjection is designed to be reused via .wrap(). Previously,
StructProjection.create(requiredSchema, deleteSchema) was called inside
the per-record forEach, creating N*M objects (records x schemas). Pre-build
a Map<Set<Integer>, StructProjection> once before iterating records, then
call .wrap(record) on the pre-built projection inside the loop.
---
.../amoro/io/reader/CombinedDeleteFilter.java | 10 +-
.../TestCombinedDeleteFilterStructProjection.java | 354 +++++++++++++++++++++
2 files changed, 358 insertions(+), 6 deletions(-)
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
index a18b2ab81..66a92fcb8 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java
@@ -288,22 +288,20 @@ public abstract class CombinedDeleteFilter<T extends
StructLike> {
BloomFilter.create(StructLikeFunnel.INSTANCE, dataRecordCnt, 0.001);
Map<Set<Integer>, InternalRecordWrapper> recordWrappers =
Maps.newHashMap();
+ Map<Set<Integer>, StructProjection> structProjections = Maps.newHashMap();
for (Map.Entry<Set<Integer>, Schema> deleteSchemaEntry :
deleteSchemaByDeleteIds.entrySet()) {
Set<Integer> ids = deleteSchemaEntry.getKey();
Schema deleteSchema = deleteSchemaEntry.getValue();
- InternalRecordWrapper internalRecordWrapper =
- new InternalRecordWrapper(deleteSchema.asStruct());
- recordWrappers.put(ids, internalRecordWrapper);
+ recordWrappers.put(ids, new
InternalRecordWrapper(deleteSchema.asStruct()));
+ structProjections.put(ids, StructProjection.create(requiredSchema,
deleteSchema));
}
try (CloseableIterable<Record> deletes = readRecords()) {
for (Record record : deletes) {
recordWrappers.forEach(
(ids, internalRecordWrapper) -> {
- Schema deleteSchema = deleteSchemaByDeleteIds.get(ids);
- StructProjection projection =
- StructProjection.create(requiredSchema,
deleteSchema).wrap(record);
+ StructProjection projection =
structProjections.get(ids).wrap(record);
StructLike deletePK = internalRecordWrapper.copyFor(projection);
bloomFilter.put(deletePK);
});
diff --git
a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestCombinedDeleteFilterStructProjection.java
b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestCombinedDeleteFilterStructProjection.java
new file mode 100644
index 000000000..73d2cbd85
--- /dev/null
+++
b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestCombinedDeleteFilterStructProjection.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.io;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.catalog.BasicCatalogTestHelper;
+import org.apache.amoro.catalog.TableTestBase;
+import org.apache.amoro.io.reader.CombinedDeleteFilter;
+import org.apache.amoro.io.reader.DeleteCache;
+import org.apache.amoro.io.reader.GenericCombinedIcebergDataReader;
+import org.apache.amoro.optimizing.RewriteFilesInput;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IdentityPartitionConverters;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.types.TypeUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+/**
+ * Tests for StructProjection hoisting in
CombinedDeleteFilter.initializeBloomFilter().
+ *
+ * <p>Verifies that hoisting StructProjection.create() outside the per-record
loop (reusing the
+ * projection object via .wrap()) produces correct bloom filter initialization
and equality-delete
+ * filtering results, including the multiple-delete-schema scenario.
+ */
+@RunWith(Parameterized.class)
+public class TestCombinedDeleteFilterStructProjection extends TableTestBase {
+
+ private final FileFormat fileFormat;
+
+ @Parameterized.Parameters(name = "fileFormat = {0}")
+ public static Object[][] parameters() {
+ return new Object[][] {{FileFormat.PARQUET}, {FileFormat.AVRO},
{FileFormat.ORC}};
+ }
+
+ public TestCombinedDeleteFilterStructProjection(FileFormat fileFormat) {
+ super(
+ new BasicCatalogTestHelper(TableFormat.ICEBERG),
+ new BasicTableTestHelper(false, false,
buildTableProperties(fileFormat)));
+ this.fileFormat = fileFormat;
+ System.setProperty(DeleteCache.DELETE_CACHE_ENABLED, "false");
+ }
+
+ private static Map<String, String> buildTableProperties(FileFormat
fileFormat) {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(3);
+ props.put(TableProperties.FORMAT_VERSION, "2");
+ props.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name());
+ props.put(TableProperties.DELETE_DEFAULT_FILE_FORMAT, fileFormat.name());
+ return props;
+ }
+
+ /**
+ * Lower the bloom-filter threshold so the filter is activated with a small
dataset (< 3 data
+ * records) while still having > threshold eq-delete records.
+ */
+ private static final long BLOOM_TRIGGER = 2L;
+
+ @Before
+ public void resetBloomFilterThreshold() {
+ CombinedDeleteFilter.FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT = BLOOM_TRIGGER;
+ }
+
+ //
---------------------------------------------------------------------------
+ // helpers
+ //
---------------------------------------------------------------------------
+
+ private OutputFileFactory outputFileFactory() {
+ return OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1)
+ .format(fileFormat)
+ .build();
+ }
+
+ private DataFile writeDataFile(List<Record> records) throws IOException {
+ return FileHelpers.writeDataFile(
+ getMixedTable().asUnkeyedTable(),
+
outputFileFactory().newOutputFile(TestHelpers.Row.of()).encryptingOutputFile(),
+ TestHelpers.Row.of(),
+ records);
+ }
+
+ private DeleteFile writeEqDeleteFile(List<Record> records, Schema
deleteSchema)
+ throws IOException {
+ return FileHelpers.writeDeleteFile(
+ getMixedTable().asUnkeyedTable(),
+
outputFileFactory().newOutputFile(TestHelpers.Row.of()).encryptingOutputFile(),
+ TestHelpers.Row.of(),
+ records,
+ deleteSchema);
+ }
+
+ private GenericCombinedIcebergDataReader buildReader(RewriteFilesInput
input) {
+ return new GenericCombinedIcebergDataReader(
+ getMixedTable().io(),
+ getMixedTable().schema(),
+ getMixedTable().spec(),
+ getMixedTable().asUnkeyedTable().encryption(),
+ null,
+ false,
+ IdentityPartitionConverters::convertConstant,
+ false,
+ null,
+ input,
+ "");
+ }
+
+ //
---------------------------------------------------------------------------
+ // Test 1: bloom filter is active and correctly identifies equality-deleted
rows
+ // with a single delete schema (hoisted StructProjection reuse path)
+ //
---------------------------------------------------------------------------
+
+ /**
+ * Verifies that with the bloom-filter path active, records matched by
equality-delete files are
+ * filtered out and the remainder survives — exercising the hoisted
StructProjection code.
+ */
+ @Test
+ public void testBloomFilterWithHoistedProjection_singleDeleteSchema() throws
IOException {
+ // 3 data rows: id=1,2,3
+ DataFile dataFile =
+ writeDataFile(
+ Arrays.asList(
+ MixedDataTestHelpers.createRecord(1, "alice", 10L,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(2, "bob", 20L,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(3, "carol", 30L,
"1970-01-01T08:00:00")));
+
+ // eq-delete on id: delete id=1 and id=2 (> BLOOM_TRIGGER records so
filter activates)
+ Schema idSchema = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA,
Sets.newHashSet(1));
+ GenericRecord idRec = GenericRecord.create(idSchema);
+ List<Record> deleteRecords = new ArrayList<>();
+ deleteRecords.add(idRec.copy("id", 1));
+ deleteRecords.add(idRec.copy("id", 2));
+ deleteRecords.add(idRec.copy("id", 99)); // extra to push count above
threshold
+ DeleteFile eqDeleteFile = writeEqDeleteFile(deleteRecords, idSchema);
+
+ RewriteFilesInput input =
+ new RewriteFilesInput(
+ new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile,
1L)},
+ new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile,
1L)},
+ new DeleteFile[] {},
+ new DeleteFile[]
{MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile, 2L)},
+ getMixedTable());
+
+ GenericCombinedIcebergDataReader reader = buildReader(input);
+ Assert.assertTrue("Bloom filter should be active",
reader.getDeleteFilter().isFilterEqDelete());
+
+ try (CloseableIterable<Record> surviving = reader.readData()) {
+ List<Record> result = Lists.newArrayList(surviving);
+ Assert.assertEquals("Only id=3 should survive", 1, result.size());
+ Assert.assertEquals(3, result.get(0).get(0));
+ }
+
+ try (CloseableIterable<Record> deleted = reader.readDeletedData()) {
+ Assert.assertEquals(
+ "id=1 and id=2 should be reported as deleted", 2,
Iterables.size(deleted));
+ }
+
+ reader.close();
+ }
+
+ //
---------------------------------------------------------------------------
+ // Test 2: multiple delete schemas — the per-schema StructProjection loop in
+ // initializeBloomFilter must wrap() each record against each schema
+ //
---------------------------------------------------------------------------
+
+ /**
+ * Uses two equality-delete files whose schemas differ (id-only vs id+name).
Both must be put into
+ * the bloom filter correctly so that applyEqDeletesForSchema can later
verify membership.
+ */
+ @Test
+ public void testBloomFilterWithHoistedProjection_multipleDeleteSchemas()
throws IOException {
+ DataFile dataFile =
+ writeDataFile(
+ Arrays.asList(
+ MixedDataTestHelpers.createRecord(1, "alice", 10L,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(2, "bob", 20L,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(3, "carol", 30L,
"1970-01-01T08:00:00")));
+
+ // Schema A: delete by id only
+ Schema idSchema = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA,
Sets.newHashSet(1));
+ GenericRecord idRec = GenericRecord.create(idSchema);
+ List<Record> deleteByIdRecords = new ArrayList<>();
+ IntStream.rangeClosed(1, 3).forEach(id ->
deleteByIdRecords.add(idRec.copy("id", id)));
+ DeleteFile eqDeleteById = writeEqDeleteFile(deleteByIdRecords, idSchema);
+
+ // Schema B: delete by id + name (different schema → separate bloom-filter
projection)
+ Schema idNameSchema = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA,
Sets.newHashSet(1, 2));
+ GenericRecord idNameRec = GenericRecord.create(idNameSchema);
+ List<Record> deleteByIdNameRecords = new ArrayList<>();
+ IntStream.rangeClosed(1, 3)
+ .forEach(
+ id ->
+ deleteByIdNameRecords.add(
+ idNameRec.copy("id", id, "name", id == 1 ? "alice" :
"other")));
+ DeleteFile eqDeleteByIdName = writeEqDeleteFile(deleteByIdNameRecords,
idNameSchema);
+
+ RewriteFilesInput input =
+ new RewriteFilesInput(
+ new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile,
1L)},
+ new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile,
1L)},
+ new DeleteFile[] {},
+ new DeleteFile[] {
+ MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteById, 2L),
+ MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteByIdName, 3L)
+ },
+ getMixedTable());
+
+ GenericCombinedIcebergDataReader reader = buildReader(input);
+ Assert.assertTrue("Bloom filter should be active",
reader.getDeleteFilter().isFilterEqDelete());
+
+ // id=1,2,3 are all deleted by eqDeleteById; none should survive
+ try (CloseableIterable<Record> surviving = reader.readData()) {
+ Assert.assertEquals("All records should be deleted", 0,
Iterables.size(surviving));
+ }
+
+ try (CloseableIterable<Record> deleted = reader.readDeletedData()) {
+ Assert.assertEquals("All 3 rows should appear as deleted", 3,
Iterables.size(deleted));
+ }
+
+ reader.close();
+ }
+
+ //
---------------------------------------------------------------------------
+ // Test 3: verify that the bloom filter does NOT wrongly exclude rows that
are
+ // present in the data but absent from the eq-delete files
+ //
---------------------------------------------------------------------------
+
+ /**
+ * Ensures false-negative freedom: records NOT covered by any
equality-delete survive even when
+ * the bloom filter path is active (i.e. the hoisted StructProjection wraps
records faithfully).
+ */
+ @Test
+ public void testBloomFilterWithHoistedProjection_noFalseNegatives() throws
IOException {
+ DataFile dataFile =
+ writeDataFile(
+ Arrays.asList(
+ MixedDataTestHelpers.createRecord(10, "diana", 100L,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(20, "eve", 200L,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(30, "frank", 300L,
"1970-01-01T08:00:00")));
+
+ // eq-delete on id: only id=10 is deleted; insert extra entries to exceed
bloom threshold
+ Schema idSchema = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA,
Sets.newHashSet(1));
+ GenericRecord idRec = GenericRecord.create(idSchema);
+ List<Record> deleteRecords = new ArrayList<>();
+ deleteRecords.add(idRec.copy("id", 10));
+ // pad to exceed BLOOM_TRIGGER
+ deleteRecords.add(idRec.copy("id", 999));
+ deleteRecords.add(idRec.copy("id", 9999));
+ DeleteFile eqDeleteFile = writeEqDeleteFile(deleteRecords, idSchema);
+
+ RewriteFilesInput input =
+ new RewriteFilesInput(
+ new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile,
1L)},
+ new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile,
1L)},
+ new DeleteFile[] {},
+ new DeleteFile[]
{MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile, 2L)},
+ getMixedTable());
+
+ GenericCombinedIcebergDataReader reader = buildReader(input);
+ Assert.assertTrue("Bloom filter should be active",
reader.getDeleteFilter().isFilterEqDelete());
+
+ try (CloseableIterable<Record> surviving = reader.readData()) {
+ List<Record> result = Lists.newArrayList(surviving);
+ Assert.assertEquals("id=20 and id=30 should survive", 2, result.size());
+ }
+
+ reader.close();
+ }
+
+ //
---------------------------------------------------------------------------
+ // Test 4: bloom filter inactive (below threshold) — non-bloom code path
+ // should also work correctly after the refactor
+ //
---------------------------------------------------------------------------
+
+ /**
+ * Resets the threshold above the delete-record count so the bloom filter is
not activated.
+ * Confirms the non-bloom code path still correctly applies equality deletes.
+ */
+ @Test
+ public void testEqualityDeleteWithoutBloomFilter() throws IOException {
+ // Set threshold high so bloom filter is NOT activated
+ CombinedDeleteFilter.FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT = 1_000_000L;
+
+ DataFile dataFile =
+ writeDataFile(
+ Arrays.asList(
+ MixedDataTestHelpers.createRecord(1, "alice", 10L,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(2, "bob", 20L,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(3, "carol", 30L,
"1970-01-01T08:00:00")));
+
+ Schema idSchema = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA,
Sets.newHashSet(1));
+ GenericRecord idRec = GenericRecord.create(idSchema);
+ DeleteFile eqDeleteFile =
+ writeEqDeleteFile(Collections.singletonList(idRec.copy("id", 2)),
idSchema);
+
+ RewriteFilesInput input =
+ new RewriteFilesInput(
+ new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile,
1L)},
+ new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile,
1L)},
+ new DeleteFile[] {},
+ new DeleteFile[]
{MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile, 2L)},
+ getMixedTable());
+
+ GenericCombinedIcebergDataReader reader = buildReader(input);
+ Assert.assertFalse(
+ "Bloom filter should NOT be active",
reader.getDeleteFilter().isFilterEqDelete());
+
+ try (CloseableIterable<Record> surviving = reader.readData()) {
+ List<Record> result = Lists.newArrayList(surviving);
+ Assert.assertEquals("id=1 and id=3 should survive", 2, result.size());
+ }
+
+ reader.close();
+ }
+}