hudi-agent commented on code in PR #19006:
URL: https://github.com/apache/hudi/pull/19006#discussion_r3411552626
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java:
##########
@@ -260,6 +260,77 @@ void testFileListingWithRecordLevelIndex(String
recordFields, ColumnStatsProbe p
assertThat(fileSlices.size(), is(expectedCnt));
}
+ @ParameterizedTest
+ @MethodSource("filtersAndResults")
+ void testFileListingWithPartitionedRecordLevelIndex(String recordFields,
ColumnStatsProbe probe, int maxKeyCnt, int expectedCnt) throws Exception {
+ DataType dataType = TestConfigurations.ROW_DATA_TYPE_WITH_ATOMIC_TYPES;
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath(), dataType);
+ conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+ conf.set(FlinkOptions.METADATA_ENABLED, true);
+ conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, recordFields);
+ conf.set(FlinkOptions.READ_DATA_SKIPPING_RLI_KEYS_MAX_NUM, maxKeyCnt);
Review Comment:
🤖 Naming-wise this reads as a partitioned-RLI test, but only
`RECORD_LEVEL_INDEX_ENABLE_PROP` is set —
`HoodieRecordIndex.IS_PARTITIONED_OPTION` isn't toggled, so
`BaseRecordLevelIndex.create(...)` will pick `GlobalRecordLevelIndex` (the
existing behavior), not the new `RecordLevelIndex`. Is that intended, or should
this test also flip the index option so the new partitioned path actually runs
end-to-end here? The mock-based tests do cover it, but it would be nice to have
an integration-level check too.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/BaseRecordLevelIndex.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HoodieDataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.record.HoodieRecordIndex;
+import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.util.StreamerUtil;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Base index support that leverages Record Level Index to prune file slices.
+ */
+@Slf4j
+public abstract class BaseRecordLevelIndex implements FlinkMetadataIndex {
+ private static final long serialVersionUID = 1L;
+
+ private final String basePath;
+ protected final Configuration conf;
+ protected final List<String> hoodieKeysFromFilter;
+ private final HoodieTableMetaClient metaClient;
+ private HoodieTableMetadata metadataTable;
+
+ @VisibleForTesting
+ BaseRecordLevelIndex(
+ String basePath,
+ Configuration conf,
+ HoodieTableMetaClient metaClient,
+ List<String> hoodieKeysFromFilter) {
+ this.basePath = basePath;
+ this.conf = conf;
+ this.metaClient = metaClient;
+ this.hoodieKeysFromFilter = hoodieKeysFromFilter;
+ }
+
+ @Override
+ public String getIndexPartitionName() {
+ return HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX;
+ }
+
+ @Override
+ public boolean isIndexAvailable() {
+ return metaClient.getTableConfig().isMetadataTableAvailable()
+ &&
metaClient.getTableConfig().getMetadataPartitions().contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX);
+ }
+
+ public HoodieTableMetadata getMetadataTable() {
+ // initialize the metadata table lazily
+ if (this.metadataTable == null) {
+ this.metadataTable =
metaClient.getTableFormat().getMetadataFactory().create(
+ HoodieFlinkEngineContext.DEFAULT,
+ metaClient.getStorage(),
+ StreamerUtil.metadataConfig(conf),
+ basePath);
+ }
+ return this.metadataTable;
+ }
+
+ public List<FileSlice> computeCandidateFileSlices(List<FileSlice>
fileSlices) {
+ if (!isIndexAvailable()) {
+ return fileSlices;
+ }
+
+ try {
Review Comment:
🤖 The previous `RecordLevelIndex.computeCandidateFileSlices` had a `finally
{ recordIndexData.unpersistWithDependencies(); }` cleanup. After the refactor
the `HoodiePairData` produced by `readRecordIndexLocationsWithKeys` (one per
candidate partition in the partitioned variant) is no longer unpersisted. For
the Flink list-backed implementation this is a no-op, so likely harmless today,
but was the cleanup dropped intentionally? If the metadata read ever returns a
Spark/RDD-backed pair data in some path it could re-introduce the original leak.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java:
##########
@@ -75,6 +93,167 @@ public class TestRecordLevelIndex {
.notNull();
private static final RowType ROW_TYPE_MULTI_KEYS = (RowType)
ROW_DATA_TYPE_MULTI_KEYS.getLogicalType();
+ @Test
+ void testPartitionedRliPrunesMultiplePartitions() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Arrays.asList(
+ fileSlice("par1", "file1"),
+ fileSlice("par1", "file2"),
+ fileSlice("par2", "file1"),
+ fileSlice("par2", "file2"));
+ when(metadataTable.readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par1"))))
+ .thenReturn(HoodieListPairData.eager(Collections.singletonList(
+ Pair.of("id1", new HoodieRecordGlobalLocation("par1", "001",
"file1")))));
+ when(metadataTable.readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par2"))))
+ .thenReturn(HoodieListPairData.eager(Collections.singletonList(
+ Pair.of("id2", new HoodieRecordGlobalLocation("par2", "001",
"file2")))));
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertEquals(Arrays.asList(fileSlices.get(0), fileSlices.get(3)), result);
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par1")));
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par2")));
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any());
+ }
+
+ @Test
+ void testPartitionedRliPrunesAtPartitionThreshold() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Arrays.asList(
+ fileSlice("par1", "file1"),
+ fileSlice("par2", "file2"),
+ fileSlice("par3", "file3"));
+ when(metadataTable.readRecordIndexLocationsWithKeys(any(), any()))
+ .thenAnswer(invocation -> {
+ String partition = invocation.<Option<String>>getArgument(1).get();
+ return HoodieListPairData.eager(Collections.singletonList(
+ Pair.of("id1", new HoodieRecordGlobalLocation(
+ partition, "001", "file" + partition.substring(3)))));
+ });
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertEquals(fileSlices, result);
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par1")));
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par2")));
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par3")));
+ }
+
+ @Test
+ void testPartitionedRliSkipsPruningAbovePartitionThreshold() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Arrays.asList(
+ fileSlice("par1", "file1"),
+ fileSlice("par2", "file2"),
+ fileSlice("par3", "file3"),
+ fileSlice("par4", "file4"));
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertSame(fileSlices, result);
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any(),
any());
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any());
+ }
+
+ @Test
+ void testPartitionedRliFallsBackWhenPartitionLookupFails() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Arrays.asList(
+ fileSlice("par1", "file1"),
+ fileSlice("par2", "file2"));
+ when(metadataTable.readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par1"))))
+ .thenReturn(HoodieListPairData.eager(Collections.singletonList(
+ Pair.of("id1", new HoodieRecordGlobalLocation("par1", "001",
"file1")))));
+ when(metadataTable.readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par2"))))
+ .thenThrow(new RuntimeException("lookup failure"));
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertSame(fileSlices, result);
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par1")));
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par2")));
+ }
+
+ @Test
+ void testPartitionedRliSkipsPruningForNoCandidateFiles() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Collections.emptyList();
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertSame(fileSlices, result);
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any(),
any());
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any());
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {0, -1})
+ void testRejectsNonPositivePartitionLookupThreshold(int maxPartitions) {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.READ_DATA_SKIPPING_RLI_PARTITIONS_MAX_NUM,
maxPartitions);
+
+ assertThrows(IllegalArgumentException.class,
+ () -> new RecordLevelIndex("", conf, metaClient,
Collections.singletonList("id1")));
+ }
+
+ @Test
+ void testGlobalRliMatchesPartitionAndFileId() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ GlobalRecordLevelIndex recordLevelIndex =
createGlobalRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Arrays.asList(
+ fileSlice("par1", "file1"),
+ fileSlice("par2", "file1"));
+ when(metadataTable.readRecordIndexLocationsWithKeys(any()))
+ .thenReturn(HoodieListPairData.eager(Collections.singletonList(
+ Pair.of("id1", new HoodieRecordGlobalLocation("par1", "001",
"file1")))));
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertEquals(Collections.singletonList(fileSlices.get(0)), result);
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any());
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any(),
any());
+ }
+
+ private RecordLevelIndex createRecordLevelIndex(HoodieTableMetadata
metadataTable) {
+ mockAvailableRecordIndex();
+ RecordLevelIndex recordLevelIndex = spy(
+ new RecordLevelIndex("", new Configuration(), metaClient,
Collections.singletonList("id1")));
+
lenient().doReturn(metadataTable).when(recordLevelIndex).getMetadataTable();
+ return recordLevelIndex;
+ }
+
+ private GlobalRecordLevelIndex
createGlobalRecordLevelIndex(HoodieTableMetadata metadataTable) {
+ mockAvailableRecordIndex();
+ GlobalRecordLevelIndex recordLevelIndex = spy(
+ new GlobalRecordLevelIndex("", new Configuration(), metaClient,
Collections.singletonList("id1")));
+
lenient().doReturn(metadataTable).when(recordLevelIndex).getMetadataTable();
+ return recordLevelIndex;
+ }
+
+ private void mockAvailableRecordIndex() {
+ when(metaClient.getTableConfig()).thenReturn(tableConfig);
+ when(tableConfig.isMetadataTableAvailable()).thenReturn(true);
+
when(tableConfig.getMetadataPartitions()).thenReturn(Collections.singleton(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX));
+ }
+
+ private static HoodieIndexDefinition indexDefinition(boolean partitioned) {
Review Comment:
🤖 nit: `indexDefinition(boolean partitioned)` doesn't appear to be called by
any of the new or pre-existing tests — could you remove it (and the
`HoodieIndexDefinition` import) to keep dead code out?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java:
##########
@@ -75,6 +93,167 @@ public class TestRecordLevelIndex {
.notNull();
private static final RowType ROW_TYPE_MULTI_KEYS = (RowType)
ROW_DATA_TYPE_MULTI_KEYS.getLogicalType();
+ @Test
+ void testPartitionedRliPrunesMultiplePartitions() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Arrays.asList(
+ fileSlice("par1", "file1"),
+ fileSlice("par1", "file2"),
+ fileSlice("par2", "file1"),
+ fileSlice("par2", "file2"));
+ when(metadataTable.readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par1"))))
+ .thenReturn(HoodieListPairData.eager(Collections.singletonList(
+ Pair.of("id1", new HoodieRecordGlobalLocation("par1", "001",
"file1")))));
+ when(metadataTable.readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par2"))))
+ .thenReturn(HoodieListPairData.eager(Collections.singletonList(
+ Pair.of("id2", new HoodieRecordGlobalLocation("par2", "001",
"file2")))));
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertEquals(Arrays.asList(fileSlices.get(0), fileSlices.get(3)), result);
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par1")));
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par2")));
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any());
+ }
+
+ @Test
+ void testPartitionedRliPrunesAtPartitionThreshold() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Arrays.asList(
+ fileSlice("par1", "file1"),
+ fileSlice("par2", "file2"),
+ fileSlice("par3", "file3"));
+ when(metadataTable.readRecordIndexLocationsWithKeys(any(), any()))
+ .thenAnswer(invocation -> {
+ String partition = invocation.<Option<String>>getArgument(1).get();
+ return HoodieListPairData.eager(Collections.singletonList(
+ Pair.of("id1", new HoodieRecordGlobalLocation(
+ partition, "001", "file" + partition.substring(3)))));
+ });
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertEquals(fileSlices, result);
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par1")));
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par2")));
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par3")));
+ }
+
+ @Test
+ void testPartitionedRliSkipsPruningAbovePartitionThreshold() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Arrays.asList(
+ fileSlice("par1", "file1"),
+ fileSlice("par2", "file2"),
+ fileSlice("par3", "file3"),
+ fileSlice("par4", "file4"));
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertSame(fileSlices, result);
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any(),
any());
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any());
+ }
+
+ @Test
+ void testPartitionedRliFallsBackWhenPartitionLookupFails() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Arrays.asList(
+ fileSlice("par1", "file1"),
+ fileSlice("par2", "file2"));
+ when(metadataTable.readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par1"))))
+ .thenReturn(HoodieListPairData.eager(Collections.singletonList(
+ Pair.of("id1", new HoodieRecordGlobalLocation("par1", "001",
"file1")))));
+ when(metadataTable.readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par2"))))
+ .thenThrow(new RuntimeException("lookup failure"));
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertSame(fileSlices, result);
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par1")));
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any(),
eq(Option.of("par2")));
+ }
+
+ @Test
+ void testPartitionedRliSkipsPruningForNoCandidateFiles() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Collections.emptyList();
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertSame(fileSlices, result);
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any(),
any());
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any());
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {0, -1})
+ void testRejectsNonPositivePartitionLookupThreshold(int maxPartitions) {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.READ_DATA_SKIPPING_RLI_PARTITIONS_MAX_NUM,
maxPartitions);
+
+ assertThrows(IllegalArgumentException.class,
+ () -> new RecordLevelIndex("", conf, metaClient,
Collections.singletonList("id1")));
+ }
+
+ @Test
+ void testGlobalRliMatchesPartitionAndFileId() {
+ HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+ GlobalRecordLevelIndex recordLevelIndex =
createGlobalRecordLevelIndex(metadataTable);
+ List<FileSlice> fileSlices = Arrays.asList(
+ fileSlice("par1", "file1"),
+ fileSlice("par2", "file1"));
+ when(metadataTable.readRecordIndexLocationsWithKeys(any()))
+ .thenReturn(HoodieListPairData.eager(Collections.singletonList(
+ Pair.of("id1", new HoodieRecordGlobalLocation("par1", "001",
"file1")))));
+
+ List<FileSlice> result =
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+ assertEquals(Collections.singletonList(fileSlices.get(0)), result);
+ verify(metadataTable).readRecordIndexLocationsWithKeys(any());
+ verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any(),
any());
+ }
+
+ private RecordLevelIndex createRecordLevelIndex(HoodieTableMetadata
metadataTable) {
+ mockAvailableRecordIndex();
+ RecordLevelIndex recordLevelIndex = spy(
+ new RecordLevelIndex("", new Configuration(), metaClient,
Collections.singletonList("id1")));
Review Comment:
🤖 nit: `lenient()` is needed here because some callers (e.g.
`testPartitionedRliSkipsPruningAbovePartitionThreshold`) never reach
`getMetadataTable()`. Have you considered splitting `createRecordLevelIndex`
into two helpers — one that stubs the metadata table and one that doesn't — so
strict stubbing can catch accidentally unused stubs?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java:
##########
@@ -704,6 +703,7 @@ public static void assertRowsEquals(List<Row> rows, String
expected, boolean wit
return rowStr;
}
})
+ .sorted()
Review Comment:
🤖 Changing the sort key from `toStringSafely(row.getField(0))` to the
formatted row string is a global behavior change for `assertRowsEquals`. When
`withChangeFlag` is true the leading `+I`/`-D` prefix dominates the comparison,
so rows are now grouped by change-flag first rather than by id. Was this needed
only for the new partitioned test, or did existing change-log tests already
match this ordering? Worth a sanity-check that no prior assertions were
silently masked by the new ordering.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/BaseRecordLevelIndex.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HoodieDataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.record.HoodieRecordIndex;
+import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.util.StreamerUtil;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Base index support that leverages Record Level Index to prune file slices.
+ */
+@Slf4j
+public abstract class BaseRecordLevelIndex implements FlinkMetadataIndex {
+ private static final long serialVersionUID = 1L;
+
+ private final String basePath;
+ protected final Configuration conf;
+ protected final List<String> hoodieKeysFromFilter;
+ private final HoodieTableMetaClient metaClient;
+ private HoodieTableMetadata metadataTable;
+
+ @VisibleForTesting
+ BaseRecordLevelIndex(
+ String basePath,
+ Configuration conf,
+ HoodieTableMetaClient metaClient,
+ List<String> hoodieKeysFromFilter) {
+ this.basePath = basePath;
+ this.conf = conf;
+ this.metaClient = metaClient;
+ this.hoodieKeysFromFilter = hoodieKeysFromFilter;
+ }
+
+ @Override
+ public String getIndexPartitionName() {
+ return HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX;
+ }
+
+ @Override
+ public boolean isIndexAvailable() {
+ return metaClient.getTableConfig().isMetadataTableAvailable()
+ &&
metaClient.getTableConfig().getMetadataPartitions().contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX);
+ }
+
+ public HoodieTableMetadata getMetadataTable() {
+ // initialize the metadata table lazily
+ if (this.metadataTable == null) {
+ this.metadataTable =
metaClient.getTableFormat().getMetadataFactory().create(
+ HoodieFlinkEngineContext.DEFAULT,
+ metaClient.getStorage(),
+ StreamerUtil.metadataConfig(conf),
+ basePath);
+ }
+ return this.metadataTable;
+ }
+
+ public List<FileSlice> computeCandidateFileSlices(List<FileSlice>
fileSlices) {
+ if (!isIndexAvailable()) {
+ return fileSlices;
+ }
+
+ try {
+ Option<Set<HoodieFileGroupId>> candidateFileGroupIds =
+ lookupCandidateFileGroupIds(fileSlices);
+ return candidateFileGroupIds.map(candidates -> fileSlices.stream()
+ .filter(fileSlice -> candidates.contains(fileSlice.getFileGroupId()))
+ .collect(Collectors.toList()))
+ .orElse(fileSlices);
+ } catch (Throwable e) {
+ log.error("Failed to read metadata index: {} for data skipping",
getIndexPartitionName(), e);
+ return fileSlices;
+ }
+ }
+
+ protected abstract Option<Set<HoodieFileGroupId>>
lookupCandidateFileGroupIds(List<FileSlice> fileSlices);
+
+ protected static Set<HoodieFileGroupId> getFileGroupIds(
+ HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData) {
+ List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations =
+ HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
+ return recordIndexLocations.stream()
+ .map(pair -> new HoodieFileGroupId(pair.getValue().getPartitionPath(),
pair.getValue().getFileId()))
+ .collect(Collectors.toSet());
+ }
+
+ public static Option<BaseRecordLevelIndex> create(
+ String basePath,
+ Configuration conf,
+ HoodieTableMetaClient metaClient,
+ List<ExpressionEvaluators.Evaluator> evaluators,
+ RowType rowType) {
+ if (evaluators.isEmpty() ||
!FlinkOptions.QUERY_TYPE_SNAPSHOT.equalsIgnoreCase(conf.get(FlinkOptions.QUERY_TYPE)))
{
+ return Option.empty();
+ }
+ if (metaClient == null) {
+ metaClient = StreamerUtil.createMetaClient(conf);
+ }
+ // disallow RLI for new encoding with complex key gen when the table
version is lower than NINE.
+ if
(KeyGenUtils.mayUseNewEncodingForComplexKeyGen(metaClient.getTableConfig())) {
+ return Option.empty();
+ }
+
+ String[] recordKeyFields =
metaClient.getTableConfig().getRecordKeyFields().orElse(new String[0]);
+ if (recordKeyFields.length == 0) {
+ log.warn("The table do not have record keys, skipping the rli pruning.");
+ return Option.empty();
+ }
+ boolean consistentLogicalTimestampEnabled =
OptionsResolver.isConsistentLogicalTimestampEnabled(conf);
+ List<String> hoodieKeysFromFilter = computeHoodieKeyFromFilters(conf,
metaClient, evaluators, recordKeyFields, rowType,
consistentLogicalTimestampEnabled);
+ if (hoodieKeysFromFilter.isEmpty()) {
+ log.warn("The number of keys from query predicate is empty, skipping the
rli pruning.");
+ return Option.empty();
+ }
+ int maxKeyNum = conf.get(FlinkOptions.READ_DATA_SKIPPING_RLI_KEYS_MAX_NUM);
+ if (hoodieKeysFromFilter.size() > maxKeyNum) {
+ log.warn("The number of keys from query predicate: {} exceeds the upper
threshold: {}, skipping the rli pruning, the keys: {}",
+ hoodieKeysFromFilter.size(), maxKeyNum, hoodieKeysFromFilter);
+ return Option.empty();
+ }
+ if
(metaClient.getIndexForMetadataPartition(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX).isEmpty())
{
+ return Option.empty();
+ }
+ boolean isPartitioned =
HoodieRecordIndex.isPartitioned(metaClient.getIndexForMetadataPartition(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX).get());
+ return Option.of(isPartitioned
Review Comment:
🤖 nit: `getIndexForMetadataPartition(PARTITION_NAME_RECORD_INDEX)` is called
twice — once on line 171 to check `.isEmpty()` and again here to call `.get()`.
Could you store it in a local variable (e.g. `Option<HoodieIndexDefinition>
indexOpt = metaClient.getIndexForMetadataPartition(...)`) and reuse it?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]