This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 348e7f13a7fb feat(flink): Add validation to reject multiple writers 
for flink RLI writes (#18946)
348e7f13a7fb is described below

commit 348e7f13a7fb12240f5fe5ec781dd5e861655116
Author: Shuo Cheng <[email protected]>
AuthorDate: Tue Jun 9 22:40:46 2026 +0800

    feat(flink): Add validation to reject multiple writers for flink RLI writes 
(#18946)
---
 .../apache/hudi/configuration/OptionsResolver.java | 10 +++++++
 .../org/apache/hudi/table/HoodieTableFactory.java  |  4 +++
 .../apache/hudi/table/TestHoodieTableFactory.java  | 31 ++++++++++++++++++++++
 3 files changed, 45 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index d2f8a623cda8..5089b4eea3c7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -603,6 +603,16 @@ public class OptionsResolver {
     return 
WriteConcurrencyMode.isNonBlockingConcurrencyControl(config.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
 HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()));
   }
 
+  /**
+   * Returns whether this is optimistic concurrency control.
+   */
+  public static boolean isOptimisticConcurrencyControl(Configuration config) {
+    return WriteConcurrencyMode.valueOf(config.getString(
+        HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
+        
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()).toUpperCase(Locale.ROOT))
+        .isOptimisticConcurrencyControl();
+  }
+
   /**
    * Returns whether the cleaning for failed writes is enabled as lazy.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 4e0487c89940..2fb4031cd7b4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -198,6 +198,8 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
             String.format("Metadata table should be enabled when %s is %s.", 
FlinkOptions.INDEX_TYPE.key(), 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX));
         
ValidationUtils.checkArgument(conf.get(FlinkOptions.INDEX_GLOBAL_ENABLED),
             String.format("Partition level index updating is not supported for 
GLOBAL_RECORD_LEVEL_INDEX, please set '%s' = 'true'.", 
FlinkOptions.INDEX_GLOBAL_ENABLED.key()));
+        ValidationUtils.checkArgument(!OptionsResolver.isMultiWriter(conf),
+            "Flink global record level index does not support multiple 
writers, set hoodie.write.concurrency.mode=SINGLE_WRITER instead.");
 
         boolean deferredRLI = Boolean.parseBoolean(conf.getString(
             HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key(), 
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.defaultValue().toString()));
@@ -209,6 +211,8 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
             "Partitioned record level index supports only Flink streaming 
upsert and insert overwrite.");
         
ValidationUtils.checkArgument(!OptionsResolver.isNonBlockingConcurrencyControl(conf),
             "Partitioned record level index does not support non-blocking 
concurrency control.");
+        ValidationUtils.checkArgument(!OptionsResolver.isMultiWriter(conf),
+            "Flink record level index does not support multiple writers, set 
hoodie.write.concurrency.mode=SINGLE_WRITER instead.");
         break;
       default:
         break;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 1746355f1b36..8376a8ba3396 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -22,7 +22,9 @@ import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.hive.MultiPartKeysValueExtractor;
@@ -48,6 +50,8 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -58,6 +62,7 @@ import java.util.Objects;
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_DATE_FORMAT;
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT;
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -278,6 +283,32 @@ public class TestHoodieTableFactory {
     assertThrows(IllegalArgumentException.class, () -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext6));
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieIndex.IndexType.class, names = 
{"GLOBAL_RECORD_LEVEL_INDEX", "RECORD_LEVEL_INDEX"})
+  void testRecordLevelIndexDoesNotSupportMultipleWriters(HoodieIndex.IndexType 
indexType) {
+    ResolvedSchema schema = SchemaBuilder.instance()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20))
+        .field("ts", DataTypes.TIMESTAMP(3))
+        .primaryKey("f0")
+        .build();
+
+    Configuration rliConf = new Configuration(this.conf);
+    rliConf.set(FlinkOptions.INDEX_TYPE, indexType.name());
+    rliConf.set(FlinkOptions.OPERATION, "upsert");
+    rliConf.set(FlinkOptions.METADATA_ENABLED, true);
+    rliConf.set(FlinkOptions.INDEX_GLOBAL_ENABLED,
+        indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX);
+    rliConf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
+        WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+
+    IllegalArgumentException exception = assertThrows(
+        IllegalArgumentException.class,
+        () -> new 
HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(rliConf, 
schema, "")));
+    assertThat(exception.getMessage(),
+        containsString("record level index does not support multiple 
writers"));
+  }
+
   @Test
   void testTableTypeCheck() {
     ResolvedSchema schema = SchemaBuilder.instance()

Reply via email to