This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e1508d503d [cdc] pulsar cdc compatible with flink-connector-pulsar
version 4.1.0 (#5646)
e1508d503d is described below
commit e1508d503d709c0ed16c5c752f1d90e7ad15d2e8
Author: Kerwin <[email protected]>
AuthorDate: Wed May 21 18:20:11 2025 +0800
[cdc] pulsar cdc compatible with flink-connector-pulsar version 4.1.0
(#5646)
---
.../org/apache/paimon/flink/action/cdc/SyncJobHandler.java | 1 -
.../paimon/flink/action/cdc/pulsar/PulsarActionUtils.java | 12 ++++++++++--
.../flink/action/cdc/pulsar/PulsarActionITCaseBase.java | 4 ++--
3 files changed, 12 insertions(+), 5 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
index 147d19377b..323078aef6 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
@@ -156,7 +156,6 @@ public class SyncJobHandler {
PULSAR_CONF,
PulsarActionUtils.VALUE_FORMAT,
PulsarOptions.PULSAR_SERVICE_URL,
- PulsarOptions.PULSAR_ADMIN_URL,
PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME);
checkOneRequiredOption(
cdcSourceConfig,
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
index ea5a715a3b..7d9672ae21 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
@@ -61,7 +61,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
@@ -166,6 +165,15 @@ public class PulsarActionUtils {
.defaultValue(true)
.withDescription("To specify the boundedness of a
stream.");
+ // flink-connector-pulsar 4.1.0+ remove PulsarOptions#PULSAR_ADMIN_URL
option, Compatible with
+ // lower version flink-connector-pulsar.
+ @Deprecated
+ static final ConfigOption<String> PULSAR_ADMIN_URL =
+ ConfigOptions.key("pulsar.admin.adminUrl")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Pulsar service HTTP URL for the
admin endpoint.");
+
public static PulsarSource<CdcSourceRecord> buildPulsarSource(
Configuration pulsarConfig,
DeserializationSchema<CdcSourceRecord> deserializationSchema) {
@@ -174,10 +182,10 @@ public class PulsarActionUtils {
// the minimum setup
pulsarSourceBuilder
.setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL))
- .setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL))
.setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME))
.setDeserializationSchema(deserializationSchema);
+
pulsarConfig.getOptional(PULSAR_ADMIN_URL).ifPresent(pulsarSourceBuilder::setAdminUrl);
pulsarConfig.getOptional(TOPIC).ifPresent(pulsarSourceBuilder::setTopics);
pulsarConfig.getOptional(TOPIC_PATTERN).ifPresent(pulsarSourceBuilder::setTopicPattern);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
index 1263754a1d..8c19f043e1 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
@@ -53,7 +53,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
@@ -150,7 +149,8 @@ public class PulsarActionITCaseBase extends
CdcActionITCaseBase {
protected Map<String, String> getBasicPulsarConfig() {
Map<String, String> config = new HashMap<>();
config.put(PULSAR_SERVICE_URL.key(),
PULSAR_CONTAINER.getPulsarBrokerUrl());
- config.put(PULSAR_ADMIN_URL.key(),
PULSAR_CONTAINER.getHttpServiceUrl());
+ // flink-connector-pulsar 4.1.0+ remove PulsarOptions#PULSAR_ADMIN_URL
option.
+ config.put("pulsar.admin.adminUrl",
PULSAR_CONTAINER.getHttpServiceUrl());
config.put(PULSAR_SUBSCRIPTION_NAME.key(), "paimon-tests");
return config;
}