This is an automated email from the ASF dual-hosted git repository. pvillard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new ecea18f796 NIFI-12733 Make Apicurio's groupId optional and configurable and use artifactId instead of name as key ecea18f796 is described below commit ecea18f79655c0e34949d94609c8909aeb2d093e Author: Juldrixx <juldr...@gmail.com> AuthorDate: Fri Feb 2 17:44:37 2024 +0100 NIFI-12733 Make Apicurio's groupId optional and configurable and use artifactId instead of name as key Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #8351. --- .../schemaregistry/ApicurioSchemaRegistry.java | 24 +++++++---- .../client/ApicurioSchemaRegistryClient.java | 39 ++++++------------ .../client/CachingSchemaRegistryClient.java | 21 ++-------- .../client/SchemaRegistryApiClient.java | 37 +++++++---------- .../client/SchemaRegistryClient.java | 6 +-- .../apicurio/schemaregistry/util/SchemaUtils.java | 44 ++++++--------------- .../client/ApicurioSchemaRegistryClientTest.java | 46 +++++++++------------- .../client/CachingSchemaRegistryClientTest.java | 26 ++++++------ .../client/SchemaRegistryApiClientTest.java | 36 ++++++----------- .../schemaregistry/util/SchemaUtilsTest.java | 23 +---------- .../src/test/resources/metadata_response.json | 15 ------- .../test/resources/schema_response_version_3.json | 15 +++++++ .../resources/schema_response_version_latest.json | 15 +++++++ .../src/test/resources/search_response.json | 16 -------- 14 files changed, 135 insertions(+), 228 deletions(-) diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/ApicurioSchemaRegistry.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/ApicurioSchemaRegistry.java index 63a68949ec..d36b34c21b 100644 --- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/ApicurioSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/ApicurioSchemaRegistry.java @@ -26,6 +26,7 @@ import org.apache.nifi.apicurio.schemaregistry.client.SchemaRegistryClient; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -43,7 +44,7 @@ import java.util.concurrent.TimeUnit; @Tags({"schema", "registry", "apicurio", "avro"}) @CapabilityDescription("Provides a Schema Registry that interacts with the Apicurio Schema Registry so that those Schemas that are stored in the Apicurio Schema " - + "Registry can be used in NiFi. When a Schema is looked up by name by this registry, it will find a Schema in the Apicurio Schema Registry with their names.") + + "Registry can be used in NiFi. When a Schema is looked up by name by this registry, it will find a Schema in the Apicurio Schema Registry with their artifact identifiers.") public class ApicurioSchemaRegistry extends AbstractControllerService implements SchemaRegistry { private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, @@ -56,6 +57,15 @@ public class ApicurioSchemaRegistry extends AbstractControllerService implements .addValidator(StandardValidators.URL_VALIDATOR) .required(true) .build(); + static final PropertyDescriptor SCHEMA_GROUP_ID = new PropertyDescriptor.Builder() + .name("Schema Group ID") + .displayName("Schema Group ID") + .description("The artifact Group ID for the schemas") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .defaultValue("default") + .required(true) + .build(); static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder() .name("Cache Size") .displayName("Cache Size") @@ -86,6 +96,7 @@ public class ApicurioSchemaRegistry extends AbstractControllerService implements private static final List<PropertyDescriptor> PROPERTIES = List.of( SCHEMA_REGISTRY_URL, + SCHEMA_GROUP_ID, CACHE_SIZE, CACHE_EXPIRATION, WEB_CLIENT_PROVIDER @@ -102,29 +113,26 @@ public class ApicurioSchemaRegistry extends AbstractControllerService implements @OnEnabled public void onEnabled(final ConfigurationContext context) { final String schemaRegistryUrl = context.getProperty(SCHEMA_REGISTRY_URL).getValue(); + final String schemaGroupId = context.getProperty(SCHEMA_GROUP_ID).getValue(); final int cacheSize = context.getProperty(CACHE_SIZE).asInteger(); final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS); final WebClientServiceProvider webClientServiceProvider = context.getProperty(WEB_CLIENT_PROVIDER).asControllerService(WebClientServiceProvider.class); - final SchemaRegistryApiClient apiClient = new SchemaRegistryApiClient(webClientServiceProvider, schemaRegistryUrl); + final SchemaRegistryApiClient apiClient = new SchemaRegistryApiClient(webClientServiceProvider, schemaRegistryUrl, schemaGroupId); final SchemaRegistryClient schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient); client = new CachingSchemaRegistryClient(schemaRegistryClient, cacheSize, cacheExpiration); } @Override public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { - final String schemaName = schemaIdentifier.getName().orElseThrow( + final String schemaId = schemaIdentifier.getName().orElseThrow( () -> new SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present") ); final OptionalInt version = schemaIdentifier.getVersion(); - if (version.isPresent()) { - return client.getSchema(schemaName, version.getAsInt()); - } else { - return client.getSchema(schemaName); - } + return client.getSchema(schemaId, version); } @Override diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClient.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClient.java index 793113705a..e7c0947156 100644 --- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClient.java +++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClient.java @@ -17,13 +17,13 @@ package org.apache.nifi.apicurio.schemaregistry.client; import org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils; -import org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils.ResultAttributes; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.record.RecordSchema; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.util.OptionalInt; public class ApicurioSchemaRegistryClient implements SchemaRegistryClient { private final SchemaRegistryApiClient apiClient; @@ -33,37 +33,20 @@ public class ApicurioSchemaRegistryClient implements SchemaRegistryClient { } @Override - public RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException { - final ResultAttributes attributes = getAttributesForSchemaName(schemaName); - final int version = getVersionAttributeFromMetadata(attributes); - return createRecordSchemaForAttributes(attributes, version); + public RecordSchema getSchema(final String schemaId, final OptionalInt version) throws IOException, SchemaNotFoundException { + return createRecordSchemaForAttributes( + schemaId, + version + ); } - @Override - public RecordSchema getSchema(final String schemaName, final int version) throws IOException, SchemaNotFoundException { - final ResultAttributes attributes = getAttributesForSchemaName(schemaName); - return createRecordSchemaForAttributes(attributes, version); - } - - private ResultAttributes getAttributesForSchemaName(String schemaName) throws IOException { - final URI searchUri = apiClient.buildSearchUri(schemaName); - try (final InputStream searchResultStream = apiClient.retrieveResponse(searchUri)) { - return SchemaUtils.getResultAttributes(searchResultStream); - } - } - - private int getVersionAttributeFromMetadata(final ResultAttributes attributes) throws IOException { - final URI metaDataUri = apiClient.buildMetaDataUri(attributes.groupId(), attributes.artifactId()); - try (final InputStream metadataResultStream = apiClient.retrieveResponse(metaDataUri)) { - return SchemaUtils.extractVersionAttributeFromStream(metadataResultStream); - } - } - - private RecordSchema createRecordSchemaForAttributes(ResultAttributes attributes, int version) throws IOException, SchemaNotFoundException { - final URI schemaUri = apiClient.buildSchemaVersionUri(attributes.groupId(), attributes.artifactId(), version); + private RecordSchema createRecordSchemaForAttributes(final String artifactId, final OptionalInt version) throws IOException, SchemaNotFoundException { + final URI schemaUri = version.isPresent() + ? apiClient.buildSchemaVersionUri(artifactId, version.getAsInt()) : + apiClient.buildSchemaArtifactUri(artifactId); try (final InputStream schemaResultStream = apiClient.retrieveResponse(schemaUri)) { - return SchemaUtils.createRecordSchema(schemaResultStream, attributes.name(), version); + return SchemaUtils.createRecordSchema(schemaResultStream, artifactId, version); } } } diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClient.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClient.java index cfb8aff1dd..ba5d74cb1a 100644 --- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClient.java +++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClient.java @@ -23,10 +23,11 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.nifi.serialization.record.RecordSchema; import java.time.Duration; +import java.util.OptionalInt; public class CachingSchemaRegistryClient implements SchemaRegistryClient { private final SchemaRegistryClient client; - private final LoadingCache<Pair<String, Integer>, RecordSchema> schemaCache; + private final LoadingCache<Pair<String, OptionalInt>, RecordSchema> schemaCache; public CachingSchemaRegistryClient(final SchemaRegistryClient toWrap, final int cacheSize, final long expirationNanos) { this.client = toWrap; @@ -34,24 +35,10 @@ public class CachingSchemaRegistryClient implements SchemaRegistryClient { schemaCache = Caffeine.newBuilder() .maximumSize(cacheSize) .expireAfterWrite(Duration.ofNanos(expirationNanos)) - .build(key -> { - if (key.getRight() == -1) { - // If the version in the key is -1, fetch the schema by name only. - return client.getSchema(key.getLeft()); - } else { - // If a specific version is provided in the key, fetch the schema with that version. - return client.getSchema(key.getLeft(), key.getRight()); - } - }); + .build(key -> client.getSchema(key.getLeft(), key.getRight())); } - - @Override - public RecordSchema getSchema(final String schemaName) { - return schemaCache.get(Pair.of(schemaName, -1)); - } - @Override - public RecordSchema getSchema(final String schemaName, final int version) { + public RecordSchema getSchema(final String schemaName, final OptionalInt version) { return schemaCache.get(Pair.of(schemaName, version)); } } diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClient.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClient.java index 57488794a8..8d030e29d8 100644 --- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClient.java +++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClient.java @@ -26,10 +26,12 @@ public class SchemaRegistryApiClient { private final WebClientServiceProvider webClientServiceProvider; private final String baseUrl; + private final String groupId; - public SchemaRegistryApiClient(final WebClientServiceProvider webClientServiceProvider, final String baseUrl) { + public SchemaRegistryApiClient(final WebClientServiceProvider webClientServiceProvider, final String baseUrl, final String groupId) { this.webClientServiceProvider = webClientServiceProvider; this.baseUrl = baseUrl; + this.groupId = groupId; } public InputStream retrieveResponse(final URI uri) { @@ -51,37 +53,26 @@ public class SchemaRegistryApiClient { .addPathSegment("v2"); } - public URI buildSearchUri(final String schemaName) { + private HttpUriBuilder buildBaseSchemaUri() { return buildBaseUri() - .addPathSegment("search") - .addPathSegment("artifacts") - .addQueryParameter("name", schemaName) - .addQueryParameter("limit", "1") - .build(); + .addPathSegment("groups") + .addPathSegment(this.groupId); } - public URI buildMetaDataUri(final String groupId, final String artifactId) { - return buildGroupArtifactsUri(groupId, artifactId) - .addPathSegment("meta") - .build(); + private HttpUriBuilder buildBaseSchemaArtifactUri(final String artifactId) { + return buildBaseSchemaUri() + .addPathSegment("artifacts") + .addPathSegment(artifactId); } - public URI buildSchemaUri(final String groupId, final String artifactId) { - return buildGroupArtifactsUri(groupId, artifactId).build(); + public URI buildSchemaArtifactUri(final String artifactId) { + return buildBaseSchemaArtifactUri(artifactId).build(); } - public URI buildSchemaVersionUri(final String groupId, final String artifactId, final int version) { - return buildGroupArtifactsUri(groupId, artifactId) + public URI buildSchemaVersionUri(final String artifactId, final int version) { + return buildBaseSchemaArtifactUri(artifactId) .addPathSegment("versions") .addPathSegment(String.valueOf(version)) .build(); } - - private HttpUriBuilder buildGroupArtifactsUri(final String groupId, final String artifactId) { - return buildBaseUri() - .addPathSegment("groups") - .addPathSegment(groupId) - .addPathSegment("artifacts") - .addPathSegment(artifactId); - } } diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryClient.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryClient.java index ba21b50734..39a6cf24a2 100644 --- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryClient.java +++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryClient.java @@ -20,12 +20,10 @@ import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.record.RecordSchema; import java.io.IOException; +import java.util.OptionalInt; public interface SchemaRegistryClient { - RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException; - - RecordSchema getSchema(final String schemaName, final int version) throws IOException, SchemaNotFoundException; - + RecordSchema getSchema(final String schemaId, final OptionalInt version) throws IOException, SchemaNotFoundException; } diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtils.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtils.java index 5202a9e14e..4f77104e63 100644 --- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtils.java +++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtils.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; -import java.io.UncheckedIOException; +import java.util.OptionalInt; public class SchemaUtils { @@ -39,50 +39,28 @@ public class SchemaUtils { private SchemaUtils() { } - public static RecordSchema createRecordSchema(final InputStream schemaStream, final String name, final int version) throws SchemaNotFoundException, IOException { + public static RecordSchema createRecordSchema(final InputStream schemaStream, final String artifactId, final OptionalInt version) throws SchemaNotFoundException, IOException { final JsonNode schemaNode = OBJECT_MAPPER.readTree(schemaStream); final String schemaText = schemaNode.toString(); try { final Schema avroSchema = new Schema.Parser().parse(schemaText); - final SchemaIdentifier schemaId = SchemaIdentifier.builder() - .name(name) - .version(version) - .build(); + final SchemaIdentifier.Builder schemaIdBuilder = SchemaIdentifier.builder() + .name(artifactId); + + if (version.isPresent()) { + schemaIdBuilder.version(version.getAsInt()); + } + + final SchemaIdentifier schemaId = schemaIdBuilder.build(); return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId); } catch (final SchemaParseException e) { final String errorMessage = String.format("Obtained Schema with name [%s] from Apicurio Schema Registry but the Schema Text " + - "that was returned is not a valid Avro Schema", name); + "that was returned is not a valid Avro Schema", artifactId); throw new SchemaNotFoundException(errorMessage, e); } } - public static int extractVersionAttributeFromStream(InputStream in) { - final JsonNode metadataNode; - try { - metadataNode = OBJECT_MAPPER.readTree(in); - } catch (IOException e) { - throw new UncheckedIOException("Failed to read version from HTTP response stream", e); - } - return Integer.parseInt(metadataNode.get("version").asText()); - } - - public static ResultAttributes getResultAttributes(InputStream in) { - final JsonNode jsonNode; - try { - jsonNode = OBJECT_MAPPER.readTree(in); - } catch (IOException e) { - throw new UncheckedIOException("Failed to read result attributes from HTTP response stream", e); - } - final JsonNode artifactNode = jsonNode.get("artifacts").get(0); - final String groupId = artifactNode.get("groupId").asText(); - final String artifactId = artifactNode.get("id").asText(); - final String name = artifactNode.get("name").asText(); - return new ResultAttributes(groupId, artifactId, name); - } - - public record ResultAttributes(String groupId, String artifactId, String name) { - } static ObjectMapper setObjectMapper() { return new ObjectMapper(); diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClientTest.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClientTest.java index 671297d23f..9cbbdedbf3 100644 --- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClientTest.java +++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClientTest.java @@ -19,7 +19,6 @@ package org.apache.nifi.apicurio.schemaregistry.client; import org.apache.commons.io.IOUtils; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.record.RecordSchema; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -29,6 +28,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.nio.charset.Charset; +import java.util.OptionalInt; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.doReturn; @@ -39,55 +39,45 @@ import static org.mockito.Mockito.verify; class ApicurioSchemaRegistryClientTest { private static final String TEST_URL = "http://test.apicurio-schema-registry.com:8888"; - private static final String SCHEMA_NAME = "schema1"; + private static final String ARTIFACT_ID = "artifactId1"; private static final int VERSION = 3; - private static final String SEARCH_URL = TEST_URL + "/search"; - private static final String METADATA_URL = TEST_URL + "/meta"; + private static final String SCHEMA_ARTIFACT_URL = TEST_URL + "/schema/" + ARTIFACT_ID; private static final String SCHEMA_VERSION_URL = TEST_URL + "/schema/versions/" + VERSION; - private static final String GROUP_ID = "groupId1"; - private static final String ARTIFACT_ID = "artifactId1"; @Mock private SchemaRegistryApiClient apiClient; private ApicurioSchemaRegistryClient schemaRegistryClient; - @BeforeEach - void setup() { - doReturn(URI.create(SEARCH_URL)).when(apiClient).buildSearchUri(SCHEMA_NAME); - doReturn(URI.create(SCHEMA_VERSION_URL)).when(apiClient).buildSchemaVersionUri(GROUP_ID, ARTIFACT_ID, VERSION); - doReturn(getResource("search_response.json")).when(apiClient).retrieveResponse(URI.create(SEARCH_URL)); - doReturn(getResource("schema_response.json")).when(apiClient).retrieveResponse(URI.create(SCHEMA_VERSION_URL)); - } - @Test - void testGetSchemaWithNameInvokesApiClientAndReturnsRecordSchema() throws IOException, SchemaNotFoundException { - doReturn(URI.create(METADATA_URL)).when(apiClient).buildMetaDataUri(GROUP_ID, ARTIFACT_ID); - doReturn(getResource("metadata_response.json")).when(apiClient).retrieveResponse(URI.create(METADATA_URL)); + void testGetSchemaWithIdInvokesApiClientAndReturnsRecordSchema() throws IOException, SchemaNotFoundException { + doReturn(URI.create(SCHEMA_ARTIFACT_URL)).when(apiClient).buildSchemaArtifactUri(ARTIFACT_ID); + doReturn(getResource("schema_response_version_latest.json")).when(apiClient).retrieveResponse(URI.create(SCHEMA_ARTIFACT_URL)); schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient); - final RecordSchema schema = schemaRegistryClient.getSchema(SCHEMA_NAME); + final RecordSchema schema = schemaRegistryClient.getSchema(ARTIFACT_ID, OptionalInt.empty()); - verify(apiClient).buildSearchUri(SCHEMA_NAME); - verify(apiClient).buildMetaDataUri(GROUP_ID, ARTIFACT_ID); - verify(apiClient).buildSchemaVersionUri(GROUP_ID, ARTIFACT_ID, VERSION); + verify(apiClient).buildSchemaArtifactUri(ARTIFACT_ID); + verify(apiClient, never()).buildSchemaVersionUri(ARTIFACT_ID, VERSION); - final String expectedSchemaText = IOUtils.toString(getResource("schema_response.json"), Charset.defaultCharset()) + final String expectedSchemaText = IOUtils.toString(getResource("schema_response_version_latest.json"), Charset.defaultCharset()) .replace("\n", "") .replaceAll(" +", ""); assertEquals(expectedSchemaText, schema.getSchemaText().orElseThrow(() -> new AssertionError("Schema Text is empty"))); } @Test - void testGetSchemaWithNameAndVersionInvokesApiClientAndReturnsRecordSchema() throws IOException, SchemaNotFoundException { + void testGetSchemaWithIdAndVersionInvokesApiClientAndReturnsRecordSchema() throws IOException, SchemaNotFoundException { + doReturn(URI.create(SCHEMA_VERSION_URL)).when(apiClient).buildSchemaVersionUri(ARTIFACT_ID, VERSION); + doReturn(getResource("schema_response_version_3.json")).when(apiClient).retrieveResponse(URI.create(SCHEMA_VERSION_URL)); + schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient); - final RecordSchema schema = schemaRegistryClient.getSchema(SCHEMA_NAME, 3); + final RecordSchema schema = schemaRegistryClient.getSchema(ARTIFACT_ID, OptionalInt.of(VERSION)); - verify(apiClient).buildSearchUri(SCHEMA_NAME); - verify(apiClient, never()).buildMetaDataUri(GROUP_ID, ARTIFACT_ID); - verify(apiClient).buildSchemaVersionUri(GROUP_ID, ARTIFACT_ID, VERSION); + verify(apiClient, never()).buildSchemaArtifactUri(ARTIFACT_ID); + verify(apiClient).buildSchemaVersionUri(ARTIFACT_ID, VERSION); - final String expectedSchemaText = IOUtils.toString(getResource("schema_response.json"), Charset.defaultCharset()) + final String expectedSchemaText = IOUtils.toString(getResource("schema_response_version_3.json"), Charset.defaultCharset()) .replace("\n", "") .replaceAll(" +", ""); assertEquals(expectedSchemaText, schema.getSchemaText().orElseThrow(() -> new AssertionError("Schema Text is empty"))); diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClientTest.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClientTest.java index 24eda7a159..ef3f63dbe5 100644 --- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClientTest.java +++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClientTest.java @@ -29,6 +29,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; import java.util.Arrays; +import java.util.OptionalInt; import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -60,35 +61,36 @@ class CachingSchemaRegistryClientTest { @Test void testGetSchemaWithNameInvokesClientAndCacheResult() throws IOException, SchemaNotFoundException { - when(mockClient.getSchema(SCHEMA_NAME)).thenReturn(TEST_SCHEMA); + final OptionalInt version = OptionalInt.empty(); - RecordSchema actualSchema1 = cachingClient.getSchema(SCHEMA_NAME); - RecordSchema actualSchema2 = cachingClient.getSchema(SCHEMA_NAME); + when(mockClient.getSchema(SCHEMA_NAME, version)).thenReturn(TEST_SCHEMA); + + RecordSchema actualSchema1 = cachingClient.getSchema(SCHEMA_NAME, version); + RecordSchema actualSchema2 = cachingClient.getSchema(SCHEMA_NAME, version); assertEquals(TEST_SCHEMA, actualSchema1); assertEquals(TEST_SCHEMA, actualSchema2); - verify(mockClient).getSchema(SCHEMA_NAME); + verify(mockClient).getSchema(SCHEMA_NAME, version); } @Test void testGetSchemaWithNameAndVersionInvokesClientAndCacheResult() throws IOException, SchemaNotFoundException { - String schemaName = "schema"; - int version = 1; + final OptionalInt version = OptionalInt.of(1); - when(mockClient.getSchema(schemaName, version)).thenReturn(TEST_SCHEMA); + when(mockClient.getSchema(SCHEMA_NAME, version)).thenReturn(TEST_SCHEMA); - RecordSchema actualSchema1 = cachingClient.getSchema(schemaName, version); - RecordSchema actualSchema2 = cachingClient.getSchema(schemaName, version); + RecordSchema actualSchema1 = cachingClient.getSchema(SCHEMA_NAME, version); + RecordSchema actualSchema2 = cachingClient.getSchema(SCHEMA_NAME, version); assertEquals(TEST_SCHEMA, actualSchema1); assertEquals(TEST_SCHEMA, actualSchema2); - verify(mockClient).getSchema(schemaName, version); + verify(mockClient).getSchema(SCHEMA_NAME, version); } @Test void testGetSchemaWithNameAndVersionDoesNotCacheDifferentVersions() throws IOException, SchemaNotFoundException { - int version1 = 1; - int version2 = 2; + final OptionalInt version1 = OptionalInt.of(1); + final OptionalInt version2 = OptionalInt.of(2); RecordSchema expectedSchema1 = TEST_SCHEMA; RecordSchema expectedSchema2 = TEST_SCHEMA_2; diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClientTest.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClientTest.java index 8551c6fb58..486075a7d0 100644 --- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClientTest.java +++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClientTest.java @@ -35,12 +35,12 @@ class SchemaRegistryApiClientTest { private static final String BASE_URL = "http://test.apicurio-schema-registry.com:8888"; private static final String API_PATH = "/apis/registry/v2"; - private static final String METADATA_PATH = "/meta"; private static final String GROUP_ID = "groupId1"; private static final String ARTIFACT_ID = "artifactId1"; - private static final String SCHEMA_PATH = String.format("/groups/%s/artifacts/%s", GROUP_ID, ARTIFACT_ID); - private static final String SCHEMA_NAME = "schema1"; - private static final String SEARCH_PATH = String.format("/search/artifacts?name=%s&limit=1", SCHEMA_NAME); + private static final int VERSION = 3; + private static final String GROUP_PATH = String.format("/groups/%s", GROUP_ID); + private static final String ARTIFACT_PATH = String.format("/artifacts/%s", ARTIFACT_ID); + private static final String VERSION_PATH = String.format("/versions/%d", VERSION); @Mock private WebClientServiceProvider webClientServiceProvider; @@ -53,7 +53,7 @@ class SchemaRegistryApiClientTest { @Test void testBuildBaseUrl() { - client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL); + client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL, GROUP_ID); final HttpUriBuilder httpUriBuilder = client.buildBaseUri(); @@ -61,30 +61,20 @@ class SchemaRegistryApiClientTest { } @Test - void testBuildSearchUri() { - client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL); + void testBuildSchemaArtifactUri() { + client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL, GROUP_ID); - final URI uri = client.buildSearchUri(SCHEMA_NAME); + final URI uri = client.buildSchemaArtifactUri(ARTIFACT_ID); - assertEquals(BASE_URL + API_PATH + SEARCH_PATH, uri.toString()); + assertEquals(BASE_URL + API_PATH + GROUP_PATH + ARTIFACT_PATH, uri.toString()); } @Test - void testBuildMetadataUri() { - client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL); + void testBuildSchemaVersionUri() { + client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL, GROUP_ID); - final URI uri = client.buildMetaDataUri(GROUP_ID, ARTIFACT_ID); + final URI uri = client.buildSchemaVersionUri(ARTIFACT_ID, VERSION); - assertEquals(BASE_URL + API_PATH + SCHEMA_PATH + METADATA_PATH, uri.toString()); + assertEquals(BASE_URL + API_PATH + GROUP_PATH + ARTIFACT_PATH + VERSION_PATH, uri.toString()); } - - @Test - void testBuildSchemaUri() { - client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL); - - final URI uri = client.buildSchemaUri(GROUP_ID, ARTIFACT_ID); - - assertEquals(BASE_URL + API_PATH + SCHEMA_PATH, uri.toString()); - } - } diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtilsTest.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtilsTest.java index 7b745f6344..cded22eb7f 100644 --- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtilsTest.java +++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtilsTest.java @@ -17,7 +17,6 @@ package org.apache.nifi.apicurio.schemaregistry.util; import org.apache.commons.io.IOUtils; -import org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils.ResultAttributes; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.record.RecordSchema; import org.junit.jupiter.api.Test; @@ -25,6 +24,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; +import java.util.OptionalInt; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -34,7 +34,7 @@ class SchemaUtilsTest { void testCreateRecordSchema() throws SchemaNotFoundException, IOException { final InputStream in = getResource("schema_response.json"); - final RecordSchema schema = SchemaUtils.createRecordSchema(in, "schema1", 3); + final RecordSchema schema = SchemaUtils.createRecordSchema(in, "schema1", OptionalInt.of(3)); assertEquals("schema1", schema.getSchemaName().orElseThrow(() -> new AssertionError("Schema Name is empty"))); assertEquals("schema_namespace_1", schema.getSchemaNamespace().orElseThrow(() -> new AssertionError("Schema Namespace is empty"))); @@ -46,25 +46,6 @@ class SchemaUtilsTest { assertEquals(expectedSchemaText, schema.getSchemaText().orElseThrow(() -> new AssertionError("Schema Text is empty"))); } - @Test - void testGetVersionAttribute() { - final InputStream in = getResource("metadata_response.json"); - - int version = SchemaUtils.extractVersionAttributeFromStream(in); - - assertEquals(3, version); - } - - @Test - void testGetResultAttributes() { - final InputStream in = getResource("search_response.json"); - - final ResultAttributes resultAttributes = SchemaUtils.getResultAttributes(in); - - final ResultAttributes expectedAttributes = new ResultAttributes("groupId1", "artifactId1", "schema1"); - assertEquals(expectedAttributes, resultAttributes); - } - private InputStream getResource(final String resourceName) { return this.getClass().getClassLoader().getResourceAsStream(resourceName); } diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/metadata_response.json b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/metadata_response.json deleted file mode 100644 index 96459721ca..0000000000 --- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/metadata_response.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "name": "schema1", - "createdBy": "", - "createdOn": "2023-10-16T14:19:21+0000", - "modifiedBy": "", - "modifiedOn": "2023-10-16T14:53:12+0000", - "id": "artifactId1", - "version": "3", - "type": "AVRO", - "globalId": 3, - "state": "ENABLED", - "groupId": "groupId1", - "contentId": 2, - "references": [] -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/schema_response_version_3.json b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/schema_response_version_3.json new file mode 100644 index 0000000000..9adfffeaa2 --- /dev/null +++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/schema_response_version_3.json @@ -0,0 +1,15 @@ +{ + "type": "record", + "namespace": "schema_namespace", + "name": "schema_version_3", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "age", + "type": "int" + } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/schema_response_version_latest.json b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/schema_response_version_latest.json new file mode 100644 index 0000000000..87f1e60e8d --- /dev/null +++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/schema_response_version_latest.json @@ -0,0 +1,15 @@ +{ + "type": "record", + "namespace": "schema_namespace", + "name": "schema_version_latest", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "age", + "type": "int" + } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/search_response.json b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/search_response.json deleted file mode 100644 index 5db50a7a15..0000000000 --- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/search_response.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "artifacts": [ - { - "id": "artifactId1", - "name": "schema1", - "createdOn": "2023-10-16T14:19:21+0000", - "createdBy": "", - "type": "AVRO", - "state": "ENABLED", - "modifiedOn": "2023-10-16T14:19:31+0000", - "modifiedBy": "", - "groupId": "groupId1" - } - ], - "count": 1 -} \ No newline at end of file