This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 147d2b1bff6a055c0f8c928af7afc01dd9aa048b Author: Lars Bachmann <lars.bachm...@sony.com> AuthorDate: Thu Apr 29 09:31:51 2021 +0200 [FLINK-21229][avro-confluent-registry] Add Confluent schema registry SSL support --- .../confluent/CachedSchemaCoderProvider.java | 21 ++- ...ConfluentRegistryAvroDeserializationSchema.java | 110 ++++++++++++++-- .../ConfluentRegistryAvroSerializationSchema.java | 65 +++++++-- .../confluent/RegistryAvroFormatFactory.java | 77 ++++++++++- .../registry/confluent/RegistryAvroOptions.java | 52 ++++++++ .../confluent/CachedSchemaCoderProviderTest.java | 145 +++++++++++++++++++++ .../confluent/RegistryAvroFormatFactoryTest.java | 92 +++++++++++++ .../src/test/resources/test-keystore.jks | Bin 0 -> 2327 bytes 8 files changed, 529 insertions(+), 33 deletions(-) diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java index b5f3200..53be09c 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java @@ -25,10 +25,11 @@ import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import javax.annotation.Nullable; +import java.util.Map; import java.util.Objects; /** - * A {@link SchemaCoder.SchemaCoderProvider} that uses a cached schema registry client underlying. * + * A {@link SchemaCoder.SchemaCoderProvider} that uses a cached schema registry client underneath. */ @Internal class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider { @@ -37,21 +38,28 @@ class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider { private final String subject; private final String url; private final int identityMapCapacity; + private final @Nullable Map<String, ?> registryConfigs; CachedSchemaCoderProvider(String url, int identityMapCapacity) { - this(null, url, identityMapCapacity); + this(null, url, identityMapCapacity, null); } - CachedSchemaCoderProvider(@Nullable String subject, String url, int identityMapCapacity) { + CachedSchemaCoderProvider( + @Nullable String subject, + String url, + int identityMapCapacity, + @Nullable Map<String, ?> registryConfigs) { this.subject = subject; this.url = Objects.requireNonNull(url); this.identityMapCapacity = identityMapCapacity; + this.registryConfigs = registryConfigs; } @Override public SchemaCoder get() { return new ConfluentSchemaRegistryCoder( - this.subject, new CachedSchemaRegistryClient(url, identityMapCapacity)); + this.subject, + new CachedSchemaRegistryClient(url, identityMapCapacity, registryConfigs)); } @Override @@ -65,11 +73,12 @@ class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider { CachedSchemaCoderProvider that = (CachedSchemaCoderProvider) o; return identityMapCapacity == that.identityMapCapacity && Objects.equals(subject, that.subject) - && url.equals(that.url); + && url.equals(that.url) + && Objects.equals(registryConfigs, that.registryConfigs); } @Override public int hashCode() { - return Objects.hash(subject, url, identityMapCapacity); + return Objects.hash(subject, url, identityMapCapacity, registryConfigs); } } diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java index 7becf50..c448a91 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java @@ -28,6 +28,8 @@ import org.apache.avro.specific.SpecificRecord; import javax.annotation.Nullable; +import java.util.Map; + /** * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that * uses Confluent Schema Registry. @@ -60,8 +62,10 @@ public class ConfluentRegistryAvroDeserializationSchema<T> /** * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link - * GenericRecord} using provided reader schema and looks up writer schema in Confluent Schema - * Registry. + * GenericRecord} using the provided reader schema and looks up the writer schema in the + * Confluent Schema Registry. + * + * <p>By default, this method supports up to 1000 cached schema versions. * * @param schema schema of produced records * @param url url of schema registry to connect @@ -74,25 +78,65 @@ public class ConfluentRegistryAvroDeserializationSchema<T> /** * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link - * GenericRecord} using provided reader schema and looks up writer schema in Confluent Schema - * Registry. + * GenericRecord} using the provided reader schema and looks up the writer schema in the + * Confluent Schema Registry. * * @param schema schema of produced records * @param url url of schema registry to connect - * @param identityMapCapacity maximum number of cached schema versions (default: 1000) + * @param identityMapCapacity maximum number of cached schema versions * @return deserialized record in form of {@link GenericRecord} */ public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric( Schema schema, String url, int identityMapCapacity) { + return forGeneric(schema, url, identityMapCapacity, null); + } + + /** + * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link + * GenericRecord} using the provided reader schema and looks up the writer schema in the + * Confluent Schema Registry. + * + * <p>By default, this method supports up to 1000 cached schema versions. + * + * @param schema schema of produced records + * @param url URL of schema registry to connect + * @param registryConfigs map with additional schema registry configs (for example SSL + * properties) + * @return deserialized record in form of {@link GenericRecord} + */ + public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric( + Schema schema, String url, @Nullable Map<String, ?> registryConfigs) { + return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY, registryConfigs); + } + + /** + * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link + * GenericRecord} using the provided reader schema and looks up the writer schema in the + * Confluent Schema Registry. + * + * @param schema schema of produced records + * @param url URL of schema registry to connect + * @param identityMapCapacity maximum number of cached schema versions + * @param registryConfigs map with additional schema registry configs (for example SSL + * properties) + * @return deserialized record in form of {@link GenericRecord} + */ + public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric( + Schema schema, + String url, + int identityMapCapacity, + @Nullable Map<String, ?> registryConfigs) { return new ConfluentRegistryAvroDeserializationSchema<>( GenericRecord.class, schema, - new CachedSchemaCoderProvider(url, identityMapCapacity)); + new CachedSchemaCoderProvider(null, url, identityMapCapacity, registryConfigs)); } /** - * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro - * schema and looks up writer schema in Confluent Schema Registry. + * Creates {@link AvroDeserializationSchema} that produces classes that were generated from Avro + * schema and looks up the writer schema in the Confluent Schema Registry. + * + * <p>By default, this method supports up to 1000 cached schema versions. * * @param tClass class of record to be produced * @param url url of schema registry to connect @@ -100,22 +144,62 @@ public class ConfluentRegistryAvroDeserializationSchema<T> */ public static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(Class<T> tClass, String url) { - return forSpecific(tClass, url, DEFAULT_IDENTITY_MAP_CAPACITY); + return forSpecific(tClass, url, DEFAULT_IDENTITY_MAP_CAPACITY, null); } /** - * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro - * schema and looks up writer schema in Confluent Schema Registry. + * Creates {@link AvroDeserializationSchema} that produces classes that were generated from Avro + * schema and looks up the writer schema in the Confluent Schema Registry. * * @param tClass class of record to be produced * @param url url of schema registry to connect - * @param identityMapCapacity maximum number of cached schema versions (default: 1000) + * @param identityMapCapacity maximum number of cached schema versions * @return deserialized record */ public static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific( Class<T> tClass, String url, int identityMapCapacity) { + return forSpecific(tClass, url, identityMapCapacity, null); + } + + /** + * Creates {@link AvroDeserializationSchema} that produces classes that were generated from Avro + * schema and looks up the writer schema in the Confluent Schema Registry. + * + * <p>By default, this method supports up to 1000 cached schema versions. + * + * @param tClass class of record to be produced + * @param url URL of schema registry to connect + * @param registryConfigs map with additional schema registry configs (for example SSL + * properties) + * @return deserialized record + */ + public static <T extends SpecificRecord> + ConfluentRegistryAvroDeserializationSchema<T> forSpecific( + Class<T> tClass, String url, @Nullable Map<String, ?> registryConfigs) { + return forSpecific(tClass, url, DEFAULT_IDENTITY_MAP_CAPACITY, registryConfigs); + } + + /** + * Creates {@link AvroDeserializationSchema} that produces classes that were generated from Avro + * schema and looks up the writer schema in the Confluent Schema Registry. + * + * @param tClass class of record to be produced + * @param url URL of schema registry to connect + * @param identityMapCapacity maximum number of cached schema versions + * @param registryConfigs map with additional schema registry configs (for example SSL + * properties) + * @return deserialized record + */ + public static <T extends SpecificRecord> + ConfluentRegistryAvroDeserializationSchema<T> forSpecific( + Class<T> tClass, + String url, + int identityMapCapacity, + @Nullable Map<String, ?> registryConfigs) { return new ConfluentRegistryAvroDeserializationSchema<>( - tClass, null, new CachedSchemaCoderProvider(url, identityMapCapacity)); + tClass, + null, + new CachedSchemaCoderProvider(null, url, identityMapCapacity, registryConfigs)); } } diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchema.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchema.java index f06193f..b3b574c 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchema.java +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchema.java @@ -26,6 +26,10 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.specific.SpecificRecord; +import javax.annotation.Nullable; + +import java.util.Map; + /** * Serialization schema that serializes to Avro binary format that uses Confluent Schema Registry. * @@ -57,38 +61,83 @@ public class ConfluentRegistryAvroSerializationSchema<T> /** * Creates {@link AvroSerializationSchema} that produces byte arrays that were generated from - * avro schema and writes the writer schema to Confluent Schema Registry. + * Avro schema and writes the writer schema to Confluent Schema Registry. * * @param tClass the type to be serialized * @param subject subject of schema registry to produce - * @param schemaRegistryUrl url of schema registry to connect - * @return Serialized record + * @param schemaRegistryUrl URL of schema registry to connect + * @return serialized record */ public static <T extends SpecificRecord> ConfluentRegistryAvroSerializationSchema<T> forSpecific( Class<T> tClass, String subject, String schemaRegistryUrl) { + return forSpecific(tClass, subject, schemaRegistryUrl, null); + } + + /** + * Creates {@link AvroSerializationSchema} that produces byte arrays that were generated from + * Avro schema and writes the writer schema to Confluent Schema Registry. + * + * @param tClass the type to be serialized + * @param subject subject of schema registry to produce + * @param schemaRegistryUrl URL of schema registry to connect + * @param registryConfigs map with additional schema registry configs (for example SSL + * properties) + * @return serialized record + */ + public static <T extends SpecificRecord> + ConfluentRegistryAvroSerializationSchema<T> forSpecific( + Class<T> tClass, + String subject, + String schemaRegistryUrl, + @Nullable Map<String, ?> registryConfigs) { return new ConfluentRegistryAvroSerializationSchema<>( tClass, null, new CachedSchemaCoderProvider( - subject, schemaRegistryUrl, DEFAULT_IDENTITY_MAP_CAPACITY)); + subject, + schemaRegistryUrl, + DEFAULT_IDENTITY_MAP_CAPACITY, + registryConfigs)); } /** * Creates {@link AvroSerializationSchema} that produces byte arrays that were generated from - * avro schema and writes the writer schema to Confluent Schema Registry. + * Avro schema and writes the writer schema to Confluent Schema Registry. * * @param subject subject of schema registry to produce * @param schema schema that will be used for serialization - * @param schemaRegistryUrl url of schema registry to connect - * @return Serialized record in form of byte array + * @param schemaRegistryUrl URL of schema registry to connect + * @return serialized record */ public static ConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric( String subject, Schema schema, String schemaRegistryUrl) { + return forGeneric(subject, schema, schemaRegistryUrl, null); + } + + /** + * Creates {@link AvroSerializationSchema} that produces byte arrays that were generated from + * Avro schema and writes the writer schema to Confluent Schema Registry. + * + * @param subject subject of schema registry to produce + * @param schema schema that will be used for serialization + * @param schemaRegistryUrl URL of schema registry to connect + * @param registryConfigs map with additional schema registry configs (for example SSL + * properties) + * @return serialized record + */ + public static ConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric( + String subject, + Schema schema, + String schemaRegistryUrl, + @Nullable Map<String, ?> registryConfigs) { return new ConfluentRegistryAvroSerializationSchema<>( GenericRecord.class, schema, new CachedSchemaCoderProvider( - subject, schemaRegistryUrl, DEFAULT_IDENTITY_MAP_CAPACITY)); + subject, + schemaRegistryUrl, + DEFAULT_IDENTITY_MAP_CAPACITY, + registryConfigs)); } } diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java index cff1870..525cd60 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java @@ -42,12 +42,22 @@ import org.apache.flink.table.factories.SerializationFormatFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Optional; import java.util.Set; +import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.BASIC_AUTH_CREDENTIALS_SOURCE; +import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.BASIC_AUTH_USER_INFO; +import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.BEARER_AUTH_CREDENTIALS_SOURCE; +import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.BEARER_AUTH_TOKEN; import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SCHEMA_REGISTRY_SUBJECT; import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SCHEMA_REGISTRY_URL; +import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SSL_KEYSTORE_LOCATION; +import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SSL_KEYSTORE_PASSWORD; +import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SSL_TRUSTSTORE_LOCATION; +import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SSL_TRUSTSTORE_PASSWORD; /** * Table format factory for providing configured instances of Schema Registry Avro to RowData {@link @@ -64,6 +74,8 @@ public class RegistryAvroFormatFactory FactoryUtil.validateFactoryOptions(this, formatOptions); String schemaRegistryURL = formatOptions.get(SCHEMA_REGISTRY_URL); + Map<String, ?> optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions); + return new DecodingFormat<DeserializationSchema<RowData>>() { @Override public DeserializationSchema<RowData> createRuntimeDecoder( @@ -72,8 +84,14 @@ public class RegistryAvroFormatFactory final TypeInformation<RowData> rowDataTypeInfo = context.createTypeInformation(producedDataType); return new AvroRowDataDeserializationSchema( - ConfluentRegistryAvroDeserializationSchema.forGeneric( - AvroSchemaConverter.convertToSchema(rowType), schemaRegistryURL), + optionalPropertiesMap.isEmpty() + ? ConfluentRegistryAvroDeserializationSchema.forGeneric( + AvroSchemaConverter.convertToSchema(rowType), + schemaRegistryURL) + : ConfluentRegistryAvroDeserializationSchema.forGeneric( + AvroSchemaConverter.convertToSchema(rowType), + schemaRegistryURL, + optionalPropertiesMap), AvroToRowDataConverters.createRowConverter(rowType), rowDataTypeInfo); } @@ -92,6 +110,8 @@ public class RegistryAvroFormatFactory String schemaRegistryURL = formatOptions.get(SCHEMA_REGISTRY_URL); Optional<String> subject = formatOptions.getOptional(SCHEMA_REGISTRY_SUBJECT); + Map<String, ?> optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions); + if (!subject.isPresent()) { throw new ValidationException( String.format( @@ -106,10 +126,16 @@ public class RegistryAvroFormatFactory final RowType rowType = (RowType) consumedDataType.getLogicalType(); return new AvroRowDataSerializationSchema( rowType, - ConfluentRegistryAvroSerializationSchema.forGeneric( - subject.get(), - AvroSchemaConverter.convertToSchema(rowType), - schemaRegistryURL), + optionalPropertiesMap.isEmpty() + ? ConfluentRegistryAvroSerializationSchema.forGeneric( + subject.get(), + AvroSchemaConverter.convertToSchema(rowType), + schemaRegistryURL) + : ConfluentRegistryAvroSerializationSchema.forGeneric( + subject.get(), + AvroSchemaConverter.convertToSchema(rowType), + schemaRegistryURL, + optionalPropertiesMap), RowDataToAvroConverters.createConverter(rowType)); } @@ -136,6 +162,45 @@ public class RegistryAvroFormatFactory public Set<ConfigOption<?>> optionalOptions() { Set<ConfigOption<?>> options = new HashSet<>(); options.add(SCHEMA_REGISTRY_SUBJECT); + options.add(SSL_KEYSTORE_LOCATION); + options.add(SSL_KEYSTORE_PASSWORD); + options.add(SSL_TRUSTSTORE_LOCATION); + options.add(SSL_TRUSTSTORE_PASSWORD); + options.add(BASIC_AUTH_CREDENTIALS_SOURCE); + options.add(BASIC_AUTH_USER_INFO); + options.add(BEARER_AUTH_CREDENTIALS_SOURCE); + options.add(BEARER_AUTH_TOKEN); return options; } + + private Map<String, String> buildOptionalPropertiesMap(ReadableConfig formatOptions) { + final Map<String, String> properties = new HashMap<>(); + + formatOptions + .getOptional(SSL_KEYSTORE_LOCATION) + .ifPresent(v -> properties.put("schema.registry.ssl.keystore.location", v)); + formatOptions + .getOptional(SSL_KEYSTORE_PASSWORD) + .ifPresent(v -> properties.put("schema.registry.ssl.keystore.password", v)); + formatOptions + .getOptional(SSL_TRUSTSTORE_LOCATION) + .ifPresent(v -> properties.put("schema.registry.ssl.truststore.location", v)); + formatOptions + .getOptional(SSL_TRUSTSTORE_PASSWORD) + .ifPresent(v -> properties.put("schema.registry.ssl.truststore.password", v)); + formatOptions + .getOptional(BASIC_AUTH_CREDENTIALS_SOURCE) + .ifPresent(v -> properties.put("basic.auth.credentials.source", v)); + formatOptions + .getOptional(BASIC_AUTH_USER_INFO) + .ifPresent(v -> properties.put("basic.auth.user.info", v)); + formatOptions + .getOptional(BEARER_AUTH_CREDENTIALS_SOURCE) + .ifPresent(v -> properties.put("bearer.auth.credentials.source", v)); + formatOptions + .getOptional(BEARER_AUTH_TOKEN) + .ifPresent(v -> properties.put("bearer.auth.token", v)); + + return properties; + } } diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroOptions.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroOptions.java index c94ebc1..8132340 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroOptions.java +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroOptions.java @@ -37,4 +37,56 @@ public class RegistryAvroOptions { .noDefaultValue() .withDescription( "Subject name to write to the Schema Registry service, required for sink"); + + // -------------------------------------------------------------------------------------------- + // Commonly used options maintained by Flink for convenience + // -------------------------------------------------------------------------------------------- + + public static final ConfigOption<String> SSL_KEYSTORE_LOCATION = + ConfigOptions.key("ssl.keystore.location") + .stringType() + .noDefaultValue() + .withDescription("Location / File of SSL keystore"); + + public static final ConfigOption<String> SSL_KEYSTORE_PASSWORD = + ConfigOptions.key("ssl.keystore.password") + .stringType() + .noDefaultValue() + .withDescription("Password for SSL keystore"); + + public static final ConfigOption<String> SSL_TRUSTSTORE_LOCATION = + ConfigOptions.key("ssl.truststore.location") + .stringType() + .noDefaultValue() + .withDescription("Location / File of SSL truststore"); + + public static final ConfigOption<String> SSL_TRUSTSTORE_PASSWORD = + ConfigOptions.key("ssl.truststore.password") + .stringType() + .noDefaultValue() + .withDescription("Password for SSL truststore"); + + public static final ConfigOption<String> BASIC_AUTH_CREDENTIALS_SOURCE = + ConfigOptions.key("basic-auth.credentials-source") + .stringType() + .noDefaultValue() + .withDescription("Basic auth credentials source for Schema Registry"); + + public static final ConfigOption<String> BASIC_AUTH_USER_INFO = + ConfigOptions.key("basic-auth.user-info") + .stringType() + .noDefaultValue() + .withDescription("Basic auth user info for schema registry"); + + public static final ConfigOption<String> BEARER_AUTH_CREDENTIALS_SOURCE = + ConfigOptions.key("bearer-auth.credentials-source") + .stringType() + .noDefaultValue() + .withDescription("Bearer auth credentials source for Schema Registry"); + + public static final ConfigOption<String> BEARER_AUTH_TOKEN = + ConfigOptions.key("bearer-auth.token") + .stringType() + .noDefaultValue() + .withDescription("Bearer auth token for Schema Registry"); } diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProviderTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProviderTest.java new file mode 100644 index 0000000..db877d8 --- /dev/null +++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProviderTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.RestService; +import io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider; +import io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider; +import org.junit.Test; +import org.powermock.reflect.Whitebox; + +import javax.net.ssl.SSLSocketFactory; + +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +/** + * Tests for properties set by {@link RegistryAvroFormatFactory} in {@link + * CachedSchemaCoderProvider}. + */ +public class CachedSchemaCoderProviderTest { + + @Test + public void testThatSslIsNotInitializedForNoSslProperties() { + CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(new HashMap<>()); + SSLSocketFactory sslSocketFactory = getSslSocketFactoryFromProvider(provider); + + assertNull(sslSocketFactory); + } + + @Test + public void testThatSslIsInitializedForSslProperties() throws URISyntaxException { + String keystoreFile = getAbsolutePath("/test-keystore.jks"); + String keystorePassword = "123456"; + Map<String, String> configs = new HashMap<>(); + configs.put("schema.registry.ssl.keystore.location", keystoreFile); + configs.put("schema.registry.ssl.keystore.password", keystorePassword); + configs.put("schema.registry.ssl.truststore.location", keystoreFile); + configs.put("schema.registry.ssl.truststore.password", keystorePassword); + + CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(configs); + SSLSocketFactory sslSocketFactory = getSslSocketFactoryFromProvider(provider); + + assertNotNull(sslSocketFactory); + } + + @Test + public void testThatBasicAuthIsNotInitializedForNoBasicAuthProperties() { + CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(new HashMap<>()); + BasicAuthCredentialProvider basicAuthCredentialProvider = + getBasicAuthFromProvider(provider); + + assertNull(basicAuthCredentialProvider); + } + + @Test + public void testThatBasicAuthIsInitializedForBasicAuthProperties() { + String userPassword = "user:pwd"; + Map<String, String> configs = new HashMap<>(); + configs.put("basic.auth.credentials.source", "USER_INFO"); + configs.put("basic.auth.user.info", userPassword); + + CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(configs); + BasicAuthCredentialProvider basicAuthCredentialProvider = + getBasicAuthFromProvider(provider); + + assertNotNull(basicAuthCredentialProvider); + assertEquals(basicAuthCredentialProvider.getUserInfo(null), userPassword); + } + + @Test + public void testThatBearerAuthIsNotInitializedForNoBearerAuthProperties() { + CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(new HashMap<>()); + BearerAuthCredentialProvider bearerAuthCredentialProvider = + getBearerAuthFromProvider(provider); + + assertNull(bearerAuthCredentialProvider); + } + + @Test + public void testThatBearerAuthIsInitializedForBearerAuthProperties() { + String token = "123456"; + Map<String, String> configs = new HashMap<>(); + configs.put("bearer.auth.credentials.source", "STATIC_TOKEN"); + configs.put("bearer.auth.token", token); + + CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(configs); + BearerAuthCredentialProvider bearerAuthCredentialProvider = + getBearerAuthFromProvider(provider); + + assertNotNull(bearerAuthCredentialProvider); + assertEquals(bearerAuthCredentialProvider.getBearerToken(null), token); + } + + private String getAbsolutePath(String path) throws URISyntaxException { + return CachedSchemaCoderProviderTest.class.getResource(path).toURI().getPath(); + } + + private CachedSchemaCoderProvider initCachedSchemaCoderProvider(Map<String, String> config) { + return new CachedSchemaCoderProvider("test", "someUrl", 1000, config); + } + + private SSLSocketFactory getSslSocketFactoryFromProvider(CachedSchemaCoderProvider provider) { + return getInternalStateFromRestService("sslSocketFactory", provider); + } + + private BasicAuthCredentialProvider getBasicAuthFromProvider( + CachedSchemaCoderProvider provider) { + return getInternalStateFromRestService("basicAuthCredentialProvider", provider); + } + + private BearerAuthCredentialProvider getBearerAuthFromProvider( + CachedSchemaCoderProvider provider) { + return getInternalStateFromRestService("bearerAuthCredentialProvider", provider); + } + + private <T> T getInternalStateFromRestService(String name, CachedSchemaCoderProvider provider) { + CachedSchemaRegistryClient cachedSchemaRegistryClient = + Whitebox.getInternalState(provider.get(), "schemaRegistryClient"); + RestService restService = + Whitebox.getInternalState(cachedSchemaRegistryClient, "restService"); + return Whitebox.getInternalState(restService, name); + } +} diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java index 3171c82..21f9842 100644 --- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java +++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java @@ -67,6 +67,19 @@ public class RegistryAvroFormatFactoryTest { private static final String SUBJECT = "test-subject"; private static final String REGISTRY_URL = "http://localhost:8081"; + private static final Map<String, String> EXPECTED_OPTIONAL_PROPERTIES = new HashMap<>(); + + static { + EXPECTED_OPTIONAL_PROPERTIES.put( + "schema.registry.ssl.keystore.location", getAbsolutePath("/test-keystore.jks")); + EXPECTED_OPTIONAL_PROPERTIES.put("schema.registry.ssl.keystore.password", "123456"); + EXPECTED_OPTIONAL_PROPERTIES.put( + "schema.registry.ssl.truststore.location", getAbsolutePath("/test-keystore.jks")); + EXPECTED_OPTIONAL_PROPERTIES.put("schema.registry.ssl.truststore.password", "123456"); + EXPECTED_OPTIONAL_PROPERTIES.put("basic.auth.credentials.source", "USER_INFO"); + EXPECTED_OPTIONAL_PROPERTIES.put("basic.auth.user.info", "user:pwd"); + } + @Rule public ExpectedException thrown = ExpectedException.none(); @Test @@ -127,6 +140,52 @@ public class RegistryAvroFormatFactoryTest { createTableSink(SCHEMA, options); } + @Test + public void testDeserializationSchemaWithOptionalProperties() { + final AvroRowDataDeserializationSchema expectedDeser = + new AvroRowDataDeserializationSchema( + ConfluentRegistryAvroDeserializationSchema.forGeneric( + AvroSchemaConverter.convertToSchema(ROW_TYPE), + REGISTRY_URL, + EXPECTED_OPTIONAL_PROPERTIES), + AvroToRowDataConverters.createRowConverter(ROW_TYPE), + InternalTypeInfo.of(ROW_TYPE)); + + final DynamicTableSource actualSource = createTableSource(SCHEMA, getOptionalProperties()); + assertThat(actualSource, instanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class)); + TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock = + (TestDynamicTableFactory.DynamicTableSourceMock) actualSource; + + DeserializationSchema<RowData> actualDeser = + scanSourceMock.valueFormat.createRuntimeDecoder( + ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType()); + + assertEquals(expectedDeser, actualDeser); + } + + @Test + public void testSerializationSchemaWithOptionalProperties() { + final AvroRowDataSerializationSchema expectedSer = + new AvroRowDataSerializationSchema( + ROW_TYPE, + ConfluentRegistryAvroSerializationSchema.forGeneric( + SUBJECT, + AvroSchemaConverter.convertToSchema(ROW_TYPE), + REGISTRY_URL, + EXPECTED_OPTIONAL_PROPERTIES), + RowDataToAvroConverters.createConverter(ROW_TYPE)); + + final DynamicTableSink actualSink = createTableSink(SCHEMA, getOptionalProperties()); + assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class)); + TestDynamicTableFactory.DynamicTableSinkMock sinkMock = + (TestDynamicTableFactory.DynamicTableSinkMock) actualSink; + + SerializationSchema<RowData> actualSer = + sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType()); + + assertEquals(expectedSer, actualSer); + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ @@ -153,4 +212,37 @@ public class RegistryAvroFormatFactoryTest { options.put("avro-confluent.schema-registry.url", REGISTRY_URL); return options; } + + private Map<String, String> getOptionalProperties() { + final Map<String, String> properties = new HashMap<>(); + // defined via Flink maintained options + properties.put( + RegistryAvroOptions.SSL_KEYSTORE_LOCATION.key(), + getAbsolutePath("/test-keystore.jks")); + properties.put(RegistryAvroOptions.SSL_KEYSTORE_PASSWORD.key(), "123456"); + properties.put( + RegistryAvroOptions.SSL_TRUSTSTORE_LOCATION.key(), + getAbsolutePath("/test-keystore.jks")); + properties.put(RegistryAvroOptions.SSL_TRUSTSTORE_PASSWORD.key(), "123456"); + properties.put(RegistryAvroOptions.BASIC_AUTH_CREDENTIALS_SOURCE.key(), "USER_INFO"); + properties.put(RegistryAvroOptions.BASIC_AUTH_USER_INFO.key(), "user:pwd"); + + return getModifiedOptions( + opts -> + properties.forEach( + (k, v) -> + opts.put( + String.format( + "%s.%s", + RegistryAvroFormatFactory.IDENTIFIER, k), + v))); + } + + private static String getAbsolutePath(String path) { + try { + return CachedSchemaCoderProviderTest.class.getResource(path).toURI().getPath(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } } diff --git a/flink-formats/flink-avro-confluent-registry/src/test/resources/test-keystore.jks b/flink-formats/flink-avro-confluent-registry/src/test/resources/test-keystore.jks new file mode 100644 index 0000000..b682158 Binary files /dev/null and b/flink-formats/flink-avro-confluent-registry/src/test/resources/test-keystore.jks differ