This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8887df60e [cdc] Improve pulsar-cdc (#2535)
8887df60e is described below

commit 8887df60ee45e69f0ee62fdf054ae15c4a7b6e1b
Author: yuzelin <[email protected]>
AuthorDate: Wed Dec 20 12:46:31 2023 +0800

    [cdc] Improve pulsar-cdc (#2535)
---
 docs/content/cdc-ingestion/pulsar-cdc.md           | 91 ++++++++++++++++++++++
 .../flink/action/cdc/pulsar/PulsarActionUtils.java | 51 +++---------
 .../action/cdc/pulsar/PulsarSchemaITCase.java      |  2 +-
 .../cdc/pulsar/PulsarSyncDatabaseActionITCase.java |  2 +-
 4 files changed, 104 insertions(+), 42 deletions(-)

diff --git a/docs/content/cdc-ingestion/pulsar-cdc.md 
b/docs/content/cdc-ingestion/pulsar-cdc.md
index b65d68e40..b89151358 100644
--- a/docs/content/cdc-ingestion/pulsar-cdc.md
+++ b/docs/content/cdc-ingestion/pulsar-cdc.md
@@ -228,3 +228,94 @@ Synchronization from multiple Pulsar topics to Paimon 
database.
     --table_conf changelog-producer=input \
     --table_conf sink.parallelism=4
 ```
+
+## Additional pulsar_config
+
+There are some useful options to build Flink Pulsar Source, but they are not 
provided by flink-pulsar-connector document. They are:
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Key</th>
+        <th class="text-left">Default</th>
+        <th class="text-left">Type</th>
+        <th class="text-left">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+        <tr>
+         <td>value.format</td>
+          <td>(none)</td>
+          <td>String</td>
+          <td>Defines the format identifier for encoding value data.</td>
+        </tr>
+        <tr>
+          <td>topic</td>
+          <td>(none)</td>
+          <td>String</td>
+          <td>Topic name(s) from which the data is read. It also supports 
topic list by separating topic by semicolon 
+              like 'topic-1;topic-2'. Note, only one of "topic-pattern" and 
"topic" can be specified.
+          </td>
+        </tr>
+        <tr>
+          <td>pulsar.startCursor.fromMessageId</td>
+          <td>EARLIEST</td>
+          <td>Sting</td>
+          <td>Using a unique identifier of a single message to seek the start 
position. The common format is a triple 
+              '&ltlong&gtledgerId,&ltlong&gtentryId,&ltint&gtpartitionIndex'. 
Specially, you can set it to 
+              EARLIEST (-1, -1, -1) or LATEST (Long.MAX_VALUE, Long.MAX_VALUE, 
-1).
+          </td>
+        </tr>
+        <tr>
+          <td>pulsar.startCursor.fromPublishTime</td>
+          <td>(none)</td>
+          <td>Long</td>
+          <td>Using the message publish time to seek the start position.</td>
+        </tr>
+        <tr>
+          <td>pulsar.startCursor.fromMessageIdInclusive</td>
+          <td>true</td>
+          <td>Boolean</td>
+          <td>Whether to include the given message id. This option only works 
when the message id is not EARLIEST or LATEST.</td>
+        </tr>
+        <tr>
+          <td>pulsar.stopCursor.atMessageId</td>
+          <td>(none)</td>
+          <td>String</td>
+          <td>Stop consuming when the message id is equal or greater than the 
specified message id. Message that is equal 
+              to the specified message id will not be consumed. The common 
format is a triple 
'&ltlong&gtledgerId,&ltlong&gtentryId,&ltint&gtpartitionIndex'. 
+              Specially, you can set it to LATEST (Long.MAX_VALUE, 
Long.MAX_VALUE, -1).
+        <tr>
+          <td>pulsar.stopCursor.afterMessageId</td>
+          <td>(none)</td>
+          <td>String</td>
+          <td>Stop consuming when the message id is greater than the specified 
message id. Message that is equal to the 
+              specified message id will be consumed. The common format is a 
triple '&ltlong&gtledgerId,&ltlong&gtentryId,&ltint&gtpartitionIndex'. 
+              Specially, you can set it to LATEST (Long.MAX_VALUE, 
Long.MAX_VALUE, -1).
+          </td>
+        </tr>
+        <tr>
+          <td>pulsar.stopCursor.atEventTime</td>
+          <td>(none)</td>
+          <td>Long</td>
+          <td>Stop consuming when message event time is greater than or equals 
the specified timestamp. 
+              Message that even time is equal to the specified timestamp will 
not be consumed.
+          </td>
+        </tr>
+        <tr>
+          <td>pulsar.stopCursor.afterEventTime</td>
+          <td>(none)</td>
+          <td>Long</td>
+          <td>Stop consuming when message event time is greater than the 
specified timestamp. 
+              Message that even time is equal to the specified timestamp will 
be consumed.
+          </td>
+        </tr>
+        <tr>
+          <td>pulsar.source.unbounded</td>
+          <td>true</td>
+          <td>Boolean</td>
+          <td>To specify the boundedness of a stream.</td>
+        </tr>
+    </tbody>
+</table>
+
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
index 11e979c8b..0f8a17625 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
-import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
 import org.apache.flink.connector.pulsar.source.PulsarSource;
 import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
@@ -44,23 +43,20 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.CLIENT_CONFIG_PREFIX;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static 
org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.isFullTopicRanges;
-import static 
org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.pulsar.client.api.KeySharedPolicy.stickyHashRange;
 
@@ -73,19 +69,15 @@ public class PulsarActionUtils {
                     .noDefaultValue()
                     .withDescription("Defines the format identifier for 
encoding value data.");
 
-    public static final ConfigOption<String> TOPIC =
+    public static final ConfigOption<List<String>> TOPIC =
             ConfigOptions.key("topic")
                     .stringType()
+                    .asList()
                     .noDefaultValue()
                     .withDescription(
-                            "Topic names from which the table is read. Either 
'topic' or 'topic-pattern' must be set for source. "
-                                    + "Option 'topic' is required for sink.");
-
-    static final ConfigOption<String> PULSAR_AUTH_PARAM_MAP =
-            ConfigOptions.key(CLIENT_CONFIG_PREFIX + "authParamMap")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Parameters for the authentication 
plugin.");
+                            "Topic name(s) from which the data is read. It 
also supports topic list by separating topic "
+                                    + "by semicolon like 'topic-1;topic-2'. 
Note, only one of \"topic-pattern\" and \"topic\" "
+                                    + "can be specified.");
 
     static final ConfigOption<String> PULSAR_START_CURSOR_FROM_MESSAGE_ID =
             ConfigOptions.key("pulsar.startCursor.fromMessageId")
@@ -151,9 +143,7 @@ public class PulsarActionUtils {
                     .defaultValue(true)
                     .withDescription("To specify the boundedness of a 
stream.");
 
-    public static PulsarSource<String> buildPulsarSource(Configuration 
rawConfig) {
-        Configuration pulsarConfig = preprocessPulsarConfig(rawConfig);
-
+    public static PulsarSource<String> buildPulsarSource(Configuration 
pulsarConfig) {
         PulsarSourceBuilder<String> pulsarSourceBuilder = 
PulsarSource.builder();
 
         // the minimum setup
@@ -161,10 +151,7 @@ public class PulsarActionUtils {
                 .setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL))
                 .setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL))
                 
.setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME))
-                .setTopics(
-                        Arrays.stream(pulsarConfig.get(TOPIC).split(","))
-                                .map(String::trim)
-                                .collect(Collectors.toList()))
+                .setTopics(pulsarConfig.get(TOPIC))
                 .setDeserializationSchema(new SimpleStringSchema());
 
         // other settings
@@ -231,8 +218,7 @@ public class PulsarActionUtils {
         String authPluginClassName = 
pulsarConfig.get(PULSAR_AUTH_PLUGIN_CLASS_NAME);
         if (authPluginClassName != null) {
             String authParamsString = pulsarConfig.get(PULSAR_AUTH_PARAMS);
-            Map<String, String> authParamsMap =
-                    pulsarConfig.get(PulsarOptions.PULSAR_AUTH_PARAM_MAP);
+            Map<String, String> authParamsMap = 
pulsarConfig.get(PULSAR_AUTH_PARAM_MAP);
 
             checkArgument(
                     authParamsString != null || authParamsMap != null,
@@ -278,21 +264,6 @@ public class PulsarActionUtils {
         }
     }
 
-    static SourceConfiguration toSourceConfiguration(Configuration rawConfig) {
-        return new SourceConfiguration(preprocessPulsarConfig(rawConfig));
-    }
-
-    private static Configuration preprocessPulsarConfig(Configuration 
rawConfig) {
-        Configuration cloned = new Configuration(rawConfig);
-        if (cloned.contains(PULSAR_AUTH_PARAM_MAP)) {
-            Map<String, String> authParamsMap =
-                    
parseCommaSeparatedKeyValues(cloned.get(PULSAR_AUTH_PARAM_MAP));
-            cloned.removeConfig(PULSAR_AUTH_PARAM_MAP);
-            cloned.set(PulsarOptions.PULSAR_AUTH_PARAM_MAP, authParamsMap);
-        }
-        return cloned;
-    }
-
     public static DataFormat getDataFormat(Configuration pulsarConfig) {
         return DataFormat.fromConfigString(pulsarConfig.get(VALUE_FORMAT));
     }
@@ -301,7 +272,7 @@ public class PulsarActionUtils {
     public static MessageQueueSchemaUtils.ConsumerWrapper createPulsarConsumer(
             Configuration pulsarConfig) {
         try {
-            SourceConfiguration pulsarSourceConfiguration = 
toSourceConfiguration(pulsarConfig);
+            SourceConfiguration pulsarSourceConfiguration = new 
SourceConfiguration(pulsarConfig);
             PulsarClient pulsarClient = 
PulsarClientFactory.createClient(pulsarSourceConfiguration);
 
             ConsumerBuilder<String> consumerBuilder =
@@ -313,7 +284,7 @@ public class PulsarActionUtils {
             // The default position is Latest
             
consumerBuilder.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
 
-            String topic = 
pulsarConfig.get(PulsarActionUtils.TOPIC).split(",")[0];
+            String topic = pulsarConfig.get(PulsarActionUtils.TOPIC).get(0);
             TopicPartition topicPartition = new TopicPartition(topic);
             consumerBuilder.topic(topicPartition.getFullTopicName());
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSchemaITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSchemaITCase.java
index 98dfea01a..33d854fb8 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSchemaITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSchemaITCase.java
@@ -51,7 +51,7 @@ public class PulsarSchemaITCase extends 
PulsarActionITCaseBase {
         sendMessages(topic, messages);
 
         Configuration pulsarConfig = 
Configuration.fromMap(getBasicPulsarConfig());
-        pulsarConfig.set(TOPIC, topic);
+        pulsarConfig.setString(TOPIC.key(), topic);
         pulsarConfig.set(VALUE_FORMAT, "canal-json");
 
         Schema pulsarSchema =
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
index 946a1f6e4..dbd3db5d5 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
@@ -67,7 +67,7 @@ public class PulsarSyncDatabaseActionITCase extends 
PulsarActionITCaseBase {
         Map<String, String> pulsarConfig = getBasicPulsarConfig();
         pulsarConfig.put(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
         pulsarConfig.put(VALUE_FORMAT.key(), "canal-json");
-        pulsarConfig.put(TOPIC.key(), String.join(",", topics));
+        pulsarConfig.put(TOPIC.key(), String.join(";", topics));
         PulsarSyncDatabaseAction action =
                 syncDatabaseActionBuilder(pulsarConfig)
                         .withTableConfig(getBasicTableConfig())

Reply via email to