This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 147d2b1bff6a055c0f8c928af7afc01dd9aa048b
Author: Lars Bachmann <lars.bachm...@sony.com>
AuthorDate: Thu Apr 29 09:31:51 2021 +0200

    [FLINK-21229][avro-confluent-registry] Add Confluent schema registry SSL 
support
---
 .../confluent/CachedSchemaCoderProvider.java       |  21 ++-
 ...ConfluentRegistryAvroDeserializationSchema.java | 110 ++++++++++++++--
 .../ConfluentRegistryAvroSerializationSchema.java  |  65 +++++++--
 .../confluent/RegistryAvroFormatFactory.java       |  77 ++++++++++-
 .../registry/confluent/RegistryAvroOptions.java    |  52 ++++++++
 .../confluent/CachedSchemaCoderProviderTest.java   | 145 +++++++++++++++++++++
 .../confluent/RegistryAvroFormatFactoryTest.java   |  92 +++++++++++++
 .../src/test/resources/test-keystore.jks           | Bin 0 -> 2327 bytes
 8 files changed, 529 insertions(+), 33 deletions(-)

diff --git 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java
 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java
index b5f3200..53be09c 100644
--- 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java
+++ 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java
@@ -25,10 +25,11 @@ import 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
 
 import javax.annotation.Nullable;
 
