This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 001cba06f [FLINK-38737][common] Fix potential backwards
incompatibility with fallback keys (#4185)
001cba06f is described below
commit 001cba06f58014a627931ac81fb64934d34fa191
Author: Kunni <[email protected]>
AuthorDate: Fri Dec 5 18:01:25 2025 +0800
[FLINK-38737][common] Fix potential backwards incompatibility with fallback
keys (#4185)
---
.../flink/cdc/common/factories/FactoryHelper.java | 19 +++++++---
.../cdc/common/factories/FactoryHelperTests.java | 23 ++++++++++--
.../paimon/sink/PaimonDataSinkFactoryTest.java | 42 ++++++++++++++++++++++
3 files changed, 77 insertions(+), 7 deletions(-)
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
index 8fa928883..8e4ff0227 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.common.factories;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.configuration.FallbackKey;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
@@ -32,6 +33,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
/** A helper for working with {@link Factory}. */
@PublicEvolving
@@ -70,7 +72,7 @@ public class FactoryHelper {
final List<String> missingRequiredOptions =
requiredOptions.stream()
.filter(option -> configuration.get(option) == null)
- .map(ConfigOption::key)
+ .flatMap(FactoryHelper::allKeys)
.sorted()
.collect(Collectors.toList());
@@ -109,8 +111,8 @@ public class FactoryHelper {
public void validate() {
Set<String> allOptionKeys =
Stream.concat(
-
factory.requiredOptions().stream().map(ConfigOption::key),
-
factory.optionalOptions().stream().map(ConfigOption::key))
+
factory.requiredOptions().stream().flatMap(FactoryHelper::allKeys),
+
factory.optionalOptions().stream().flatMap(FactoryHelper::allKeys))
.collect(Collectors.toSet());
validateFactoryOptions(factory, context.getFactoryConfiguration());
@@ -135,8 +137,8 @@ public class FactoryHelper {
Set<String> allOptionKeys =
Stream.concat(
-
factory.requiredOptions().stream().map(ConfigOption::key),
-
factory.optionalOptions().stream().map(ConfigOption::key))
+
factory.requiredOptions().stream().flatMap(FactoryHelper::allKeys),
+
factory.optionalOptions().stream().flatMap(FactoryHelper::allKeys))
.collect(Collectors.toSet());
Set<String> filteredOptionKeys =
@@ -148,6 +150,13 @@ public class FactoryHelper {
validateUnconsumedKeys(factory.identifier(), filteredOptionKeys,
allOptionKeys);
}
+ private static Stream<String> allKeys(ConfigOption<?> option) {
+ return Stream.concat(
+ Stream.of(option.key()),
+ StreamSupport.stream(option.fallbackKeys().spliterator(),
false)
+ .map(FallbackKey::getKey));
+ }
+
public ReadableConfig getFormatConfig(String formatPrefix) {
final String prefix = formatPrefix + ".";
Map<String, String> formatConfigMap = new HashMap<>();
diff --git
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/factories/FactoryHelperTests.java
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/factories/FactoryHelperTests.java
index 4b356ba7f..a7be9c3f5 100644
---
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/factories/FactoryHelperTests.java
+++
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/factories/FactoryHelperTests.java
@@ -46,7 +46,10 @@ class FactoryHelperTests {
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Sets.newHashSet(
- ConfigOptions.key("id").intType().noDefaultValue(),
+ ConfigOptions.key("id")
+ .intType()
+ .noDefaultValue()
+ .withFallbackKeys("id_fallback"),
ConfigOptions.key("name").stringType().noDefaultValue(),
ConfigOptions.key("age").doubleType().noDefaultValue());
}
@@ -54,7 +57,10 @@ class FactoryHelperTests {
@Override
public Set<ConfigOption<?>> optionalOptions() {
return Sets.newHashSet(
-
ConfigOptions.key("hobby").stringType().noDefaultValue(),
+ ConfigOptions.key("hobby")
+ .stringType()
+ .noDefaultValue()
+ .withFallbackKeys("hobby_fallback"),
ConfigOptions.key("location").stringType().defaultValue("Everywhere"),
ConfigOptions.key("misc")
.mapType()
@@ -79,6 +85,19 @@ class FactoryHelperTests {
Configuration.fromMap(configurations), null,
null));
factoryHelper.validate();
+
+ // Validation for fallback keys.
+ configurations.clear();
+ configurations.put("id_fallback", "2");
+ configurations.put("name", "Bob");
+ configurations.put("age", "18");
+ configurations.put("hobby_fallback", "Swimming");
+ factoryHelper =
+ FactoryHelper.createFactoryHelper(
+ getDummyFactory(),
+ new FactoryHelper.DefaultContext(
+ Configuration.fromMap(configurations), null,
null));
+ factoryHelper.validate();
}
@Test
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java
index 2e6311d6b..dcbbdd682 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java
@@ -22,7 +22,9 @@ import
org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonEventSink;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
@@ -30,6 +32,8 @@ import
org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.util.HashMap;
@@ -38,6 +42,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
+import static org.assertj.core.api.InstanceOfAssertFactories.type;
+
/** Tests for {@link PaimonDataSinkFactory}. */
class PaimonDataSinkFactoryTest {
@@ -164,4 +170,40 @@ class PaimonDataSinkFactoryTest {
conf, conf,
Thread.currentThread().getContextClassLoader()));
Assertions.assertThat(dataSink).isInstanceOf(PaimonDataSink.class);
}
+
+ @ParameterizedTest(name = "{0}")
+ @ValueSource(strings = {"commit.user", "commit.user-prefix"})
+ void testSpecifyingCommitUser(String commitUserKey) {
+ DataSinkFactory sinkFactory =
+ FactoryDiscoveryUtils.getFactoryByIdentifier("paimon",
DataSinkFactory.class);
+
Assertions.assertThat(sinkFactory).isInstanceOf(PaimonDataSinkFactory.class);
+ Configuration conf =
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put(PaimonDataSinkOptions.METASTORE.key(),
"filesystem")
+ .put(
+ PaimonDataSinkOptions.WAREHOUSE.key(),
+ new File(
+
temporaryFolder.toFile(),
+
UUID.randomUUID().toString())
+ .toString())
+ .put(commitUserKey, "yux")
+ .build());
+
+ DataSink dataSink =
+ sinkFactory.createDataSink(
+ new FactoryHelper.DefaultContext(
+ conf, conf,
Thread.currentThread().getContextClassLoader()));
+ Assertions.assertThat(dataSink).isInstanceOf(PaimonDataSink.class);
+
Assertions.assertThat(dataSink).extracting("commitUser").isEqualTo("yux");
+ Assertions.assertThat(dataSink.getEventSinkProvider())
+ .isInstanceOf(FlinkSinkProvider.class)
+ .asInstanceOf(type(FlinkSinkProvider.class))
+ .extracting(FlinkSinkProvider::getSink)
+ .isExactlyInstanceOf(PaimonEventSink.class)
+ .extracting("commitUser")
+ .asString()
+ .hasSize(39) // 3 ("yux") + 36 (Random UUID)
+ .startsWith("yux");
+ }
}