This is an automated email from the ASF dual-hosted git repository. pvillard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new bb178f3 NIFI-6576 Added Basic Authentication support to ConfluentSchemaRegistry bb178f3 is described below commit bb178f371b879adff823517917426532627c5872 Author: exceptionfactory <exceptionfact...@gmail.com> AuthorDate: Tue Jan 5 16:51:22 2021 -0500 NIFI-6576 Added Basic Authentication support to ConfluentSchemaRegistry Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #4743. This closes #4508. This closes #4224. --- .../nifi-confluent-schema-registry-service/pom.xml | 6 ++ .../schemaregistry/ConfluentSchemaRegistry.java | 70 ++++++++++++++++-- .../schemaregistry/client/AuthenticationType.java | 26 +++++++ .../client/RestSchemaRegistryClient.java | 13 +++- .../ConfluentSchemaRegistryTest.java | 84 ++++++++++++++++++++++ 5 files changed, 194 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml index 1d14956..be05e30 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml @@ -61,6 +61,12 @@ <artifactId>nifi-web-utils</artifactId> <version>1.13.0-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>1.13.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> </dependencies> <repositories> diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java index cbf0fce..0762daf 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java @@ -20,22 +20,26 @@ package org.apache.nifi.confluent.schemaregistry; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.net.ssl.SSLContext; + +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.confluent.schemaregistry.client.AuthenticationType; import org.apache.nifi.confluent.schemaregistry.client.CachingSchemaRegistryClient; import org.apache.nifi.confluent.schemaregistry.client.RestSchemaRegistryClient; import org.apache.nifi.confluent.schemaregistry.client.SchemaRegistryClient; @@ -112,6 +116,34 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement .required(true) .build(); + static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder() + .name("authentication-type") + .displayName("Authentication Type") + .description("HTTP Client Authentication Type for Confluent Schema Registry") + .required(false) + .allowableValues(AuthenticationType.values()) + .defaultValue(AuthenticationType.NONE.toString()) + .build(); + + static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .name("username") + .displayName("Username") + .description("Username for authentication to Confluent Schema Registry") + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$"))) + .required(false) + .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString()) + .build(); + + static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("password") + .displayName("Password") + .description("Password for authentication to Confluent Schema Registry") + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$"))) + .required(false) + .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString()) + .sensitive(true) + .build(); + private volatile SchemaRegistryClient client; @@ -123,6 +155,9 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement properties.add(TIMEOUT); properties.add(CACHE_SIZE); properties.add(CACHE_EXPIRATION); + properties.add(AUTHENTICATION_TYPE); + properties.add(USERNAME); + properties.add(PASSWORD); return properties; } @@ -139,7 +174,9 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); } - final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext, getLogger()); + final String username = context.getProperty(USERNAME).getValue(); + final String password = context.getProperty(PASSWORD).getValue(); + final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext, username, password, getLogger()); final int cacheSize = context.getProperty(CACHE_SIZE).asInteger(); final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS).longValue(); @@ -149,6 +186,8 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement @Override protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(); + final boolean sslContextSet = validationContext.getProperty(SSL_CONTEXT).isSet(); if (sslContextSet) { final List<String> baseUrls = getBaseURLs(validationContext); @@ -157,7 +196,7 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement .collect(Collectors.toList()); if (!insecure.isEmpty()) { - return Collections.singleton(new ValidationResult.Builder() + results.add(new ValidationResult.Builder() .subject(SCHEMA_REGISTRY_URLS.getDisplayName()) .input(insecure.get(0)) .valid(false) @@ -166,7 +205,30 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement } } - return Collections.emptyList(); + final PropertyValue authenticationTypeProperty = validationContext.getProperty(AUTHENTICATION_TYPE); + if (authenticationTypeProperty.isSet()) { + final AuthenticationType authenticationType = AuthenticationType.valueOf(authenticationTypeProperty.getValue()); + if (AuthenticationType.BASIC.equals(authenticationType)) { + final String username = validationContext.getProperty(USERNAME).getValue(); + if (StringUtils.isBlank(username)) { + results.add(new ValidationResult.Builder() + .subject(USERNAME.getDisplayName()) + .valid(false) + .explanation("Username is required for Basic Authentication") + .build()); + } + final String password = validationContext.getProperty(PASSWORD).getValue(); + if (StringUtils.isBlank(password)) { + results.add(new ValidationResult.Builder() + .subject(PASSWORD.getDisplayName()) + .valid(false) + .explanation("Password is required for Basic Authentication") + .build()); + } + } + } + + return results; } private List<String> getBaseURLs(final PropertyContext context) { diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/AuthenticationType.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/AuthenticationType.java new file mode 100644 index 0000000..1200575 --- /dev/null +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/AuthenticationType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry.client; + +/** + * Client Authentication Type for Confluent Schema Registry + */ +public enum AuthenticationType { + BASIC, + + NONE +} diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java index 1bca29c..4ff3f2a 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java @@ -19,6 +19,7 @@ package org.apache.nifi.confluent.schemaregistry.client; import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -30,6 +31,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; import javax.net.ssl.SSLContext; import javax.ws.rs.client.Client; @@ -68,7 +70,12 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { private static final String SCHEMA_REGISTRY_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json"; - public RestSchemaRegistryClient(final List<String> baseUrls, final int timeoutMillis, final SSLContext sslContext, final ComponentLog logger) { + public RestSchemaRegistryClient(final List<String> baseUrls, + final int timeoutMillis, + final SSLContext sslContext, + final String username, + final String password, + final ComponentLog logger) { this.baseUrls = new ArrayList<>(baseUrls); final ClientConfig clientConfig = new ClientConfig(); @@ -76,6 +83,10 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { clientConfig.property(ClientProperties.READ_TIMEOUT, timeoutMillis); client = WebUtils.createClient(clientConfig, sslContext); + if (StringUtils.isNoneBlank(username, password)) { + client.register(HttpAuthenticationFeature.basic(username, password)); + } + this.logger = logger; } diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java new file mode 100644 index 0000000..58789a5 --- /dev/null +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import org.apache.nifi.confluent.schemaregistry.client.AuthenticationType; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class ConfluentSchemaRegistryTest { + + private static final String SERVICE_ID = ConfluentSchemaRegistry.class.getSimpleName(); + + private TestRunner runner; + + private ConfluentSchemaRegistry registry; + + @Before + public void setUp() throws InitializationException { + registry = new ConfluentSchemaRegistry(); + final Processor processor = Mockito.mock(Processor.class); + runner = TestRunners.newTestRunner(processor); + runner.addControllerService(SERVICE_ID, registry); + } + + @Test + public void testValidateAuthenticationTypeBasicMissingUsernameAndPassword() { + runner.setProperty(registry, ConfluentSchemaRegistry.AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString()); + runner.assertNotValid(registry); + } + + @Test + public void testValidateAuthenticationTypeBasicMissingUsername() { + runner.setProperty(registry, ConfluentSchemaRegistry.AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString()); + runner.setProperty(registry, ConfluentSchemaRegistry.PASSWORD, String.class.getName()); + runner.assertNotValid(registry); + } + + @Test + public void testValidateAuthenticationTypeBasicMissingPassword() { + runner.setProperty(registry, ConfluentSchemaRegistry.AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString()); + runner.setProperty(registry, ConfluentSchemaRegistry.USERNAME, String.class.getSimpleName()); + runner.assertNotValid(registry); + } + + @Test + public void testValidateAuthenticationTypeNoneValid() { + runner.setProperty(registry, ConfluentSchemaRegistry.AUTHENTICATION_TYPE, AuthenticationType.NONE.toString()); + runner.assertValid(registry); + } + + @Test + public void testValidateAndEnableDefaultProperties() { + runner.assertValid(registry); + runner.enableControllerService(registry); + } + + @Test + public void testValidateAndEnableAuthenticationTypeBasic() { + runner.setProperty(registry, ConfluentSchemaRegistry.AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString()); + runner.setProperty(registry, ConfluentSchemaRegistry.USERNAME, String.class.getSimpleName()); + runner.setProperty(registry, ConfluentSchemaRegistry.PASSWORD, String.class.getName()); + runner.assertValid(registry); + runner.enableControllerService(registry); + } +}