This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit 618daa266d483f4b0d3af03ffbb47315e872df3a
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Oct 18 13:29:39 2022 +0800

    [FLINK-29433][Connector/Pulsar] Support Auth through the builder pattern in 
Pulsar connector. (#21071)
---
 .../connector/pulsar/sink/PulsarSinkBuilder.java   | 33 ++++++++++++++++++++++
 .../pulsar/source/PulsarSourceBuilder.java         | 33 ++++++++++++++++++++++
 2 files changed, 66 insertions(+)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
index 332a27a..4e8f9b2 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
@@ -38,9 +38,13 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME;
@@ -243,6 +247,35 @@ public class PulsarSinkBuilder<IN> {
         return this;
     }
 
+    /**
+     * Configure the authentication provider to use in the Pulsar client 
instance.
+     *
+     * @param authPluginClassName name of the Authentication-Plugin you want 
to use
+     * @param authParamsString string which represents parameters for the 
Authentication-Plugin,
+     *     e.g., "key1:val1,key2:val2"
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setAuthentication(
+            String authPluginClassName, String authParamsString) {
+        configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
+        configBuilder.set(PULSAR_AUTH_PARAMS, authParamsString);
+        return this;
+    }
+
+    /**
+     * Configure the authentication provider to use in the Pulsar client 
instance.
+     *
+     * @param authPluginClassName name of the Authentication-Plugin you want 
to use
+     * @param authParams map which represents parameters for the 
Authentication-Plugin
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setAuthentication(
+            String authPluginClassName, Map<String, String> authParams) {
+        configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
+        configBuilder.set(PULSAR_AUTH_PARAM_MAP, authParams);
+        return this;
+    }
+
     /**
      * Set an arbitrary property for the PulsarSink and Pulsar Producer. The 
valid keys can be found
      * in {@link PulsarSinkOptions} and {@link PulsarOptions}.
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index 5309dd0..5aac3d2 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -44,11 +44,15 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
 import static java.lang.Boolean.FALSE;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
@@ -369,6 +373,35 @@ public final class PulsarSourceBuilder<OUT> {
         return self;
     }
 
+    /**
+     * Configure the authentication provider to use in the Pulsar client 
instance.
+     *
+     * @param authPluginClassName name of the Authentication-Plugin you want 
to use
+     * @param authParamsString string which represents parameters for the 
Authentication-Plugin,
+     *     e.g., "key1:val1,key2:val2"
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setAuthentication(
+            String authPluginClassName, String authParamsString) {
+        configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
+        configBuilder.set(PULSAR_AUTH_PARAMS, authParamsString);
+        return this;
+    }
+
+    /**
+     * Configure the authentication provider to use in the Pulsar client 
instance.
+     *
+     * @param authPluginClassName name of the Authentication-Plugin you want 
to use
+     * @param authParams map which represents parameters for the 
Authentication-Plugin
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setAuthentication(
+            String authPluginClassName, Map<String, String> authParams) {
+        configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
+        configBuilder.set(PULSAR_AUTH_PARAM_MAP, authParams);
+        return this;
+    }
+
     /**
      * Set an arbitrary property for the PulsarSource and Pulsar Consumer. The 
valid keys can be
      * found in {@link PulsarSourceOptions} and {@link PulsarOptions}.

Reply via email to