+import java.util.Map;
 import java.util.Objects;
 
 /**
- * A {@link SchemaCoder.SchemaCoderProvider} that uses a cached schema 
registry client underlying. *
+ * A {@link SchemaCoder.SchemaCoderProvider} that uses a cached schema 
registry client underneath.
  */
 @Internal
 class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {
@@ -37,21 +38,28 @@ class CachedSchemaCoderProvider implements 
SchemaCoder.SchemaCoderProvider {
     private final String subject;
     private final String url;
     private final int identityMapCapacity;
+    private final @Nullable Map<String, ?> registryConfigs;
 
     CachedSchemaCoderProvider(String url, int identityMapCapacity) {
-        this(null, url, identityMapCapacity);
+        this(null, url, identityMapCapacity, null);
     }
 
-    CachedSchemaCoderProvider(@Nullable String subject, String url, int 
identityMapCapacity) {
+    CachedSchemaCoderProvider(
+            @Nullable String subject,
+            String url,
+            int identityMapCapacity,
+            @Nullable Map<String, ?> registryConfigs) {
         this.subject = subject;
         this.url = Objects.requireNonNull(url);
         this.identityMapCapacity = identityMapCapacity;
+        this.registryConfigs = registryConfigs;
     }
 
     @Override
     public SchemaCoder get() {
         return new ConfluentSchemaRegistryCoder(
-                this.subject, new CachedSchemaRegistryClient(url, 
identityMapCapacity));
+                this.subject,
+                new CachedSchemaRegistryClient(url, identityMapCapacity, 
registryConfigs));
     }
 
     @Override
@@ -65,11 +73,12 @@ class CachedSchemaCoderProvider implements 
SchemaCoder.SchemaCoderProvider {
         CachedSchemaCoderProvider that = (CachedSchemaCoderProvider) o;
         return identityMapCapacity == that.identityMapCapacity
                 && Objects.equals(subject, that.subject)
-                && url.equals(that.url);
+                && url.equals(that.url)
+                && Objects.equals(registryConfigs, that.registryConfigs);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(subject, url, identityMapCapacity);
+        return Objects.hash(subject, url, identityMapCapacity, 
registryConfigs);
     }
 }
diff --git 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
index 7becf50..c448a91 100644
--- 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
+++ 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
@@ -28,6 +28,8 @@ import org.apache.avro.specific.SpecificRecord;
 
 import javax.annotation.Nullable;
 
+import java.util.Map;
+
 /**
  * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that
  * uses Confluent Schema Registry.
@@ -60,8 +62,10 @@ public class ConfluentRegistryAvroDeserializationSchema<T>
 
     /**
      * Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link
-     * GenericRecord} using provided reader schema and looks up writer schema 
in Confluent Schema
-     * Registry.
+     * GenericRecord} using the provided reader schema and looks up the writer 
schema in the
+     * Confluent Schema Registry.
+     *
+     * <p>By default, this method supports up to 1000 cached schema versions.
      *
      * @param schema schema of produced records
      * @param url url of schema registry to connect
@@ -74,25 +78,65 @@ public class ConfluentRegistryAvroDeserializationSchema<T>
 
     /**
      * Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link
-     * GenericRecord} using provided reader schema and looks up writer schema 
in Confluent Schema
-     * Registry.
+     * GenericRecord} using the provided reader schema and looks up the writer 
schema in the
+     * Confluent Schema Registry.
      *
      * @param schema schema of produced records
      * @param url url of schema registry to connect
-     * @param identityMapCapacity maximum number of cached schema versions 
(default: 1000)
+     * @param identityMapCapacity maximum number of cached schema versions
      * @return deserialized record in form of {@link GenericRecord}
      */
     public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> 
forGeneric(
             Schema schema, String url, int identityMapCapacity) {
+        return forGeneric(schema, url, identityMapCapacity, null);
+    }
+
+    /**
+     * Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link
+     * GenericRecord} using the provided reader schema and looks up the writer 
schema in the
+     * Confluent Schema Registry.
+     *
+     * <p>By default, this method supports up to 1000 cached schema versions.
+     *
+     * @param schema schema of produced records
+     * @param url URL of schema registry to connect
+     * @param registryConfigs map with additional schema registry configs (for 
example SSL
+     *     properties)
+     * @return deserialized record in form of {@link GenericRecord}
+     */
+    public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> 
forGeneric(
+            Schema schema, String url, @Nullable Map<String, ?> 
registryConfigs) {
+        return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY, 
registryConfigs);
+    }
+
+    /**
+     * Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link
+     * GenericRecord} using the provided reader schema and looks up the writer 
schema in the
+     * Confluent Schema Registry.
+     *
+     * @param schema schema of produced records
+     * @param url URL of schema registry to connect
+     * @param identityMapCapacity maximum number of cached schema versions
+     * @param registryConfigs map with additional schema registry configs (for 
example SSL
+     *     properties)
+     * @return deserialized record in form of {@link GenericRecord}
+     */
+    public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> 
forGeneric(
+            Schema schema,
+            String url,
+            int identityMapCapacity,
+            @Nullable Map<String, ?> registryConfigs) {
         return new ConfluentRegistryAvroDeserializationSchema<>(
                 GenericRecord.class,
                 schema,
-                new CachedSchemaCoderProvider(url, identityMapCapacity));
+                new CachedSchemaCoderProvider(null, url, identityMapCapacity, 
registryConfigs));
     }
 
     /**
-     * Creates {@link AvroDeserializationSchema} that produces classes that 
were generated from avro
-     * schema and looks up writer schema in Confluent Schema Registry.
+     * Creates {@link AvroDeserializationSchema} that produces classes that 
were generated from Avro
+     * schema and looks up the writer schema in the Confluent Schema Registry.
+     *
+     * <p>By default, this method supports up to 1000 cached schema versions.
      *
      * @param tClass class of record to be produced
      * @param url url of schema registry to connect
@@ -100,22 +144,62 @@ public class ConfluentRegistryAvroDeserializationSchema<T>
      */
     public static <T extends SpecificRecord>
             ConfluentRegistryAvroDeserializationSchema<T> forSpecific(Class<T> 
tClass, String url) {
-        return forSpecific(tClass, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+        return forSpecific(tClass, url, DEFAULT_IDENTITY_MAP_CAPACITY, null);
     }
 
     /**
-     * Creates {@link AvroDeserializationSchema} that produces classes that 
were generated from avro
-     * schema and looks up writer schema in Confluent Schema Registry.
+     * Creates {@link AvroDeserializationSchema} that produces classes that 
were generated from Avro
+     * schema and looks up the writer schema in the Confluent Schema Registry.
      *
      * @param tClass class of record to be produced
      * @param url url of schema registry to connect
-     * @param identityMapCapacity maximum number of cached schema versions 
(default: 1000)
+     * @param identityMapCapacity maximum number of cached schema versions
      * @return deserialized record
      */
     public static <T extends SpecificRecord>
             ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
                     Class<T> tClass, String url, int identityMapCapacity) {
+        return forSpecific(tClass, url, identityMapCapacity, null);
+    }
+
+    /**
+     * Creates {@link AvroDeserializationSchema} that produces classes that 
were generated from Avro
+     * schema and looks up the writer schema in the Confluent Schema Registry.
+     *
+     * <p>By default, this method supports up to 1000 cached schema versions.
+     *
+     * @param tClass class of record to be produced
+     * @param url URL of schema registry to connect
+     * @param registryConfigs map with additional schema registry configs (for 
example SSL
+     *     properties)
+     * @return deserialized record
+     */
+    public static <T extends SpecificRecord>
+            ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
+                    Class<T> tClass, String url, @Nullable Map<String, ?> 
registryConfigs) {
+        return forSpecific(tClass, url, DEFAULT_IDENTITY_MAP_CAPACITY, 
registryConfigs);
+    }
+
+    /**
+     * Creates {@link AvroDeserializationSchema} that produces classes that 
were generated from Avro
+     * schema and looks up the writer schema in the Confluent Schema Registry.
+     *
+     * @param tClass class of record to be produced
+     * @param url URL of schema registry to connect
+     * @param identityMapCapacity maximum number of cached schema versions
+     * @param registryConfigs map with additional schema registry configs (for 
example SSL
+     *     properties)
+     * @return deserialized record
+     */
+    public static <T extends SpecificRecord>
+            ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
+                    Class<T> tClass,
+                    String url,
+                    int identityMapCapacity,
+                    @Nullable Map<String, ?> registryConfigs) {
         return new ConfluentRegistryAvroDeserializationSchema<>(
-                tClass, null, new CachedSchemaCoderProvider(url, 
identityMapCapacity));
+                tClass,
+                null,
+                new CachedSchemaCoderProvider(null, url, identityMapCapacity, 
registryConfigs));
     }
 }
