This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new db460814d365 fix(flink): Fix minibatch lookup for global record level
index (#18759)
db460814d365 is described below
commit db460814d365d0a08bbf6bd0940cd88fc9e8be65
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon May 18 15:51:17 2026 +0800
fix(flink): Fix minibatch lookup for global record level index (#18759)
---
.../sink/partitioner/BucketAssignFunction.java | 126 ++++++++++++++-------
.../partitioner/MinibatchBucketAssignFunction.java | 67 +++++++++--
.../index/GlobalRecordLevelIndexBackend.java | 16 +--
.../TestMinibatchBucketAssignFunction.java | 4 +-
.../index/TestGlobalRecordLevelIndexBackend.java | 17 +--
.../sink/utils/StreamWriteFunctionWrapper.java | 67 +++++++++--
.../apache/hudi/table/ITTestHoodieDataSource.java | 7 ++
7 files changed, 221 insertions(+), 83 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index c74bf0d0dd2b..7faff523e689 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -50,6 +50,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
+import java.io.Serializable;
import java.util.Objects;
/**
@@ -101,6 +102,11 @@ public class BucketAssignFunction
@Setter
protected transient Correspondent correspondent;
+ /**
+ * Processor for data records selected by the write operation type.
+ */
+ private transient Processor recordProcessor;
+
/**
* If the index is global, update the index for the old partition path
* if same key record with different partition path came in.
@@ -132,6 +138,7 @@ public class BucketAssignFunction
HoodieTableType.valueOf(conf.get(FlinkOptions.TABLE_TYPE)),
context,
writeConfig);
+ this.recordProcessor = initRecordProcessor();
}
@Override
@@ -148,67 +155,102 @@ public class BucketAssignFunction
@Override
public void processElement(HoodieFlinkInternalRow value, Context ctx,
Collector<HoodieFlinkInternalRow> out) throws Exception {
- processRecord(value, ctx.getCurrentKey(), out);
+ if (value.isIndexRecord()) {
+ processIndexRecord(value, ctx.getCurrentKey());
+ } else {
+ recordProcessor.process(value, out);
+ }
}
- protected void processRecord(HoodieFlinkInternalRow record, String
recordKey, Collector<HoodieFlinkInternalRow> out) throws Exception {
- if (record.isIndexRecord()) {
- indexBackend.update(
- recordKey, new HoodieRecordGlobalLocation(record.getPartitionPath(),
record.getInstantTime(), record.getFileId()));
- return;
- }
+ protected void processIndexRecord(
+ HoodieFlinkInternalRow record,
+ String recordKey) throws Exception {
+ indexBackend.update(
+ recordKey, new HoodieRecordGlobalLocation(record.getPartitionPath(),
record.getInstantTime(), record.getFileId()));
+ }
+
+ protected void processChangingRecord(
+ HoodieFlinkInternalRow record,
+ String recordKey,
+ Collector<HoodieFlinkInternalRow> out,
+ HoodieRecordGlobalLocation prefetchedOldLoc,
+ boolean prefetched) throws Exception {
// 1. put the record into the BucketAssigner;
// 2. look up the state for location, if the record has a location, just
send it out;
// 3. if it is an INSERT, decide the location using the BucketAssigner
then send it out.
final String partitionPath = record.getPartitionPath();
final HoodieRecordLocation location;
- if (isChangingRecords) {
- // Only changing records need looking up the index for the location,
- // append only records are always recognized as INSERT.
- // Structured as Tuple(partition, fileId, instantTime).
- HoodieRecordGlobalLocation oldLoc = indexBackend.get(recordKey);
- if (oldLoc != null) {
- // Set up the instant time as "U" to mark the bucket as an update
bucket.
- String partitionFromState = oldLoc.getPartitionPath();
- String fileIdFromState = oldLoc.getFileId();
- if (!Objects.equals(partitionFromState, partitionPath)) {
- if (globalIndex) {
- // if partition path changes, emit a delete record for old
partition path,
- // then update the index state using location with new partition
path.
- RowData row = record.getRowData();
- RowKind orginalRowKind = row.getRowKind();
- row.setRowKind(RowKind.DELETE);
- // the operationType field is used as the index operation type,
and only 'I' and 'D' index operation will be written to the metadata table.
- // for record key, whose partition path is updated, we simply
ignore the DELETE index record, and the location for this key will be updated
- // by the following INSERT index record.
- HoodieFlinkInternalRow deleteRecord =
- new HoodieFlinkInternalRow(record.getRecordKey(),
partitionFromState, fileIdFromState, "U", "-U", false, row);
- out.collect(deleteRecord);
- row.setRowKind(orginalRowKind);
- }
- location = getNewRecordLocation(partitionPath);
- } else {
- location = oldLoc.toLocal("U");
- this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
+ // Only changing records need looking up the index for the location,
+ // append only records are always recognized as INSERT.
+ // Structured as Tuple(partition, fileId, instantTime).
+ HoodieRecordGlobalLocation oldLoc = prefetched ? prefetchedOldLoc :
indexBackend.get(recordKey);
+ if (oldLoc != null) {
+ // Set up the instant time as "U" to mark the bucket as an update bucket.
+ String partitionFromState = oldLoc.getPartitionPath();
+ String fileIdFromState = oldLoc.getFileId();
+ if (!Objects.equals(partitionFromState, partitionPath)) {
+ if (globalIndex) {
+ // if partition path changes, emit a delete record for old partition
path,
+ // then update the index state using location with new partition
path.
+ RowData row = record.getRowData();
+ RowKind orginalRowKind = row.getRowKind();
+ row.setRowKind(RowKind.DELETE);
+ // the operationType field is used as the index operation type, and
only 'I' and 'D' index operation will be written to the metadata table.
+ // for record key, whose partition path is updated, we simply ignore
the DELETE index record, and the location for this key will be updated
+ // by the following INSERT index record.
+ HoodieFlinkInternalRow deleteRecord =
+ new HoodieFlinkInternalRow(record.getRecordKey(),
partitionFromState, fileIdFromState, "U", "-U", false, row);
+ out.collect(deleteRecord);
+ row.setRowKind(orginalRowKind);
}
- } else {
location = getNewRecordLocation(partitionPath);
- }
- // refresh the index only when the location is updated.
- if (oldLoc == null || !oldLoc.getFileId().equals(location.getFileId())) {
- record.setOperationType("I");
- this.indexBackend.update(recordKey,
HoodieRecordGlobalLocation.fromLocal(partitionPath, location));
+ } else {
+ location = oldLoc.toLocal("U");
+ this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
}
} else {
- log.warn("This branch should not be reached.");
location = getNewRecordLocation(partitionPath);
}
+ // refresh the index only when the location is updated.
+ if (oldLoc == null || !oldLoc.getFileId().equals(location.getFileId())) {
+ record.setOperationType("I");
+ this.indexBackend.update(recordKey,
HoodieRecordGlobalLocation.fromLocal(partitionPath, location));
+ }
record.setFileId(location.getFileId());
record.setInstantTime(location.getInstantTime());
out.collect(record);
}
+ protected void processInsertRecord(
+ HoodieFlinkInternalRow record,
+ Collector<HoodieFlinkInternalRow> out) {
+ // Record is an INSERT, decide the location using the BucketAssigner then
send it out.
+ final String partitionPath = record.getPartitionPath();
+ final HoodieRecordLocation location = getNewRecordLocation(partitionPath);
+ record.setFileId(location.getFileId());
+ record.setInstantTime(location.getInstantTime());
+ out.collect(record);
+ }
+
+ /**
+ * Initializes the processor for non-index records based on whether the
write operation needs index lookup.
+ */
+ private Processor initRecordProcessor() {
+ if (isChangingRecords) {
+ return (value, out) -> processChangingRecord(value,
value.getRecordKey(), out, null, false);
+ } else {
+ return this::processInsertRecord;
+ }
+ }
+
+ /**
+ * Processes regular data records after index records have been handled by
the caller.
+ */
+ private interface Processor extends Serializable {
+ void process(HoodieFlinkInternalRow value,
Collector<HoodieFlinkInternalRow> out) throws Exception;
+ }
+
protected HoodieRecordLocation getNewRecordLocation(String partitionPath) {
final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);
final HoodieRecordLocation location;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
index 70b09621aa4d..cdf30c932eb7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
@@ -20,6 +20,7 @@ package org.apache.hudi.sink.partitioner;
import org.apache.hudi.adapter.ProcessFunctionAdapter;
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.configuration.FlinkOptions;
@@ -36,8 +37,10 @@ import
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.util.Collector;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -58,6 +61,8 @@ public class MinibatchBucketAssignFunction
* extending the BucketAssignFunction here, because BucketAssignFunction is a
* KeyedProcessFunction while MinibatchBucketAssignFunction is a
ProcessFunction.
*/
+ @VisibleForTesting
+ @Getter
private final BucketAssignFunction delegateFunction;
/**
@@ -76,11 +81,21 @@ public class MinibatchBucketAssignFunction
private transient Collector<HoodieFlinkInternalRow> outCollector;
+ /**
+ * Processor for buffered data records selected by the write operation type.
+ */
+ private transient Processor minibatchProcessor;
+
public MinibatchBucketAssignFunction(Configuration conf) {
+ this(conf,
Math.max(conf.get(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE),
FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE.defaultValue()));
+ }
+
+ @VisibleForTesting
+ public MinibatchBucketAssignFunction(Configuration conf, int miniBatchSize) {
this.delegateFunction = new BucketAssignFunction(conf);
this.isChangingRecords = WriteOperationType.isChangingRecords(
WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION)));
- this.miniBatchSize =
Math.max(conf.get(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE),
FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE.defaultValue());
+ this.miniBatchSize = miniBatchSize;
}
@Override
@@ -88,6 +103,7 @@ public class MinibatchBucketAssignFunction
super.open(parameters);
delegateFunction.open(parameters);
this.recordBuffer = new ArrayList<>();
+ this.minibatchProcessor = initRecordProcessor();
}
@Override
@@ -102,7 +118,7 @@ public class MinibatchBucketAssignFunction
}
if (record.isIndexRecord()) {
// handle index records immediately, do not need buffering
- delegateFunction.processRecord(record, record.getRecordKey(),
outCollector);
+ delegateFunction.processIndexRecord(record, record.getRecordKey());
} else {
// Add data records to the buffer
recordBuffer.add(record);
@@ -120,20 +136,47 @@ public class MinibatchBucketAssignFunction
if (recordBuffer.isEmpty()) {
return;
}
-
// process batch of records.
+ minibatchProcessor.process(recordBuffer, out);
+ // Clear the buffer after processing
+ recordBuffer.clear();
+ }
+
+ /**
+ * Initializes the processor for buffered data records based on whether
minibatch index lookup is needed.
+ */
+ private Processor initRecordProcessor() {
if (isChangingRecords) {
- List<String> recordKeys =
recordBuffer.stream().map(HoodieFlinkInternalRow::getRecordKey).collect(Collectors.toList());
- MinibatchIndexBackend minibatchIndexBackend = (MinibatchIndexBackend)
delegateFunction.getIndexBackend();
- // load the record location mapping into the record index cache.
- minibatchIndexBackend.get(recordKeys);
- }
- for (HoodieFlinkInternalRow record: recordBuffer) {
- delegateFunction.processRecord(record, record.getRecordKey(), out);
+ return new Processor() {
+ @Override
+ public void process(List<HoodieFlinkInternalRow> records,
Collector<HoodieFlinkInternalRow> out) throws Exception {
+ List<String> recordKeys =
records.stream().map(HoodieFlinkInternalRow::getRecordKey).collect(Collectors.toList());
+ MinibatchIndexBackend minibatchIndexBackend =
(MinibatchIndexBackend) delegateFunction.getIndexBackend();
+ // get record locations by minibatch
+ Map<String, HoodieRecordGlobalLocation> recordLocations =
minibatchIndexBackend.get(recordKeys);
+ for (HoodieFlinkInternalRow record: records) {
+ String recordKey = record.getRecordKey();
+ delegateFunction.processChangingRecord(record, recordKey, out,
recordLocations.get(recordKey), true);
+ }
+ }
+ };
+ } else {
+ return new Processor() {
+ @Override
+ public void process(List<HoodieFlinkInternalRow> records,
Collector<HoodieFlinkInternalRow> out) throws Exception {
+ for (HoodieFlinkInternalRow record: recordBuffer) {
+ delegateFunction.processInsertRecord(record, out);
+ }
+ }
+ };
}
+ }
- // Clear the buffer after processing
- recordBuffer.clear();
+ /**
+ * Processes buffered data records while keeping index records out of the
minibatch buffer.
+ */
+ private interface Processor extends Serializable {
+ void process(List<HoodieFlinkInternalRow> records,
Collector<HoodieFlinkInternalRow> out) throws Exception;
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
index cef722c87a4b..a95e18e863a1 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.HoodieDataUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
@@ -38,8 +37,7 @@ import org.apache.flink.configuration.Configuration;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -74,7 +72,7 @@ public class GlobalRecordLevelIndexBackend implements
MinibatchIndexBackend {
@Override
public HoodieRecordGlobalLocation get(String recordKey) throws IOException {
- return get(Collections.singletonList(recordKey)).get(recordKey);
+ throw new UnsupportedOperationException(this.getClass().getSimpleName() +
" doesn't support lookup with a single key.");
}
@Override
@@ -84,22 +82,20 @@ public class GlobalRecordLevelIndexBackend implements
MinibatchIndexBackend {
@Override
public Map<String, HoodieRecordGlobalLocation> get(List<String> recordKeys)
throws IOException {
- // use a linked hash map to keep the natural order.
- Map<String, HoodieRecordGlobalLocation> keysAndLocations = new
LinkedHashMap<>();
+ Map<String, HoodieRecordGlobalLocation> keysAndLocations = new HashMap<>();
List<String> missedKeys = new ArrayList<>();
for (String key: recordKeys) {
HoodieRecordGlobalLocation location = recordIndexCache.get(key);
if (location == null) {
missedKeys.add(key);
+ } else {
+ keysAndLocations.put(key, location);
}
- // insert anyway even the location is null to keep the natural order.
- keysAndLocations.put(key, location);
}
if (!missedKeys.isEmpty()) {
HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
metadataTable.readRecordIndexLocationsWithKeys(HoodieListData.eager(missedKeys));
- List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations =
HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
- recordIndexLocations.forEach(keyAndLocation -> {
+ recordIndexData.forEach(keyAndLocation -> {
recordIndexCache.update(keyAndLocation.getKey(),
keyAndLocation.getValue());
keysAndLocations.put(keyAndLocation.getKey(),
keyAndLocation.getValue());
});
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
index 055a85de0695..a98a95529fc1 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
@@ -58,8 +58,8 @@ public class TestMinibatchBucketAssignFunction {
final String basePath = tempFile.getAbsolutePath();
conf = TestConfigurations.getDefaultConf(basePath);
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
- conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
}
@BeforeEach
@@ -244,4 +244,4 @@ public class TestMinibatchBucketAssignFunction {
// Close should not throw any exceptions
testHarness.close();
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
index 396a57185cec..3b3f72a9fcb5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -43,6 +44,7 @@ import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static org.mockito.Mockito.mock;
@@ -74,14 +76,15 @@ public class TestGlobalRecordLevelIndexBackend {
try (GlobalRecordLevelIndexBackend globalRecordLevelIndexBackend = new
GlobalRecordLevelIndexBackend(conf, -1)) {
// get record location
- HoodieRecordGlobalLocation location =
globalRecordLevelIndexBackend.get("id1");
+ HoodieRecordGlobalLocation location =
globalRecordLevelIndexBackend.get(Collections.singletonList("id1")).get("id1");
assertNotNull(location);
assertEquals("par1", location.getPartitionPath());
assertEquals(firstCommitTime, location.getInstantTime());
// get record location with non existed key
- location = globalRecordLevelIndexBackend.get("new_key");
+ location =
globalRecordLevelIndexBackend.get(Collections.singletonList("new_key")).get("new_key");
assertNull(location);
+ assertThrows(UnsupportedOperationException.class, () ->
globalRecordLevelIndexBackend.get("id1"));
// get records locations for multiple record keys
Map<String, HoodieRecordGlobalLocation> locations =
globalRecordLevelIndexBackend.get(Arrays.asList("id1", "id2", "id3"));
@@ -90,7 +93,7 @@ public class TestGlobalRecordLevelIndexBackend {
// get records locations for multiple record keys with unexisted key
locations = globalRecordLevelIndexBackend.get(Arrays.asList("id1",
"id2", "new_key"));
- assertEquals(3, locations.size());
+ assertEquals(2, locations.size());
assertNull(locations.get("new_key"));
// new checkpoint
@@ -99,7 +102,7 @@ public class TestGlobalRecordLevelIndexBackend {
// update record location
HoodieRecordGlobalLocation newLocation = new
HoodieRecordGlobalLocation("par5", "1003", "file_id_4");
globalRecordLevelIndexBackend.update("new_key", newLocation);
- location = globalRecordLevelIndexBackend.get("new_key");
+ location =
globalRecordLevelIndexBackend.get(Collections.singletonList("new_key")).get("new_key");
assertEquals(newLocation, location);
// previous instant commit success, clean
@@ -182,9 +185,9 @@ public class TestGlobalRecordLevelIndexBackend {
// cache for the oldest ckp id will be cleaned
assertNull(globalRecordLevelIndexBackend.getRecordIndexCache().getCaches().get(-1L));
// caches for the latest 3 ckp id still in the cache
- assertEquals("par1",
globalRecordLevelIndexBackend.get("id2_0").getPartitionPath());
- assertEquals("par1",
globalRecordLevelIndexBackend.get("id3_0").getPartitionPath());
- assertEquals("par1",
globalRecordLevelIndexBackend.get("id4_0").getPartitionPath());
+ assertEquals("par1",
globalRecordLevelIndexBackend.get(Collections.singletonList("id2_0")).get("id2_0").getPartitionPath());
+ assertEquals("par1",
globalRecordLevelIndexBackend.get(Collections.singletonList("id3_0")).get("id3_0").getPartitionPath());
+ assertEquals("par1",
globalRecordLevelIndexBackend.get(Collections.singletonList("id4_0")).get("id4_0").getPartitionPath());
}
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index dac07150295e..dec0936f05a6 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -35,6 +35,7 @@ import org.apache.hudi.sink.bootstrap.RLIBootstrapOperator;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
+import org.apache.hudi.sink.partitioner.MinibatchBucketAssignFunction;
import org.apache.hudi.sink.partitioner.index.IndexWriteFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.util.HoodieSchemaConverter;
@@ -109,6 +110,10 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
* Function that assigns bucket ID.
*/
private BucketAssignFunction bucketAssignerFunction;
+ /**
+ * Function that assigns bucket ID with minibatch RLI lookup.
+ */
+ private MinibatchBucketAssignFunction minibatchBucketAssignerFunction;
/**
* BucketAssignOperator context.
**/
@@ -177,11 +182,19 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
toHoodieFunction.setRuntimeContext(runtimeContext);
toHoodieFunction.open(conf);
- bucketAssignerFunction = new BucketAssignFunction(conf);
- bucketAssignerFunction.setRuntimeContext(runtimeContext);
- bucketAssignerFunction.setCorrespondent(correspondent);
- bucketAssignerFunction.open(conf);
- bucketAssignerFunction.initializeState(this.stateInitializationContext);
+ if (useMinibatchBucketAssignFunction()) {
+ minibatchBucketAssignerFunction = new
MinibatchBucketAssignFunction(conf, 1);
+ minibatchBucketAssignerFunction.setRuntimeContext(runtimeContext);
+ minibatchBucketAssignerFunction.setCorrespondent(correspondent);
+ minibatchBucketAssignerFunction.open(conf);
+
minibatchBucketAssignerFunction.initializeState(this.stateInitializationContext);
+ } else {
+ bucketAssignerFunction = new BucketAssignFunction(conf);
+ bucketAssignerFunction.setRuntimeContext(runtimeContext);
+ bucketAssignerFunction.setCorrespondent(correspondent);
+ bucketAssignerFunction.open(conf);
+ bucketAssignerFunction.initializeState(this.stateInitializationContext);
+ }
if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
bootstrapOperator = isStreamingWriteIndexEnabled ? Mockito.spy(new
RLIBootstrapOperator(conf)) : new BootstrapOperator(conf);
@@ -216,7 +229,11 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
stateInitializationContext.getKeyedStateStore().setCurrentKey(hoodieRecord.getRecordKey());
RecordsCollector<HoodieFlinkInternalRow> collector =
RecordsCollector.getInstance(rowType);
when(context.getCurrentKey()).thenReturn(hoodieRecord.getRecordKey());
- bucketAssignerFunction.processElement(hoodieRecord, context, collector);
+ if (useMinibatchBucketAssignFunction()) {
+ minibatchBucketAssignerFunction.processElement(hoodieRecord, null,
collector);
+ } else {
+ bucketAssignerFunction.processElement(hoodieRecord, context, collector);
+ }
bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey());
RecordsCollector<RowData> indexRecordCollector =
RecordsCollector.getInstance();
for (HoodieFlinkInternalRow row: collector.getVal()) {
@@ -268,7 +285,12 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
bootstrapOperator.snapshotState(new
MockStateSnapshotContext(checkpointId));
}
- bucketAssignerFunction.snapshotState(new
MockFunctionSnapshotContext(checkpointId));
+ if (useMinibatchBucketAssignFunction()) {
+ minibatchBucketAssignerFunction.prepareSnapshotPreBarrier(checkpointId);
+ minibatchBucketAssignerFunction.snapshotState(new
MockFunctionSnapshotContext(checkpointId));
+ } else {
+ bucketAssignerFunction.snapshotState(new
MockFunctionSnapshotContext(checkpointId));
+ }
writeFunction.snapshotState(new MockFunctionSnapshotContext(checkpointId));
if (isStreamingWriteIndexEnabled) {
@@ -294,6 +316,13 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
}
public void endInput() {
+ if (useMinibatchBucketAssignFunction()) {
+ try {
+ minibatchBucketAssignerFunction.endInput();
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ }
+ }
writeFunction.endInput();
if (isStreamingWriteIndexEnabled) {
indexWriteFunction.endInput();
@@ -304,7 +333,15 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
stateInitializationContext.checkpointSuccess(checkpointId);
indexStateInitializationContext.checkpointSuccess(checkpointId);
coordinator.notifyCheckpointComplete(checkpointId);
- this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
+ if (useMinibatchBucketAssignFunction()) {
+ try {
+
this.minibatchBucketAssignerFunction.notifyCheckpointComplete(checkpointId);
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ }
+ } else {
+ this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
+ }
if (asyncCompaction) {
try {
compactFunctionWrapper.compact(checkpointId);
@@ -367,7 +404,11 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
public void close() throws Exception {
coordinator.close();
ioManager.close();
- bucketAssignerFunction.close();
+ if (useMinibatchBucketAssignFunction()) {
+ minibatchBucketAssignerFunction.close();
+ } else {
+ bucketAssignerFunction.close();
+ }
writeFunction.close();
if (indexWriteFunction != null) {
indexWriteFunction.close();
@@ -388,7 +429,9 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
@Override
public BucketAssignFunction getBucketAssignFunction() {
- return this.bucketAssignerFunction;
+ return useMinibatchBucketAssignFunction()
+ ? this.minibatchBucketAssignerFunction.getDelegateFunction()
+ : this.bucketAssignerFunction;
}
public boolean isKeyInState(HoodieKey hoodieKey) {
@@ -425,6 +468,10 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
indexWriteFunction.open(conf);
}
+ private boolean useMinibatchBucketAssignFunction() {
+ return OptionsResolver.isGlobalRecordLevelIndex(conf) &&
!conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
+ }
+
private void setupIndexBootstrapFunction() {
bootstrapOperator = Mockito.spy(new RLIBootstrapOperator(conf));
CollectOutputAdapter<HoodieFlinkInternalRow> output = new
CollectOutputAdapter<>();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 2da870f1ef8c..5d6a5daa061e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -3167,6 +3167,7 @@ public class ITTestHoodieDataSource {
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.options(getDefaultKeys())
.option(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name())
+ .option(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, false)
.option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
.option(FlinkOptions.TABLE_TYPE, tableType.name())
.end();
@@ -3176,6 +3177,12 @@ public class ITTestHoodieDataSource {
List<Row> result1 = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
+
+ // insert another batch of records, so that minibatch lookup results are
not empty
+ execInsertSql(tableEnv, TestSQL.UPDATE_INSERT_T1);
+ result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result1, TestData.DATA_SET_SOURCE_MERGED);
}
@ParameterizedTest