This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8887df60e [cdc] Improve pulsar-cdc (#2535)
8887df60e is described below
commit 8887df60ee45e69f0ee62fdf054ae15c4a7b6e1b
Author: yuzelin <[email protected]>
AuthorDate: Wed Dec 20 12:46:31 2023 +0800
[cdc] Improve pulsar-cdc (#2535)
---
docs/content/cdc-ingestion/pulsar-cdc.md | 91 ++++++++++++++++++++++
.../flink/action/cdc/pulsar/PulsarActionUtils.java | 51 +++---------
.../action/cdc/pulsar/PulsarSchemaITCase.java | 2 +-
.../cdc/pulsar/PulsarSyncDatabaseActionITCase.java | 2 +-
4 files changed, 104 insertions(+), 42 deletions(-)
diff --git a/docs/content/cdc-ingestion/pulsar-cdc.md
b/docs/content/cdc-ingestion/pulsar-cdc.md
index b65d68e40..b89151358 100644
--- a/docs/content/cdc-ingestion/pulsar-cdc.md
+++ b/docs/content/cdc-ingestion/pulsar-cdc.md
@@ -228,3 +228,94 @@ Synchronization from multiple Pulsar topics to Paimon
database.
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4
```
+
+## Additional pulsar_config
+
+There are some useful options to build Flink Pulsar Source, but they are not
provided by flink-pulsar-connector document. They are:
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left">Key</th>
+ <th class="text-left">Default</th>
+ <th class="text-left">Type</th>
+ <th class="text-left">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>value.format</td>
+ <td>(none)</td>
+ <td>String</td>
+ <td>Defines the format identifier for encoding value data.</td>
+ </tr>
+ <tr>
+ <td>topic</td>
+ <td>(none)</td>
+ <td>String</td>
+ <td>Topic name(s) from which the data is read. It also supports
topic list by separating topic by semicolon
+ like 'topic-1;topic-2'. Note, only one of "topic-pattern" and
"topic" can be specified.
+ </td>
+ </tr>
+ <tr>
+ <td>pulsar.startCursor.fromMessageId</td>
+ <td>EARLIEST</td>
+ <td>Sting</td>
+ <td>Using a unique identifier of a single message to seek the start
position. The common format is a triple
+ '<long>ledgerId,<long>entryId,<int>partitionIndex'.
Specially, you can set it to
+ EARLIEST (-1, -1, -1) or LATEST (Long.MAX_VALUE, Long.MAX_VALUE,
-1).
+ </td>
+ </tr>
+ <tr>
+ <td>pulsar.startCursor.fromPublishTime</td>
+ <td>(none)</td>
+ <td>Long</td>
+ <td>Using the message publish time to seek the start position.</td>
+ </tr>
+ <tr>
+ <td>pulsar.startCursor.fromMessageIdInclusive</td>
+ <td>true</td>
+ <td>Boolean</td>
+ <td>Whether to include the given message id. This option only works
when the message id is not EARLIEST or LATEST.</td>
+ </tr>
+ <tr>
+ <td>pulsar.stopCursor.atMessageId</td>
+ <td>(none)</td>
+ <td>String</td>
+ <td>Stop consuming when the message id is equal or greater than the
specified message id. Message that is equal
+ to the specified message id will not be consumed. The common
format is a triple
'<long>ledgerId,<long>entryId,<int>partitionIndex'.
+ Specially, you can set it to LATEST (Long.MAX_VALUE,
Long.MAX_VALUE, -1).
+ <tr>
+ <td>pulsar.stopCursor.afterMessageId</td>
+ <td>(none)</td>
+ <td>String</td>
+ <td>Stop consuming when the message id is greater than the specified
message id. Message that is equal to the
+ specified message id will be consumed. The common format is a
triple '<long>ledgerId,<long>entryId,<int>partitionIndex'.
+ Specially, you can set it to LATEST (Long.MAX_VALUE,
Long.MAX_VALUE, -1).
+ </td>
+ </tr>
+ <tr>
+ <td>pulsar.stopCursor.atEventTime</td>
+ <td>(none)</td>
+ <td>Long</td>
+ <td>Stop consuming when message event time is greater than or equals
the specified timestamp.
+ Message that even time is equal to the specified timestamp will
not be consumed.
+ </td>
+ </tr>
+ <tr>
+ <td>pulsar.stopCursor.afterEventTime</td>
+ <td>(none)</td>
+ <td>Long</td>
+ <td>Stop consuming when message event time is greater than the
specified timestamp.
+ Message that even time is equal to the specified timestamp will
be consumed.
+ </td>
+ </tr>
+ <tr>
+ <td>pulsar.source.unbounded</td>
+ <td>true</td>
+ <td>Boolean</td>
+ <td>To specify the boundedness of a stream.</td>
+ </tr>
+ </tbody>
+</table>
+
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
index 11e979c8b..0f8a17625 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
-import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
@@ -44,23 +43,20 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.internal.DefaultImplementation;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.CLIENT_CONFIG_PREFIX;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
+import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static
org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder;
import static
org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.isFullTopicRanges;
-import static
org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.pulsar.client.api.KeySharedPolicy.stickyHashRange;
@@ -73,19 +69,15 @@ public class PulsarActionUtils {
.noDefaultValue()
.withDescription("Defines the format identifier for
encoding value data.");
- public static final ConfigOption<String> TOPIC =
+ public static final ConfigOption<List<String>> TOPIC =
ConfigOptions.key("topic")
.stringType()
+ .asList()
.noDefaultValue()
.withDescription(
- "Topic names from which the table is read. Either
'topic' or 'topic-pattern' must be set for source. "
- + "Option 'topic' is required for sink.");
-
- static final ConfigOption<String> PULSAR_AUTH_PARAM_MAP =
- ConfigOptions.key(CLIENT_CONFIG_PREFIX + "authParamMap")
- .stringType()
- .noDefaultValue()
- .withDescription("Parameters for the authentication
plugin.");
+ "Topic name(s) from which the data is read. It
also supports topic list by separating topic "
+ + "by semicolon like 'topic-1;topic-2'.
Note, only one of \"topic-pattern\" and \"topic\" "
+ + "can be specified.");
static final ConfigOption<String> PULSAR_START_CURSOR_FROM_MESSAGE_ID =
ConfigOptions.key("pulsar.startCursor.fromMessageId")
@@ -151,9 +143,7 @@ public class PulsarActionUtils {
.defaultValue(true)
.withDescription("To specify the boundedness of a
stream.");
- public static PulsarSource<String> buildPulsarSource(Configuration
rawConfig) {
- Configuration pulsarConfig = preprocessPulsarConfig(rawConfig);
-
+ public static PulsarSource<String> buildPulsarSource(Configuration
pulsarConfig) {
PulsarSourceBuilder<String> pulsarSourceBuilder =
PulsarSource.builder();
// the minimum setup
@@ -161,10 +151,7 @@ public class PulsarActionUtils {
.setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL))
.setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL))
.setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME))
- .setTopics(
- Arrays.stream(pulsarConfig.get(TOPIC).split(","))
- .map(String::trim)
- .collect(Collectors.toList()))
+ .setTopics(pulsarConfig.get(TOPIC))
.setDeserializationSchema(new SimpleStringSchema());
// other settings
@@ -231,8 +218,7 @@ public class PulsarActionUtils {
String authPluginClassName =
pulsarConfig.get(PULSAR_AUTH_PLUGIN_CLASS_NAME);
if (authPluginClassName != null) {
String authParamsString = pulsarConfig.get(PULSAR_AUTH_PARAMS);
- Map<String, String> authParamsMap =
- pulsarConfig.get(PulsarOptions.PULSAR_AUTH_PARAM_MAP);
+ Map<String, String> authParamsMap =
pulsarConfig.get(PULSAR_AUTH_PARAM_MAP);
checkArgument(
authParamsString != null || authParamsMap != null,
@@ -278,21 +264,6 @@ public class PulsarActionUtils {
}
}
- static SourceConfiguration toSourceConfiguration(Configuration rawConfig) {
- return new SourceConfiguration(preprocessPulsarConfig(rawConfig));
- }
-
- private static Configuration preprocessPulsarConfig(Configuration
rawConfig) {
- Configuration cloned = new Configuration(rawConfig);
- if (cloned.contains(PULSAR_AUTH_PARAM_MAP)) {
- Map<String, String> authParamsMap =
-
parseCommaSeparatedKeyValues(cloned.get(PULSAR_AUTH_PARAM_MAP));
- cloned.removeConfig(PULSAR_AUTH_PARAM_MAP);
- cloned.set(PulsarOptions.PULSAR_AUTH_PARAM_MAP, authParamsMap);
- }
- return cloned;
- }
-
public static DataFormat getDataFormat(Configuration pulsarConfig) {
return DataFormat.fromConfigString(pulsarConfig.get(VALUE_FORMAT));
}
@@ -301,7 +272,7 @@ public class PulsarActionUtils {
public static MessageQueueSchemaUtils.ConsumerWrapper createPulsarConsumer(
Configuration pulsarConfig) {
try {
- SourceConfiguration pulsarSourceConfiguration =
toSourceConfiguration(pulsarConfig);
+ SourceConfiguration pulsarSourceConfiguration = new
SourceConfiguration(pulsarConfig);
PulsarClient pulsarClient =
PulsarClientFactory.createClient(pulsarSourceConfiguration);
ConsumerBuilder<String> consumerBuilder =
@@ -313,7 +284,7 @@ public class PulsarActionUtils {
// The default position is Latest
consumerBuilder.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
- String topic =
pulsarConfig.get(PulsarActionUtils.TOPIC).split(",")[0];
+ String topic = pulsarConfig.get(PulsarActionUtils.TOPIC).get(0);
TopicPartition topicPartition = new TopicPartition(topic);
consumerBuilder.topic(topicPartition.getFullTopicName());
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSchemaITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSchemaITCase.java
index 98dfea01a..33d854fb8 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSchemaITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSchemaITCase.java
@@ -51,7 +51,7 @@ public class PulsarSchemaITCase extends
PulsarActionITCaseBase {
sendMessages(topic, messages);
Configuration pulsarConfig =
Configuration.fromMap(getBasicPulsarConfig());
- pulsarConfig.set(TOPIC, topic);
+ pulsarConfig.setString(TOPIC.key(), topic);
pulsarConfig.set(VALUE_FORMAT, "canal-json");
Schema pulsarSchema =
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
index 946a1f6e4..dbd3db5d5 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
@@ -67,7 +67,7 @@ public class PulsarSyncDatabaseActionITCase extends
PulsarActionITCaseBase {
Map<String, String> pulsarConfig = getBasicPulsarConfig();
pulsarConfig.put(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
pulsarConfig.put(VALUE_FORMAT.key(), "canal-json");
- pulsarConfig.put(TOPIC.key(), String.join(",", topics));
+ pulsarConfig.put(TOPIC.key(), String.join(";", topics));
PulsarSyncDatabaseAction action =
syncDatabaseActionBuilder(pulsarConfig)
.withTableConfig(getBasicTableConfig())