diff --git 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchema.java
 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchema.java
index f06193f..b3b574c 100644
--- 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchema.java
+++ 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchema.java
@@ -26,6 +26,10 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificRecord;
 
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
 /**
  * Serialization schema that serializes to Avro binary format that uses 
Confluent Schema Registry.
  *
@@ -57,38 +61,83 @@ public class ConfluentRegistryAvroSerializationSchema<T>
 
     /**
      * Creates {@link AvroSerializationSchema} that produces byte arrays that 
were generated from
-     * avro schema and writes the writer schema to Confluent Schema Registry.
+     * Avro schema and writes the writer schema to Confluent Schema Registry.
      *
      * @param tClass the type to be serialized
      * @param subject subject of schema registry to produce
-     * @param schemaRegistryUrl url of schema registry to connect
-     * @return Serialized record
+     * @param schemaRegistryUrl URL of schema registry to connect
+     * @return serialized record
      */
     public static <T extends SpecificRecord>
             ConfluentRegistryAvroSerializationSchema<T> forSpecific(
                     Class<T> tClass, String subject, String schemaRegistryUrl) 
{
+        return forSpecific(tClass, subject, schemaRegistryUrl, null);
+    }
+
+    /**
+     * Creates {@link AvroSerializationSchema} that produces byte arrays that 
were generated from
+     * Avro schema and writes the writer schema to Confluent Schema Registry.
+     *
+     * @param tClass the type to be serialized
+     * @param subject subject of schema registry to produce
+     * @param schemaRegistryUrl URL of schema registry to connect
+     * @param registryConfigs map with additional schema registry configs (for 
example SSL
+     *     properties)
+     * @return serialized record
+     */
+    public static <T extends SpecificRecord>
+            ConfluentRegistryAvroSerializationSchema<T> forSpecific(
+                    Class<T> tClass,
+                    String subject,
+                    String schemaRegistryUrl,
+                    @Nullable Map<String, ?> registryConfigs) {
         return new ConfluentRegistryAvroSerializationSchema<>(
                 tClass,
                 null,
                 new CachedSchemaCoderProvider(
-                        subject, schemaRegistryUrl, 
DEFAULT_IDENTITY_MAP_CAPACITY));
+                        subject,
+                        schemaRegistryUrl,
+                        DEFAULT_IDENTITY_MAP_CAPACITY,
+                        registryConfigs));
     }
 
     /**
      * Creates {@link AvroSerializationSchema} that produces byte arrays that 
were generated from
-     * avro schema and writes the writer schema to Confluent Schema Registry.
+     * Avro schema and writes the writer schema to Confluent Schema Registry.
      *
      * @param subject subject of schema registry to produce
      * @param schema schema that will be used for serialization
-     * @param schemaRegistryUrl url of schema registry to connect
-     * @return Serialized record in form of byte array
+     * @param schemaRegistryUrl URL of schema registry to connect
+     * @return serialized record
      */
     public static ConfluentRegistryAvroSerializationSchema<GenericRecord> 
