This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 348e7f13a7fb feat(flink): Add validation to reject multiple writers
for flink RLI writes (#18946)
348e7f13a7fb is described below
commit 348e7f13a7fb12240f5fe5ec781dd5e861655116
Author: Shuo Cheng <[email protected]>
AuthorDate: Tue Jun 9 22:40:46 2026 +0800
feat(flink): Add validation to reject multiple writers for flink RLI writes
(#18946)
---
.../apache/hudi/configuration/OptionsResolver.java | 10 +++++++
.../org/apache/hudi/table/HoodieTableFactory.java | 4 +++
.../apache/hudi/table/TestHoodieTableFactory.java | 31 ++++++++++++++++++++++
3 files changed, 45 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index d2f8a623cda8..5089b4eea3c7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -603,6 +603,16 @@ public class OptionsResolver {
return
WriteConcurrencyMode.isNonBlockingConcurrencyControl(config.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()));
}
+ /**
+ * Returns whether this is optimistic concurrency control.
+ */
+ public static boolean isOptimisticConcurrencyControl(Configuration config) {
+ return WriteConcurrencyMode.valueOf(config.getString(
+ HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
+
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()).toUpperCase(Locale.ROOT))
+ .isOptimisticConcurrencyControl();
+ }
+
/**
* Returns whether the cleaning for failed writes is enabled as lazy.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 4e0487c89940..2fb4031cd7b4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -198,6 +198,8 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
String.format("Metadata table should be enabled when %s is %s.",
FlinkOptions.INDEX_TYPE.key(),
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX));
ValidationUtils.checkArgument(conf.get(FlinkOptions.INDEX_GLOBAL_ENABLED),
String.format("Partition level index updating is not supported for
GLOBAL_RECORD_LEVEL_INDEX, please set '%s' = 'true'.",
FlinkOptions.INDEX_GLOBAL_ENABLED.key()));
+ ValidationUtils.checkArgument(!OptionsResolver.isMultiWriter(conf),
+ "Flink global record level index does not support multiple
writers, set hoodie.write.concurrency.mode=SINGLE_WRITER instead.");
boolean deferredRLI = Boolean.parseBoolean(conf.getString(
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key(),
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.defaultValue().toString()));
@@ -209,6 +211,8 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
"Partitioned record level index supports only Flink streaming
upsert and insert overwrite.");
ValidationUtils.checkArgument(!OptionsResolver.isNonBlockingConcurrencyControl(conf),
"Partitioned record level index does not support non-blocking
concurrency control.");
+ ValidationUtils.checkArgument(!OptionsResolver.isMultiWriter(conf),
+ "Flink record level index does not support multiple writers, set
hoodie.write.concurrency.mode=SINGLE_WRITER instead.");
break;
default:
break;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 1746355f1b36..8376a8ba3396 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -22,7 +22,9 @@ import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
@@ -48,6 +50,8 @@ import org.apache.flink.table.factories.DynamicTableFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.io.File;
import java.io.IOException;
@@ -58,6 +62,7 @@ import java.util.Objects;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_DATE_FORMAT;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -278,6 +283,32 @@ public class TestHoodieTableFactory {
assertThrows(IllegalArgumentException.class, () -> new
HoodieTableFactory().createDynamicTableSink(sourceContext6));
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieIndex.IndexType.class, names =
{"GLOBAL_RECORD_LEVEL_INDEX", "RECORD_LEVEL_INDEX"})
+ void testRecordLevelIndexDoesNotSupportMultipleWriters(HoodieIndex.IndexType
indexType) {
+ ResolvedSchema schema = SchemaBuilder.instance()
+ .field("f0", DataTypes.INT().notNull())
+ .field("f1", DataTypes.VARCHAR(20))
+ .field("ts", DataTypes.TIMESTAMP(3))
+ .primaryKey("f0")
+ .build();
+
+ Configuration rliConf = new Configuration(this.conf);
+ rliConf.set(FlinkOptions.INDEX_TYPE, indexType.name());
+ rliConf.set(FlinkOptions.OPERATION, "upsert");
+ rliConf.set(FlinkOptions.METADATA_ENABLED, true);
+ rliConf.set(FlinkOptions.INDEX_GLOBAL_ENABLED,
+ indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX);
+ rliConf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
+ WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+
+ IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> new
HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(rliConf,
schema, "")));
+ assertThat(exception.getMessage(),
+ containsString("record level index does not support multiple
writers"));
+ }
+
@Test
void testTableTypeCheck() {
ResolvedSchema schema = SchemaBuilder.instance()