This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 93e334ce042d feat(flink): Support dynamic bucket for flink streaming
with partitio… (#18640)
93e334ce042d is described below
commit 93e334ce042d72418c77abaa3246231b96a5615c
Author: Shuo Cheng <[email protected]>
AuthorDate: Sat May 9 20:29:41 2026 +0800
feat(flink): Support dynamic bucket for flink streaming with partitio…
(#18640)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 5 +
.../apache/hudi/client/HoodieFlinkWriteClient.java | 3 +-
.../apache/hudi/index/FlinkHoodieIndexFactory.java | 1 +
.../apache/hudi/configuration/FlinkOptions.java | 9 +
.../apache/hudi/configuration/OptionsResolver.java | 15 +-
.../sink/partitioner/BucketAssignFunction.java | 4 +-
.../hudi/sink/partitioner/BucketAssigner.java | 2 +
.../partitioner/DynamicBucketAssignFunction.java | 164 +++++++++
.../partitioner/DynamicBucketAssignOperator.java | 64 ++++
...oner.java => GlobalRecordIndexPartitioner.java} | 40 ++-
.../sink/partitioner/RecordIndexPartitioner.java | 89 ++++-
.../index/DummyPartitionedIndexBackend.java | 46 +++
.../partitioner/index/FlinkStateIndexBackend.java | 4 +-
.../{IndexBackend.java => GlobalIndexBackend.java} | 37 +-
...end.java => GlobalRecordLevelIndexBackend.java} | 30 +-
.../hudi/sink/partitioner/index/IndexBackend.java | 21 +-
.../partitioner/index/IndexBackendFactory.java | 17 +-
.../sink/partitioner/index/IndexWriteFunction.java | 15 +-
.../partitioner/index/MinibatchIndexBackend.java | 18 +-
.../partitioner/index/PartitionedIndexBackend.java | 47 +++
.../partitioner/index/RecordLevelIndexBackend.java | 380 +++++++++++++++++----
.../partitioner/index/RocksDBIndexBackend.java | 4 +-
.../java/org/apache/hudi/sink/utils/Pipelines.java | 29 +-
.../SamplingActionExecutor.java} | 36 +-
.../apache/hudi/source/stats/RecordLevelIndex.java | 12 +-
.../org/apache/hudi/table/HoodieTableFactory.java | 38 ++-
.../hudi/configuration/TestOptionsResolver.java | 19 ++
....java => TestGlobalRecordIndexPartitioner.java} | 10 +-
.../partitioner/TestRecordIndexPartitioner.java | 110 +++---
...java => TestGlobalRecordLevelIndexBackend.java} | 66 ++--
.../index/TestRecordLevelIndexBackend.java | 258 +++++++-------
.../org/apache/hudi/sink/utils/TestWriteBase.java | 6 +-
.../hudi/table/ITTestDynamicBucketStreamWrite.java | 269 +++++++++++++++
33 files changed, 1445 insertions(+), 423 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 4c1c90fe2791..db30f301d97a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -29,6 +29,7 @@ import org.apache.hudi.client.RunsTableService;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
@@ -1478,6 +1479,10 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
*/
protected Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>>
tagRecordsWithLocationForStreamingWrites(HoodieData<HoodieRecord>
untaggedRecords,
Set<String> enabledMetadataPartitions) {
+ // no need to tag of the incoming records is empty.
+ if (untaggedRecords instanceof HoodieListData &&
untaggedRecords.isEmpty()) {
+ return Pair.of(Collections.emptyList(), untaggedRecords);
+ }
List<HoodieFileGroupId> updatedFileGroupIds = new ArrayList<>();
// Fetch latest file slices for all enabled MDT partitions
Map<String, List<FileSlice>> partitionToLatestFileSlices = new HashMap<>();
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 1e5a691833d0..228209cf1df6 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -173,7 +173,8 @@ public class HoodieFlinkWriteClient<T>
protected void writeToMetadataTable(boolean
skipStreamingWritesToMetadataTable, HoodieTable table, String instantTime,
List<HoodieWriteStat> partialMetadataTableWriteStats,
HoodieCommitMetadata metadata) {
if (!skipStreamingWritesToMetadataTable
- && isStreamingWriteToMetadataEnabled(table)) {
+ && isStreamingWriteToMetadataEnabled(table)
+ &&
WriteOperationType.streamingWritesToMetadataSupported(getOperationType())) {
streamingMetadataWriteHandler.commitToMetadataTable(table, instantTime,
metadata, partialMetadataTableWriteStats);
} else {
super.writeToMetadataTable(skipStreamingWritesToMetadataTable, table,
instantTime, partialMetadataTableWriteStats, metadata);
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
index feca357bc493..a1daae0aca01 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
@@ -48,6 +48,7 @@ public final class FlinkHoodieIndexFactory {
// instantiates an in-memory HoodieIndex component as a placeholder.
case GLOBAL_RECORD_LEVEL_INDEX:
// todo: aligned with flink state index currently, may need further
improving.
+ case RECORD_LEVEL_INDEX:
case INMEMORY:
return new FlinkInMemoryStateIndex(context, config);
case BLOOM:
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 767a7457e1e7..4910c721e09c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -308,6 +308,15 @@ public class FlinkOptions extends HoodieConfig {
+ "The memory size of each individual cache within a checkpoint
interval is dynamically calculated based on the \n"
+ "average memory size of caches for historical checkpoints.");
+ @AdvancedConfig
+ public static final ConfigOption<Integer>
INDEX_RLI_CACHE_CONCURRENT_PARTITIONS_NUM = ConfigOptions
+ .key("index.rli.cache.concurrent.partitions.num")
+ .intType()
+ .defaultValue(2)
+ .withDescription("Expected number of partitions whose partitioned RLI
caches are updated concurrently. "
+ + "Used to infer the initial memory size for each partition cache as
INDEX_RLI_CACHE_SIZE / concurrency "
+ + "when historical cache usage is unavailable.");
+
@AdvancedConfig
public static final ConfigOption<Integer> INDEX_RLI_LOOKUP_MINIBATCH_SIZE =
ConfigOptions
.key("index.rli.lookup.minibatch.size")
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 398ca329dbb4..59f5847df4e5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -198,15 +198,23 @@ public class OptionsResolver {
}
/**
- * Returns whether {@link
org.apache.hudi.sink.partitioner.MinibatchBucketAssignFunction} should be used
for bucket assigning.
+ * Returns whether partitioned record level index is used for bucket
assigning.
*/
public static boolean isRecordLevelIndex(Configuration conf) {
+ HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
+ return indexType == HoodieIndex.IndexType.RECORD_LEVEL_INDEX;
+ }
+
+ /**
+ * Returns whether the table uses metadata-table record level index.
+ */
+ public static boolean isGlobalRecordLevelIndex(Configuration conf) {
HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
return indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX;
}
public static boolean isRLIWithBootstrap(Configuration conf) {
- return isRecordLevelIndex(conf) &&
conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
+ return isGlobalRecordLevelIndex(conf) &&
conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
}
/**
@@ -459,7 +467,8 @@ public class OptionsResolver {
*/
public static boolean isStreamingIndexWriteEnabled(Configuration conf) {
return conf.get(FlinkOptions.METADATA_ENABLED)
- && OptionsResolver.getIndexType(conf) ==
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX
+ && (OptionsResolver.getIndexType(conf) ==
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX
+ || OptionsResolver.getIndexType(conf) ==
HoodieIndex.IndexType.RECORD_LEVEL_INDEX)
&&
WriteOperationType.streamingWritesToMetadataSupported(WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION)));
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 6ec7c389751f..c74bf0d0dd2b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -31,7 +31,7 @@ import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.sink.event.Correspondent;
-import org.apache.hudi.sink.partitioner.index.IndexBackend;
+import org.apache.hudi.sink.partitioner.index.GlobalIndexBackend;
import org.apache.hudi.sink.partitioner.index.IndexBackendFactory;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.FlinkTaskContextSupplier;
@@ -84,7 +84,7 @@ public class BucketAssignFunction
* </ul>
*/
@Getter
- private transient IndexBackend indexBackend;
+ private transient GlobalIndexBackend indexBackend;
/**
* Bucket assigner to assign new bucket IDs or reuse existing ones.
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
index 819db7439816..fe848d7bcc20 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
@@ -233,6 +233,8 @@ public class BucketAssigner implements AutoCloseable {
public void close() {
reset();
+ smallFileAssignMap.clear();
+ newFileAssignStates.clear();
WriteProfiles.clean(config.getBasePath());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/DynamicBucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/DynamicBucketAssignFunction.java
new file mode 100644
index 000000000000..953d837af1f2
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/DynamicBucketAssignFunction.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner;
+
+import org.apache.hudi.adapter.KeyedProcessFunctionAdapter;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.sink.event.Correspondent;
+import org.apache.hudi.sink.partitioner.index.DummyPartitionedIndexBackend;
+import org.apache.hudi.sink.partitioner.index.RecordLevelIndexBackend;
+import org.apache.hudi.sink.partitioner.index.PartitionedIndexBackend;
+import org.apache.hudi.sink.partitioner.profile.WriteProfile;
+import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.table.action.commit.BucketType;
+import org.apache.hudi.util.FlinkTaskContextSupplier;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.utils.RuntimeContextUtils;
+
+import lombok.Setter;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Assigns Flink streaming records to dynamic bucket file groups.
+ *
+ * <p>This function first checks the partition-scoped RLI backend for an
existing
+ * {@code recordKey -> fileGroupId} mapping. Existing keys are routed as
updates to
+ * the recorded file group; new keys are assigned by {@link BucketAssigner}
and then
+ * written back to the backend so the streaming metadata writer can persist
the assignment to RLI.
+ */
+public class DynamicBucketAssignFunction
+ extends KeyedProcessFunctionAdapter<String, HoodieFlinkInternalRow,
HoodieFlinkInternalRow>
+ implements CheckpointedFunction, CheckpointListener {
+
+ private final Configuration conf;
+ private final boolean isInsertOverwrite;
+
+ private transient PartitionedIndexBackend indexBackend;
+ private transient BucketAssigner bucketAssigner;
+
+ @Setter
+ protected transient Correspondent correspondent;
+
+ private transient int maxParallelism;
+ private transient int numTasks;
+ private transient int taskId;
+
+ /**
+ * Creates the dynamic bucket assign function for one bucket assign operator.
+ *
+ * @param conf Flink write configuration
+ */
+ public DynamicBucketAssignFunction(Configuration conf) {
+ this.conf = conf;
+ this.isInsertOverwrite = OptionsResolver.isInsertOverwrite(conf);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(this.conf,
!OptionsResolver.isIncrementalJobGraph(conf));
+ HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
+
HadoopFSUtils.getStorageConfWithCopy(HadoopConfigurations.getHadoopConf(this.conf)),
+ new FlinkTaskContextSupplier(getRuntimeContext()));
+ boolean delta =
HoodieTableType.valueOf(conf.get(FlinkOptions.TABLE_TYPE)).equals(HoodieTableType.MERGE_ON_READ);
+ WriteProfile writeProfile = WriteProfiles.singleton(isInsertOverwrite,
delta, writeConfig, context);
+ this.bucketAssigner = new BucketAssigner(
+ RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
+
RuntimeContextUtils.getMaxNumberOfParallelSubtasks(getRuntimeContext()),
+ RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),
+ writeProfile,
+ writeConfig);
+ this.maxParallelism =
RuntimeContextUtils.getMaxNumberOfParallelSubtasks(getRuntimeContext());
+ this.numTasks =
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+ this.taskId =
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) {
+ this.indexBackend = isInsertOverwrite
+ ? new DummyPartitionedIndexBackend()
+ : new RecordLevelIndexBackend(conf, (partitionPath, recordKey, fileId)
-> isRecordKeyOfThisTask(recordKey));
+ }
+
+ private boolean isRecordKeyOfThisTask(String recordKey) {
+ return KeyGroupRangeAssignment.assignKeyToParallelOperator(recordKey,
maxParallelism, numTasks) == taskId;
+ }
+
+ @Override
+ public void processElement(HoodieFlinkInternalRow record, Context ctx,
Collector<HoodieFlinkInternalRow> out) throws Exception {
+ String partitionPath = record.getPartitionPath();
+ String recordKey = record.getRecordKey();
+ String fileGroupId = indexBackend.get(partitionPath, recordKey);
+
+ BucketInfo bucketInfo;
+ // `operationType` in the record is used to generate index record in the
following writer operator.
+ // Currently, we only emit INSERT index record and ignore DELETE index
record, because we cannot be
+ // certain whether data with the same key in storage will actually be
deleted. It's possible that
+ // data in storage is deleted, but the record level index data remains.
+ // todo: support ordering value in record level index metadata payload,
since the efficiency of location
+ // tagging by merging lookup is intolerable in flink streaming writing
scenario.
+ if (fileGroupId != null) {
+ bucketInfo = bucketAssigner.addUpdate(partitionPath, fileGroupId);
+ } else {
+ bucketInfo = bucketAssigner.addInsert(partitionPath);
+ indexBackend.update(partitionPath, recordKey,
bucketInfo.getFileIdPrefix());
+ record.setOperationType("I");
+ }
+
+ String instantTime = bucketInfo.getBucketType() == BucketType.INSERT ? "I"
: "U";
+ record.setInstantTime(instantTime);
+ record.setFileId(bucketInfo.getFileIdPrefix());
+ out.collect(record);
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) {
+ this.bucketAssigner.reset();
+ this.indexBackend.onCheckpoint(context.getCheckpointId());
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ this.bucketAssigner.reload(checkpointId);
+ this.indexBackend.onCheckpointComplete(this.correspondent, checkpointId);
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.indexBackend.close();
+ if (this.bucketAssigner != null) {
+ this.bucketAssigner.close();
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/DynamicBucketAssignOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/DynamicBucketAssignOperator.java
new file mode 100644
index 000000000000..b26ef25d77a6
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/DynamicBucketAssignOperator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner;
+
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.sink.event.Correspondent;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * Operator wrapper that wires the dynamic bucket assign function to the data
write coordinator.
+ */
+public class DynamicBucketAssignOperator extends KeyedProcessOperator<String,
HoodieFlinkInternalRow, HoodieFlinkInternalRow> {
+
+ /**
+ * The dynamic bucket assign function.
+ */
+ private final DynamicBucketAssignFunction bucketAssignFunction;
+
+ /**
+ * OperatorId for the data write operator.
+ */
+ private final OperatorID dataWriteOperatorId;
+
+ /**
+ * Creates an operator wrapper for dynamic bucket assignment.
+ *
+ * @param bucketAssignFunction function that performs partitioned-RLI-backed
assignment
+ * @param dataWriteOperatorId operator id of the downstream data write
operator coordinator
+ */
+ public DynamicBucketAssignOperator(DynamicBucketAssignFunction
bucketAssignFunction, OperatorID dataWriteOperatorId) {
+ super(bucketAssignFunction);
+ this.bucketAssignFunction = bucketAssignFunction;
+ this.dataWriteOperatorId = dataWriteOperatorId;
+ }
+
+ @Override
+ public void setup(StreamTask<?, ?> containingTask, StreamConfig config,
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
+ super.setup(containingTask, config, output);
+
this.bucketAssignFunction.setCorrespondent(Correspondent.getInstance(dataWriteOperatorId,
+
getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway()));
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java
similarity index 65%
copy from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java
copy to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java
index e80b7bba25be..8b6f1ae22ad9 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
@@ -31,12 +32,13 @@ import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.configuration.Configuration;
/**
- * Record index input partitioner, which is aligned with the mapping of record
key
- * to the file group of record index partition in metadata table. It prevents
multiple
- * index write subtasks from writing the same record index file group, thereby
effectively
- * reducing the number of small files.
+ * Global record index input partitioner.
+ *
+ * <p>The partitioner is aligned with the record-key to metadata-table
file-group mapping used by
+ * global RLI. This prevents multiple index write subtasks from writing the
same record-index file
+ * group and reduces small files in the metadata table.
*/
-public class RecordIndexPartitioner implements Partitioner<HoodieKey> {
+public class GlobalRecordIndexPartitioner implements Partitioner<HoodieKey> {
private final Configuration conf;
/**
* The number of file groups for record index partition in metadata data
table. The number
@@ -45,19 +47,30 @@ public class RecordIndexPartitioner implements
Partitioner<HoodieKey> {
*/
private int numFileGroupsForRecordIndexPartition = -1;
- public RecordIndexPartitioner(Configuration conf) {
+ /**
+ * Creates a partitioner for global RLI index writes.
+ *
+ * @param conf Flink write configuration
+ */
+ public GlobalRecordIndexPartitioner(Configuration conf) {
this.conf = conf;
}
+ /**
+ * Routes an index row to the writer responsible for its global RLI file
group.
+ *
+ * @param recordKey index row key, where the partition path is ignored for
global RLI
+ * @param numPartitions downstream index writer parallelism
+ * @return downstream subtask index
+ */
@Override
public int partition(HoodieKey recordKey, int numPartitions) {
// initialize numFileGroupsForRecordIndexPartition lazily.
if (numFileGroupsForRecordIndexPartition < 0) {
numFileGroupsForRecordIndexPartition =
getNumFileGroupsForRecordIndexPartition();
}
- // note: the hashing is in line with GLOBAL_RECORD_LEVEL_INDEX currently.
- // if partitioned record level index is supported, the partition path
should be considered here as well.
- int fgIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(recordKey.getRecordKey(),
numFileGroupsForRecordIndexPartition);
+ int fgIndex = HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(
+ recordKey.getRecordKey(), numFileGroupsForRecordIndexPartition);
return fgIndex % numPartitions;
}
@@ -66,11 +79,14 @@ public class RecordIndexPartitioner implements
Partitioner<HoodieKey> {
*/
private int getNumFileGroupsForRecordIndexPartition() {
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
- HoodieTableMetadata metadataTable =
metaClient.getTableFormat().getMetadataFactory().create(
+ try (HoodieTableMetadata metadataTable =
metaClient.getTableFormat().getMetadataFactory().create(
HoodieFlinkEngineContext.DEFAULT,
metaClient.getStorage(),
StreamerUtil.metadataConfig(conf),
- conf.get(FlinkOptions.PATH));
- return
metadataTable.getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+ conf.get(FlinkOptions.PATH))) {
+ return
metadataTable.getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to get file group count for global
record index partition.", e);
+ }
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java
index e80b7bba25be..a4597e00f099 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java
@@ -19,9 +19,13 @@
package org.apache.hudi.sink.partitioner;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.hash.BucketIndexUtil;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
@@ -30,47 +34,96 @@ import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.configuration.Configuration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
/**
- * Record index input partitioner, which is aligned with the mapping of record
key
+ * Partitioned record index input partitioner, which is aligned with the
mapping of record key
* to the file group of record index partition in metadata table. It prevents
multiple
* index write subtasks from writing the same record index file group, thereby
effectively
* reducing the number of small files.
+ *
+ * <p>Partitioned RLI stores record-index file groups under the corresponding
data table
+ * partition. Since RLI file groups for different data partitions can have the
same bucket id,
+ * shuffling only by bucket id only would route those equally numbered buckets
to the same
+ * downstream task and can easily cause data skew. Therefore an index row must
be routed by
+ * its data partition first, then by the record-key hash within that
partition's RLI file
+ * group range.
*/
public class RecordIndexPartitioner implements Partitioner<HoodieKey> {
private final Configuration conf;
+ private Map<String, Integer> partitionedRLIFileGroupCounts;
+ private Functions.Function3<Integer, String, Integer, Integer>
partitionIndexFunc;
+
/**
- * The number of file groups for record index partition in metadata data
table. The number
- * cannot be calculated during compiling the writing pipeline, since the
hoodie table may
- * not be created yet, so the number is lazily calculated during job running.
+ * Creates a partitioner for partitioned RLI index writes.
+ *
+ * @param conf Flink write configuration
*/
- private int numFileGroupsForRecordIndexPartition = -1;
-
public RecordIndexPartitioner(Configuration conf) {
this.conf = conf;
}
+ /**
+ * Routes an index row by data partition and record-key-derived partitioned
RLI file group.
+ *
+ * @param recordKey index row key containing both record key and data
partition path
+ * @param numPartitions downstream index writer parallelism
+ * @return downstream subtask index
+ */
@Override
public int partition(HoodieKey recordKey, int numPartitions) {
- // initialize numFileGroupsForRecordIndexPartition lazily.
- if (numFileGroupsForRecordIndexPartition < 0) {
- numFileGroupsForRecordIndexPartition =
getNumFileGroupsForRecordIndexPartition();
+ if (partitionedRLIFileGroupCounts == null) {
+ partitionedRLIFileGroupCounts = getPartitionedRLIFileGroupCounts();
+ }
+ int fileGroupCount =
getFileGroupCountForPartitionedRLI(recordKey.getPartitionPath());
+ int fgIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(recordKey.getRecordKey(),
fileGroupCount);
+ if (partitionIndexFunc == null) {
+ partitionIndexFunc =
BucketIndexUtil.getPartitionIndexFunc(numPartitions);
}
- // note: the hashing is in line with GLOBAL_RECORD_LEVEL_INDEX currently.
- // if partitioned record level index is supported, the partition path
should be considered here as well.
- int fgIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(recordKey.getRecordKey(),
numFileGroupsForRecordIndexPartition);
- return fgIndex % numPartitions;
+ return partitionIndexFunc.apply(fileGroupCount,
recordKey.getPartitionPath(), fgIndex);
}
/**
- * Get the number of file groups for record index partition in metadata
table.
+ * Get the file group count for each data partition in the partitioned
record index.
*/
- private int getNumFileGroupsForRecordIndexPartition() {
+ private Map<String, Integer> getPartitionedRLIFileGroupCounts() {
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
- HoodieTableMetadata metadataTable =
metaClient.getTableFormat().getMetadataFactory().create(
+ if
(!metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX))
{
+ return Collections.emptyMap();
+ }
+ try (HoodieTableMetadata metadataTable =
metaClient.getTableFormat().getMetadataFactory().create(
HoodieFlinkEngineContext.DEFAULT,
metaClient.getStorage(),
StreamerUtil.metadataConfig(conf),
- conf.get(FlinkOptions.PATH));
- return
metadataTable.getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+ conf.get(FlinkOptions.PATH))) {
+ Map<String, Integer> fileGroupCounts = new HashMap<>();
+
metadataTable.getBucketizedFileGroupsForPartitionedRLI(MetadataPartitionType.RECORD_INDEX)
+ .forEach((partitionPath, fileSlices) ->
fileGroupCounts.put(partitionPath, fileSlices.size()));
+ return fileGroupCounts;
+ } catch (Exception e) {
+ throw new HoodieException("Failed to get file group counts for
partitioned record index.", e);
+ }
+ }
+
+ /**
+ * Get the partitioned record index file group count for the given data
partition.
+ */
+ private int getFileGroupCountForPartitionedRLI(String partitionPath) {
+ int fileGroupCount =
partitionedRLIFileGroupCounts.getOrDefault(partitionPath, 0);
+ // HoodieBackedTableMetadataWriter initializes record-index file groups
for a newly seen
+ // data partition with RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP, so
the writer-side
+ // partitioner should use the same count before that partition appears in
the MDT view.
+ return fileGroupCount > 0 ? fileGroupCount :
getMinFileGroupCountForPartitionedRLI();
+ }
+
+ /**
+ * Get the minimum file group count used to initialize newly seen
partitioned record index partitions.
+ */
+ private int getMinFileGroupCountForPartitionedRLI() {
+ return Integer.parseInt(conf.getString(
+
HoodieMetadataConfig.RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(),
+
HoodieMetadataConfig.RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.defaultValue().toString()));
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/DummyPartitionedIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/DummyPartitionedIndexBackend.java
new file mode 100644
index 000000000000..2fe8c4784294
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/DummyPartitionedIndexBackend.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import java.io.IOException;
+
+/**
+ * A no-op implementation of {@link PartitionedIndexBackend} that does not
persist any index state.
+ *
+ * <p>Used as a placeholder when partitioned record-level indexing is disabled
or not applicable.
+ * All lookups return {@code null} (record not found), and writes are silently
discarded.
+ */
+public class DummyPartitionedIndexBackend implements PartitionedIndexBackend {
+
+ @Override
+ public String get(String partitionPath, String recordKey) {
+ return null;
+ }
+
+ @Override
+ public void update(String partitionPath, String recordKey, String fileId) {
+ // do nothing
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/FlinkStateIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/FlinkStateIndexBackend.java
index c952f8f047d1..b4ea70949232 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/FlinkStateIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/FlinkStateIndexBackend.java
@@ -25,9 +25,9 @@ import org.apache.flink.api.common.state.ValueState;
import java.io.IOException;
/**
- * An implementation of {@link IndexBackend} based on flink keyed value state.
+ * An implementation of {@link GlobalIndexBackend} based on flink keyed value
state.
*/
-public class FlinkStateIndexBackend implements IndexBackend {
+public class FlinkStateIndexBackend implements GlobalIndexBackend {
private final ValueState<HoodieRecordGlobalLocation> indexState;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalIndexBackend.java
similarity index 56%
copy from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
copy to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalIndexBackend.java
index 636ca4e6b0d3..12fab0631b03 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalIndexBackend.java
@@ -19,18 +19,13 @@
package org.apache.hudi.sink.partitioner.index;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
-import org.apache.hudi.sink.event.Correspondent;
-import org.apache.flink.metrics.MetricGroup;
-
-import java.io.Closeable;
import java.io.IOException;
/**
- * An interface that provides an abstraction for managing record location
mappings in the index system.
- * It serves as the backend for storing and retrieving the location of records
identified by their unique keys.
+ * Index backend for managing global record location mappings keyed by record
key.
*/
-public interface IndexBackend extends Closeable {
+public interface GlobalIndexBackend extends IndexBackend {
/**
* Retrieves the global location of a record based on its key.
@@ -47,32 +42,4 @@ public interface IndexBackend extends Closeable {
* @param recordGlobalLocation the new global location of the record
*/
void update(String recordKey, HoodieRecordGlobalLocation
recordGlobalLocation) throws IOException;
-
- /**
- * Listener method called when the bucket assign operator finishes the
checkpoint with {@code checkpointId}.
- *
- * @param checkpointId checkpoint id.
- */
- default void onCheckpoint(long checkpointId) {
- // do nothing.
- }
-
- /**
- * Listener method called when the bucket assign operator receives a notify
checkpoint complete event.
- *
- * @param correspondent The Correspondent used to get inflight
instants from the coordinator.
- * @param completedCheckpointId The latest completed checkpoint id
- */
- default void onCheckpointComplete(Correspondent correspondent, long
completedCheckpointId) {
- // do nothing.
- }
-
- /**
- * Registers metrics for this backend.
- *
- * @param metricGroup flink metric group
- */
- default void registerMetrics(MetricGroup metricGroup) {
- // do nothing.
- }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
similarity index 80%
copy from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
copy to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
index 6fdecb0f22c4..cef722c87a4b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
@@ -44,10 +44,14 @@ import java.util.List;
import java.util.Map;
/**
- * An implementation of {@link IndexBackend} based on the record level index
in metadata table.
+ * Global record-level-index backend backed by the metadata table.
+ *
+ * <p>The backend serves the global RLI path: lookups are keyed by record key
only and return
+ * {@link HoodieRecordGlobalLocation}. A checkpoint-aware {@link
RecordIndexCache} keeps recently
+ * accessed locations locally and falls back to metadata table reads for cache
misses.
*/
@Slf4j
-public class RecordLevelIndexBackend implements MinibatchIndexBackend {
+public class GlobalRecordLevelIndexBackend implements MinibatchIndexBackend {
@VisibleForTesting
@Getter
private final RecordIndexCache recordIndexCache;
@@ -55,7 +59,13 @@ public class RecordLevelIndexBackend implements
MinibatchIndexBackend {
private final HoodieTableMetaClient metaClient;
private HoodieTableMetadata metadataTable;
- public RecordLevelIndexBackend(Configuration conf, long initCheckpointId) {
+ /**
+ * Creates a global RLI backend with a checkpoint-aware cache.
+ *
+ * @param conf Flink write configuration
+ * @param initCheckpointId restored checkpoint id used to seed cache
eviction state, or {@code -1}
+ */
+ public GlobalRecordLevelIndexBackend(Configuration conf, long
initCheckpointId) {
this.metaClient = StreamerUtil.createMetaClient(conf);
this.conf = conf;
this.recordIndexCache = new RecordIndexCache(conf, initCheckpointId);
@@ -102,11 +112,25 @@ public class RecordLevelIndexBackend implements
MinibatchIndexBackend {
recordKeysAndLocations.forEach(keyAndLocation ->
recordIndexCache.update(keyAndLocation.getKey(), keyAndLocation.getValue()));
}
+ /**
+ * Starts a new checkpoint generation in the local record-index cache.
+ *
+ * @param checkpointId checkpoint id from the bucket assign operator
+ */
@Override
public void onCheckpoint(long checkpointId) {
recordIndexCache.addCheckpointCache(checkpointId);
}
+ /**
+ * Marks older cache generations evictable after a checkpoint completes.
+ *
+ * <p>The minimum inflight checkpoint is used as the retention boundary. If
no instant is inflight,
+ * the completed checkpoint id is safe because the writer coordinator has
already advanced past it.
+ *
+ * @param correspondent writer coordinator correspondent used to query
inflight instants
+ * @param completedCheckpointID completed checkpoint id reported by Flink
+ */
@Override
public void onCheckpointComplete(Correspondent correspondent, long
completedCheckpointID) {
Map<Long, String> inflightInstants =
correspondent.requestInflightInstants();
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
index 636ca4e6b0d3..d0ec6eb6a7c4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
@@ -18,36 +18,17 @@
package org.apache.hudi.sink.partitioner.index;
-import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.sink.event.Correspondent;
import org.apache.flink.metrics.MetricGroup;
import java.io.Closeable;
-import java.io.IOException;
/**
- * An interface that provides an abstraction for managing record location
mappings in the index system.
- * It serves as the backend for storing and retrieving the location of records
identified by their unique keys.
+ * An interface that provides common lifecycle hooks for index backends.
*/
public interface IndexBackend extends Closeable {
- /**
- * Retrieves the global location of a record based on its key.
- *
- * @param recordKey the unique key identifying the record
- * @return the global location of the record, or null if the record is not
found in the index
- */
- HoodieRecordGlobalLocation get(String recordKey) throws IOException;
-
- /**
- * Updates the global location of a record in the index.
- *
- * @param recordKey the unique key identifying the record
- * @param recordGlobalLocation the new global location of the record
- */
- void update(String recordKey, HoodieRecordGlobalLocation
recordGlobalLocation) throws IOException;
-
/**
* Listener method called when the bucket assign operator finishes the
checkpoint with {@code checkpointId}.
*
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
index 4e9df8d9fcbd..01a2e04224a5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
@@ -39,10 +39,21 @@ import
org.apache.flink.runtime.state.FunctionInitializationContext;
import java.util.stream.StreamSupport;
/**
- * Factory to create an {@link IndexBackend} based on the configured index
type.
+ * Factory to create a {@link GlobalIndexBackend} based on the configured
index type.
*/
public class IndexBackendFactory {
- public static IndexBackend create(Configuration conf,
FunctionInitializationContext context, RuntimeContext runtimeContext) throws
Exception {
+ /**
+ * Creates the global index backend used by the legacy bucket assign
function.
+ *
+ * <p>Flink state index stores locations in keyed state. Global RLI either
uses the bootstrap
+ * RocksDB cache or a metadata-table-backed backend with checkpoint-aware
cache eviction.
+ *
+ * @param conf Flink write configuration
+ * @param context Flink function initialization context
+ * @param runtimeContext Flink runtime context for job and attempt metadata
+ * @return global index backend for record-key lookups
+ */
+ public static GlobalIndexBackend create(Configuration conf,
FunctionInitializationContext context, RuntimeContext runtimeContext) throws
Exception {
HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
switch (indexType) {
case FLINK_STATE:
@@ -74,7 +85,7 @@ public class IndexBackendFactory {
// set the jobId state with current job id.
jobIdState.clear();
jobIdState.add(RuntimeContextUtils.getJobId(runtimeContext));
- return new RecordLevelIndexBackend(conf, initCheckpointId);
+ return new GlobalRecordLevelIndexBackend(conf, initCheckpointId);
}
default:
throw new UnsupportedOperationException("Index type " + indexType + "
is not supported for bucket assigning yet.");
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
index ad13e54bd86e..8e64d58a97b3 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
@@ -51,6 +51,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
/**
* A Flink stream writing function that handles writing index records to the
metadata table.
@@ -83,6 +84,8 @@ public class IndexWriteFunction extends
AbstractStreamWriteFunction<RowData> {
*/
private transient HoodieFlinkTable<?> flinkTable;
+ private transient Function<RowData, String> dedupKeyExtractor;
+
public IndexWriteFunction(Configuration conf) {
super(conf, true);
}
@@ -95,6 +98,8 @@ public class IndexWriteFunction extends
AbstractStreamWriteFunction<RowData> {
config,
config.get(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE) * 1024 * 1024);
this.indexDataBuffer =
BufferUtils.createBuffer(IndexRowUtils.INDEX_ROW_TYPE, memorySegmentPool);
+ this.dedupKeyExtractor =
this.writeClient.getConfig().isRecordLevelIndexEnabled()
+ ? IndexWriteFunction::getPartitionedDedupKey :
IndexRowUtils::getRecordKey;
}
@Override
@@ -171,13 +176,19 @@ public class IndexWriteFunction extends
AbstractStreamWriteFunction<RowData> {
Map<String, HoodieRecord> keyAndRecordMap = new LinkedHashMap<>();
while (rowItr.hasNext()) {
RowData indexRow = rowItr.next();
- String recordKey = IndexRowUtils.getRecordKey(indexRow);
- keyAndRecordMap.put(recordKey,
IndexRowUtils.convertToHoodieRecord(this.currentInstant, indexRow,
writeConfig));
+ keyAndRecordMap.put(
+ dedupKeyExtractor.apply(indexRow),
+ IndexRowUtils.convertToHoodieRecord(this.currentInstant, indexRow,
writeConfig));
dataPartitions.add(IndexRowUtils.getPartition(indexRow));
}
return Pair.of(new ArrayList<>(keyAndRecordMap.values()), dataPartitions);
}
+ private static String getPartitionedDedupKey(RowData indexRow) {
+ String recordKey = IndexRowUtils.getRecordKey(indexRow);
+ return IndexRowUtils.getPartition(indexRow) + "/" + recordKey;
+ }
+
@Override
public void endInput() {
super.endInput();
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
index 0b79ee10bcdf..cdfceb5cd49c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
@@ -28,9 +28,23 @@ import java.util.Map;
/**
* Index delegator which supports mini-batch index operations.
*/
-public interface MinibatchIndexBackend extends IndexBackend {
+public interface MinibatchIndexBackend extends GlobalIndexBackend {
- Map<String, HoodieRecordGlobalLocation> get(List<String> recordKey) throws
IOException;
+ /**
+ * Retrieves locations for a batch of record keys.
+ *
+ * <p>The returned map should contain one entry for each requested key and
preserve the request
+ * order when the implementation can do so. Missing keys should map to
{@code null}.
+ *
+ * @param recordKeys record keys to look up
+ * @return record-key to global-location mapping
+ */
+ Map<String, HoodieRecordGlobalLocation> get(List<String> recordKeys) throws
IOException;
+ /**
+ * Updates locations for a batch of record keys.
+ *
+ * @param recordKeysAndLocations record-key and location pairs to write into
the backend
+ */
void update(List<Pair<String, HoodieRecordGlobalLocation>>
recordKeysAndLocations) throws IOException;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/PartitionedIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/PartitionedIndexBackend.java
new file mode 100644
index 000000000000..3a2e583f41d9
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/PartitionedIndexBackend.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+/**
+ * Index backend for partition-scoped record-to-file-group mappings.
+ *
+ * <p>Unlike {@link GlobalIndexBackend}, this backend is addressed by both
data partition and record
+ * key. It is used by the partitioned RLI based dynamic bucket assignment
path, where the persisted
+ * index value is the file group id selected by {@code BucketAssigner}.
+ */
+public interface PartitionedIndexBackend extends IndexBackend {
+
+ /**
+ * Retrieves the file group id for a record in the given partition.
+ *
+ * @param partitionPath the partition path of the record
+ * @param recordKey the unique key identifying the record
+ * @return the file group id, or null if the record is not found in the index
+ */
+ String get(String partitionPath, String recordKey);
+
+ /**
+ * Updates the file group id for a record in the given partition.
+ *
+ * @param partitionPath the partition path of the record
+ * @param recordKey the unique key identifying the record
+ * @param fileId the file group id, usually the file id prefix produced by
bucket assignment
+ */
+ void update(String partitionPath, String recordKey, String fileId);
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
index 6fdecb0f22c4..cc44a3e154f5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,108 +19,201 @@
package org.apache.hudi.sink.partitioner.index;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.HoodieDataUtils;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.sink.event.Correspondent;
+import org.apache.hudi.sink.utils.SamplingActionExecutor;
+import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
+import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
/**
- * An implementation of {@link IndexBackend} based on the record level index
in metadata table.
+ * Partition-scoped index backend backed by partitioned record level index.
+ *
+ * <p>The backend keeps a lazy cache per data partition. Each cache maps
record keys owned by the
+ * current Flink assign subtask to the file group id stored in the partitioned
RLI. New records are
+ * added to the cache after bucket assignment, while existing records keep
their original RLI routing.
+ *
+ * <p>Partition caches are evicted lazily: checkpoint completion only advances
the safe eviction
+ * watermark, and cleanup happens when the total in-memory cache size exceeds
+ * {@link FlinkOptions#INDEX_RLI_CACHE_SIZE}.
*/
@Slf4j
-public class RecordLevelIndexBackend implements MinibatchIndexBackend {
- @VisibleForTesting
- @Getter
- private final RecordIndexCache recordIndexCache;
+public class RecordLevelIndexBackend implements PartitionedIndexBackend {
+
private final Configuration conf;
+ private final HoodieWriteConfig writeConfig;
private final HoodieTableMetaClient metaClient;
+ private final long maxCacheSizeInBytes;
+ private final BootstrapFilter bootstrapFilter;
private HoodieTableMetadata metadataTable;
- public RecordLevelIndexBackend(Configuration conf, long initCheckpointId) {
- this.metaClient = StreamerUtil.createMetaClient(conf);
+ @Getter
+ private final Map<String, BucketCache> partitionBucketCaches = new
LinkedHashMap<>(16, 0.75f, true);
+ private long currentCheckpointId = -1L;
+ private long minRetainedCheckpointId = Long.MIN_VALUE;
+ private final SamplingActionExecutor cleanExecutor = new
SamplingActionExecutor();
+
+ /**
+ * Creates a partitioned RLI backend with custom ownership filtering.
+ *
+ * <p>This constructor is used by simple bucket index, whose write ownership
is determined by
+ * partition and bucket file id rather than Flink record-key key groups.
+ *
+ * @param conf Flink write configuration
+ * @param bootstrapFilter filter for deciding whether a bootstrapped RLI
record belongs to this task
+ */
+ public RecordLevelIndexBackend(Configuration conf, BootstrapFilter
bootstrapFilter) {
this.conf = conf;
- this.recordIndexCache = new RecordIndexCache(conf, initCheckpointId);
+ this.writeConfig = FlinkWriteClients.getHoodieClientConfig(conf, false,
false);
+ this.metaClient = StreamerUtil.createMetaClient(conf);
+ this.maxCacheSizeInBytes = conf.get(FlinkOptions.INDEX_RLI_CACHE_SIZE) *
1024 * 1024;
+ this.bootstrapFilter = bootstrapFilter;
reloadMetadataTable();
}
@Override
- public HoodieRecordGlobalLocation get(String recordKey) throws IOException {
- return get(Collections.singletonList(recordKey)).get(recordKey);
+ public String get(String partitionPath, String recordKey) {
+ BucketCache cache = getOrBootstrapPartition(partitionPath);
+ return cache.getFileGroupId(recordKey);
}
@Override
- public void update(String recordKey, HoodieRecordGlobalLocation
recordGlobalLocation) {
- recordIndexCache.update(recordKey, recordGlobalLocation);
+ public void update(String partitionPath, String recordKey, String fileId) {
+ BucketCache cache = getOrBootstrapPartition(partitionPath);
+ cache.putRecordKey(recordKey, fileId);
+ cleanExecutor.runIfNecessary(() -> cleanIfNecessary(0L, partitionPath));
}
+ /**
+ * Records the latest checkpoint id seen by this assign subtask.
+ *
+ * <p>New record-key mappings written after this point are tagged with the
checkpoint id so lazy
+ * eviction can avoid removing partition caches that may still be needed for
recovery.
+ *
+ * @param checkpointId checkpoint id from the bucket assign operator
+ */
@Override
- public Map<String, HoodieRecordGlobalLocation> get(List<String> recordKeys)
throws IOException {
- // use a linked hash map to keep the natural order.
- Map<String, HoodieRecordGlobalLocation> keysAndLocations = new
LinkedHashMap<>();
- List<String> missedKeys = new ArrayList<>();
- for (String key: recordKeys) {
- HoodieRecordGlobalLocation location = recordIndexCache.get(key);
- if (location == null) {
- missedKeys.add(key);
- }
- // insert anyway even the location is null to keep the natural order.
- keysAndLocations.put(key, location);
- }
- if (!missedKeys.isEmpty()) {
- HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
-
metadataTable.readRecordIndexLocationsWithKeys(HoodieListData.eager(missedKeys));
- List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations =
HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
- recordIndexLocations.forEach(keyAndLocation -> {
- recordIndexCache.update(keyAndLocation.getKey(),
keyAndLocation.getValue());
- keysAndLocations.put(keyAndLocation.getKey(),
keyAndLocation.getValue());
- });
- }
- return keysAndLocations;
+ public void onCheckpoint(long checkpointId) {
+ this.currentCheckpointId = checkpointId;
}
+ /**
+ * Advances the lazy eviction watermark after a completed checkpoint.
+ *
+ * <p>The watermark is derived from the minimum inflight checkpoint so
caches that may still be
+ * needed for recovery are retained. Metadata table state is reloaded after
the coordinator has
+ * committed or advanced instants for the completed checkpoint.
+ *
+ * @param correspondent writer coordinator correspondent used to query
inflight instants
+ * @param completedCheckpointId completed checkpoint id reported by Flink
+ */
@Override
- public void update(List<Pair<String, HoodieRecordGlobalLocation>>
recordKeysAndLocations) throws IOException {
- recordKeysAndLocations.forEach(keyAndLocation ->
recordIndexCache.update(keyAndLocation.getKey(), keyAndLocation.getValue()));
+ public void onCheckpointComplete(Correspondent correspondent, long
completedCheckpointId) {
+ Map<Long, String> inflightInstants =
correspondent.requestInflightInstants();
+
updateEvictableCkp(inflightInstants.keySet().stream().min(Long::compareTo).orElse(completedCheckpointId));
+ metaClient.reloadActiveTimeline();
+ reloadMetadataTable();
}
- @Override
- public void onCheckpoint(long checkpointId) {
- recordIndexCache.addCheckpointCache(checkpointId);
+ private BucketCache getOrBootstrapPartition(String partitionPath) {
+ BucketCache cache = partitionBucketCaches.get(partitionPath);
+ if (cache != null) {
+ return cache;
+ }
+
+ cache = bootstrapPartition(partitionPath);
+ partitionBucketCaches.put(partitionPath, cache);
+ return cache;
}
- @Override
- public void onCheckpointComplete(Correspondent correspondent, long
completedCheckpointID) {
- Map<Long, String> inflightInstants =
correspondent.requestInflightInstants();
- log.info("Inflight instants and the corresponding checkpoints: {},
notified completed checkpoints: {}",
- inflightInstants, completedCheckpointID);
- // if there are no inflight instants,
- // the latest completed checkpoint id is used as the minimum checkpoint id,
- // since the streaming write operator always uses previous checkpoint id
to request the new instant.
-
recordIndexCache.markAsEvictable(inflightInstants.keySet().stream().min(Long::compareTo).orElse(completedCheckpointID));
- this.metaClient.reloadActiveTimeline();
- reloadMetadataTable();
+ private BucketCache bootstrapPartition(String partitionPath) {
+ BucketCache cache = createBucketCache(partitionPath);
+ if
(!metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX))
{
+ log.info("Record index is not available yet. Start partitioned RLI cache
empty for partition {}", partitionPath);
+ return cache;
+ }
+
+ try {
+ Map<String, List<FileSlice>> partitionedFileGroups =
+
metadataTable.getBucketizedFileGroupsForPartitionedRLI(MetadataPartitionType.RECORD_INDEX);
+ List<FileSlice> fileSlices = partitionedFileGroups.get(partitionPath);
+ if (fileSlices == null || fileSlices.isEmpty()) {
+ return cache;
+ }
+ HoodiePairData<String, HoodieRecordGlobalLocation> locations =
+ metadataTable.readRecordIndexLocations(fileSlicesToFilter ->
fileSlices);
+ final AtomicLong totalCnt = new AtomicLong();
+ locations.forEach(locationPair -> {
+ String recordKey = locationPair.getLeft();
+ String fileId = locationPair.getRight().getFileId();
+ if (bootstrapFilter.shouldLoad(partitionPath, recordKey, fileId)) {
+ cache.bootstrapRecordKey(recordKey, fileId);
+ }
+ totalCnt.incrementAndGet();
+ });
+ log.info("Bootstrapped partitioned RLI cache for partition {} with {}
owned records from total {} RLI records.",
+ partitionPath, cache.size(), totalCnt.get());
+ return cache;
+ } catch (Exception e) {
+ cache.close();
+ throw new HoodieException("Failed to bootstrap partitioned RLI cache for
partition " + partitionPath, e);
+ }
+ }
+
+ private BucketCache createBucketCache(String partitionPath) {
+ long inferredCacheSize = inferMemorySizeForCache();
+ cleanIfNecessary(inferredCacheSize, partitionPath);
+ return newBucketCache(createSpillableMap(partitionPath,
inferredCacheSize), Long.MIN_VALUE);
+ }
+
+ private ExternalSpillableMap<String, Integer> createSpillableMap(String
partitionPath, long inferredCacheSize) {
+ // preserve some memory for fileId dict, assuming the average file group
number as Short.MAX_VALUE, then the size will be about 1MB.
+ long maxInMemorySizeInBytes = Math.max(inferredCacheSize -
BucketCache.FILE_ID_DICT_ENTRY_SIZE * Short.MAX_VALUE, 1);
+ try {
+ return new ExternalSpillableMap<>(
+ maxInMemorySizeInBytes,
+ writeConfig.getSpillableMapBasePath(),
+ new DefaultSizeEstimator<>(),
+ new DefaultSizeEstimator<>(),
+ ExternalSpillableMap.DiskMapType.ROCKS_DB,
+ new DefaultSerializer<>(),
+
writeConfig.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED),
+ "PartitionedRLICache-" + partitionPath);
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to create partitioned RLI cache for
partition " + partitionPath, e);
+ }
}
private void reloadMetadataTable() {
+ closeMetadataTable();
this.metadataTable =
metaClient.getTableFormat().getMetadataFactory().create(
HoodieFlinkEngineContext.DEFAULT,
metaClient.getStorage(),
@@ -128,16 +221,179 @@ public class RecordLevelIndexBackend implements
MinibatchIndexBackend {
conf.get(FlinkOptions.PATH));
}
+ private void updateEvictableCkp(long checkpointId) {
+ ValidationUtils.checkArgument(checkpointId >= minRetainedCheckpointId,
+ String.format("The minimum retained checkpoint id should be increased,
previous: %s, received: %s",
+ minRetainedCheckpointId, checkpointId));
+ minRetainedCheckpointId = checkpointId;
+ }
+
+ private long inferMemorySizeForCache() {
+ int concurrentPartitionsNum =
conf.get(FlinkOptions.INDEX_RLI_CACHE_CONCURRENT_PARTITIONS_NUM);
+ if (partitionBucketCaches.isEmpty()) {
+ return maxCacheSizeInBytes / concurrentPartitionsNum;
+ }
+ long averageMemorySize = getCurrentHeapSize() /
partitionBucketCaches.size();
+ if (averageMemorySize <= 0) {
+ return maxCacheSizeInBytes / concurrentPartitionsNum;
+ }
+ return averageMemorySize;
+ }
+
+ private long getCurrentHeapSize() {
+ return partitionBucketCaches.values().stream()
+ .map(BucketCache::getHeapSize)
+ .reduce(Long::sum)
+ .orElse(0L);
+ }
+
+ @VisibleForTesting
+ void cleanIfNecessary(long nextCacheSize, String protectedPartitionPath) {
+ while (getCurrentHeapSize() + nextCacheSize > maxCacheSizeInBytes) {
+ boolean cleaned = false;
+ Iterator<Map.Entry<String, BucketCache>> iterator =
partitionBucketCaches.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, BucketCache> entry = iterator.next();
+ if (entry.getKey().equals(protectedPartitionPath)) {
+ continue;
+ }
+ BucketCache cache = entry.getValue();
+ if (cache.lastUpdatedCheckpoint < minRetainedCheckpointId) {
+ cache.close();
+ iterator.remove();
+ cleaned = true;
+ log.info("Evict partitioned RLI cache for partition {}",
entry.getKey());
+ break;
+ }
+ }
+ if (!cleaned) {
+ // All remaining partition caches are either protected or too recent
to evict safely.
+ // Returning avoids retrying the same scan without making progress.
+ return;
+ }
+ }
+ }
+
@Override
public void close() throws IOException {
- this.recordIndexCache.close();
- if (this.metadataTable == null) {
- return;
+ partitionBucketCaches.values().forEach(BucketCache::close);
+ partitionBucketCaches.clear();
+ closeMetadataTable();
+ }
+
+ private void closeMetadataTable() {
+ if (metadataTable != null) {
+ try {
+ metadataTable.close();
+ } catch (Exception e) {
+ throw new HoodieException("Failed to close metadata table", e);
+ } finally {
+ metadataTable = null;
+ }
}
- try {
- this.metadataTable.close();
- } catch (Exception e) {
- throw new HoodieException("Exception happened during close metadata
table.", e);
+ }
+
+ /**
+ * Creates a bucket cache with an injected spillable map for tests.
+ *
+ * @param recordKeyToFileGroupIdCode encoded record-key to file-group-id
mapping
+ * @param lastUpdatedCheckpoint checkpoint id of the latest non-bootstrap
update
+ * @return bucket cache for one data partition
+ */
+ @VisibleForTesting
+ public BucketCache newBucketCache(
+ ExternalSpillableMap<String, Integer> recordKeyToFileGroupIdCode,
+ long lastUpdatedCheckpoint) {
+ return new BucketCache(recordKeyToFileGroupIdCode, lastUpdatedCheckpoint);
+ }
+
+ @FunctionalInterface
+ public interface BootstrapFilter {
+ boolean shouldLoad(String partitionPath, String recordKey, String fileId);
+ }
+
+ /**
+ * Bucket Cache of one data table partition.
+ *
+ * <p>The spillable map stores {@code recordKey -> fileGroupIdCode}. File
group ids are dictionary
+ * encoded inside the cache so hot partitions do not repeat UUID-style file
group id strings for
+ * every record key entry.
+ */
+ public class BucketCache implements Closeable {
+ @Getter
+ private final ExternalSpillableMap<String, Integer>
recordKeyToFileGroupIdCode;
+ // Keep file group ids in a partition-local dictionary so the spillable
record map stores
+ // compact integer codes instead of repeating long UUID-style file group
id strings.
+ private final Map<String, Integer> fileGroupIdToDictId;
+ private final List<String> dictIdToFileGroupId;
+ private long lastUpdatedCheckpoint;
+ // File group ids generated by bucket assign are UUID-style 36-character
strings.
+ // Each dictionary entry keeps one file group id and one 4-byte integer
code.
+ private static final long FILE_ID_DICT_ENTRY_SIZE = 36L + Integer.BYTES;
+
+ BucketCache(ExternalSpillableMap<String, Integer>
recordKeyToFileGroupIdCode, long lastUpdatedCheckpoint) {
+ this.recordKeyToFileGroupIdCode = recordKeyToFileGroupIdCode;
+ this.fileGroupIdToDictId = new HashMap<>();
+ this.dictIdToFileGroupId = new ArrayList<>();
+ this.lastUpdatedCheckpoint = lastUpdatedCheckpoint;
+ }
+
+ /**
+ * Returns the decoded file group id for the record key, or {@code null}
when the key is unknown.
+ */
+ String getFileGroupId(String recordKey) {
+ Integer fileGroupIdCode = this.recordKeyToFileGroupIdCode.get(recordKey);
+ return fileGroupIdCode == null ? null :
dictIdToFileGroupId.get(fileGroupIdCode);
+ }
+
+ /**
+ * Returns the number of cached record-key mappings in this partition.
+ */
+ int size() {
+ return this.recordKeyToFileGroupIdCode.size();
+ }
+
+ private long getHeapSize() {
+ return this.recordKeyToFileGroupIdCode.getCurrentInMemoryMapSize() +
getFileGroupIdDictHeapSize();
+ }
+
+ /**
+ * Adds a record key mapping from incoming new records.
+ */
+ void putRecordKey(String recordKey, String fileGroupId) {
+ this.recordKeyToFileGroupIdCode.put(recordKey,
getOrCreateFileGroupIdCode(fileGroupId));
+ this.lastUpdatedCheckpoint = currentCheckpointId;
+ }
+
+ /**
+ * Bootstrap a record key mapping from the RLI data.
+ */
+ void bootstrapRecordKey(String recordKey, String fileGroupId) {
+ this.recordKeyToFileGroupIdCode.put(recordKey,
getOrCreateFileGroupIdCode(fileGroupId));
+ }
+
+ private int getOrCreateFileGroupIdCode(String fileGroupId) {
+ // Encode each unique file group id once, and reuse the code for all
record keys routed to it.
+ Integer existingCode = fileGroupIdToDictId.get(fileGroupId);
+ if (existingCode != null) {
+ return existingCode;
+ }
+
+ int newCode = dictIdToFileGroupId.size();
+ fileGroupIdToDictId.put(fileGroupId, newCode);
+ dictIdToFileGroupId.add(fileGroupId);
+ return newCode;
+ }
+
+ private long getFileGroupIdDictHeapSize() {
+ return dictIdToFileGroupId.size() * FILE_ID_DICT_ENTRY_SIZE;
+ }
+
+ @Override
+ public void close() {
+ this.recordKeyToFileGroupIdCode.close();
+ this.fileGroupIdToDictId.clear();
+ this.dictIdToFileGroupId.clear();
}
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java
index 86d383a3916c..ea45f7da9bd0 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java
@@ -32,10 +32,10 @@ import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
/**
- * An implementation of {@link IndexBackend} based on RocksDB.
+ * An implementation of {@link GlobalIndexBackend} based on RocksDB.
*/
@Slf4j
-public class RocksDBIndexBackend implements IndexBackend {
+public class RocksDBIndexBackend implements GlobalIndexBackend {
private static final String COLUMN_FAMILY = "index_cache";
private final RocksDBDAO rocksDBDAO;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 47d1f8e9e716..44e9dfd335cc 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -56,11 +56,14 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketIndexPartitioner;
+import org.apache.hudi.sink.partitioner.DynamicBucketAssignFunction;
+import org.apache.hudi.sink.partitioner.DynamicBucketAssignOperator;
import org.apache.hudi.sink.partitioner.MiniBatchBucketAssignOperator;
+import org.apache.hudi.sink.partitioner.MinibatchBucketAssignFunction;
import org.apache.hudi.sink.partitioner.RecordIndexPartitioner;
+import org.apache.hudi.sink.partitioner.GlobalRecordIndexPartitioner;
import org.apache.hudi.sink.partitioner.index.IndexRowUtils;
import org.apache.hudi.sink.partitioner.index.IndexWriteOperator;
-import org.apache.hudi.sink.partitioner.MinibatchBucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.table.format.FilePathUtils;
@@ -126,6 +129,10 @@ public class Pipelines {
final int PARALLELISM_VALUE = conf.get(FlinkOptions.WRITE_TASKS);
final boolean isBucketIndexType = OptionsResolver.isBucketIndexType(conf);
+ if (OptionsResolver.isRecordLevelIndex(conf)) {
+ throw new HoodieException(
+ "Record level index does not work with bulk insert using FLINK
engine.");
+ }
if (isBucketIndexType) {
// TODO support bulk insert for consistent bucket index
if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) {
@@ -285,7 +292,7 @@ public class Pipelines {
DataStream<HoodieFlinkInternalRow> dataStream1 =
rowDataToHoodieRecord(conf, rowType, dataStream);
if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
- boolean isRliBootstrap = OptionsResolver.isRecordLevelIndex(conf);
+ boolean isRliBootstrap = OptionsResolver.isGlobalRecordLevelIndex(conf);
dataStream1 = dataStream1
.transform(
"index_bootstrap",
@@ -424,7 +431,11 @@ public class Pipelines {
if (isStreamingIndexWriteEnabled) {
// index writing pipeline
SingleOutputStreamOperator<RowData> indexWriteDatastream =
writeDatastream
- .partitionCustom(new RecordIndexPartitioner(conf),
IndexRowUtils::getHoodieKey)
+ .partitionCustom(
+ OptionsResolver.isRecordLevelIndex(conf)
+ ? new RecordIndexPartitioner(conf)
+ : new GlobalRecordIndexPartitioner(conf),
+ IndexRowUtils::getHoodieKey)
.transform(
opName("index_write", conf),
TypeInformation.of(RowData.class),
@@ -450,15 +461,23 @@ public class Pipelines {
private static DataStream<HoodieFlinkInternalRow> createBucketAssignStream(
DataStream<HoodieFlinkInternalRow> inputStream, Configuration conf,
RowType rowType, String writeOperatorUid) {
String assignerOperatorName = "bucket_assigner";
- if (OptionsResolver.isRecordLevelIndex(conf) &&
!conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+ if (OptionsResolver.isGlobalRecordLevelIndex(conf) &&
!conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
return inputStream
- .partitionCustom(new RecordIndexPartitioner(conf), row -> new
HoodieKey(row.getRecordKey(), row.getPartitionPath()))
.transform(
assignerOperatorName,
new HoodieFlinkInternalRowTypeInfo(rowType),
new MiniBatchBucketAssignOperator(new
MinibatchBucketAssignFunction(conf),
OperatorIDGenerator.fromUid(writeOperatorUid)))
.uid(opUID(assignerOperatorName, conf))
.setParallelism(conf.get(FlinkOptions.BUCKET_ASSIGN_TASKS));
+ } else if (OptionsResolver.isRecordLevelIndex(conf)) {
+ return inputStream
+ .keyBy(HoodieFlinkInternalRow::getRecordKey)
+ .transform(
+ assignerOperatorName,
+ new HoodieFlinkInternalRowTypeInfo(rowType),
+ new DynamicBucketAssignOperator(new
DynamicBucketAssignFunction(conf),
OperatorIDGenerator.fromUid(writeOperatorUid)))
+ .uid(opUID(assignerOperatorName, conf))
+ .setParallelism(conf.get(FlinkOptions.BUCKET_ASSIGN_TASKS));
} else {
return inputStream
// Key-by record key, to avoid multiple subtasks write to a bucket
at the same time
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/SamplingActionExecutor.java
similarity index 56%
copy from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
copy to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/SamplingActionExecutor.java
index 0b79ee10bcdf..31addf73cd87 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/SamplingActionExecutor.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,21 +16,29 @@
* limitations under the License.
*/
-package org.apache.hudi.sink.partitioner.index;
-
-import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
-import org.apache.hudi.common.util.collection.Pair;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
+package org.apache.hudi.sink.utils;
/**
- * Index delegator which supports mini-batch index operations.
+ * Executes an action after a fixed number of record updates.
*/
-public interface MinibatchIndexBackend extends IndexBackend {
+public class SamplingActionExecutor {
+ static final int DEFAULT_RECORD_INTERVAL = 10000;
+
+ private final int stepSize;
+ private long recordCnt;
+
+ public SamplingActionExecutor() {
+ this(DEFAULT_RECORD_INTERVAL);
+ }
- Map<String, HoodieRecordGlobalLocation> get(List<String> recordKey) throws
IOException;
+ public SamplingActionExecutor(int stepSize) {
+ this.stepSize = stepSize;
+ }
- void update(List<Pair<String, HoodieRecordGlobalLocation>>
recordKeysAndLocations) throws IOException;
-}
+ public void runIfNecessary(Runnable action) {
+ if ((++recordCnt) % stepSize == 0) {
+ action.run();
+ recordCnt = 0L;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
index 2b0d21baf047..67e20e9e2282 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
@@ -39,6 +39,7 @@ import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.source.ExpressionEvaluators;
import org.apache.hudi.util.StreamerUtil;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
@@ -59,6 +60,7 @@ import java.util.stream.Collectors;
/**
* An index support implementation that leverages Record Level Index to prune
file slices.
*/
+@Slf4j
public class RecordLevelIndex implements FlinkMetadataIndex {
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(RecordLevelIndex.class);
@@ -107,16 +109,20 @@ public class RecordLevelIndex implements
FlinkMetadataIndex {
if (!isIndexAvailable()) {
return fileSlices;
}
- HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
-
getMetadataTable().readRecordIndexLocationsWithKeys(HoodieListData.eager(hoodieKeysFromFilter));
+ HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData = null;
try {
+ recordIndexData =
+
getMetadataTable().readRecordIndexLocationsWithKeys(HoodieListData.eager(hoodieKeysFromFilter));
List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations =
HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
Set<String> fileIds = recordIndexLocations.stream()
.map(pair ->
pair.getValue().getFileId()).collect(Collectors.toSet());
return fileSlices.stream().filter(fileSlice ->
fileIds.contains(fileSlice.getFileId())).collect(Collectors.toList());
+ } catch (Throwable e) {
+ log.error("Failed to read metadata index: {} for data skipping",
getIndexPartitionName(), e);
+ return fileSlices;
} finally {
// Clean up the RDD to avoid memory leaks
- recordIndexData.unpersistWithDependencies();
+
Option.ofNullable(recordIndexData).ifPresent(HoodiePairData::unpersistWithDependencies);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index a08f3f56aacb..e87a2680f185 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -196,16 +196,26 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
HoodieIndexConfig.INDEX_TYPE.checkValues(indexTypeStr);
}
HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
- if (indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX) {
- ValidationUtils.checkArgument(conf.get(FlinkOptions.METADATA_ENABLED),
- String.format("Metadata table should be enabled when %s is %s.",
FlinkOptions.INDEX_TYPE.key(),
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX));
-
ValidationUtils.checkArgument(conf.get(FlinkOptions.INDEX_GLOBAL_ENABLED),
- String.format("Partition level index updating is not supported for
GLOBAL_RECORD_LEVEL_INDEX, please set '%s' = 'true'.",
FlinkOptions.INDEX_GLOBAL_ENABLED.key()));
-
- boolean deferredRLI = Boolean.parseBoolean(conf.getString(
- HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key(),
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.defaultValue().toString()));
- ValidationUtils.checkArgument(!deferredRLI,
- String.format("Deferred RLI initialization is not supported for
flink ingestion, please set '%s' = 'false'.",
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key()));
+ switch (indexType) {
+ case GLOBAL_RECORD_LEVEL_INDEX:
+ ValidationUtils.checkArgument(conf.get(FlinkOptions.METADATA_ENABLED),
+ String.format("Metadata table should be enabled when %s is %s.",
FlinkOptions.INDEX_TYPE.key(),
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX));
+
ValidationUtils.checkArgument(conf.get(FlinkOptions.INDEX_GLOBAL_ENABLED),
+ String.format("Partition level index updating is not supported for
GLOBAL_RECORD_LEVEL_INDEX, please set '%s' = 'true'.",
FlinkOptions.INDEX_GLOBAL_ENABLED.key()));
+
+ boolean deferredRLI = Boolean.parseBoolean(conf.getString(
+ HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key(),
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.defaultValue().toString()));
+ ValidationUtils.checkArgument(!deferredRLI,
+ String.format("Deferred RLI initialization is not supported for
flink ingestion, please set '%s' = 'false'.",
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key()));
+ break;
+ case RECORD_LEVEL_INDEX:
+ ValidationUtils.checkArgument(OptionsResolver.isUpsertOperation(conf)
|| OptionsResolver.isInsertOverwrite(conf),
+ "Partitioned record level index supports only Flink streaming
upsert and insert overwrite.");
+
ValidationUtils.checkArgument(!OptionsResolver.isNonBlockingConcurrencyControl(conf),
+ "Partitioned record level index does not support non-blocking
concurrency control.");
+ break;
+ default:
+ break;
}
}
@@ -445,6 +455,14 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
if (!conf.contains(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE)) {
conf.set(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE,
OptionsResolver.getWriteBufferSizeInBytes(conf) / 1024 / 1024 / 4);
}
+ } else if (indexType == HoodieIndex.IndexType.RECORD_LEVEL_INDEX) {
+
conf.setString(HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
+ conf.set(FlinkOptions.INDEX_GLOBAL_ENABLED, false);
+ conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(),
"true");
+ conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, false);
+ if (!conf.contains(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE)) {
+ conf.set(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE,
OptionsResolver.getWriteBufferSizeInBytes(conf) / 1024 / 1024 / 4);
+ }
} else {
conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(),
"false");
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
index 39a4ee93d7ec..1383722eb0a8 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
@@ -20,6 +20,7 @@ package org.apache.hudi.configuration;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
@@ -52,6 +53,24 @@ public class TestOptionsResolver {
assertEquals(HoodieIndex.IndexType.BLOOM,
OptionsResolver.getIndexType(conf));
}
+ @Test
+ void testRecordLevelIndexStreamingWrite() {
+ Configuration conf = getConf();
+ conf.set(FlinkOptions.METADATA_ENABLED, true);
+ conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.RECORD_LEVEL_INDEX.name());
+
+ assertTrue(OptionsResolver.isRecordLevelIndex(conf));
+ assertTrue(OptionsResolver.isStreamingIndexWriteEnabled(conf));
+
+ conf.set(FlinkOptions.OPERATION,
WriteOperationType.INSERT_OVERWRITE.value());
+ assertFalse(OptionsResolver.isStreamingIndexWriteEnabled(conf));
+
+ conf.set(FlinkOptions.OPERATION, WriteOperationType.UPSERT.value());
+ conf.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+ assertFalse(OptionsResolver.isRecordLevelIndex(conf));
+ assertFalse(OptionsResolver.isStreamingIndexWriteEnabled(conf));
+ }
+
@Test
void testIsLazyFailedWritesCleanPolicy() {
Configuration conf = new Configuration();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestRecordIndexPartitioner.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGlobalRecordIndexPartitioner.java
similarity index 94%
copy from
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestRecordIndexPartitioner.java
copy to
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGlobalRecordIndexPartitioner.java
index 54bc3a4e8ccb..32163baf38d2 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestRecordIndexPartitioner.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGlobalRecordIndexPartitioner.java
@@ -34,11 +34,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
- * Test cases for {@link RecordIndexPartitioner}.
+ * Test cases for {@link GlobalRecordIndexPartitioner}.
*/
-public class TestRecordIndexPartitioner {
+public class TestGlobalRecordIndexPartitioner {
- private static RecordIndexPartitioner partitioner;
+ private static GlobalRecordIndexPartitioner partitioner;
@TempDir
static File tempFile;
@@ -49,7 +49,7 @@ public class TestRecordIndexPartitioner {
Configuration conf = TestConfigurations.getDefaultConf(basePath);
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
TestData.writeData(TestData.DATA_SET_INSERT, conf);
- partitioner = new RecordIndexPartitioner(conf);
+ partitioner = new GlobalRecordIndexPartitioner(conf);
}
@Test
@@ -104,4 +104,4 @@ public class TestRecordIndexPartitioner {
int partition = partitioner.partition(key, numPartitions);
assertTrue(partition >= 0 && partition < numPartitions);
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestRecordIndexPartitioner.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestRecordIndexPartitioner.java
index 54bc3a4e8ccb..59cd3d93fb0b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestRecordIndexPartitioner.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestRecordIndexPartitioner.java
@@ -20,88 +20,90 @@ package org.apache.hudi.sink.partitioner;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.hash.BucketIndexUtil;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
-import org.apache.hudi.utils.TestData;
import org.apache.flink.configuration.Configuration;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
- * Test cases for {@link RecordIndexPartitioner}.
+ * Test cases for partitioned record level index routing in {@link
RecordIndexPartitioner}.
*/
public class TestRecordIndexPartitioner {
-
- private static RecordIndexPartitioner partitioner;
+ private static final int FILE_GROUP_COUNT = 3;
+ private static final int NUM_PARTITIONS = 4;
@TempDir
- static File tempFile;
-
- @BeforeAll
- public static void beforeAll() throws Exception {
- final String basePath = tempFile.getAbsolutePath();
- Configuration conf = TestConfigurations.getDefaultConf(basePath);
-
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
- TestData.writeData(TestData.DATA_SET_INSERT, conf);
- partitioner = new RecordIndexPartitioner(conf);
- }
+ File tempFile;
@Test
- void testPartitionMethod() {
- // Test partitioning with different record keys
- HoodieKey key1 = new HoodieKey("record_key_1", "partition_path");
- HoodieKey key2 = new HoodieKey("record_key_2", "partition_path");
- HoodieKey key3 = new HoodieKey("another_record_key", "partition_path");
-
- int numPartitions = 10;
-
- // Test that partitioning works consistently
- int partition1 = partitioner.partition(key1, numPartitions);
- int partition2 = partitioner.partition(key2, numPartitions);
- int partition3 = partitioner.partition(key3, numPartitions);
-
- // Each partition should be within the range [0, numPartitions)
- assertTrue(partition1 >= 0 && partition1 < numPartitions);
- assertTrue(partition2 >= 0 && partition2 < numPartitions);
- assertTrue(partition3 >= 0 && partition3 < numPartitions);
-
- // Same key should always map to the same partition
- assertEquals(partitioner.partition(key1, numPartitions),
partitioner.partition(key1, numPartitions));
+ void testNewPartitionUsesMinFileGroupCount() throws Exception {
+ RecordIndexPartitioner partitioner = newPartitioner();
+ String recordKey = "record_key";
+
+ assertEquals(
+ expectedPartition(recordKey, "par1"),
+ partitioner.partition(new HoodieKey(recordKey, "par1"),
NUM_PARTITIONS));
+ assertEquals(
+ expectedPartition(recordKey, "par2"),
+ partitioner.partition(new HoodieKey(recordKey, "par2"),
NUM_PARTITIONS));
}
@Test
- void testPartitionConsistency() {
+ void testPartitionConsistency() throws Exception {
+ RecordIndexPartitioner partitioner = newPartitioner();
HoodieKey key = new HoodieKey("consistent_test_key", "partition_path");
- int numPartitions = 5;
-
- // Test that the same key always maps to the same partition
- int expectedPartition = partitioner.partition(key, numPartitions);
+
+ int expectedPartition = partitioner.partition(key, NUM_PARTITIONS);
for (int i = 0; i < 10; i++) {
- assertEquals(expectedPartition, partitioner.partition(key,
numPartitions));
+ assertEquals(expectedPartition, partitioner.partition(key,
NUM_PARTITIONS));
}
}
@Test
- void testEdgeCaseSinglePartition() {
- HoodieKey key = new HoodieKey("any_key", "partition_path");
- int numPartitions = 1; // Single partition
-
- // With single partition, everything should go to partition 0
- assertEquals(0, partitioner.partition(key, numPartitions));
+ void testPartitionWithinRange() throws Exception {
+ RecordIndexPartitioner partitioner = newPartitioner();
+
+ int partition1 = partitioner.partition(new HoodieKey("record_key",
"par1"), NUM_PARTITIONS);
+ int partition2 = partitioner.partition(new HoodieKey("record_key",
"par2"), NUM_PARTITIONS);
+ int partition3 = partitioner.partition(new HoodieKey("record_key",
"par3"), NUM_PARTITIONS);
+
+ assertTrue(partition1 >= 0 && partition1 < NUM_PARTITIONS);
+ assertTrue(partition2 >= 0 && partition2 < NUM_PARTITIONS);
+ assertTrue(partition3 >= 0 && partition3 < NUM_PARTITIONS);
+
+ assertNotEquals(partition1, partition2);
+ assertNotEquals(partition1, partition3);
}
@Test
- void testLargeNumberOfPartitions() {
- HoodieKey key = new HoodieKey("test_key_for_large_partition",
"partition_path");
- int numPartitions = 100; // Large number of partitions
-
- int partition = partitioner.partition(key, numPartitions);
- assertTrue(partition >= 0 && partition < numPartitions);
+ void testSinglePartition() throws Exception {
+ RecordIndexPartitioner partitioner = newPartitioner();
+
+ assertEquals(0, partitioner.partition(new HoodieKey("any_key", "par1"),
1));
+ }
+
+ private RecordIndexPartitioner newPartitioner() throws Exception {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.RECORD_LEVEL_INDEX.name());
+
conf.setString(HoodieMetadataConfig.RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(),
String.valueOf(FILE_GROUP_COUNT));
+ StreamerUtil.initTableIfNotExists(conf);
+ return new RecordIndexPartitioner(conf);
+ }
+
+ private int expectedPartition(String recordKey, String partitionPath) {
+ int recordIndexFileGroup =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(recordKey,
FILE_GROUP_COUNT);
+ return
BucketIndexUtil.getPartitionIndexFunc(NUM_PARTITIONS).apply(FILE_GROUP_COUNT,
partitionPath, recordIndexFileGroup);
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
similarity index 69%
copy from
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
copy to
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
index 70011b514ee7..396a57185cec 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
@@ -49,9 +49,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
- * Test cases for {@link RecordLevelIndexBackend}.
+ * Test cases for {@link GlobalRecordLevelIndexBackend}.
*/
-public class TestRecordLevelIndexBackend {
+public class TestGlobalRecordLevelIndexBackend {
private Configuration conf;
@@ -72,34 +72,34 @@ public class TestRecordLevelIndexBackend {
String firstCommitTime =
TestUtils.getLastCompleteInstant(tempFile.toURI().toString());
- try (RecordLevelIndexBackend recordLevelIndexBackend = new
RecordLevelIndexBackend(conf, -1)) {
+ try (GlobalRecordLevelIndexBackend globalRecordLevelIndexBackend = new
GlobalRecordLevelIndexBackend(conf, -1)) {
// get record location
- HoodieRecordGlobalLocation location = recordLevelIndexBackend.get("id1");
+ HoodieRecordGlobalLocation location =
globalRecordLevelIndexBackend.get("id1");
assertNotNull(location);
assertEquals("par1", location.getPartitionPath());
assertEquals(firstCommitTime, location.getInstantTime());
// get record location with non existed key
- location = recordLevelIndexBackend.get("new_key");
+ location = globalRecordLevelIndexBackend.get("new_key");
assertNull(location);
// get records locations for multiple record keys
- Map<String, HoodieRecordGlobalLocation> locations =
recordLevelIndexBackend.get(Arrays.asList("id1", "id2", "id3"));
+ Map<String, HoodieRecordGlobalLocation> locations =
globalRecordLevelIndexBackend.get(Arrays.asList("id1", "id2", "id3"));
assertEquals(3, locations.size());
locations.values().forEach(Assertions::assertNotNull);
// get records locations for multiple record keys with unexisted key
- locations = recordLevelIndexBackend.get(Arrays.asList("id1", "id2",
"new_key"));
+ locations = globalRecordLevelIndexBackend.get(Arrays.asList("id1",
"id2", "new_key"));
assertEquals(3, locations.size());
assertNull(locations.get("new_key"));
// new checkpoint
- recordLevelIndexBackend.onCheckpoint(1);
+ globalRecordLevelIndexBackend.onCheckpoint(1);
// update record location
HoodieRecordGlobalLocation newLocation = new
HoodieRecordGlobalLocation("par5", "1003", "file_id_4");
- recordLevelIndexBackend.update("new_key", newLocation);
- location = recordLevelIndexBackend.get("new_key");
+ globalRecordLevelIndexBackend.update("new_key", newLocation);
+ location = globalRecordLevelIndexBackend.get("new_key");
assertEquals(newLocation, location);
// previous instant commit success, clean
@@ -107,12 +107,12 @@ public class TestRecordLevelIndexBackend {
Map<Long, String> inflightInstants = new HashMap<>();
inflightInstants.put(1L, "0001");
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
- recordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
- assertEquals(2,
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+ globalRecordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
+ assertEquals(2,
globalRecordLevelIndexBackend.getRecordIndexCache().getCaches().size());
// the cache contains 'new_key', and other old locations
- location = recordLevelIndexBackend.getRecordIndexCache().get("new_key");
+ location =
globalRecordLevelIndexBackend.getRecordIndexCache().get("new_key");
assertEquals(newLocation, location);
- location = recordLevelIndexBackend.getRecordIndexCache().get("id1");
+ location =
globalRecordLevelIndexBackend.getRecordIndexCache().get("id1");
assertNotNull(location);
}
}
@@ -122,69 +122,69 @@ public class TestRecordLevelIndexBackend {
// set a small value for RLI cache
conf.set(FlinkOptions.INDEX_RLI_CACHE_SIZE, 1L);
- try (RecordLevelIndexBackend recordLevelIndexBackend = new
RecordLevelIndexBackend(conf, -1)) {
+ try (GlobalRecordLevelIndexBackend globalRecordLevelIndexBackend = new
GlobalRecordLevelIndexBackend(conf, -1)) {
for (int i = 0; i < 1500; i++) {
- recordLevelIndexBackend.update("id1_" + i,
+ globalRecordLevelIndexBackend.update("id1_" + i,
new HoodieRecordGlobalLocation("par1", "000000001",
UUID.randomUUID().toString(), -1));
}
// new checkpoint
- recordLevelIndexBackend.onCheckpoint(1);
+ globalRecordLevelIndexBackend.onCheckpoint(1);
Correspondent correspondent = mock(Correspondent.class);
Map<Long, String> inflightInstants = new HashMap<>();
inflightInstants.put(1L, "0001");
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
- recordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
+ globalRecordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
for (int i = 0; i < 2000; i++) {
- recordLevelIndexBackend.update("id2_" + i,
+ globalRecordLevelIndexBackend.update("id2_" + i,
new HoodieRecordGlobalLocation("par1", "000000001",
UUID.randomUUID().toString(), -1));
}
// new checkpoint
- recordLevelIndexBackend.onCheckpoint(2);
+ globalRecordLevelIndexBackend.onCheckpoint(2);
correspondent = mock(Correspondent.class);
inflightInstants = new HashMap<>();
inflightInstants.put(2L, "0002");
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
- recordLevelIndexBackend.onCheckpointComplete(correspondent, 2);
+ globalRecordLevelIndexBackend.onCheckpointComplete(correspondent, 2);
for (int i = 0; i < 2000; i++) {
- recordLevelIndexBackend.update("id3_" + i,
+ globalRecordLevelIndexBackend.update("id3_" + i,
new HoodieRecordGlobalLocation("par1", "000000001",
UUID.randomUUID().toString(), -1));
}
// the cache for the first instant is evicted
- assertEquals(2,
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+ assertEquals(2,
globalRecordLevelIndexBackend.getRecordIndexCache().getCaches().size());
// new checkpoint
- recordLevelIndexBackend.onCheckpoint(3);
+ globalRecordLevelIndexBackend.onCheckpoint(3);
correspondent = mock(Correspondent.class);
inflightInstants = new HashMap<>();
inflightInstants.put(3L, "0003");
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
- recordLevelIndexBackend.onCheckpointComplete(correspondent, 3);
+ globalRecordLevelIndexBackend.onCheckpointComplete(correspondent, 3);
for (int i = 0; i < 500; i++) {
- recordLevelIndexBackend.update("id4_" + i,
+ globalRecordLevelIndexBackend.update("id4_" + i,
new HoodieRecordGlobalLocation("par1", "000000001",
UUID.randomUUID().toString(), -1));
}
- assertEquals(3,
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+ assertEquals(3,
globalRecordLevelIndexBackend.getRecordIndexCache().getCaches().size());
// insert another batch of records, which will trigger the cleaning of
the cache.
// another cache clean is triggered
for (int i = 500; i < 1500; i++) {
- recordLevelIndexBackend.update("id4_" + i,
+ globalRecordLevelIndexBackend.update("id4_" + i,
new HoodieRecordGlobalLocation("par1", "000000001",
UUID.randomUUID().toString(), -1));
}
- assertEquals(3,
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+ assertEquals(3,
globalRecordLevelIndexBackend.getRecordIndexCache().getCaches().size());
// cache for the oldest ckp id will be cleaned
-
assertNull(recordLevelIndexBackend.getRecordIndexCache().getCaches().get(-1L));
+
assertNull(globalRecordLevelIndexBackend.getRecordIndexCache().getCaches().get(-1L));
// caches for the latest 3 ckp id still in the cache
- assertEquals("par1",
recordLevelIndexBackend.get("id2_0").getPartitionPath());
- assertEquals("par1",
recordLevelIndexBackend.get("id3_0").getPartitionPath());
- assertEquals("par1",
recordLevelIndexBackend.get("id4_0").getPartitionPath());
+ assertEquals("par1",
globalRecordLevelIndexBackend.get("id2_0").getPartitionPath());
+ assertEquals("par1",
globalRecordLevelIndexBackend.get("id3_0").getPartitionPath());
+ assertEquals("par1",
globalRecordLevelIndexBackend.get("id4_0").getPartitionPath());
}
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
index 70011b514ee7..c29c256bbaa9 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
@@ -18,34 +18,27 @@
package org.apache.hudi.sink.partitioner.index;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.event.Correspondent;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
-import org.apache.hudi.utils.TestData;
-import org.apache.hudi.utils.TestUtils;
import org.apache.flink.configuration.Configuration;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-
-import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
-import static org.mockito.Mockito.mock;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
/**
@@ -53,138 +46,145 @@ import static org.mockito.Mockito.when;
*/
public class TestRecordLevelIndexBackend {
+ private static final long ONE_MB = 1024 * 1024;
+
private Configuration conf;
@TempDir
File tempFile;
@BeforeEach
- void beforeEach() throws IOException {
+ public void before() throws Exception {
conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
- conf.set(FlinkOptions.TABLE_TYPE, COPY_ON_WRITE.name());
-
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
+ conf.setString("hadoop.fs.defaultFS", "file:///");
+ conf.set(FlinkOptions.INDEX_RLI_CACHE_SIZE, 1L);
StreamerUtil.initTableIfNotExists(conf);
}
@Test
- void testRecordLevelIndexBackend() throws Exception {
- TestData.writeData(TestData.DATA_SET_INSERT, conf);
-
- String firstCommitTime =
TestUtils.getLastCompleteInstant(tempFile.toURI().toString());
-
- try (RecordLevelIndexBackend recordLevelIndexBackend = new
RecordLevelIndexBackend(conf, -1)) {
- // get record location
- HoodieRecordGlobalLocation location = recordLevelIndexBackend.get("id1");
- assertNotNull(location);
- assertEquals("par1", location.getPartitionPath());
- assertEquals(firstCommitTime, location.getInstantTime());
-
- // get record location with non existed key
- location = recordLevelIndexBackend.get("new_key");
- assertNull(location);
-
- // get records locations for multiple record keys
- Map<String, HoodieRecordGlobalLocation> locations =
recordLevelIndexBackend.get(Arrays.asList("id1", "id2", "id3"));
- assertEquals(3, locations.size());
- locations.values().forEach(Assertions::assertNotNull);
-
- // get records locations for multiple record keys with unexisted key
- locations = recordLevelIndexBackend.get(Arrays.asList("id1", "id2",
"new_key"));
- assertEquals(3, locations.size());
- assertNull(locations.get("new_key"));
-
- // new checkpoint
- recordLevelIndexBackend.onCheckpoint(1);
-
- // update record location
- HoodieRecordGlobalLocation newLocation = new
HoodieRecordGlobalLocation("par5", "1003", "file_id_4");
- recordLevelIndexBackend.update("new_key", newLocation);
- location = recordLevelIndexBackend.get("new_key");
- assertEquals(newLocation, location);
-
- // previous instant commit success, clean
- Correspondent correspondent = mock(Correspondent.class);
- Map<Long, String> inflightInstants = new HashMap<>();
- inflightInstants.put(1L, "0001");
-
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
- recordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
- assertEquals(2,
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
- // the cache contains 'new_key', and other old locations
- location = recordLevelIndexBackend.getRecordIndexCache().get("new_key");
- assertEquals(newLocation, location);
- location = recordLevelIndexBackend.getRecordIndexCache().get("id1");
- assertNotNull(location);
+ public void testCheckpointCompleteDoesNotEagerEvict() throws Exception {
+ try (RecordLevelIndexBackend backend = createBackend()) {
+ backend.getPartitionBucketCaches().put("par1",
cacheWithHeapSize(backend, 2 * ONE_MB, 1L));
+
+ backend.onCheckpointComplete(new
TestCorrespondent(Collections.emptyMap()), 2L);
+
+ assertTrue(backend.getPartitionBucketCaches().containsKey("par1"));
+ }
+ }
+
+ @Test
+ public void testLazyEvictOldPartitionsWhenHeapExceedsLimit() throws
Exception {
+ try (RecordLevelIndexBackend backend = createBackend()) {
+ backend.getPartitionBucketCaches().put("par1",
cacheWithHeapSize(backend, 800 * 1024, 1L));
+ backend.getPartitionBucketCaches().put("par2",
cacheWithHeapSize(backend, 800 * 1024, 1L));
+ backend.onCheckpointComplete(new
TestCorrespondent(Collections.emptyMap()), 2L);
+ backend.getPartitionBucketCaches().put("par3",
cacheWithHeapSize(backend, 1L, 2L));
+
+ backend.cleanIfNecessary(0L, "par3");
+
+ assertFalse(backend.getPartitionBucketCaches().containsKey("par1"));
+ assertTrue(backend.getPartitionBucketCaches().containsKey("par2"));
+ assertTrue(backend.getPartitionBucketCaches().containsKey("par3"));
}
}
@Test
- void testRecordLevelIndexCacheClean() throws Exception {
- // set a small value for RLI cache
- conf.set(FlinkOptions.INDEX_RLI_CACHE_SIZE, 1L);
+ public void testLazyEvictKeepsRecentPartition() throws Exception {
+ try (RecordLevelIndexBackend backend = createBackend()) {
+ backend.getPartitionBucketCaches().put("old", cacheWithHeapSize(backend,
800 * 1024, 1L));
+ backend.getPartitionBucketCaches().put("recent",
cacheWithHeapSize(backend, 800 * 1024, 2L));
+ backend.onCheckpointComplete(new
TestCorrespondent(Collections.emptyMap()), 2L);
+
+ backend.cleanIfNecessary(0L, null);
+
+ assertFalse(backend.getPartitionBucketCaches().containsKey("old"));
+ assertTrue(backend.getPartitionBucketCaches().containsKey("recent"));
+ }
+ }
+
+ @Test
+ public void testGetDoesNotRefreshLastUpdatedCheckpoint() throws Exception {
+ try (RecordLevelIndexBackend backend = createBackend()) {
+ backend.getPartitionBucketCaches().put("old", cacheWithHeapSize(backend,
2 * ONE_MB, 1L));
+ backend.onCheckpoint(2L);
+
+ backend.get("old", "key1");
+ backend.onCheckpointComplete(new
TestCorrespondent(Collections.emptyMap()), 2L);
+ backend.cleanIfNecessary(0L, null);
+
+ assertFalse(backend.getPartitionBucketCaches().containsKey("old"));
+ }
+ }
+
+ @Test
+ public void testPartitionCacheDictionaryEncodesFileGroupId() throws
Exception {
+ try (RecordLevelIndexBackend backend = createBackend()) {
+ ExternalSpillableMap<String, Integer> recordKeyToFileGroupIdCode =
mapWithStorage(0L);
+ RecordLevelIndexBackend.BucketCache cache =
backend.newBucketCache(recordKeyToFileGroupIdCode, 1L);
+
+ cache.putRecordKey("key1", "file-group-id-000000000000000000000001");
+ cache.putRecordKey("key2", "file-group-id-000000000000000000000001");
+ cache.putRecordKey("key3", "file-group-id-000000000000000000000002");
+
+ assertEquals("file-group-id-000000000000000000000001",
cache.getFileGroupId("key1"));
+ assertEquals("file-group-id-000000000000000000000001",
cache.getFileGroupId("key2"));
+ assertEquals("file-group-id-000000000000000000000002",
cache.getFileGroupId("key3"));
+ assertEquals(Integer.valueOf(0), recordKeyToFileGroupIdCode.get("key1"));
+ assertEquals(Integer.valueOf(0), recordKeyToFileGroupIdCode.get("key2"));
+ assertEquals(Integer.valueOf(1), recordKeyToFileGroupIdCode.get("key3"));
+ }
+ }
+
+ @Test
+ public void testLazyEvictUsesAccessOrder() throws Exception {
+ try (RecordLevelIndexBackend backend = createBackend()) {
+ backend.getPartitionBucketCaches().put("par1",
cacheWithHeapSize(backend, 500 * 1024, 1L));
+ backend.getPartitionBucketCaches().put("par2",
cacheWithHeapSize(backend, 500 * 1024, 1L));
+ backend.getPartitionBucketCaches().get("par1");
+ backend.getPartitionBucketCaches().put("par3",
cacheWithHeapSize(backend, 100 * 1024, 2L));
+ backend.onCheckpointComplete(new
TestCorrespondent(Collections.emptyMap()), 2L);
+
+ backend.cleanIfNecessary(0L, "par3");
+
+ assertTrue(backend.getPartitionBucketCaches().containsKey("par1"));
+ assertFalse(backend.getPartitionBucketCaches().containsKey("par2"));
+ assertTrue(backend.getPartitionBucketCaches().containsKey("par3"));
+ }
+ }
+
+ private RecordLevelIndexBackend createBackend() {
+ return new RecordLevelIndexBackend(conf, (partitionPath, recordKey,
fileId) -> true);
+ }
+
+ private RecordLevelIndexBackend.BucketCache cacheWithHeapSize(
+ RecordLevelIndexBackend indexBackend,
+ long heapSize,
+ long lastUpdatedCheckpoint) {
+ return indexBackend.newBucketCache(mapWithStorage(heapSize),
lastUpdatedCheckpoint);
+ }
+
+ private ExternalSpillableMap<String, Integer> mapWithStorage(long heapSize) {
+ Map<String, Integer> storage = new HashMap<>();
+ ExternalSpillableMap<String, Integer> recordKeyToFileGroupIdCode =
Mockito.mock(ExternalSpillableMap.class);
+
when(recordKeyToFileGroupIdCode.getCurrentInMemoryMapSize()).thenReturn(heapSize);
+ when(recordKeyToFileGroupIdCode.size()).thenAnswer(invocation ->
storage.size());
+ when(recordKeyToFileGroupIdCode.get(Mockito.anyString()))
+ .thenAnswer(invocation -> storage.get(invocation.getArgument(0)));
+ doAnswer(invocation -> storage.put(invocation.getArgument(0),
invocation.getArgument(1)))
+ .when(recordKeyToFileGroupIdCode).put(Mockito.anyString(),
Mockito.anyInt());
+ return recordKeyToFileGroupIdCode;
+ }
+
+ private static class TestCorrespondent extends Correspondent {
+ private final Map<Long, String> inflightInstants;
+
+ TestCorrespondent(Map<Long, String> inflightInstants) {
+ this.inflightInstants = inflightInstants;
+ }
- try (RecordLevelIndexBackend recordLevelIndexBackend = new
RecordLevelIndexBackend(conf, -1)) {
-
- for (int i = 0; i < 1500; i++) {
- recordLevelIndexBackend.update("id1_" + i,
- new HoodieRecordGlobalLocation("par1", "000000001",
UUID.randomUUID().toString(), -1));
- }
- // new checkpoint
- recordLevelIndexBackend.onCheckpoint(1);
- Correspondent correspondent = mock(Correspondent.class);
- Map<Long, String> inflightInstants = new HashMap<>();
- inflightInstants.put(1L, "0001");
-
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
- recordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
-
- for (int i = 0; i < 2000; i++) {
- recordLevelIndexBackend.update("id2_" + i,
- new HoodieRecordGlobalLocation("par1", "000000001",
UUID.randomUUID().toString(), -1));
- }
-
- // new checkpoint
- recordLevelIndexBackend.onCheckpoint(2);
- correspondent = mock(Correspondent.class);
- inflightInstants = new HashMap<>();
- inflightInstants.put(2L, "0002");
-
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
- recordLevelIndexBackend.onCheckpointComplete(correspondent, 2);
-
- for (int i = 0; i < 2000; i++) {
- recordLevelIndexBackend.update("id3_" + i,
- new HoodieRecordGlobalLocation("par1", "000000001",
UUID.randomUUID().toString(), -1));
- }
-
- // the cache for the first instant is evicted
- assertEquals(2,
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
-
- // new checkpoint
- recordLevelIndexBackend.onCheckpoint(3);
- correspondent = mock(Correspondent.class);
- inflightInstants = new HashMap<>();
- inflightInstants.put(3L, "0003");
-
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
- recordLevelIndexBackend.onCheckpointComplete(correspondent, 3);
-
- for (int i = 0; i < 500; i++) {
- recordLevelIndexBackend.update("id4_" + i,
- new HoodieRecordGlobalLocation("par1", "000000001",
UUID.randomUUID().toString(), -1));
- }
-
- assertEquals(3,
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
-
- // insert another batch of records, which will trigger the cleaning of
the cache.
- // another cache clean is triggered
- for (int i = 500; i < 1500; i++) {
- recordLevelIndexBackend.update("id4_" + i,
- new HoodieRecordGlobalLocation("par1", "000000001",
UUID.randomUUID().toString(), -1));
- }
- assertEquals(3,
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
- // cache for the oldest ckp id will be cleaned
-
assertNull(recordLevelIndexBackend.getRecordIndexCache().getCaches().get(-1L));
- // caches for the latest 3 ckp id still in the cache
- assertEquals("par1",
recordLevelIndexBackend.get("id2_0").getPartitionPath());
- assertEquals("par1",
recordLevelIndexBackend.get("id3_0").getPartitionPath());
- assertEquals("par1",
recordLevelIndexBackend.get("id4_0").getPartitionPath());
+ @Override
+ public Map<Long, String> requestInflightInstants() {
+ return inflightInstants;
}
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index 5318e4381bc3..a383d66dbaec 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -36,7 +36,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.partitioner.index.IndexBackend;
-import org.apache.hudi.sink.partitioner.index.RecordLevelIndexBackend;
+import org.apache.hudi.sink.partitioner.index.GlobalRecordLevelIndexBackend;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -366,8 +366,8 @@ public class TestWriteBase {
*/
public TestHarness assertInflightCachesOfBucketAssigner(int expected) {
IndexBackend indexBackend =
pipeline.getBucketAssignFunction().getIndexBackend();
- if (indexBackend instanceof RecordLevelIndexBackend) {
- assertEquals(expected, ((RecordLevelIndexBackend)
indexBackend).getRecordIndexCache().getCaches().size());
+ if (indexBackend instanceof GlobalRecordLevelIndexBackend) {
+ assertEquals(expected, ((GlobalRecordLevelIndexBackend)
indexBackend).getRecordIndexCache().getCaches().size());
}
return this;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
new file mode 100644
index 000000000000..83b5b7f73cdf
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestSQL;
+import org.apache.hudi.utils.TestTableEnvs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.utils.TestConfigurations.sql;
+import static org.apache.hudi.utils.TestData.assertRowsEquals;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for Flink streaming writes with dynamic bucket index.
+ */
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestDynamicBucketStreamWrite {
+ private TableEnvironment streamTableEnv;
+ private TableEnvironment batchTableEnv;
+
+ @TempDir
+ File tempFile;
+
+ @BeforeEach
+ void beforeEach() {
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+ streamTableEnv = TableEnvironmentImpl.create(settings);
+ streamTableEnv.getConfig().getConfiguration()
+ .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
4);
+ Configuration execConf = streamTableEnv.getConfig().getConfiguration();
+ execConf.setString("execution.checkpointing.interval", "2s");
+ execConf.setString("restart-strategy", "fixed-delay");
+ execConf.setString("restart-strategy.fixed-delay.attempts", "0");
+
+ batchTableEnv = TestTableEnvs.getBatchTableEnv();
+ batchTableEnv.getConfig().getConfiguration()
+ .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
4);
+ }
+
+ @ParameterizedTest
+ @MethodSource("tableTypeAndBooleanTrueFalseParams")
+ void testWriteAndBootstrapFromRecordIndex(HoodieTableType tableType, boolean
partitioned) {
+ streamTableEnv.executeSql(getTableDDL("t1", tableType, partitioned));
+
+ final String insertInto1 = "insert into t1 values\n"
+ + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+ + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+ + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+ + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2')";
+ execInsertSql(streamTableEnv, insertInto1);
+
+ Map<String, String> initialFileIds = collectFileIds(streamTableEnv, "uuid
in ('id1', 'id2')");
+ assertThat(initialFileIds).containsOnlyKeys("id1", "id2");
+ assertRecordIndexMetadataPartitionExists();
+
+ final String insertInto2 = "insert into t1 values\n"
+ + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:05','par1'),\n"
+ + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:06','par1'),\n"
+ + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:07','par2')";
+ execInsertSql(streamTableEnv, insertInto2);
+
+ List<Row> result = execSelectSql(streamTableEnv, "select uuid, name, age,
ts, `partition` from t1");
+ assertRowsEquals(result, "["
+ + "+I[id1, Danny, 24, 1970-01-01T00:00:05, par1], "
+ + "+I[id2, Stephen, 34, 1970-01-01T00:00:06, par1], "
+ + "+I[id3, Julian, 53, 1970-01-01T00:00:03, par2], "
+ + "+I[id4, Fabian, 31, 1970-01-01T00:00:04, par2], "
+ + "+I[id5, Sophia, 18, 1970-01-01T00:00:07, par2]]");
+
+ Map<String, String> updatedFileIds = collectFileIds(streamTableEnv, "uuid
in ('id1', 'id2')");
+ assertEquals(initialFileIds, updatedFileIds);
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testInsertOverwrite(HoodieTableType tableType) {
+ batchTableEnv.executeSql(getTableDDL("t1", tableType));
+ execInsertSql(batchTableEnv, TestSQL.INSERT_T1);
+ String selectPhysicalColumns = "select uuid, name, age, ts, `partition`
from t1";
+
+ final String insertOverwritePartition = "insert overwrite t1
partition(`partition`='par1') values\n"
+ + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01'),\n"
+ + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02')\n";
+ execInsertSql(batchTableEnv, insertOverwritePartition);
+ assertRowsEquals(execSelectSql(batchTableEnv, selectPhysicalColumns),
TestData.DATA_SET_SOURCE_INSERT_OVERWRITE);
+
+ execInsertSql(batchTableEnv, insertOverwritePartition);
+ assertRowsEquals(execSelectSql(batchTableEnv, selectPhysicalColumns),
TestData.DATA_SET_SOURCE_INSERT_OVERWRITE);
+
+ final String insertOverwriteDynamicPartition = "insert overwrite t1 /*+
OPTIONS('write.partition.overwrite.mode'='dynamic') */ values\n"
+ + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n"
+ + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n";
+ execInsertSql(batchTableEnv, insertOverwriteDynamicPartition);
+ assertRowsEquals(execSelectSql(batchTableEnv, selectPhysicalColumns),
TestData.DATA_SET_SOURCE_INSERT_OVERWRITE_DYNAMIC_PARTITION);
+
+ execInsertSql(batchTableEnv, insertOverwriteDynamicPartition);
+ assertRowsEquals(execSelectSql(batchTableEnv, selectPhysicalColumns),
TestData.DATA_SET_SOURCE_INSERT_OVERWRITE_DYNAMIC_PARTITION);
+
+ final String insertOverwriteTable = "insert overwrite t1 values\n"
+ + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n"
+ + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n";
+ final String expected = "["
+ + "+I[id1, Danny, 24, 1970-01-01T00:00:01, par1], "
+ + "+I[id2, Stephen, 34, 1970-01-01T00:00:02, par2]]";
+ execInsertSql(batchTableEnv, insertOverwriteTable);
+ assertRowsEquals(execSelectSql(batchTableEnv, selectPhysicalColumns),
expected);
+
+ execInsertSql(batchTableEnv, insertOverwriteTable);
+ assertRowsEquals(execSelectSql(batchTableEnv, selectPhysicalColumns),
expected);
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testBucketScalesUpWithContinuousWrites(HoodieTableType tableType) {
+ streamTableEnv.executeSql(getTableDDL(
+ "t1", tableType,
Collections.singletonMap(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(),
"1"), true));
+
+ execInsertSql(streamTableEnv, "insert into t1 values\n"
+ + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par_scale'),\n"
+ + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par_scale'),\n"
+ + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par_scale'),\n"
+ + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par_scale')");
+
+ execInsertSql(streamTableEnv, "insert into t1 values\n"
+ + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par_scale'),\n"
+ + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par_scale'),\n"
+ + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par_scale'),\n"
+ + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par_scale')");
+
+ Map<String, String> fileIdsByRecordKey = collectFileIds(streamTableEnv,
"`partition` = 'par_scale'");
+ assertThat(fileIdsByRecordKey).hasSize(8);
+ Set<String> fileIds = new HashSet<>(fileIdsByRecordKey.values());
+ assertThat(fileIds).hasSizeGreaterThan(2);
+ }
+
+ /**
+ * Return test params => (HoodieTableType, true/false).
+ */
+ private static Stream<Arguments> tableTypeAndBooleanTrueFalseParams() {
+ Object[][] data =
+ new Object[][] {
+ {HoodieTableType.COPY_ON_WRITE, false},
+ {HoodieTableType.COPY_ON_WRITE, true},
+ {HoodieTableType.MERGE_ON_READ, false},
+ {HoodieTableType.MERGE_ON_READ, true}};
+ return Stream.of(data).map(Arguments::of);
+ }
+
+ private String getTableDDL(String tableName, HoodieTableType tableType) {
+ return getTableDDL(tableName, tableType, Collections.emptyMap(), true);
+ }
+
+ private String getTableDDL(String tableName, HoodieTableType tableType,
boolean partitioned) {
+ return getTableDDL(tableName, tableType, Collections.emptyMap(),
partitioned);
+ }
+
+ private String getTableDDL(String tableName, HoodieTableType tableType,
Map<String, String> extraOptions, boolean partitioned) {
+ TestConfigurations.Sql ddl = sql(tableName)
+ .field("_hoodie_file_name STRING METADATA VIRTUAL")
+ .field("uuid varchar(20)")
+ .field("name varchar(10)")
+ .field("age int")
+ .field("ts timestamp(3)")
+ .field("`partition` varchar(20)")
+ .options(getDefaultKeys())
+ .options(extraOptions)
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.RECORD_LEVEL_INDEX.name())
+ .option(FlinkOptions.BUCKET_ASSIGN_TASKS, 2)
+ .option(FlinkOptions.WRITE_TASKS, 2)
+ .option(FlinkOptions.INDEX_WRITE_TASKS, 2);
+ if (!partitioned) {
+ ddl.noPartition();
+ }
+ return ddl.end();
+ }
+
+ private static Map<String, String> getDefaultKeys() {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
+ return conf.toMap();
+ }
+
+ private void assertRecordIndexMetadataPartitionExists() {
+ org.apache.hadoop.conf.Configuration hadoopConf = new
org.apache.hadoop.conf.Configuration();
+ String mdtBasePath =
HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
+ assertTrue(StreamerUtil.tableExists(mdtBasePath, hadoopConf),
+ "Metadata table should exist for table with dynamic bucket index");
+ assertTrue(StreamerUtil.partitionExists(mdtBasePath,
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), hadoopConf),
+ "RECORD_INDEX partition should exist for table with dynamic bucket
index");
+ }
+
+ private Map<String, String> collectFileIds(TableEnvironment tableEnv, String
condition) {
+ List<Row> rows = execSelectSql(tableEnv, "select uuid, _hoodie_file_name
from t1 where " + condition);
+ return rows.stream().collect(Collectors.toMap(
+ row -> row.getField(0).toString(),
+ row -> FSUtils.getFileId(row.getField(1).toString())));
+ }
+
+ private List<Row> execSelectSql(TableEnvironment tableEnv, String select) {
+ return CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery(select).execute().collect());
+ }
+
+ private void execInsertSql(TableEnvironment tableEnv, String insert) {
+ TableResult tableResult = tableEnv.executeSql(insert);
+ try {
+ tableResult.await();
+ } catch (InterruptedException | ExecutionException ex) {
+ throw new AssertionError("Failed to execute insert SQL: " + insert, ex);
+ }
+ }
+}