This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4ffad38863f0 feat: Support data skipping based on record index for
flink reader (#17490)
4ffad38863f0 is described below
commit 4ffad38863f03e753e9f9a6fc5bcfbc79a125ae5
Author: Shuo Cheng <[email protected]>
AuthorDate: Wed Dec 17 10:39:07 2025 +0800
feat: Support data skipping based on record index for flink reader (#17490)
---
.../client/common/HoodieFlinkEngineContext.java | 11 +-
.../apache/hudi/configuration/FlinkOptions.java | 12 +
.../org/apache/hudi/sink/bulk/RowDataKeyGen.java | 2 +-
.../apache/hudi/source/ExpressionEvaluators.java | 16 +
.../java/org/apache/hudi/source/FileIndex.java | 34 ++-
.../apache/hudi/source/prune/ColumnStatsProbe.java | 4 +
.../apache/hudi/source/prune/PartitionPruners.java | 16 +-
.../apache/hudi/source/stats/ColumnStatsIndex.java | 9 +-
.../apache/hudi/source/stats/FileStatsIndex.java | 30 +-
...lumnStatsIndex.java => FlinkMetadataIndex.java} | 30 +-
.../hudi/source/stats/PartitionStatsIndex.java | 6 +
.../apache/hudi/source/stats/RecordLevelIndex.java | 277 +++++++++++++++++
.../org/apache/hudi/table/HoodieTableSource.java | 99 +++---
.../java/org/apache/hudi/source/TestFileIndex.java | 216 +++++++++++++
.../hudi/source/TestIncrementalInputSplits.java | 3 +-
.../hudi/source/stats/TestRecordLevelIndex.java | 338 +++++++++++++++++++++
.../apache/hudi/table/ITTestHoodieDataSource.java | 30 ++
.../apache/hudi/table/TestHoodieTableSource.java | 7 +-
.../org/apache/hudi/utils/TestConfigurations.java | 22 ++
.../test/java/org/apache/hudi/utils/TestData.java | 8 +
20 files changed, 1068 insertions(+), 102 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index edbdc1ef0703..e993d3ccfa52 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -37,9 +37,11 @@ import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ClosableSortingIterator;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -236,7 +238,14 @@ public class HoodieFlinkEngineContext extends
HoodieEngineContext {
SerializableFunction<Iterator<V>, Iterator<R>> processFunc,
List<K> keySpace,
boolean preservesPartitioning) {
- throw new UnsupportedOperationException("processKeyGroups() is not
supported in FlinkEngineContext");
+ // Group values by key and apply the function to each group in parallel
+ List<Iterable<V>> groupedValues =
data.groupByKey().values().collectAsList();
+ // Process each group in parallel using parallel stream
+ List<R> results = executeParallelStream(
+ groupedValues.parallelStream(),
+ stream -> stream.map(values ->
throwingMapWrapper(processFunc).apply(new
ClosableSortingIterator<>(values.iterator()))),
+
groupedValues.size()).flatMap(CollectionUtils::toStream).collect(Collectors.toList());
+ return HoodieListData.eager(results);
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 7ba63278497e..960e45f16f7e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -417,6 +417,18 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Enables data-skipping allowing queries to leverage
indexes to reduce the search space by "
+ "skipping over files");
+ @AdvancedConfig
+ public static final ConfigOption<Integer>
READ_DATA_SKIPPING_RLI_KEYS_MAX_NUM = ConfigOptions
+ .key("read.data.skipping.rli.keys.max.num")
+ .intType()
+ .defaultValue(8)
+ .withDescription("Record Level index statistics will be read from
metadata table (MDT) for data skipping optimization,\n"
+ + "and currently the index statistics are collected by a single
process. This config is used to constrain the maximum \n"
+ + " number of hoodie keys that can be read from MDT without
sacrificing any performance. If the number of hoodie keys from query\n"
+ + "predicate is greater than the maximum value, the query will
fallback to skip the record level index filtering.\n"
+ + "E.g., given query: SELECT * FROM T WHERE `uuid` IN
(1,2,3,4,5,6,7,8,9), the number of hoodie keys is 9, and\n"
+ + "the maximum value is 8, so the source will not perform record
level index filtering.");
+
// ------------------------------------------------------------------------
// Write Options
// ------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
index 8f0a213040a7..866e0ffae773 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
@@ -221,7 +221,7 @@ public class RowDataKeyGen implements Serializable {
}
// reference: org.apache.hudi.keygen.KeyGenUtils.getRecordKey
- public static String getRecordKey(Object recordKeyValue, String
recordKeyField,boolean consistentLogicalTimestampEnabled) {
+ public static String getRecordKey(Object recordKeyValue, String
recordKeyField, boolean consistentLogicalTimestampEnabled) {
recordKeyValue = getTimestampValue(consistentLogicalTimestampEnabled,
recordKeyValue);
String recordKey = StringUtils.objToString(recordKeyValue);
if (recordKey == null || recordKey.isEmpty()) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionEvaluators.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionEvaluators.java
index d6a472b2b7ad..be51a1391f4b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionEvaluators.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionEvaluators.java
@@ -194,6 +194,10 @@ public class ExpressionEvaluators {
"Can not find column " + this.name);
return columnStats;
}
+
+ public String getName() {
+ return this.name;
+ }
}
/**
@@ -243,6 +247,10 @@ public class ExpressionEvaluators {
}
return compare(maxVal, val, type) >= 0;
}
+
+ public Object getVal() {
+ return this.val;
+ }
}
/**
@@ -424,6 +432,10 @@ public class ExpressionEvaluators {
public void bindVals(Object... vals) {
this.vals = vals;
}
+
+ public Object[] getVals() {
+ return this.vals;
+ }
}
/**
@@ -521,6 +533,10 @@ public class ExpressionEvaluators {
this.evaluators = evaluators;
return this;
}
+
+ public Evaluator[] getEvaluators() {
+ return this.evaluators;
+ }
}
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index adae1d8df614..d44695b7cbd8 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -23,12 +23,14 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.source.stats.FileStatsIndex;
+import org.apache.hudi.source.stats.RecordLevelIndex;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.util.StreamerUtil;
@@ -57,7 +59,7 @@ import java.util.stream.Collectors;
*
* <p>It caches the partition paths to avoid redundant look up.
*/
-public class FileIndex implements Serializable {
+public class FileIndex implements Serializable, AutoCloseable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(FileIndex.class);
@@ -66,11 +68,12 @@ public class FileIndex implements Serializable {
private final boolean tableExists;
private final HoodieMetadataConfig metadataConfig;
private final org.apache.hadoop.conf.Configuration hadoopConf;
- private final PartitionPruners.PartitionPruner partitionPruner; // for
partition pruning
+ private final Option<PartitionPruners.PartitionPruner> partitionPruner; //
for partition pruning
private final ColumnStatsProbe colStatsProbe; // for
probing column stats
private final Function<String, Integer> partitionBucketIdFunc; // for
bucket pruning
private List<String> partitionPaths; // cache of
partition paths
private final FileStatsIndex fileStatsIndex; // for data
skipping
+ private final Option<RecordLevelIndex> recordLevelIndex;
private final HoodieTableMetaClient metaClient;
private FileIndex(
@@ -86,9 +89,11 @@ public class FileIndex implements Serializable {
this.tableExists = StreamerUtil.tableExists(path.toString(), hadoopConf);
this.metadataConfig = StreamerUtil.metadataConfig(conf);
this.colStatsProbe =
isDataSkippingFeasible(conf.get(FlinkOptions.READ_DATA_SKIPPING_ENABLED)) ?
colStatsProbe : null;
- this.partitionPruner = partitionPruner;
+ this.partitionPruner = Option.ofNullable(partitionPruner);
this.fileStatsIndex = new FileStatsIndex(path.toString(), rowType, conf,
metaClient);
this.partitionBucketIdFunc = partitionBucketIdFunc;
+ List<ExpressionEvaluators.Evaluator> evaluators =
Option.ofNullable(colStatsProbe).map(ColumnStatsProbe::getEvaluators).orElse(Collections.emptyList());
+ this.recordLevelIndex = RecordLevelIndex.create(path.toString(), conf,
metaClient, evaluators, rowType);
this.metaClient = metaClient;
}
@@ -184,8 +189,15 @@ public class FileIndex implements Serializable {
return Collections.emptyList();
}
+ // data skipping based on record index
+ if (recordLevelIndex.isPresent()) {
+ int prevSize = filteredFileSlices.size();
+ filteredFileSlices =
recordLevelIndex.get().computeCandidateFileSlices(filteredFileSlices);
+ logPruningMsg(prevSize, filteredFileSlices.size(), "record level index
pruning");
+ }
+
// data skipping based on column stats
- List<String> allFiles =
fileSlices.stream().map(FileSlice::getAllFileNames).flatMap(List::stream).collect(Collectors.toList());
+ List<String> allFiles =
filteredFileSlices.stream().map(FileSlice::getAllFileNames).flatMap(List::stream).collect(Collectors.toList());
Set<String> candidateFiles =
fileStatsIndex.computeCandidateFiles(colStatsProbe, allFiles);
if (candidateFiles == null) {
// no need to filter by col stats or error occurs.
@@ -237,12 +249,7 @@ public class FileIndex implements Serializable {
}
List<String> allPartitionPaths = this.tableExists ?
FSUtils.getAllPartitionPaths(new HoodieFlinkEngineContext(hadoopConf),
metaClient, metadataConfig)
: Collections.emptyList();
- if (this.partitionPruner == null) {
- this.partitionPaths = allPartitionPaths;
- } else {
- Set<String> prunedPartitionPaths =
this.partitionPruner.filter(allPartitionPaths);
- this.partitionPaths = new ArrayList<>(prunedPartitionPaths);
- }
+ this.partitionPaths = partitionPruner.map(pruner ->
pruner.filter(allPartitionPaths).stream().collect(Collectors.toList())).orElse(allPartitionPaths);
return this.partitionPaths;
}
@@ -278,6 +285,13 @@ public class FileIndex implements Serializable {
return (total - left) / total;
}
+ @Override
+ public void close() {
+ this.fileStatsIndex.close();
+ this.recordLevelIndex.ifPresent(RecordLevelIndex::close);
+ this.partitionPruner.ifPresent(PartitionPruners.PartitionPruner::close);
+ }
+
// -------------------------------------------------------------------------
// Inner class
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/ColumnStatsProbe.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/ColumnStatsProbe.java
index a4b481940027..11a71b9c576e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/ColumnStatsProbe.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/ColumnStatsProbe.java
@@ -73,6 +73,10 @@ public class ColumnStatsProbe implements Serializable {
return referencedCols;
}
+ public List<ExpressionEvaluators.Evaluator> getEvaluators() {
+ return evaluators;
+ }
+
@Nullable
public static ColumnStatsProbe newInstance(List<ResolvedExpression> filters)
{
if (filters.isEmpty()) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
index f80ba96bda88..492117515a59 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
@@ -50,12 +50,16 @@ import java.util.stream.Stream;
*/
public class PartitionPruners {
- public interface PartitionPruner extends Serializable {
+ public interface PartitionPruner extends Serializable, AutoCloseable {
/**
* Applies partition pruning on the given partition list, return remained
partitions.
*/
Set<String> filter(Collection<String> partitions);
+
+ default void close() {
+ // do nothing.
+ }
}
/**
@@ -163,6 +167,11 @@ public class PartitionPruners {
}
return
partitions.stream().filter(candidatePartitions::contains).collect(Collectors.toSet());
}
+
+ @Override
+ public void close() {
+ this.partitionStatsIndex.close();
+ }
}
/**
@@ -183,6 +192,11 @@ public class PartitionPruners {
}
return new HashSet<>(partitions);
}
+
+ @Override
+ public void close() {
+ pruners.forEach(PartitionPruner::close);
+ }
}
public static Builder builder() {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
index 1d73e167e33e..d54df6dc13ff 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
@@ -20,7 +20,6 @@ package org.apache.hudi.source.stats;
import org.apache.hudi.source.prune.ColumnStatsProbe;
-import java.io.Serializable;
import java.util.List;
import java.util.Set;
@@ -28,13 +27,7 @@ import java.util.Set;
* Base support that leverages Metadata Table's indexes, such as Column Stats
Index
* and Partition Stats Index, to prune files and partitions.
*/
-public interface ColumnStatsIndex extends Serializable {
-
- /**
- * Returns the partition name of the index.
- */
- String getIndexPartitionName();
-
+public interface ColumnStatsIndex extends FlinkMetadataIndex {
/**
* Computes the filtered files with given candidates.
*
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
index b19f4dcb155e..2af046d33300 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
@@ -84,7 +84,7 @@ public class FileStatsIndex implements ColumnStatsIndex {
private final RowType rowType;
private final String basePath;
private final Configuration conf;
- private HoodieTableMetaClient metaClient;
+ protected HoodieTableMetaClient metaClient;
private HoodieTableMetadata metadataTable;
public FileStatsIndex(
@@ -103,28 +103,34 @@ public class FileStatsIndex implements ColumnStatsIndex {
return HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
}
+ @Override
+ public boolean isIndexAvailable() {
+ return getMetaClient().getTableConfig().isMetadataTableAvailable()
+ &&
getMetaClient().getTableConfig().getMetadataPartitions().contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS);
+ }
+
public HoodieTableMetadata getMetadataTable() {
// initialize the metadata table lazily
if (this.metadataTable == null) {
- initMetaClient();
- this.metadataTable =
metaClient.getTableFormat().getMetadataFactory().create(
+ this.metadataTable =
getMetaClient().getTableFormat().getMetadataFactory().create(
HoodieFlinkEngineContext.DEFAULT,
- metaClient.getStorage(),
+ getMetaClient().getStorage(),
StreamerUtil.metadataConfig(conf),
basePath);
}
return this.metadataTable;
}
- private void initMetaClient() {
+ protected HoodieTableMetaClient getMetaClient() {
if (this.metaClient == null) {
this.metaClient = StreamerUtil.createMetaClient(conf);
}
+ return this.metaClient;
}
@Override
public Set<String> computeCandidateFiles(ColumnStatsProbe probe,
List<String> allFiles) {
- if (probe == null) {
+ if (probe == null || !isIndexAvailable()) {
return null;
}
try {
@@ -414,4 +420,16 @@ public class FileStatsIndex implements ColumnStatsIndex {
).collect(Collectors.toList());
return projectNestedColStatsColumns(rows);
}
+
+ @Override
+ public void close() {
+ if (this.metadataTable == null) {
+ return;
+ }
+ try {
+ this.metadataTable.close();
+ } catch (Exception e) {
+ throw new HoodieException("Exception happened during close metadata
table.", e);
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FlinkMetadataIndex.java
similarity index 50%
copy from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
copy to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FlinkMetadataIndex.java
index 1d73e167e33e..0911d43ede10 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FlinkMetadataIndex.java
@@ -18,40 +18,20 @@
package org.apache.hudi.source.stats;
-import org.apache.hudi.source.prune.ColumnStatsProbe;
-
import java.io.Serializable;
-import java.util.List;
-import java.util.Set;
/**
- * Base support that leverages Metadata Table's indexes, such as Column Stats
Index
- * and Partition Stats Index, to prune files and partitions.
+ * Base support that leverages Metadata Table's indexes, such as Column Stats
Index,
+ * Partition Stats Index, Record Level Index to prune files, file slice and
partitions.
*/
-public interface ColumnStatsIndex extends Serializable {
-
+public interface FlinkMetadataIndex extends Serializable, AutoCloseable {
/**
* Returns the partition name of the index.
*/
String getIndexPartitionName();
/**
- * Computes the filtered files with given candidates.
- *
- * @param columnStatsProbe The utility to filter the column stats metadata.
- * @param allFile The file name list of the candidate files.
- *
- * @return The set of filtered file names
- */
- Set<String> computeCandidateFiles(ColumnStatsProbe columnStatsProbe,
List<String> allFile);
-
- /**
- * Computes the filtered partition paths with given candidates.
- *
- * @param columnStatsProbe The utility to filter the column stats metadata.
- * @param allPartitions The <strong>relative</strong> partition path list
of the candidate partitions.
- *
- * @return The set of filtered relative partition paths
+ * Returns whether the metadata partition is available for the current table;
*/
- Set<String> computeCandidatePartitions(ColumnStatsProbe columnStatsProbe,
List<String> allPartitions);
+ boolean isIndexAvailable();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/PartitionStatsIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/PartitionStatsIndex.java
index 1facae5671d9..4ab47c7ef28f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/PartitionStatsIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/PartitionStatsIndex.java
@@ -50,6 +50,12 @@ public class PartitionStatsIndex extends FileStatsIndex {
return HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS;
}
+ @Override
+ public boolean isIndexAvailable() {
+ return getMetaClient().getTableConfig().isMetadataTableAvailable()
+ &&
getMetaClient().getTableConfig().getMetadataPartitions().contains(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS);
+ }
+
@Override
public Set<String> computeCandidateFiles(ColumnStatsProbe probe,
List<String> allFiles) {
throw new UnsupportedOperationException("This method is not supported by "
+ this.getClass().getSimpleName());
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
new file mode 100644
index 000000000000..2b0d21baf047
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HoodieDataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * An index support implementation that leverages Record Level Index to prune
file slices.
+ */
+public class RecordLevelIndex implements FlinkMetadataIndex {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(RecordLevelIndex.class);
+
+ private final String basePath;
+ private final Configuration conf;
+ private final List<String> hoodieKeysFromFilter;
+ private final HoodieTableMetaClient metaClient;
+ private HoodieTableMetadata metadataTable;
+
+ private RecordLevelIndex(
+ String basePath,
+ Configuration conf,
+ HoodieTableMetaClient metaClient,
+ List<String> hoodieKeysFromFilter) {
+ this.basePath = basePath;
+ this.conf = conf;
+ this.metaClient = metaClient;
+ this.hoodieKeysFromFilter = hoodieKeysFromFilter;
+ }
+
+ @Override
+ public String getIndexPartitionName() {
+ return HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX;
+ }
+
+ @Override
+ public boolean isIndexAvailable() {
+ return metaClient.getTableConfig().isMetadataTableAvailable()
+ &&
metaClient.getTableConfig().getMetadataPartitions().contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX);
+ }
+
+ public HoodieTableMetadata getMetadataTable() {
+ // initialize the metadata table lazily
+ if (this.metadataTable == null) {
+ this.metadataTable =
metaClient.getTableFormat().getMetadataFactory().create(
+ HoodieFlinkEngineContext.DEFAULT,
+ metaClient.getStorage(),
+ StreamerUtil.metadataConfig(conf),
+ basePath);
+ }
+ return this.metadataTable;
+ }
+
+ public List<FileSlice> computeCandidateFileSlices(List<FileSlice>
fileSlices) {
+ if (!isIndexAvailable()) {
+ return fileSlices;
+ }
+ HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
+
getMetadataTable().readRecordIndexLocationsWithKeys(HoodieListData.eager(hoodieKeysFromFilter));
+ try {
+ List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations =
HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
+ Set<String> fileIds = recordIndexLocations.stream()
+ .map(pair ->
pair.getValue().getFileId()).collect(Collectors.toSet());
+ return fileSlices.stream().filter(fileSlice ->
fileIds.contains(fileSlice.getFileId())).collect(Collectors.toList());
+ } finally {
+ // Clean up the RDD to avoid memory leaks
+ recordIndexData.unpersistWithDependencies();
+ }
+ }
+
+ public static Option<RecordLevelIndex> create(
+ String basePath,
+ Configuration conf,
+ HoodieTableMetaClient metaClient,
+ List<ExpressionEvaluators.Evaluator> evaluators,
+ RowType rowType) {
+ if (evaluators.isEmpty() ||
!FlinkOptions.QUERY_TYPE_SNAPSHOT.equalsIgnoreCase(conf.get(FlinkOptions.QUERY_TYPE)))
{
+ return Option.empty();
+ }
+ if (metaClient == null) {
+ metaClient = StreamerUtil.createMetaClient(conf);
+ }
+ // disallow RLI for new encoding with complex key gen when the table
version is lower than NINE.
+ if
(KeyGenUtils.mayUseNewEncodingForComplexKeyGen(metaClient.getTableConfig())) {
+ return Option.empty();
+ }
+
+ String[] recordKeyFields =
metaClient.getTableConfig().getRecordKeyFields().orElse(new String[0]);
+ if (recordKeyFields.length == 0) {
+ LOG.warn("The table do not have record keys, skipping the rli pruning.");
+ return Option.empty();
+ }
+ boolean consistentLogicalTimestampEnabled =
OptionsResolver.isConsistentLogicalTimestampEnabled(conf);
+ List<String> hoodieKeysFromFilter = computeHoodieKeyFromFilters(conf,
metaClient, evaluators, recordKeyFields, rowType,
consistentLogicalTimestampEnabled);
+ if (hoodieKeysFromFilter.isEmpty()) {
+ LOG.warn("The number of keys from query predicate is empty, skipping the
rli pruning.");
+ return Option.empty();
+ }
+ int maxKeyNum = conf.get(FlinkOptions.READ_DATA_SKIPPING_RLI_KEYS_MAX_NUM);
+ if (hoodieKeysFromFilter.size() > maxKeyNum) {
+ LOG.warn("The number of keys from query predicate: {} exceeds the upper
threshold: {}, skipping the rli pruning, the keys: {}",
+ hoodieKeysFromFilter.size(), maxKeyNum, hoodieKeysFromFilter);
+ return Option.empty();
+ }
+ return Option.of(new RecordLevelIndex(basePath, conf, metaClient,
hoodieKeysFromFilter));
+ }
+
+ /**
+ * Given query filters, it filters the EqualTo, IN and OR queries on record
key columns and
+ * returns the list of record key literals present in the query, for example:
+ * <p>
+ * filter1: `key1` = 'val1', returns {"val1"}
+ * filter2: `key1` in ('val1', 'val2', 'val3'), returns {"val1", "vale",
"val3"}
+ * filter3: `key1` = 'val1' OR `key1` = 'val2' or `key1` = 'val3', returns
{"val1", "vale", "val3"}
+ * filter4: `key1` = 'val1' AND `key2` in ('val2', 'val3'), returns
{"key1:val1,key2:val2", "key1:val1,key2:val3"}
+ */
+ @VisibleForTesting
+ public static List<String> computeHoodieKeyFromFilters(
+ Configuration conf,
+ HoodieTableMetaClient metaClient,
+ List<ExpressionEvaluators.Evaluator> evaluators,
+ String[] keyFields,
+ RowType rowType,
+ boolean consistentLogicalTimestampEnabled) {
+ String[] partitionFields =
metaClient.getTableConfig().getPartitionFields().orElse(new String[0]);
+ // align with the check logic in RowDataKeyGen
+ boolean isComplexRecordKey = keyFields.length > 1 ||
partitionFields.length > 1 &&
!OptionsResolver.useComplexKeygenNewEncoding(conf);
+ List<String> hoodieKeys = new ArrayList<>();
+ List<String> fieldNames = rowType.getFieldNames();
+ for (String keyField: keyFields) {
+ List<String> recordKeys = new ArrayList<>();
+ LogicalType fieldType = rowType.getTypeAt(fieldNames.indexOf(keyField));
+ for (ExpressionEvaluators.Evaluator evaluator: evaluators) {
+ // if there exists multiple ref fields in an evaluator, ignore this
evaluator, e.g., key = 'key1' or age = 20
+ List<Object> literals = collectLiterals(evaluator, keyField);
+ literals.forEach(val -> recordKeys.add(isComplexRecordKey
+ ? keyField + KeyGenerator.DEFAULT_COLUMN_VALUE_SEPARATOR +
normalizeLiteral(val, keyField, fieldType, consistentLogicalTimestampEnabled)
+ : normalizeLiteral(val, keyField, fieldType,
consistentLogicalTimestampEnabled)));
+ }
+ if (recordKeys.isEmpty()) {
+ LOG.info("No literals found for the record key: {}, therefore
filtering can not be performed", keyField);
+ return Collections.emptyList();
+ } else if (!isComplexRecordKey || hoodieKeys.isEmpty()) {
+ hoodieKeys = recordKeys;
+ } else {
+ // Combine literals for this configured record key with literals for
the other configured record keys
+ // If there are two literals for rk1, rk2, rk3 each. A total of 8
combinations will be generated
+ List<String> tmpHoodieKeys = new ArrayList<>();
+ for (String compositeKey: hoodieKeys) {
+ for (String recordKey: recordKeys) {
+ tmpHoodieKeys.add(compositeKey +
KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + recordKey);
+ }
+ }
+ hoodieKeys = tmpHoodieKeys;
+ }
+ }
+ return hoodieKeys;
+ }
+
+ /**
+ * Collect literal values for record key fields from the predicate.
+ */
+ private static List<Object> collectLiterals(ExpressionEvaluators.Evaluator
evaluator, String refName) {
+ if (evaluator instanceof ExpressionEvaluators.LeafEvaluator
+ && !((ExpressionEvaluators.LeafEvaluator)
evaluator).getName().equalsIgnoreCase(refName)) {
+ return Collections.emptyList();
+ }
+ if (evaluator instanceof ExpressionEvaluators.EqualTo) {
+ Object valueLiteral = ((ExpressionEvaluators.EqualTo)
evaluator).getVal();
+ return valueLiteral == null ? Collections.emptyList() :
Collections.singletonList(valueLiteral);
+ } else if (evaluator instanceof ExpressionEvaluators.In) {
+ Object[] valueLiterals = ((ExpressionEvaluators.In) evaluator).getVals();
+ if (valueLiterals.length < 1 ||
Arrays.stream(valueLiterals).anyMatch(Objects::isNull)) {
+ return Collections.emptyList();
+ }
+ return Arrays.stream(valueLiterals).collect(Collectors.toList());
+ } else if (evaluator instanceof ExpressionEvaluators.Or) {
+ List<List<Object>> literalsList =
Arrays.stream(((ExpressionEvaluators.Or) evaluator).getEvaluators())
+ .map(eval -> collectLiterals(eval,
refName)).collect(Collectors.toList());
+ // if any child expr do not contain predicate on the key, just return
empty list
+ if (literalsList.stream().anyMatch(List::isEmpty)) {
+ return Collections.emptyList();
+ }
+ return
literalsList.stream().flatMap(List::stream).distinct().collect(Collectors.toList());
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ /**
+ * Normalize literal values before used to get record index locations.
+ */
+ private static String normalizeLiteral(Object value, String keyField,
LogicalType fieldType, boolean consistentLogicalTimestampEnabled) {
+ switch (fieldType.getTypeRoot()) {
+ case DECIMAL:
+ // the scale of decimal data in predicate may not be aligned with that
in record index, padding 0 if necessary,
+ // e.g., 1.11 with target scale 5, return 1.11000
+ BigDecimal decimal = (BigDecimal) value;
+ int targetScale = ((DecimalType) fieldType).getScale();
+ value = decimal.scale() >= targetScale ? value :
decimal.setScale(targetScale);
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ // the original value is extracted from literal by
ExpressionUtils#getValueFromLiteral, which is epoch millis
+ // convert it back to TimestampData before reusing key generating
logic in RowDataKeyGen.
+ value = TimestampData.fromEpochMillis((Long) value);
+ break;
+ default:
+ break;
+ }
+ // to align with the hoodie key generating logic in writer side.
+ return RowDataKeyGen.getRecordKey(value, keyField,
consistentLogicalTimestampEnabled);
+ }
+
+ @Override
+ public void close() {
+ if (this.metadataTable == null) {
+ return;
+ }
+ try {
+ this.metadataTable.close();
+ } catch (Exception e) {
+ throw new HoodieException("Exception happened during close metadata
table.", e);
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 37e5bbbfece0..d78fb9dd4856 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -391,42 +391,45 @@ public class HoodieTableSource implements
}
private List<MergeOnReadInputSplit> buildInputSplits() {
- FileIndex fileIndex = getOrBuildFileIndex();
- List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
- if (relPartitionPaths.isEmpty()) {
- return Collections.emptyList();
- }
- List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
- if (pathInfoList.isEmpty()) {
- throw new HoodieException("No files found for reading in user provided
path.");
- }
-
- String latestCommit;
- List<FileSlice> allFileSlices;
- // file-slice after pending compaction-requested instant-time is also
considered valid
- try (HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(
- metaClient,
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(),
pathInfoList)) {
- if (!fsView.getLastInstant().isPresent()) {
+ try (FileIndex fileIndex = getOrBuildFileIndex()) {
+ List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
+ if (relPartitionPaths.isEmpty()) {
return Collections.emptyList();
}
- latestCommit = fsView.getLastInstant().get().requestedTime();
- allFileSlices = relPartitionPaths.stream()
- .flatMap(par -> fsView.getLatestMergedFileSlicesBeforeOrOn(par,
latestCommit)).collect(Collectors.toList());
+ List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
+ if (pathInfoList.isEmpty()) {
+ throw new HoodieException("No files found for reading in user provided
path.");
+ }
+
+ String latestCommit;
+ List<FileSlice> allFileSlices;
+ // file-slice after pending compaction-requested instant-time is also
considered valid
+ try (HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(
+ metaClient,
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(),
pathInfoList)) {
+ if (!fsView.getLastInstant().isPresent()) {
+ return Collections.emptyList();
+ }
+ latestCommit = fsView.getLastInstant().get().requestedTime();
+ allFileSlices = relPartitionPaths.stream()
+ .flatMap(par -> fsView.getLatestMergedFileSlicesBeforeOrOn(par,
latestCommit)).collect(Collectors.toList());
+ }
+ List<FileSlice> fileSlices = fileIndex.filterFileSlices(allFileSlices);
+
+ final String mergeType = this.conf.get(FlinkOptions.MERGE_TYPE);
+ final AtomicInteger cnt = new AtomicInteger(0);
+ // generates one input split for each file group
+ return fileSlices.stream().map(fileSlice -> {
+ String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+ Option<List<String>> logPaths =
Option.ofNullable(fileSlice.getLogFiles()
+ .sorted(HoodieLogFile.getLogFileComparator())
+ .map(logFile -> logFile.getPath().toString())
+ .collect(Collectors.toList()));
+ return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths,
latestCommit,
+ metaClient.getBasePath().toString(), maxCompactionMemoryInBytes,
mergeType, null, fileSlice.getFileId());
+ }).collect(Collectors.toList());
+ } finally {
+ this.fileIndex = null;
}
- List<FileSlice> fileSlices = fileIndex.filterFileSlices(allFileSlices);
-
- final String mergeType = this.conf.get(FlinkOptions.MERGE_TYPE);
- final AtomicInteger cnt = new AtomicInteger(0);
- // generates one input split for each file group
- return fileSlices.stream().map(fileSlice -> {
- String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
- Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(logFile -> logFile.getPath().toString())
- .collect(Collectors.toList()));
- return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths,
latestCommit,
- metaClient.getBasePath().toString(), maxCompactionMemoryInBytes,
mergeType, null, fileSlice.getFileId());
- }).collect(Collectors.toList());
}
public InputFormat<RowData, ?> getInputFormat() {
@@ -683,19 +686,23 @@ public class HoodieTableSource implements
*/
@VisibleForTesting
public List<FileSlice> getBaseFileOnlyFileSlices() {
- List<String> relPartitionPaths = getReadPartitions();
- if (relPartitionPaths.isEmpty()) {
- return Collections.emptyList();
- }
- List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
- try (HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient,
-
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(),
pathInfoList)) {
-
- List<FileSlice> allFileSlices = relPartitionPaths.stream()
- .flatMap(par -> fsView.getLatestBaseFiles(par)
- .map(baseFile -> new FileSlice(new HoodieFileGroupId(par,
baseFile.getFileId()), baseFile.getCommitTime(), baseFile,
Collections.emptyList())))
- .collect(Collectors.toList());
- return fileIndex.filterFileSlices(allFileSlices);
+ try (FileIndex fileIndex = getOrBuildFileIndex()) {
+ List<String> relPartitionPaths = getReadPartitions();
+ if (relPartitionPaths.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
+ try (HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient,
+
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(),
pathInfoList)) {
+
+ List<FileSlice> allFileSlices = relPartitionPaths.stream()
+ .flatMap(par -> fsView.getLatestBaseFiles(par)
+ .map(baseFile -> new FileSlice(new HoodieFileGroupId(par,
baseFile.getFileId()), baseFile.getCommitTime(), baseFile,
Collections.emptyList())))
+ .collect(Collectors.toList());
+ return fileIndex.filterFileSlices(allFileSlices);
+ }
+ } finally {
+ this.fileIndex = null;
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
index fab10d7fd74c..ae7aaa022df3 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -44,19 +44,30 @@ import
org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.hudi.configuration.FlinkOptions.HIVE_STYLE_PARTITIONING;
import static org.apache.hudi.configuration.FlinkOptions.KEYGEN_CLASS_NAME;
@@ -217,6 +228,211 @@ public class TestFileIndex {
assertEquals(Arrays.asList("par3"), p);
}
+ @ParameterizedTest
+ @MethodSource("filtersAndResults")
+ void testFileListingWithRecordLevelIndex(String recordFields,
ColumnStatsProbe probe, int maxKeyCnt, int expectedCnt) throws Exception {
+ DataType dataType = TestConfigurations.ROW_DATA_TYPE_WITH_ATOMIC_TYPES;
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath(), dataType);
+ conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+ conf.set(FlinkOptions.METADATA_ENABLED, true);
+ conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, recordFields);
+ conf.set(FlinkOptions.READ_DATA_SKIPPING_RLI_KEYS_MAX_NUM, maxKeyCnt);
+ // Enable record level index specifically for this test
+
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
+
+ // Write test data
+ TestData.writeData(TestData.DATA_SET_WITH_ATOMIC_TYPES, conf);
+
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ // Create a filter on the record key 'uuid' with EQUALS operator to
trigger record-level index
+ FileIndex fileIndex =
+ FileIndex.builder()
+ .path(new StoragePath(tempFile.getAbsolutePath()))
+ .conf(conf)
+ .rowType((RowType) dataType.getLogicalType())
+ .metaClient(metaClient)
+ .columnStatsProbe(probe)
+ .build();
+
+ // Get filtered file slices - this should use record-level index data
skipping
+ List<FileSlice> fileSlices = getFilteredFileSlices(metaClient, fileIndex);
+ assertThat(fileSlices.size(), is(expectedCnt));
+ }
+
+ private static Stream<Arguments> filtersAndResults() {
+ CallExpression equalTinyInt = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ new FieldReferenceExpression("f_tinyint", DataTypes.TINYINT(), 0,
0),
+ new ValueLiteralExpression((byte) 1, DataTypes.TINYINT().notNull())
+ ),
+ DataTypes.BOOLEAN());
+ CallExpression equalSmallInt = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ new FieldReferenceExpression("f_smallint", DataTypes.SMALLINT(),
0, 0),
+ new ValueLiteralExpression((short) 11,
DataTypes.SMALLINT().notNull())
+ ),
+ DataTypes.BOOLEAN());
+ CallExpression equalInt = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ new FieldReferenceExpression("f_int", DataTypes.INT(), 0, 0),
+ new ValueLiteralExpression(111, DataTypes.INT().notNull())
+ ),
+ DataTypes.BOOLEAN());
+ CallExpression equalBigInt = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ new FieldReferenceExpression("f_bigint", DataTypes.BIGINT(), 0, 0),
+ new ValueLiteralExpression(1111L, DataTypes.BIGINT().notNull())
+ ),
+ DataTypes.BOOLEAN());
+ CallExpression equalFloat = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ new FieldReferenceExpression("f_float", DataTypes.FLOAT(), 0, 0),
+ new ValueLiteralExpression(10.11f, DataTypes.FLOAT().notNull())
+ ),
+ DataTypes.BOOLEAN());
+ CallExpression equalDouble = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ new FieldReferenceExpression("f_double", DataTypes.DOUBLE(), 0, 0),
+ new ValueLiteralExpression(11.111, DataTypes.DOUBLE().notNull())
+ ),
+ DataTypes.BOOLEAN());
+
+ CallExpression equalExpr = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ new FieldReferenceExpression("f_str", DataTypes.STRING(), 0, 0),
+ new ValueLiteralExpression("str1", DataTypes.STRING().notNull())
+ ),
+ DataTypes.BOOLEAN());
+ CallExpression inExpr = CallExpression.permanent(
+ BuiltInFunctionDefinitions.IN,
+ Arrays.asList(
+ new FieldReferenceExpression("f_str", DataTypes.STRING(), 0, 0),
+ new ValueLiteralExpression("str2", DataTypes.STRING().notNull()),
+ new ValueLiteralExpression("str3", DataTypes.STRING().notNull())
+ ),
+ DataTypes.BOOLEAN());
+ CallExpression orExpression =
CallExpression.permanent(BuiltInFunctionDefinitions.OR, Arrays.asList(inExpr,
equalExpr), DataTypes.BOOLEAN());
+
+ CallExpression equalExpr1 = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ new FieldReferenceExpression("f_int", DataTypes.INT(), 0, 0),
+ new ValueLiteralExpression(111, DataTypes.INT().notNull())
+ ),
+ DataTypes.BOOLEAN());
+ CallExpression inExpr1 = CallExpression.permanent(
+ BuiltInFunctionDefinitions.IN,
+ Arrays.asList(
+ new FieldReferenceExpression("f_int", DataTypes.INT(), 0, 0),
+ new ValueLiteralExpression(333, DataTypes.INT().notNull()),
+ new ValueLiteralExpression(222, DataTypes.INT().notNull())
+ ),
+ DataTypes.BOOLEAN());
+
+ // record predicate with IN, number of filtered file slices is 1.
+ ColumnStatsProbe probe1 =
ColumnStatsProbe.newInstance(Collections.singletonList(equalExpr));
+ // record predicate with EQUALS, number of filtered file slices is 2.
+ ColumnStatsProbe probe2 =
ColumnStatsProbe.newInstance(Collections.singletonList(inExpr));
+ // record predicate with OR, number of filtered file slices is 3.
+ ColumnStatsProbe probe3 =
ColumnStatsProbe.newInstance(Collections.singletonList(orExpression));
+
+ // predicate for two record keys
+ // id = id3 and name in ('Bob', 'Han'), number of filtered file slices is
0.
+ ColumnStatsProbe probe4 =
ColumnStatsProbe.newInstance(Arrays.asList(equalExpr, inExpr1));
+ // id = id3 and name = 'Julian', number of filtered file slices is 1.
+ ColumnStatsProbe probe5 =
ColumnStatsProbe.newInstance(Arrays.asList(equalExpr, equalExpr1));
+ // id in (id1, id7) and name in ('Bob', 'Danny'), number of filtered file
slices is 2.
+ ColumnStatsProbe probe6 =
ColumnStatsProbe.newInstance(Arrays.asList(inExpr, inExpr1));
+
+ ColumnStatsProbe probeTinyInt =
ColumnStatsProbe.newInstance(Collections.singletonList(equalTinyInt));
+ ColumnStatsProbe probeSmallInt =
ColumnStatsProbe.newInstance(Collections.singletonList(equalSmallInt));
+ ColumnStatsProbe probeInt =
ColumnStatsProbe.newInstance(Collections.singletonList(equalInt));
+ ColumnStatsProbe probeBigInt =
ColumnStatsProbe.newInstance(Collections.singletonList(equalBigInt));
+ ColumnStatsProbe probeFloat =
ColumnStatsProbe.newInstance(Collections.singletonList(equalFloat));
+ ColumnStatsProbe probeDouble =
ColumnStatsProbe.newInstance(Collections.singletonList(equalDouble));
+
+ // TIMESTAMP data type tests - using the special data type config with
f_timestamp as record key
+ CallExpression equalExprTimestamp = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ new FieldReferenceExpression("f_timestamp",
DataTypes.TIMESTAMP(3), 0, 0),
+ new
ValueLiteralExpression(LocalDateTime.ofInstant(Instant.ofEpochMilli(1),
ZoneId.of("UTC")), DataTypes.TIMESTAMP(3).notNull())
+ ),
+ DataTypes.BOOLEAN());
+ ColumnStatsProbe probeTimestamp =
ColumnStatsProbe.newInstance(Collections.singletonList(equalExprTimestamp));
+
+ // TIME data type tests - using the TIME config with appropriate record key
+ CallExpression equalExprTime = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ new FieldReferenceExpression("f_time", DataTypes.TIME(), 0, 0),
+ new ValueLiteralExpression(LocalTime.ofSecondOfDay(1),
DataTypes.TIME().notNull())
+ ),
+ DataTypes.BOOLEAN());
+ ColumnStatsProbe probeTime =
ColumnStatsProbe.newInstance(Collections.singletonList(equalExprTime));
+
+ // DATE data type tests - using the date config with appropriate record key
+ CallExpression equalExprDate = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ new FieldReferenceExpression("f_date", DataTypes.DATE(), 0, 0),
+ new ValueLiteralExpression(LocalDate.ofEpochDay(1),
DataTypes.DATE().notNull())
+ ),
+ DataTypes.BOOLEAN());
+ ColumnStatsProbe probeDate =
ColumnStatsProbe.newInstance(Collections.singletonList(equalExprDate));
+
+ // DECIMAL data type tests - using decimal ordering config
+ CallExpression equalExprDecimal = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ new FieldReferenceExpression("f_decimal", DataTypes.DECIMAL(38,
18), 2, 2),
+ new ValueLiteralExpression(new BigDecimal("1.11"),
DataTypes.DECIMAL(38, 18).notNull())
+ ),
+ DataTypes.BOOLEAN());
+ ColumnStatsProbe probeDecimal =
ColumnStatsProbe.newInstance(Collections.singletonList(equalExprDecimal));
+
+ Object[][] data = new Object[][] {
+ {"f_str", probe1, 8, 1},
+ {"f_str", probe2, 8, 2},
+ {"f_str", probe3, 8, 3},
+ {"f_str,f_int", probe4, 8, 0},
+ {"f_str,f_int", probe5, 8, 1},
+ {"f_str,f_int", probe6, 8, 2},
+ // the number of hoodie keys inferred from query predicate is 2, which
exceed the configured max
+ // number of hoodie keys for record index, thus fallback to not using
record index.
+ {"f_str,f_int", probe2, 1, 3},
+ // key type is TINYINT
+ {"f_tinyint", probeTinyInt, 8, 1},
+ // key type is SMALLINT
+ {"f_smallint", probeSmallInt, 8, 1},
+ // key type is INT
+ {"f_int", probeInt, 8, 1},
+ // key type is BIGINT
+ {"f_bigint", probeBigInt, 8, 1},
+ // key type is FLOAT
+ {"f_float", probeFloat, 8, 1},
+ // key type is DOUBLE
+ {"f_double", probeDouble, 8, 1},
+ // key type is TIMESTAMP
+ {"f_timestamp", probeTimestamp, 8, 1},
+ // key type is TIME
+ {"f_time", probeTime, 8, 1},
+ // key type is DATE
+ {"f_date", probeDate, 8, 1},
+ // key type is DECIMAL
+ {"f_decimal", probeDecimal, 8, 1},
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
private List<FileSlice> getFilteredFileSlices(HoodieTableMetaClient
metaClient, FileIndex fileIndex) {
List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index fb9275c2fc35..31343c8e3af2 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -40,6 +40,7 @@ import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -366,8 +367,8 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
// which will be used to construct partition stats then.
conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(),
"true");
}
- metaClient = HoodieTestUtils.init(basePath, tableType);
TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ metaClient = StreamerUtil.createMetaClient(conf);
// uuid > 'id5' and age < 30, only column stats of 'par3' matches the
filter.
ColumnStatsProbe columnStatsProbe =
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java
new file mode 100644
index 000000000000..a23a5247139c
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.utils.TestConfigurations.ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link RecordLevelIndex}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TestRecordLevelIndex {
+ private final HoodieTableMetaClient metaClient =
mock(HoodieTableMetaClient.class);
+ private final HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
+ private static final DataType ROW_DATA_TYPE_MULTI_KEYS = DataTypes.ROW(
+ DataTypes.FIELD("key1", DataTypes.VARCHAR(20)),// record key
+ DataTypes.FIELD("key2", DataTypes.VARCHAR(20)),// record key
+ DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+ .notNull();
+ private static final RowType ROW_TYPE_MULTI_KEYS = (RowType)
ROW_DATA_TYPE_MULTI_KEYS.getLogicalType();
+
+ private List<ExpressionEvaluators.Evaluator>
createColumnStatsProbe(BuiltInFunctionDefinition func, String refName,
List<String> vals) {
+ List<ResolvedExpression> args = vals.stream().map(
+ val -> new ValueLiteralExpression(val,
DataTypes.STRING().notNull())).collect(Collectors.toList());
+ args.add(0, new FieldReferenceExpression(refName, DataTypes.STRING(), 0,
0));
+ CallExpression callExpression = CallExpression.permanent(func, args,
DataTypes.BOOLEAN());
+ return
ExpressionEvaluators.fromExpression(Collections.singletonList(callExpression));
+ }
+
+ private List<ExpressionEvaluators.Evaluator> createOrColumnStatsProbe(String
refName, List<String> vals) {
+ // Create multiple EQUALS expressions and join them with OR
+ CallExpression outerExpr = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(new FieldReferenceExpression(refName,
DataTypes.STRING(), 0, 0),
+ new ValueLiteralExpression(vals.get(0),
DataTypes.STRING().notNull())),
+ DataTypes.BOOLEAN());
+ CallExpression leftExpr = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(new FieldReferenceExpression(refName,
DataTypes.STRING(), 0, 0),
+ new ValueLiteralExpression(vals.get(1),
DataTypes.STRING().notNull())),
+ DataTypes.BOOLEAN());
+ CallExpression rightExpr = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(new FieldReferenceExpression(refName,
DataTypes.STRING(), 0, 0),
+ new ValueLiteralExpression(vals.get(2),
DataTypes.STRING().notNull())),
+ DataTypes.BOOLEAN());
+
+ CallExpression orExpression = CallExpression.permanent(
+ BuiltInFunctionDefinitions.OR,
+ Arrays.asList(leftExpr, rightExpr),
+ DataTypes.BOOLEAN());
+ CallExpression outerOrExpression = CallExpression.permanent(
+ BuiltInFunctionDefinitions.OR,
+ Arrays.asList(outerExpr, orExpression),
+ DataTypes.BOOLEAN());
+ return
ExpressionEvaluators.fromExpression(Collections.singletonList(outerOrExpression));
+ }
+
+ @Test
+ public void testComputeHoodieKeyFromFiltersWithSimpleRecordKey() {
+ // Setup mock table config with single record key field
+ when(metaClient.getTableConfig()).thenReturn(tableConfig);
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new
String[]{"uuid"}));
+ when(tableConfig.getPartitionFields()).thenReturn(Option.of(new
String[]{"partition"}));
+
+ Configuration conf = new Configuration();
+
+ // Test with EqualTo evaluator on record key
+ List<ExpressionEvaluators.Evaluator> evaluators = createColumnStatsProbe(
+ BuiltInFunctionDefinitions.EQUALS, "uuid",
Collections.singletonList("id1"));
+ String[] recordKeyFields = {"uuid"};
+ List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ conf, metaClient, evaluators, recordKeyFields,
TestConfigurations.ROW_TYPE, false);
+ assertEquals(Collections.singletonList("id1"), result, "Should return the
simple record key value");
+ }
+
+ @Test
+ public void testComputeHoodieKeyFromFiltersWithInOperator() {
+ // Setup mock table config with single record key field
+ when(metaClient.getTableConfig()).thenReturn(tableConfig);
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new
String[]{"uuid"}));
+ when(tableConfig.getPartitionFields()).thenReturn(Option.of(new
String[]{"partition"}));
+
+ Configuration conf = new Configuration();
+
+ // Test with IN operator
+ List<ExpressionEvaluators.Evaluator> evaluators =
createColumnStatsProbe(BuiltInFunctionDefinitions.IN, "uuid",
Arrays.asList("id1", "id2", "id3"));
+ String[] recordKeyFields = {"uuid"};
+ List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ conf, metaClient, evaluators, recordKeyFields,
TestConfigurations.ROW_TYPE, false);
+ assertEquals(Arrays.asList("id1", "id2", "id3"), result, "Should return
all the IN operator values");
+ }
+
+ @Test
+ public void testComputeHoodieKeyFromFiltersWithOrOperator() {
+ // Setup mock table config with single record key field
+ when(metaClient.getTableConfig()).thenReturn(tableConfig);
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new
String[]{"uuid"}));
+ when(tableConfig.getPartitionFields()).thenReturn(Option.of(new
String[]{"partition"}));
+
+ Configuration conf = new Configuration();
+
+ // Test with OR operator (which should be converted to IN)
+ List<ExpressionEvaluators.Evaluator> evaluators =
createOrColumnStatsProbe("uuid", Arrays.asList("id1", "id2", "id3"));
+ String[] recordKeyFields = {"uuid"};
+ List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ conf, metaClient, evaluators, recordKeyFields,
TestConfigurations.ROW_TYPE, false);
+ // Note: OR with two values "id1" and "id2" should result in the literals
from both evaluators
+ assertEquals(Arrays.asList("id1", "id2", "id3"), result, "Should return
values from OR operator");
+ }
+
+ @Test
+ public void testComputeHoodieKeyFromFiltersWithComplexRecordKey() {
+ // Setup mock table config with multiple record key fields
+ when(metaClient.getTableConfig()).thenReturn(tableConfig);
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new
String[]{"key1", "key2"}));
+ when(tableConfig.getPartitionFields()).thenReturn(Option.of(new
String[]{"partition"}));
+
+ Configuration conf = new Configuration();
+
+ // Test with filters on multiple record key fields
+ List<ResolvedExpression> expressions = Arrays.asList(
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(new FieldReferenceExpression("key1",
DataTypes.STRING(), 0, 0),
+ new ValueLiteralExpression("val1",
DataTypes.STRING().notNull())),
+ DataTypes.BOOLEAN()),
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(new FieldReferenceExpression("key2",
DataTypes.STRING(), 0, 0),
+ new ValueLiteralExpression("val2",
DataTypes.STRING().notNull())),
+ DataTypes.BOOLEAN())
+ );
+ String[] recordKeyFields = {"key1", "key2"};
+ List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ conf, metaClient, ExpressionEvaluators.fromExpression(expressions),
recordKeyFields, ROW_TYPE_MULTI_KEYS, false);
+ // For complex keys, the format should be key1:val1,key2:val2
+ assertEquals(Arrays.asList("key1:val1" +
KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + "key2:val2"), result,
+ "Should return composite key with complex record keys");
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testComputeHoodieKeyFromFiltersWithSpecialTypeRecordKey(boolean
consistentLogicalTimestampEnabled) {
+ // Setup mock table config with multiple record key fields
+ when(metaClient.getTableConfig()).thenReturn(tableConfig);
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new
String[]{"f_timestamp", "f_decimal"}));
+ when(tableConfig.getPartitionFields()).thenReturn(Option.of(new
String[]{"partition"}));
+
+ Configuration conf = new Configuration();
+ // Test with filters on multiple record key fields
+ List<ResolvedExpression> expressions = Arrays.asList(
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ new FieldReferenceExpression("f_timestamp",
DataTypes.TIMESTAMP(3), 0, 0),
+ new
ValueLiteralExpression(LocalDateTime.ofInstant(Instant.ofEpochMilli(1),
ZoneId.of("UTC")), DataTypes.TIMESTAMP(3).notNull())
+ ),
+ DataTypes.BOOLEAN()),
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ new FieldReferenceExpression("f_decimal", DataTypes.DECIMAL(3,
2), 0, 2),
+ new ValueLiteralExpression(new BigDecimal("1.1"),
DataTypes.DECIMAL(3, 2).notNull())
+ ),
+ DataTypes.BOOLEAN())
+ );
+ String[] recordKeyFields = {"f_timestamp", "f_decimal"};
+ RowType rowType = (RowType)
ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE.getLogicalType();
+ List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ conf, metaClient, ExpressionEvaluators.fromExpression(expressions),
recordKeyFields, rowType, consistentLogicalTimestampEnabled);
+ String expectedTimestampVal =
RowDataKeyGen.getRecordKey(TimestampData.fromEpochMillis(1), "f_timestamp",
consistentLogicalTimestampEnabled);
+ assertEquals(Arrays.asList("f_timestamp:" + expectedTimestampVal +
KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + "f_decimal:1.10"), result,
+ "Should return composite key with complex record keys");
+ }
+
+ @Test
+ public void testComputeHoodieKeyFromFiltersWithMultipleSimpleKeys() {
+ // Setup mock table config with single record key field
+ when(metaClient.getTableConfig()).thenReturn(tableConfig);
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new
String[]{"uuid"}));
+ when(tableConfig.getPartitionFields()).thenReturn(Option.of(new
String[]{"partition"}));
+
+ Configuration conf = new Configuration();
+
+ // Test with multiple equal filters on same record key field
+ // This scenario might not be typical, but testing the method behavior
+ List<ResolvedExpression> expressions = Arrays.asList(
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(new FieldReferenceExpression("uuid",
DataTypes.STRING(), 0, 0),
+ new ValueLiteralExpression("id1",
DataTypes.STRING().notNull())),
+ DataTypes.BOOLEAN()),
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(new FieldReferenceExpression("uuid",
DataTypes.STRING(), 0, 0),
+ new ValueLiteralExpression("id2",
DataTypes.STRING().notNull())),
+ DataTypes.BOOLEAN())
+ );
+ String[] recordKeyFields = {"uuid"};
+ List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ conf, metaClient, ExpressionEvaluators.fromExpression(expressions),
recordKeyFields, TestConfigurations.ROW_TYPE, false);
+ // This should return both values
+ assertEquals(Arrays.asList("id1", "id2"), result, "Should return multiple
values for same field");
+ }
+
+ @Test
+ public void testComputeHoodieKeyFromFiltersWithEmptyResult() {
+ // Setup mock table config with single record key field
+ when(metaClient.getTableConfig()).thenReturn(tableConfig);
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new
String[]{"uuid"}));
+ when(tableConfig.getPartitionFields()).thenReturn(Option.of(new
String[]{"partition"}));
+
+ Configuration conf = new Configuration();
+
+ // Test with filter on non-record key field - should return empty list
+ List<ExpressionEvaluators.Evaluator> evaluators = createColumnStatsProbe(
+ BuiltInFunctionDefinitions.EQUALS, "nonKeyField",
Collections.singletonList("val1"));
+ String[] recordKeyFields = {"uuid"};
+ List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ conf, metaClient, evaluators, recordKeyFields,
TestConfigurations.ROW_TYPE, false);
+ assertEquals(Collections.emptyList(), result, "Should return empty list
when filtering on non-record key field");
+
+ CallExpression keyExpr = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(new FieldReferenceExpression("uuid", DataTypes.STRING(),
0, 0),
+ new ValueLiteralExpression("key1", DataTypes.STRING().notNull())),
+ DataTypes.BOOLEAN());
+ CallExpression nonKeyExpr = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(new FieldReferenceExpression("name", DataTypes.STRING(),
0, 0),
+ new ValueLiteralExpression("Bob", DataTypes.STRING().notNull())),
+ DataTypes.BOOLEAN());
+ CallExpression orExpression = CallExpression.permanent(
+ BuiltInFunctionDefinitions.OR,
+ Arrays.asList(keyExpr, nonKeyExpr),
+ DataTypes.BOOLEAN());
+
+ evaluators =
ExpressionEvaluators.fromExpression(Collections.singletonList(orExpression));
+ result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ conf, metaClient, evaluators, recordKeyFields,
TestConfigurations.ROW_TYPE, false);
+ assertEquals(Collections.emptyList(), result, "Should return empty list
when filtering on or predicate including multiple fields");
+ }
+
+ @Test
+ public void
testComputeHoodieKeyFromFiltersWithComplexRecordKeyMultipleValues() {
+ // Setup mock table config with multiple record key fields
+ when(metaClient.getTableConfig()).thenReturn(tableConfig);
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new
String[]{"key1", "key2"}));
+ when(tableConfig.getPartitionFields()).thenReturn(Option.of(new
String[]{"partition"}));
+
+ Configuration conf = new Configuration();
+
+ // Test with multiple values for each record key field to test combinations
+ // key1 in ['val1', 'val2'] AND key2 in ['val3', 'val4'] should produce 4
combinations
+ List<ResolvedExpression> expressions = Arrays.asList(
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.IN,
+ Arrays.asList(new FieldReferenceExpression("key1",
DataTypes.STRING(), 0, 0),
+ new ValueLiteralExpression("val1",
DataTypes.STRING().notNull()),
+ new ValueLiteralExpression("val2",
DataTypes.STRING().notNull())),
+ DataTypes.BOOLEAN()),
+ CallExpression.permanent(
+ BuiltInFunctionDefinitions.IN,
+ Arrays.asList(new FieldReferenceExpression("key2",
DataTypes.STRING(), 0, 0),
+ new ValueLiteralExpression("val3",
DataTypes.STRING().notNull()),
+ new ValueLiteralExpression("val4",
DataTypes.STRING().notNull())),
+ DataTypes.BOOLEAN())
+ );
+ String[] recordKeyFields = {"key1", "key2"};
+ List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+ conf, metaClient, ExpressionEvaluators.fromExpression(expressions),
recordKeyFields, ROW_TYPE_MULTI_KEYS, false);
+ // Should have 4 combinations: (val1,val3), (val1,val4), (val2,val3),
(val2,val4)
+ List<String> expected = Arrays.asList(
+ "key1:val1" + KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR +
"key2:val3",
+ "key1:val1" + KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR +
"key2:val4",
+ "key1:val2" + KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR +
"key2:val3",
+ "key1:val2" + KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR +
"key2:val4"
+ );
+ assertEquals(expected, result, "Should return all combinations of complex
record keys");
+ }
+}
\ No newline at end of file
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 8aaaf3f58711..c12dd314fe06 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -526,6 +526,36 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result, expected, true);
}
+ @Test
+ void testDataSkippingWithRecordLevelIndex() throws Exception {
+ TableEnvironment tableEnv = batchTableEnv;
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.METADATA_ENABLED, true)
+ .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
+
.option(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(), true)
+ .option(FlinkOptions.TABLE_TYPE, COPY_ON_WRITE)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+ execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+ List<Row> result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1 where uuid =
'id1'").execute().collect());
+ assertRowsEquals(result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01,
par1]]");
+ // apply filters
+ List<Row> result2 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1 where uuid in ('id7',
'id8')").execute().collect());
+ assertRowsEquals(result2, "["
+ + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
+ + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
+ List<Row> result3 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1 where uuid = 'id1' or uuid =
'id7' or uuid = 'id8'").execute().collect());
+ assertRowsEquals(result3, "["
+ + "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], "
+ + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
+ + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
+ }
+
@ParameterizedTest
@MethodSource("tableTypeAndBooleanTrueFalseParams")
void testReadWithPartitionStatsPruning(HoodieTableType tableType, boolean
hiveStylePartitioning) throws Exception {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index e70b9f31863f..3e307d08d624 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -304,7 +304,7 @@ public class TestHoodieTableSource {
// test timestamp filtering
TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE,
conf1);
- HoodieTableSource tableSource1 = createHoodieTableSource(conf1);
+ HoodieTableSource tableSource1 = createHoodieTableSource(conf1,
TestConfigurations.TABLE_SCHEMA_KEY_SPECIAL_DATA_TYPE);
tableSource1.applyFilters(Collections.singletonList(
createLitEquivalenceExpr(f1, 0, DataTypes.TIMESTAMP(3).notNull(),
LocalDateTime.ofInstant(Instant.ofEpochMilli(1),
ZoneId.of("UTC")))));
@@ -321,7 +321,7 @@ public class TestHoodieTableSource {
conf2.set(FlinkOptions.RECORD_KEY_FIELD, f2);
conf2.set(FlinkOptions.ORDERING_FIELDS, f2);
TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE,
conf2);
- HoodieTableSource tableSource2 = createHoodieTableSource(conf2);
+ HoodieTableSource tableSource2 = createHoodieTableSource(conf2,
TestConfigurations.TABLE_SCHEMA_KEY_SPECIAL_DATA_TYPE);
tableSource2.applyFilters(Collections.singletonList(
createLitEquivalenceExpr(f2, 1, DataTypes.DATE().notNull(),
LocalDate.ofEpochDay(1))));
@@ -337,7 +337,8 @@ public class TestHoodieTableSource {
conf3.set(FlinkOptions.RECORD_KEY_FIELD, f3);
conf3.set(FlinkOptions.ORDERING_FIELDS, f3);
TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE,
conf3);
- HoodieTableSource tableSource3 = createHoodieTableSource(conf3);
+ HoodieTableSource tableSource3 = createHoodieTableSource(conf3,
TestConfigurations.TABLE_SCHEMA_KEY_SPECIAL_DATA_TYPE);
+
tableSource3.applyFilters(Collections.singletonList(
createLitEquivalenceExpr(f3, 1, DataTypes.DECIMAL(3, 2).notNull(),
new BigDecimal("1.11"))));
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 38fd4fd4a756..0ae06bc91af5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -184,6 +184,28 @@ public class TestConfigurations {
public static final RowType ROW_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE =
(RowType) ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE.getLogicalType();
+ public static final ResolvedSchema TABLE_SCHEMA_KEY_SPECIAL_DATA_TYPE =
SchemaBuilder.instance()
+ .fields(ROW_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE.getFieldNames(),
ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE.getChildren())
+ .build();
+
+ public static final DataType ROW_DATA_TYPE_WITH_ATOMIC_TYPES = DataTypes.ROW(
+ DataTypes.FIELD("f_bool", DataTypes.BOOLEAN()),
+ DataTypes.FIELD("f_tinyint", DataTypes.TINYINT()),
+ DataTypes.FIELD("f_smallint", DataTypes.SMALLINT()),
+ DataTypes.FIELD("f_int", DataTypes.INT()),
+ DataTypes.FIELD("f_bigint", DataTypes.BIGINT()),
+ DataTypes.FIELD("f_float", DataTypes.FLOAT()),
+ DataTypes.FIELD("f_double", DataTypes.DOUBLE()),
+ DataTypes.FIELD("f_timestamp", DataTypes.TIMESTAMP(3)),
+ DataTypes.FIELD("f_time", DataTypes.TIME()),
+ DataTypes.FIELD("f_date", DataTypes.DATE()),
+ DataTypes.FIELD("f_decimal", DataTypes.DECIMAL(38, 18)),
+ DataTypes.FIELD("f_str", DataTypes.STRING()),
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+ .notNull();
+
+ public static final RowType ROW_TYPE_WITH_ATOMIC_TYPES = (RowType)
ROW_DATA_TYPE_WITH_ATOMIC_TYPES.getLogicalType();
+
public static final RowType ROW_TYPE_EVOLUTION_AFTER = (RowType)
ROW_DATA_TYPE_EVOLUTION_AFTER.getLogicalType();
public static final DataType ROW_DATA_TYPE_BIGINT = DataTypes.ROW(
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 3ed961b8d5bf..9020073f2a37 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -426,6 +426,14 @@ public class TestData {
TimestampData.fromEpochMillis(2), StringData.fromString("par1"))
);
+ public static List<RowData> DATA_SET_WITH_ATOMIC_TYPES = Arrays.asList(
+ insertRow(TestConfigurations.ROW_TYPE_WITH_ATOMIC_TYPES, true, (byte) 1,
(short) 11, 111, 1111L, 10.11f, 11.111, TimestampData.fromEpochMillis(1),
+ 1000, 1, DecimalData.fromBigDecimal(new BigDecimal("1.11"), 38, 18),
StringData.fromString("str1"), StringData.fromString("par1")),
+ insertRow(TestConfigurations.ROW_TYPE_WITH_ATOMIC_TYPES, true, (byte) 2,
(short) 22, 222, 2222L, 20.22f, 22.222, TimestampData.fromEpochMillis(2),
+ 2000, 2, DecimalData.fromBigDecimal(new BigDecimal("2.22"), 38, 18),
StringData.fromString("str2"), StringData.fromString("par2")),
+ insertRow(TestConfigurations.ROW_TYPE_WITH_ATOMIC_TYPES, true, (byte) 3,
(short) 33, 333, 3333L, 30.33f, 33.333, TimestampData.fromEpochMillis(3),
+ 3000, 3, DecimalData.fromBigDecimal(new BigDecimal("3.33"), 38, 18),
StringData.fromString("str3"), StringData.fromString("par3")));
+
// data types handled specifically for Hoodie Key
public static List<RowData> DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE =
new ArrayList<>();