forGeneric(
             String subject, Schema schema, String schemaRegistryUrl) {
+        return forGeneric(subject, schema, schemaRegistryUrl, null);
+    }
+
+    /**
+     * Creates {@link AvroSerializationSchema} that produces byte arrays that 
were generated from
+     * Avro schema and writes the writer schema to Confluent Schema Registry.
+     *
+     * @param subject subject of schema registry to produce
+     * @param schema schema that will be used for serialization
+     * @param schemaRegistryUrl URL of schema registry to connect
+     * @param registryConfigs map with additional schema registry configs (for 
example SSL
+     *     properties)
+     * @return serialized record
+     */
+    public static ConfluentRegistryAvroSerializationSchema<GenericRecord> 
forGeneric(
+            String subject,
+            Schema schema,
+            String schemaRegistryUrl,
+            @Nullable Map<String, ?> registryConfigs) {
         return new ConfluentRegistryAvroSerializationSchema<>(
                 GenericRecord.class,
                 schema,
                 new CachedSchemaCoderProvider(
-                        subject, schemaRegistryUrl, 
DEFAULT_IDENTITY_MAP_CAPACITY));
+                        subject,
+                        schemaRegistryUrl,
+                        DEFAULT_IDENTITY_MAP_CAPACITY,
+                        registryConfigs));
     }
 }
diff --git 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
index cff1870..525cd60 100644
--- 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
+++ 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
@@ -42,12 +42,22 @@ import 
org.apache.flink.table.factories.SerializationFormatFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import static 
org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.BASIC_AUTH_CREDENTIALS_SOURCE;
+import static 
org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.BASIC_AUTH_USER_INFO;
+import static 
org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.BEARER_AUTH_CREDENTIALS_SOURCE;
+import static 
org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.BEARER_AUTH_TOKEN;
 import static 
org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SCHEMA_REGISTRY_SUBJECT;
 import static 
