This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ed9ea0ead908 fix(flink): Trigger a failover after pending instants
recommitted for both global and partitioned RLI (#18793)
ed9ea0ead908 is described below
commit ed9ea0ead908e3765b6c938da121e96ace13ba38
Author: Shuo Cheng <[email protected]>
AuthorDate: Tue Jun 2 08:47:25 2026 +0800
fix(flink): Trigger a failover after pending instants recommitted for both
global and partitioned RLI (#18793)
---
.../apache/hudi/configuration/OptionsResolver.java | 4 ----
.../hudi/sink/StreamWriteOperatorCoordinator.java | 10 +++++-----
.../sink/TestStreamWriteOperatorCoordinator.java | 23 ++++++++++++++++++++++
.../hudi/sink/utils/BulkInsertFunctionWrapper.java | 4 +---
.../hudi/sink/utils/InsertFunctionWrapper.java | 20 +++++++++++++++----
.../sink/utils/StreamWriteFunctionWrapper.java | 3 ---
6 files changed, 45 insertions(+), 19 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 66e6e2815a5a..74c5257cf44f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -232,10 +232,6 @@ public class OptionsResolver {
return indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX;
}
- public static boolean isRLIWithBootstrap(Configuration conf) {
- return isGlobalRecordLevelIndex(conf) &&
conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
- }
-
/**
* Returns whether it is a MERGE_ON_READ table, and updates by bucket index.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 0a10d6d7b88d..7b9095258903 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -330,7 +330,7 @@ public class StreamWriteOperatorCoordinator
// resetToCheckpoint() is called in two cases:
// 1. The job is restarted from state, start() will be called later.
// 2. The job is recovered from global failover. The coordinator is
already started, and start() will not be called again.
- if (executor != null && tableState.isRLIWithBootstrap) {
+ if (executor != null && tableState.isRecordLevelIndex) {
// use sync execution here to make sure the recommitting finishes
before RLI bootstrapping
this.executor.executeSync(this::restoreEvents, "Recommit pending
instants on resetting to checkpoint: %s.", checkpointID);
}
@@ -373,7 +373,7 @@ public class StreamWriteOperatorCoordinator
@Override
public void subtaskReset(int i, long resetCkpId) {
// There exists pending instants waiting for recommiting.
- if (tableState.isRLIWithBootstrap &&
!eventBuffers.getPendingInstantsBefore(resetCkpId).isEmpty()) {
+ if (tableState.isRecordLevelIndex &&
!eventBuffers.getPendingInstantsBefore(resetCkpId).isEmpty()) {
// use sync execution here to make sure the recommitting finishes before
RLI bootstrapping
executor.executeSync(() -> commitInstants(resetCkpId), "Recommit pending
instants on resetting subtask %s to checkpoint: %s.", i, resetCkpId);
}
@@ -565,7 +565,7 @@ public class StreamWriteOperatorCoordinator
if (eventBuffer.allBootstrapEventsReceived()) {
// start to recommit the instant.
boolean committed = recommitInstant(event.getCheckpointId(),
event.getInstantTime(), eventBuffer);
- if (committed && tableState.isRLIWithBootstrap) {
+ if (committed && tableState.isRecordLevelIndex) {
context.failJob(new HoodieException("There are pending instants
recommitted on job restart,"
+ "triggering a global failover so that RLI bootstrap operator can
load the record level index completely."));
}
@@ -759,7 +759,7 @@ public class StreamWriteOperatorCoordinator
final boolean syncMetadata;
final boolean isDeltaTimeCompaction;
final boolean isStreamingIndexWriteEnabled;
- final boolean isRLIWithBootstrap;
+ final boolean isRecordLevelIndex;
private TableState(Configuration conf) {
this.operationType =
WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION));
@@ -773,7 +773,7 @@ public class StreamWriteOperatorCoordinator
this.syncMetadata = conf.get(FlinkOptions.METADATA_ENABLED);
this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
this.isStreamingIndexWriteEnabled =
OptionsResolver.isStreamingIndexWriteEnabled(conf);
- this.isRLIWithBootstrap = OptionsResolver.isRLIWithBootstrap(conf);
+ this.isRecordLevelIndex = OptionsResolver.isGlobalRecordLevelIndex(conf)
|| OptionsResolver.isRecordLevelIndex(conf);
}
public static TableState create(Configuration conf) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 981cacfc54bb..f8205f57f342 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -72,6 +72,7 @@ import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -307,6 +308,18 @@ public class TestStreamWriteOperatorCoordinator {
assertThat("Recommits the instant with partial uncommitted events",
lastCompleted, is(instant));
}
+ @ParameterizedTest
+ @ValueSource(strings = {"GLOBAL_RECORD_LEVEL_INDEX", "RECORD_LEVEL_INDEX"})
+ public void testRecordLevelIndexFlag(String indexType) throws Exception {
+ Configuration conf = TestConfigurations.getDefaultConf(new File(tempFile,
indexType).getAbsolutePath());
+ conf.set(FlinkOptions.INDEX_TYPE, indexType);
+ conf.set(FlinkOptions.INDEX_WRITE_TASKS, 1);
+
+ try (StreamWriteOperatorCoordinator coordinator = createCoordinator(conf,
1)) {
+ assertTrue(getRecordLevelIndexFlag(coordinator));
+ }
+ }
+
@Test
public void testStopHeartbeatForUncommittedEventWithLazyCleanPolicy() throws
Exception {
// reset
@@ -776,6 +789,16 @@ public class TestStreamWriteOperatorCoordinator {
assertTrue(context.isJobFailed(), message);
}
+ private static boolean
getRecordLevelIndexFlag(StreamWriteOperatorCoordinator coordinator) throws
Exception {
+ Field tableStateField =
StreamWriteOperatorCoordinator.class.getDeclaredField("tableState");
+ tableStateField.setAccessible(true);
+ Object tableState = tableStateField.get(coordinator);
+
+ Field recordLevelIndexField =
tableState.getClass().getDeclaredField("isRecordLevelIndex");
+ recordLevelIndexField.setAccessible(true);
+ return recordLevelIndexField.getBoolean(tableState);
+ }
+
// -------------------------------------------------------------------------
// Kafka Offset Tests
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
index 26f79d870d66..159ead016fb5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
@@ -167,9 +167,7 @@ public class BulkInsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
}
public void coordinatorFails() throws Exception {
- this.coordinator.close();
- this.coordinator.start();
- this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
+ // Do nothing since there is no state recovery for bulk insert.
}
public void restartCoordinator() throws Exception {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
index 8c0369a889c7..9a8262155973 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
@@ -49,6 +49,8 @@ import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
/**
@@ -70,6 +72,7 @@ public class InsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
private final boolean asyncClustering;
private ClusteringFunctionWrapper clusteringFunctionWrapper;
+ private final TreeMap<Long, byte[]> coordinatorStateStore;
/**
* Append write function.
@@ -97,6 +100,7 @@ public class InsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
this.coordinatorContext = new MockOperatorCoordinatorContext(new
OperatorID(), 1);
this.coordinator = new StreamWriteOperatorCoordinator(conf,
this.coordinatorContext);
this.stateInitializationContext = new MockStateInitializationContext();
+ this.coordinatorStateStore = new TreeMap<>();
this.asyncClustering = OptionsResolver.needsAsyncClustering(conf);
StreamConfig streamConfig = new StreamConfig(conf);
@@ -142,8 +146,10 @@ public class InsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
}
public void checkpointFunction(long checkpointId) throws Exception {
+ CompletableFuture<byte[]> completableFuture = new CompletableFuture<>();
// checkpoint the coordinator first
- this.coordinator.checkpointCoordinator(checkpointId, new
CompletableFuture<>());
+ this.coordinator.checkpointCoordinator(checkpointId, completableFuture);
+ this.coordinatorStateStore.put(checkpointId, completableFuture.get());
writeFunction.snapshotState(new MockFunctionSnapshotContext(checkpointId));
stateInitializationContext.checkpointBegin(checkpointId);
@@ -167,9 +173,15 @@ public class InsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
}
public void coordinatorFails() throws Exception {
- this.coordinator.close();
- this.coordinator.start();
- this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
+ resetCoordinatorToCheckpoint();
+ }
+
+ private void resetCoordinatorToCheckpoint() {
+ if (coordinatorStateStore.isEmpty()) {
+ return;
+ }
+ Map.Entry<Long, byte[]> latestState =
this.coordinatorStateStore.lastEntry();
+ this.coordinator.resetToCheckpoint(latestState.getKey(),
latestState.getValue());
}
public void restartCoordinator() throws Exception {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 52b8e800cfd3..baab42555c1e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -368,10 +368,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
}
public void coordinatorFails() throws Exception {
- this.coordinator.close();
resetCoordinatorToCheckpoint();
- this.coordinator.start();
- this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
}
public void restartCoordinator() throws Exception {