This is an automated email from the ASF dual-hosted git repository.

yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 001cba06f [FLINK-38737][common] Fix potential backwards 
incompatibility with fallback keys (#4185)
001cba06f is described below

commit 001cba06f58014a627931ac81fb64934d34fa191
Author: Kunni <[email protected]>
AuthorDate: Fri Dec 5 18:01:25 2025 +0800

    [FLINK-38737][common] Fix potential backwards incompatibility with fallback 
keys (#4185)
---
 .../flink/cdc/common/factories/FactoryHelper.java  | 19 +++++++---
 .../cdc/common/factories/FactoryHelperTests.java   | 23 ++++++++++--
 .../paimon/sink/PaimonDataSinkFactoryTest.java     | 42 ++++++++++++++++++++++
 3 files changed, 77 insertions(+), 7 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
index 8fa928883..8e4ff0227 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.common.factories;
 import org.apache.flink.cdc.common.annotation.PublicEvolving;
 import org.apache.flink.cdc.common.configuration.ConfigOption;
 import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.configuration.FallbackKey;
 import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.ValidationException;
@@ -32,6 +33,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 /** A helper for working with {@link Factory}. */
 @PublicEvolving
@@ -70,7 +72,7 @@ public class FactoryHelper {
         final List<String> missingRequiredOptions =
                 requiredOptions.stream()
                         .filter(option -> configuration.get(option) == null)
-                        .map(ConfigOption::key)
+                        .flatMap(FactoryHelper::allKeys)
                         .sorted()
                         .collect(Collectors.toList());
 
@@ -109,8 +111,8 @@ public class FactoryHelper {
     public void validate() {
         Set<String> allOptionKeys =
                 Stream.concat(
-                                
factory.requiredOptions().stream().map(ConfigOption::key),
-                                
factory.optionalOptions().stream().map(ConfigOption::key))
+                                
factory.requiredOptions().stream().flatMap(FactoryHelper::allKeys),
+                                
factory.optionalOptions().stream().flatMap(FactoryHelper::allKeys))
                         .collect(Collectors.toSet());
 
         validateFactoryOptions(factory, context.getFactoryConfiguration());
@@ -135,8 +137,8 @@ public class FactoryHelper {
 
         Set<String> allOptionKeys =
                 Stream.concat(
-                                
factory.requiredOptions().stream().map(ConfigOption::key),
-                                
factory.optionalOptions().stream().map(ConfigOption::key))
+                                
factory.requiredOptions().stream().flatMap(FactoryHelper::allKeys),
+                                
factory.optionalOptions().stream().flatMap(FactoryHelper::allKeys))
                         .collect(Collectors.toSet());
 
         Set<String> filteredOptionKeys =
@@ -148,6 +150,13 @@ public class FactoryHelper {
         validateUnconsumedKeys(factory.identifier(), filteredOptionKeys, 
allOptionKeys);
     }
 
+    private static Stream<String> allKeys(ConfigOption<?> option) {
+        return Stream.concat(
+                Stream.of(option.key()),
+                StreamSupport.stream(option.fallbackKeys().spliterator(), 
false)
+                        .map(FallbackKey::getKey));
+    }
+
     public ReadableConfig getFormatConfig(String formatPrefix) {
         final String prefix = formatPrefix + ".";
         Map<String, String> formatConfigMap = new HashMap<>();
diff --git 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/factories/FactoryHelperTests.java
 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/factories/FactoryHelperTests.java
index 4b356ba7f..a7be9c3f5 100644
--- 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/factories/FactoryHelperTests.java
+++ 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/factories/FactoryHelperTests.java
@@ -46,7 +46,10 @@ class FactoryHelperTests {
             @Override
             public Set<ConfigOption<?>> requiredOptions() {
                 return Sets.newHashSet(
-                        ConfigOptions.key("id").intType().noDefaultValue(),
+                        ConfigOptions.key("id")
+                                .intType()
+                                .noDefaultValue()
+                                .withFallbackKeys("id_fallback"),
                         
ConfigOptions.key("name").stringType().noDefaultValue(),
                         
ConfigOptions.key("age").doubleType().noDefaultValue());
             }
@@ -54,7 +57,10 @@ class FactoryHelperTests {
             @Override
             public Set<ConfigOption<?>> optionalOptions() {
                 return Sets.newHashSet(
-                        
ConfigOptions.key("hobby").stringType().noDefaultValue(),
+                        ConfigOptions.key("hobby")
+                                .stringType()
+                                .noDefaultValue()
+                                .withFallbackKeys("hobby_fallback"),
                         
ConfigOptions.key("location").stringType().defaultValue("Everywhere"),
                         ConfigOptions.key("misc")
                                 .mapType()
@@ -79,6 +85,19 @@ class FactoryHelperTests {
                                 Configuration.fromMap(configurations), null, 
null));
 
         factoryHelper.validate();
+
+        // Validation for fallback keys.
+        configurations.clear();
+        configurations.put("id_fallback", "2");
+        configurations.put("name", "Bob");
+        configurations.put("age", "18");
+        configurations.put("hobby_fallback", "Swimming");
+        factoryHelper =
+                FactoryHelper.createFactoryHelper(
+                        getDummyFactory(),
+                        new FactoryHelper.DefaultContext(
+                                Configuration.fromMap(configurations), null, 
null));
+        factoryHelper.validate();
     }
 
     @Test
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java
index 2e6311d6b..dcbbdd682 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java
@@ -22,7 +22,9 @@ import 
org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.factories.DataSinkFactory;
 import org.apache.flink.cdc.common.factories.FactoryHelper;
 import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
 import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonEventSink;
 import org.apache.flink.table.api.ValidationException;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
@@ -30,6 +32,8 @@ import 
org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.util.HashMap;
@@ -38,6 +42,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static org.assertj.core.api.InstanceOfAssertFactories.type;
+
 /** Tests for {@link PaimonDataSinkFactory}. */
 class PaimonDataSinkFactoryTest {
 
@@ -164,4 +170,40 @@ class PaimonDataSinkFactoryTest {
                                 conf, conf, 
Thread.currentThread().getContextClassLoader()));
         Assertions.assertThat(dataSink).isInstanceOf(PaimonDataSink.class);
     }
+
+    @ParameterizedTest(name = "{0}")
+    @ValueSource(strings = {"commit.user", "commit.user-prefix"})
+    void testSpecifyingCommitUser(String commitUserKey) {
+        DataSinkFactory sinkFactory =
+                FactoryDiscoveryUtils.getFactoryByIdentifier("paimon", 
DataSinkFactory.class);
+        
Assertions.assertThat(sinkFactory).isInstanceOf(PaimonDataSinkFactory.class);
+        Configuration conf =
+                Configuration.fromMap(
+                        ImmutableMap.<String, String>builder()
+                                .put(PaimonDataSinkOptions.METASTORE.key(), 
"filesystem")
+                                .put(
+                                        PaimonDataSinkOptions.WAREHOUSE.key(),
+                                        new File(
+                                                        
temporaryFolder.toFile(),
+                                                        
UUID.randomUUID().toString())
+                                                .toString())
+                                .put(commitUserKey, "yux")
+                                .build());
+
+        DataSink dataSink =
+                sinkFactory.createDataSink(
+                        new FactoryHelper.DefaultContext(
+                                conf, conf, 
Thread.currentThread().getContextClassLoader()));
+        Assertions.assertThat(dataSink).isInstanceOf(PaimonDataSink.class);
+        
Assertions.assertThat(dataSink).extracting("commitUser").isEqualTo("yux");
+        Assertions.assertThat(dataSink.getEventSinkProvider())
+                .isInstanceOf(FlinkSinkProvider.class)
+                .asInstanceOf(type(FlinkSinkProvider.class))
+                .extracting(FlinkSinkProvider::getSink)
+                .isExactlyInstanceOf(PaimonEventSink.class)
+                .extracting("commitUser")
+                .asString()
+                .hasSize(39) // 3 ("yux") + 36 (Random UUID)
+                .startsWith("yux");
+    }
 }

Reply via email to