org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SCHEMA_REGISTRY_URL;
+import static 
org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SSL_KEYSTORE_LOCATION;
+import static 
org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SSL_KEYSTORE_PASSWORD;
+import static 
org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SSL_TRUSTSTORE_LOCATION;
+import static 
org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SSL_TRUSTSTORE_PASSWORD;
 
 /**
  * Table format factory for providing configured instances of Schema Registry 
Avro to RowData {@link
@@ -64,6 +74,8 @@ public class RegistryAvroFormatFactory
         FactoryUtil.validateFactoryOptions(this, formatOptions);
 
         String schemaRegistryURL = formatOptions.get(SCHEMA_REGISTRY_URL);
+        Map<String, ?> optionalPropertiesMap = 
buildOptionalPropertiesMap(formatOptions);
+
         return new DecodingFormat<DeserializationSchema<RowData>>() {
             @Override
             public DeserializationSchema<RowData> createRuntimeDecoder(
@@ -72,8 +84,14 @@ public class RegistryAvroFormatFactory
                 final TypeInformation<RowData> rowDataTypeInfo =
                         context.createTypeInformation(producedDataType);
                 return new AvroRowDataDeserializationSchema(
-                        ConfluentRegistryAvroDeserializationSchema.forGeneric(
-                                AvroSchemaConverter.convertToSchema(rowType), 
schemaRegistryURL),
+                        optionalPropertiesMap.isEmpty()
+                                ? 
ConfluentRegistryAvroDeserializationSchema.forGeneric(
+                                        
AvroSchemaConverter.convertToSchema(rowType),
+                                        schemaRegistryURL)
+                                : 
ConfluentRegistryAvroDeserializationSchema.forGeneric(
+                                        
AvroSchemaConverter.convertToSchema(rowType),
+                                        schemaRegistryURL,
+                                        optionalPropertiesMap),
                         AvroToRowDataConverters.createRowConverter(rowType),
                         rowDataTypeInfo);
             }
@@ -92,6 +110,8 @@ public class RegistryAvroFormatFactory
 
         String schemaRegistryURL = formatOptions.get(SCHEMA_REGISTRY_URL);
         Optional<String> subject = 
formatOptions.getOptional(SCHEMA_REGISTRY_SUBJECT);
+        Map<String, ?> optionalPropertiesMap = 
buildOptionalPropertiesMap(formatOptions);
+
         if (!subject.isPresent()) {
             throw new ValidationException(
                     String.format(
@@ -106,10 +126,16 @@ public class RegistryAvroFormatFactory
                 final RowType rowType = (RowType) 
consumedDataType.getLogicalType();
                 return new AvroRowDataSerializationSchema(
                         rowType,
-                        ConfluentRegistryAvroSerializationSchema.forGeneric(
-                                subject.get(),
-                                AvroSchemaConverter.convertToSchema(rowType),
-                                schemaRegistryURL),
+                        optionalPropertiesMap.isEmpty()
+                                ? 
ConfluentRegistryAvroSerializationSchema.forGeneric(
+                                        subject.get(),
+                                        
AvroSchemaConverter.convertToSchema(rowType),
+                                        schemaRegistryURL)
+                                : 
ConfluentRegistryAvroSerializationSchema.forGeneric(
+                                        subject.get(),
+                                        
AvroSchemaConverter.convertToSchema(rowType),
+                                        schemaRegistryURL,
+                                        optionalPropertiesMap),
                         RowDataToAvroConverters.createConverter(rowType));
             }
 
@@ -136,6 +162,45 @@ public class RegistryAvroFormatFactory
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
         options.add(SCHEMA_REGISTRY_SUBJECT);
+        options.add(SSL_KEYSTORE_LOCATION);
+        options.add(SSL_KEYSTORE_PASSWORD);
+        options.add(SSL_TRUSTSTORE_LOCATION);
+        options.add(SSL_TRUSTSTORE_PASSWORD);
+        options.add(BASIC_AUTH_CREDENTIALS_SOURCE);
+        options.add(BASIC_AUTH_USER_INFO);
+        options.add(BEARER_AUTH_CREDENTIALS_SOURCE);
+        options.add(BEARER_AUTH_TOKEN);
         return options;
     }
+
+    private Map<String, String> buildOptionalPropertiesMap(ReadableConfig 
formatOptions) {
+        final Map<String, String> properties = new HashMap<>();
+
+        formatOptions
+                .getOptional(SSL_KEYSTORE_LOCATION)
+                .ifPresent(v -> 
properties.put("schema.registry.ssl.keystore.location", v));
+        formatOptions
+                .getOptional(SSL_KEYSTORE_PASSWORD)
+                .ifPresent(v -> 
properties.put("schema.registry.ssl.keystore.password", v));
+        formatOptions
+                .getOptional(SSL_TRUSTSTORE_LOCATION)
+                .ifPresent(v -> 
properties.put("schema.registry.ssl.truststore.location", v));
+        formatOptions
+                .getOptional(SSL_TRUSTSTORE_PASSWORD)
+                .ifPresent(v -> 
properties.put("schema.registry.ssl.truststore.password", v));
+        formatOptions
+                .getOptional(BASIC_AUTH_CREDENTIALS_SOURCE)
+                .ifPresent(v -> 
properties.put("basic.auth.credentials.source", v));
+        formatOptions
+                .getOptional(BASIC_AUTH_USER_INFO)
+                .ifPresent(v -> properties.put("basic.auth.user.info", v));
+        formatOptions
+                .getOptional(BEARER_AUTH_CREDENTIALS_SOURCE)
+                .ifPresent(v -> 
properties.put("bearer.auth.credentials.source", v));
+        formatOptions
+                .getOptional(BEARER_AUTH_TOKEN)
+                .ifPresent(v -> properties.put("bearer.auth.token", v));
+
+        return properties;
+    }
 }
diff --git 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroOptions.java
 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroOptions.java
index c94ebc1..8132340 100644
--- 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroOptions.java
+++ 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroOptions.java
@@ -37,4 +37,56 @@ public class RegistryAvroOptions {
                     .noDefaultValue()
                     .withDescription(
                             "Subject name to write to the Schema Registry 
service, required for sink");
+
+    // 
--------------------------------------------------------------------------------------------
+    // Commonly used options maintained by Flink for convenience
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final ConfigOption<String> SSL_KEYSTORE_LOCATION =
+            ConfigOptions.key("ssl.keystore.location")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Location / File of SSL keystore");
+
+    public static final ConfigOption<String> SSL_KEYSTORE_PASSWORD =
+            ConfigOptions.key("ssl.keystore.password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Password for SSL keystore");
+
+    public static final ConfigOption<String> SSL_TRUSTSTORE_LOCATION =
+            ConfigOptions.key("ssl.truststore.location")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Location / File of SSL truststore");
+
+    public static final ConfigOption<String> SSL_TRUSTSTORE_PASSWORD =
+            ConfigOptions.key("ssl.truststore.password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Password for SSL truststore");
+
+    public static final ConfigOption<String> BASIC_AUTH_CREDENTIALS_SOURCE =
+            ConfigOptions.key("basic-auth.credentials-source")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Basic auth credentials source for Schema 
Registry");
+
+    public static final ConfigOption<String> BASIC_AUTH_USER_INFO =
+            ConfigOptions.key("basic-auth.user-info")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Basic auth user info for schema 
registry");
+
+    public static final ConfigOption<String> BEARER_AUTH_CREDENTIALS_SOURCE =
+            ConfigOptions.key("bearer-auth.credentials-source")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Bearer auth credentials source for 
Schema Registry");
+
+    public static final ConfigOption<String> BEARER_AUTH_TOKEN =
+            ConfigOptions.key("bearer-auth.token")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Bearer auth token for Schema Registry");
 }
diff --git 
a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProviderTest.java
 
b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProviderTest.java
new file mode 100644
index 0000000..db877d8
--- /dev/null
+++ 
b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProviderTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.RestService;
+import 
io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider;
+import 
io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider;
+import org.junit.Test;
+import org.powermock.reflect.Whitebox;
+
+import javax.net.ssl.SSLSocketFactory;
+
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Tests for properties set by {@link RegistryAvroFormatFactory} in {@link
+ * CachedSchemaCoderProvider}.
+ */
+public class CachedSchemaCoderProviderTest {
+
+    @Test
+    public void testThatSslIsNotInitializedForNoSslProperties() {
+        CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(new 
HashMap<>());
+        SSLSocketFactory sslSocketFactory = 
getSslSocketFactoryFromProvider(provider);
+
+        assertNull(sslSocketFactory);
+    }
+
+    @Test
+    public void testThatSslIsInitializedForSslProperties() throws 
URISyntaxException {
+        String keystoreFile = getAbsolutePath("/test-keystore.jks");
+        String keystorePassword = "123456";
+        Map<String, String> configs = new HashMap<>();
+        configs.put("schema.registry.ssl.keystore.location", keystoreFile);
+        configs.put("schema.registry.ssl.keystore.password", keystorePassword);
+        configs.put("schema.registry.ssl.truststore.location", keystoreFile);
+        configs.put("schema.registry.ssl.truststore.password", 
keystorePassword);
+
+        CachedSchemaCoderProvider provider = 
initCachedSchemaCoderProvider(configs);
+        SSLSocketFactory sslSocketFactory = 
getSslSocketFactoryFromProvider(provider);
+
+        assertNotNull(sslSocketFactory);
+    }
+
+    @Test
+    public void testThatBasicAuthIsNotInitializedForNoBasicAuthProperties() {
+        CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(new 
HashMap<>());
+        BasicAuthCredentialProvider basicAuthCredentialProvider =
+                getBasicAuthFromProvider(provider);
+
+        assertNull(basicAuthCredentialProvider);
+    }
+
+    @Test
+    public void testThatBasicAuthIsInitializedForBasicAuthProperties() {
+        String userPassword = "user:pwd";
+        Map<String, String> configs = new HashMap<>();
+        configs.put("basic.auth.credentials.source", "USER_INFO");
+        configs.put("basic.auth.user.info", userPassword);
+
+        CachedSchemaCoderProvider provider = 
initCachedSchemaCoderProvider(configs);
+        BasicAuthCredentialProvider basicAuthCredentialProvider =
+                getBasicAuthFromProvider(provider);
+
+        assertNotNull(basicAuthCredentialProvider);
+        assertEquals(basicAuthCredentialProvider.getUserInfo(null), 
userPassword);
+    }
+
+    @Test
+    public void testThatBearerAuthIsNotInitializedForNoBearerAuthProperties() {
+        CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(new 
HashMap<>());
+        BearerAuthCredentialProvider bearerAuthCredentialProvider =
+                getBearerAuthFromProvider(provider);
+
+        assertNull(bearerAuthCredentialProvider);
+    }
+
+    @Test
+    public void testThatBearerAuthIsInitializedForBearerAuthProperties() {
+        String token = "123456";
+        Map<String, String> configs = new HashMap<>();
+        configs.put("bearer.auth.credentials.source", "STATIC_TOKEN");
+        configs.put("bearer.auth.token", token);
+
+        CachedSchemaCoderProvider provider = 
initCachedSchemaCoderProvider(configs);
+        BearerAuthCredentialProvider bearerAuthCredentialProvider =
+                getBearerAuthFromProvider(provider);
+
+        assertNotNull(bearerAuthCredentialProvider);
+        assertEquals(bearerAuthCredentialProvider.getBearerToken(null), token);
+    }
+
+    private String getAbsolutePath(String path) throws URISyntaxException {
+        return 
CachedSchemaCoderProviderTest.class.getResource(path).toURI().getPath();
+    }
+
+    private CachedSchemaCoderProvider 
initCachedSchemaCoderProvider(Map<String, String> config) {
+        return new CachedSchemaCoderProvider("test", "someUrl", 1000, config);
+    }
+
+    private SSLSocketFactory 
getSslSocketFactoryFromProvider(CachedSchemaCoderProvider provider) {
+        return getInternalStateFromRestService("sslSocketFactory", provider);
+    }
+
+    private BasicAuthCredentialProvider getBasicAuthFromProvider(
+            CachedSchemaCoderProvider provider) {
+        return getInternalStateFromRestService("basicAuthCredentialProvider", 
provider);
+    }
+
+    private BearerAuthCredentialProvider getBearerAuthFromProvider(
+            CachedSchemaCoderProvider provider) {
+        return getInternalStateFromRestService("bearerAuthCredentialProvider", 
provider);
+    }
+
+    private <T> T getInternalStateFromRestService(String name, 
CachedSchemaCoderProvider provider) {
+        CachedSchemaRegistryClient cachedSchemaRegistryClient =
+                Whitebox.getInternalState(provider.get(), 
"schemaRegistryClient");
+        RestService restService =
+                Whitebox.getInternalState(cachedSchemaRegistryClient, 
"restService");
+        return Whitebox.getInternalState(restService, name);
+    }
+}
diff --git 
a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java
 
