This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
commit 618daa266d483f4b0d3af03ffbb47315e872df3a Author: Yufan Sheng <yu...@streamnative.io> AuthorDate: Tue Oct 18 13:29:39 2022 +0800 [FLINK-29433][Connector/Pulsar] Support Auth through the builder pattern in Pulsar connector. (#21071) --- .../connector/pulsar/sink/PulsarSinkBuilder.java | 33 ++++++++++++++++++++++ .../pulsar/source/PulsarSourceBuilder.java | 33 ++++++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java index 332a27a..4e8f9b2 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java @@ -38,9 +38,13 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME; @@ -243,6 +247,35 @@ public class PulsarSinkBuilder<IN> { return this; } + /** + * Configure the authentication provider to use in the Pulsar client instance. + * + * @param authPluginClassName name of the Authentication-Plugin you want to use + * @param authParamsString string which represents parameters for the Authentication-Plugin, + * e.g., "key1:val1,key2:val2" + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder<IN> setAuthentication( + String authPluginClassName, String authParamsString) { + configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName); + configBuilder.set(PULSAR_AUTH_PARAMS, authParamsString); + return this; + } + + /** + * Configure the authentication provider to use in the Pulsar client instance. + * + * @param authPluginClassName name of the Authentication-Plugin you want to use + * @param authParams map which represents parameters for the Authentication-Plugin + * @return this PulsarSinkBuilder. + */ + public PulsarSinkBuilder<IN> setAuthentication( + String authPluginClassName, Map<String, String> authParams) { + configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName); + configBuilder.set(PULSAR_AUTH_PARAM_MAP, authParams); + return this; + } + /** * Set an arbitrary property for the PulsarSink and Pulsar Producer. The valid keys can be found * in {@link PulsarSinkOptions} and {@link PulsarOptions}. diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java index 5309dd0..5aac3d2 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java @@ -44,11 +44,15 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; import static java.lang.Boolean.FALSE; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME; @@ -369,6 +373,35 @@ public final class PulsarSourceBuilder<OUT> { return self; } + /** + * Configure the authentication provider to use in the Pulsar client instance. + * + * @param authPluginClassName name of the Authentication-Plugin you want to use + * @param authParamsString string which represents parameters for the Authentication-Plugin, + * e.g., "key1:val1,key2:val2" + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setAuthentication( + String authPluginClassName, String authParamsString) { + configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName); + configBuilder.set(PULSAR_AUTH_PARAMS, authParamsString); + return this; + } + + /** + * Configure the authentication provider to use in the Pulsar client instance. + * + * @param authPluginClassName name of the Authentication-Plugin you want to use + * @param authParams map which represents parameters for the Authentication-Plugin + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<OUT> setAuthentication( + String authPluginClassName, Map<String, String> authParams) { + configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName); + configBuilder.set(PULSAR_AUTH_PARAM_MAP, authParams); + return this; + } + /** * Set an arbitrary property for the PulsarSource and Pulsar Consumer. The valid keys can be * found in {@link PulsarSourceOptions} and {@link PulsarOptions}.