This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e1508d503d [cdc] pulsar cdc compatible with flink-connector-pulsar 
version 4.1.0 (#5646)
e1508d503d is described below

commit e1508d503d709c0ed16c5c752f1d90e7ad15d2e8
Author: Kerwin <[email protected]>
AuthorDate: Wed May 21 18:20:11 2025 +0800

    [cdc] pulsar cdc compatible with flink-connector-pulsar version 4.1.0 
(#5646)
---
 .../org/apache/paimon/flink/action/cdc/SyncJobHandler.java   |  1 -
 .../paimon/flink/action/cdc/pulsar/PulsarActionUtils.java    | 12 ++++++++++--
 .../flink/action/cdc/pulsar/PulsarActionITCaseBase.java      |  4 ++--
 3 files changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
index 147d19377b..323078aef6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
@@ -156,7 +156,6 @@ public class SyncJobHandler {
                         PULSAR_CONF,
                         PulsarActionUtils.VALUE_FORMAT,
                         PulsarOptions.PULSAR_SERVICE_URL,
-                        PulsarOptions.PULSAR_ADMIN_URL,
                         PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME);
                 checkOneRequiredOption(
                         cdcSourceConfig,
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
index ea5a715a3b..7d9672ae21 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
@@ -61,7 +61,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
@@ -166,6 +165,15 @@ public class PulsarActionUtils {
                     .defaultValue(true)
                     .withDescription("To specify the boundedness of a 
stream.");
 
+    // flink-connector-pulsar 4.1.0+ remove PulsarOptions#PULSAR_ADMIN_URL 
option, Compatible with
+    // lower version flink-connector-pulsar.
+    @Deprecated
+    static final ConfigOption<String> PULSAR_ADMIN_URL =
+            ConfigOptions.key("pulsar.admin.adminUrl")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The Pulsar service HTTP URL for the 
admin endpoint.");
+
     public static PulsarSource<CdcSourceRecord> buildPulsarSource(
             Configuration pulsarConfig,
             DeserializationSchema<CdcSourceRecord> deserializationSchema) {
@@ -174,10 +182,10 @@ public class PulsarActionUtils {
         // the minimum setup
         pulsarSourceBuilder
                 .setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL))
-                .setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL))
                 
.setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME))
                 .setDeserializationSchema(deserializationSchema);
 
+        
pulsarConfig.getOptional(PULSAR_ADMIN_URL).ifPresent(pulsarSourceBuilder::setAdminUrl);
         
pulsarConfig.getOptional(TOPIC).ifPresent(pulsarSourceBuilder::setTopics);
         
pulsarConfig.getOptional(TOPIC_PATTERN).ifPresent(pulsarSourceBuilder::setTopicPattern);
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
index 1263754a1d..8c19f043e1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
@@ -53,7 +53,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
@@ -150,7 +149,8 @@ public class PulsarActionITCaseBase extends 
CdcActionITCaseBase {
     protected Map<String, String> getBasicPulsarConfig() {
         Map<String, String> config = new HashMap<>();
         config.put(PULSAR_SERVICE_URL.key(), 
PULSAR_CONTAINER.getPulsarBrokerUrl());
-        config.put(PULSAR_ADMIN_URL.key(), 
PULSAR_CONTAINER.getHttpServiceUrl());
+        // flink-connector-pulsar 4.1.0+ remove PulsarOptions#PULSAR_ADMIN_URL 
option.
+        config.put("pulsar.admin.adminUrl", 
PULSAR_CONTAINER.getHttpServiceUrl());
         config.put(PULSAR_SUBSCRIPTION_NAME.key(), "paimon-tests");
         return config;
     }

Reply via email to