b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java
index 3171c82..21f9842 100644
--- 
a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java
+++ 
b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java
@@ -67,6 +67,19 @@ public class RegistryAvroFormatFactoryTest {
     private static final String SUBJECT = "test-subject";
     private static final String REGISTRY_URL = "http://localhost:8081";;
 
+    private static final Map<String, String> EXPECTED_OPTIONAL_PROPERTIES = 
new HashMap<>();
+
+    static {
+        EXPECTED_OPTIONAL_PROPERTIES.put(
+                "schema.registry.ssl.keystore.location", 
getAbsolutePath("/test-keystore.jks"));
+        
EXPECTED_OPTIONAL_PROPERTIES.put("schema.registry.ssl.keystore.password", 
"123456");
+        EXPECTED_OPTIONAL_PROPERTIES.put(
+                "schema.registry.ssl.truststore.location", 
getAbsolutePath("/test-keystore.jks"));
+        
EXPECTED_OPTIONAL_PROPERTIES.put("schema.registry.ssl.truststore.password", 
"123456");
+        EXPECTED_OPTIONAL_PROPERTIES.put("basic.auth.credentials.source", 
"USER_INFO");
+        EXPECTED_OPTIONAL_PROPERTIES.put("basic.auth.user.info", "user:pwd");
+    }
+
     @Rule public ExpectedException thrown = ExpectedException.none();
 
     @Test
@@ -127,6 +140,52 @@ public class RegistryAvroFormatFactoryTest {
         createTableSink(SCHEMA, options);
     }
 
+    @Test
+    public void testDeserializationSchemaWithOptionalProperties() {
+        final AvroRowDataDeserializationSchema expectedDeser =
+                new AvroRowDataDeserializationSchema(
+                        ConfluentRegistryAvroDeserializationSchema.forGeneric(
+                                AvroSchemaConverter.convertToSchema(ROW_TYPE),
+                                REGISTRY_URL,
+                                EXPECTED_OPTIONAL_PROPERTIES),
+                        AvroToRowDataConverters.createRowConverter(ROW_TYPE),
+                        InternalTypeInfo.of(ROW_TYPE));
+
+        final DynamicTableSource actualSource = createTableSource(SCHEMA, 
getOptionalProperties());
+        assertThat(actualSource, 
instanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class));
+        TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+                (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+        DeserializationSchema<RowData> actualDeser =
+                scanSourceMock.valueFormat.createRuntimeDecoder(
+                        ScanRuntimeProviderContext.INSTANCE, 
SCHEMA.toPhysicalRowDataType());
+
+        assertEquals(expectedDeser, actualDeser);
+    }
+
+    @Test
+    public void testSerializationSchemaWithOptionalProperties() {
+        final AvroRowDataSerializationSchema expectedSer =
+                new AvroRowDataSerializationSchema(
+                        ROW_TYPE,
+                        ConfluentRegistryAvroSerializationSchema.forGeneric(
+                                SUBJECT,
+                                AvroSchemaConverter.convertToSchema(ROW_TYPE),
+                                REGISTRY_URL,
+                                EXPECTED_OPTIONAL_PROPERTIES),
+                        RowDataToAvroConverters.createConverter(ROW_TYPE));
+
+        final DynamicTableSink actualSink = createTableSink(SCHEMA, 
getOptionalProperties());
+        assertThat(actualSink, 
instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));
+        TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+                (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+        SerializationSchema<RowData> actualSer =
+                sinkMock.valueFormat.createRuntimeEncoder(null, 
SCHEMA.toPhysicalRowDataType());
+
+        assertEquals(expectedSer, actualSer);
+    }
+
     // ------------------------------------------------------------------------
     //  Utilities
     // ------------------------------------------------------------------------
@@ -153,4 +212,37 @@ public class RegistryAvroFormatFactoryTest {
         options.put("avro-confluent.schema-registry.url", REGISTRY_URL);
         return options;
     }
+
+    private Map<String, String> getOptionalProperties() {
+        final Map<String, String> properties = new HashMap<>();
+        // defined via Flink maintained options
+        properties.put(
+                RegistryAvroOptions.SSL_KEYSTORE_LOCATION.key(),
+                getAbsolutePath("/test-keystore.jks"));
+        properties.put(RegistryAvroOptions.SSL_KEYSTORE_PASSWORD.key(), 
"123456");
+        properties.put(
+                RegistryAvroOptions.SSL_TRUSTSTORE_LOCATION.key(),
+                getAbsolutePath("/test-keystore.jks"));
+        properties.put(RegistryAvroOptions.SSL_TRUSTSTORE_PASSWORD.key(), 
"123456");
+        
properties.put(RegistryAvroOptions.BASIC_AUTH_CREDENTIALS_SOURCE.key(), 
"USER_INFO");
+        properties.put(RegistryAvroOptions.BASIC_AUTH_USER_INFO.key(), 
"user:pwd");
+
+        return getModifiedOptions(
+                opts ->
+                        properties.forEach(
+                                (k, v) ->
+                                        opts.put(
+                                                String.format(
+                                                        "%s.%s",
+                                                        
RegistryAvroFormatFactory.IDENTIFIER, k),
+                                                v)));
+    }
+
+    private static String getAbsolutePath(String path) {
+        try {
+            return 
CachedSchemaCoderProviderTest.class.getResource(path).toURI().getPath();
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
 }
diff --git 
a/flink-formats/flink-avro-confluent-registry/src/test/resources/test-keystore.jks
 
b/flink-formats/flink-avro-confluent-registry/src/test/resources/test-keystore.jks
new file mode 100644
index 0000000..b682158
Binary files /dev/null and 
b/flink-formats/flink-avro-confluent-registry/src/test/resources/test-keystore.jks
 differ

Reply via email to