takraj commented on code in PR #7913:
URL: https://github.com/apache/nifi/pull/7913#discussion_r1372811693


##########
nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,101 @@
+nifi-airtable-nar

Review Comment:
   ```suggestion
   nifi-apicurio-nar
   ```



##########
nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,101 @@
+nifi-airtable-nar
+Copyright 2014-2023 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+**************************
+Apache Software License v2
+**************************
+
+  (ASLv2) Jackson Annotations
+    The following NOTICE information applies:
+      Jackson Annotations
+      Copyright (c) 2019 Tatu Saloranta
+
+  (ASLv2) Jackson Core
+    The following NOTICE information applies:
+      Jackson Core
+      Copyright (c) Tatu Saloranta
+
+  (ASLv2) Jackson Databind
+    The following NOTICE information applies:
+      Jackson Databind
+      Copyright (c) 2007- Tatu Saloranta, tatu.salora...@iki.fi
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.salora...@iki.fi), and 
has
+      been in development since 2007.
+
+      It is currently developed by a community of developers, as well as 
supported
+      commercially by FasterXML.com.
+
+  (ASLv2) Caffeine (com.github.ben-manes.caffeine:caffeine:jar:3.1.8 - 
https://github.com/ben-manes/caffeine)
+    The following NOTICE information applies:
+      Caffeine (caching library)
+      Copyright Ben Manes
+
+  (ASLv2) Error Prone Annotations
+    The following NOTICE information applies:
+      Error Prone Annotations
+      Copyright 2015 The Error Prone Authors
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Avro
+    The following NOTICE information applies:
+      Apache Avro
+      Copyright 2009-2017 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Compress
+    The following NOTICE information applies:
+      Apache Commons Compress
+      Copyright 2002-2014 The Apache Software Foundation
+
+      The files in the package org.apache.commons.compress.archivers.sevenz
+      were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/),
+      which has been placed in the public domain:
+
+      "LZMA SDK is placed in the public domain." 
(https://www.7-zip.org/sdk.html)
+
+  (ASLv2) Apache Commons CSV
+    The following NOTICE information applies:
+      Apache Commons CSV
+      Copyright 2005-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2017 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+  (ASLv2) Apache Commons Text
+    The following NOTICE information applies:
+      Apache Commons Text
+      Copyright 2001-2018 The Apache Software Foundation
+
+************************
+Eclipse Distribution License 1.0

Review Comment:
   Missing from `LICENSE` file.



##########
nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/util/SchemaUtilsTest.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils;
+import 
org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils.ResultAttributes;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class SchemaUtilsTest {
+
+    @Test
+    void testCreateRecordSchema() throws SchemaNotFoundException, IOException {
+        final InputStream in = getResource("schema_response.json");
+
+        final RecordSchema schema = SchemaUtils.createRecordSchema(in, 
"schema1", 3);
+
+        assertEquals("schema1", schema.getSchemaName().get());
+        assertEquals("schema_namespace_1", schema.getSchemaNamespace().get());
+        assertEquals("avro", schema.getSchemaFormat().get());
+        String expectedSchemaText = 
IOUtils.toString(getResource("schema_response.json"))

Review Comment:
   `IOUtils.toString(...)` without `Charset` seems to be deprecated.



##########
nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,101 @@
+nifi-airtable-nar
+Copyright 2014-2023 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+**************************
+Apache Software License v2
+**************************
+
+  (ASLv2) Jackson Annotations
+    The following NOTICE information applies:
+      Jackson Annotations
+      Copyright (c) 2019 Tatu Saloranta
+
+  (ASLv2) Jackson Core
+    The following NOTICE information applies:
+      Jackson Core
+      Copyright (c) Tatu Saloranta
+
+  (ASLv2) Jackson Databind
+    The following NOTICE information applies:
+      Jackson Databind
+      Copyright (c) 2007- Tatu Saloranta, tatu.salora...@iki.fi
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.salora...@iki.fi), and 
has
+      been in development since 2007.
+
+      It is currently developed by a community of developers, as well as 
supported
+      commercially by FasterXML.com.
+
+  (ASLv2) Caffeine (com.github.ben-manes.caffeine:caffeine:jar:3.1.8 - 
https://github.com/ben-manes/caffeine)
+    The following NOTICE information applies:
+      Caffeine (caching library)
+      Copyright Ben Manes
+
+  (ASLv2) Error Prone Annotations
+    The following NOTICE information applies:
+      Error Prone Annotations
+      Copyright 2015 The Error Prone Authors
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Avro
+    The following NOTICE information applies:
+      Apache Avro
+      Copyright 2009-2017 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Compress
+    The following NOTICE information applies:
+      Apache Commons Compress
+      Copyright 2002-2014 The Apache Software Foundation
+
+      The files in the package org.apache.commons.compress.archivers.sevenz
+      were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/),
+      which has been placed in the public domain:
+
+      "LZMA SDK is placed in the public domain." 
(https://www.7-zip.org/sdk.html)
+
+  (ASLv2) Apache Commons CSV
+    The following NOTICE information applies:
+      Apache Commons CSV
+      Copyright 2005-2016 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2017 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+  (ASLv2) Apache Commons Text
+    The following NOTICE information applies:
+      Apache Commons Text
+      Copyright 2001-2018 The Apache Software Foundation
+
+************************
+Eclipse Distribution License 1.0
+************************
+
+The following binary components are provided under the Eclipse Distribution 
License 1.0.
+
+    (EDL 1.0) Jakarta Activation API 
(jakarta.activation:jakarta.activation-api:jar:1.2.2)
+    (EDL 1.0) Jakarta XML Binding API 
(jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.3)
+
+************************
+The MIT License

Review Comment:
   Missing from `LICENSE` file.



##########
nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/util/SchemaUtilsTest.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils;
+import 
org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils.ResultAttributes;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class SchemaUtilsTest {
+
+    @Test
+    void testCreateRecordSchema() throws SchemaNotFoundException, IOException {
+        final InputStream in = getResource("schema_response.json");
+
+        final RecordSchema schema = SchemaUtils.createRecordSchema(in, 
"schema1", 3);
+
+        assertEquals("schema1", schema.getSchemaName().get());

Review Comment:
   Applies to the entire file:
   
   Here you are using an `Optional.get()`, which can throw NPE. I don't have a 
strong opinion about this, especially as because this is a test code, but IMHO 
it would be much cleaner, if you would at least add an `.orElse(...)`, or would 
assert on `isPresent()` before this line.



##########
nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/ApicurioSchemaRegistryClientTest.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.apicurio.schemaregistry;
+
+import org.apache.commons.io.IOUtils;
+import 
org.apache.nifi.apicurio.schemaregistry.client.ApicurioSchemaRegistryClient;
+import org.apache.nifi.apicurio.schemaregistry.client.SchemaRegistryApiClient;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+class ApicurioSchemaRegistryClientTest {
+
+    private static final String TEST_URL = 
"http://test.apicurio-schema-registry.com:8888";;
+    private static final String SCHEMA_NAME = "schema1";
+    private static final String SEARCH_URL = TEST_URL + "/search";
+    private static final String METADATA_URL = TEST_URL + "/meta";
+    private static final String SCHEMA_URL = TEST_URL + "/schema";
+    private static final String GROUP_ID = "groupId1";
+    private static final String ARTIFACT_ID = "artifactId1";
+    @Mock
+    private SchemaRegistryApiClient apiClient;
+    private ApicurioSchemaRegistryClient schemaRegistryClient;
+
+    @BeforeEach
+    void setup() {
+        
doReturn(URI.create(SEARCH_URL)).when(apiClient).buildSearchUri(SCHEMA_NAME);
+        
doReturn(URI.create(SCHEMA_URL)).when(apiClient).buildSchemaUri(GROUP_ID, 
ARTIFACT_ID);
+        
doReturn(getResource("search_response.json")).when(apiClient).retrieveResponse(URI.create(SEARCH_URL));
+        
doReturn(getResource("schema_response.json")).when(apiClient).retrieveResponse(URI.create(SCHEMA_URL));
+    }
+
+    @Test
+    void testGetSchemaWithNameInvokesApiClientAndReturnsRecordSchema() throws 
IOException, SchemaNotFoundException {
+        
doReturn(URI.create(METADATA_URL)).when(apiClient).buildMetaDataUri(GROUP_ID, 
ARTIFACT_ID);
+        
doReturn(getResource("metadata_response.json")).when(apiClient).retrieveResponse(URI.create(METADATA_URL));
+
+        schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient);
+
+        final RecordSchema schema = 
schemaRegistryClient.getSchema(SCHEMA_NAME);
+
+        verify(apiClient).buildSearchUri(SCHEMA_NAME);
+        verify(apiClient).buildMetaDataUri(GROUP_ID, ARTIFACT_ID);
+        verify(apiClient).buildSchemaUri(GROUP_ID, ARTIFACT_ID);
+
+        final String expectedSchemaText = 
IOUtils.toString(getResource("schema_response.json"))

Review Comment:
   `IOUtils.toString(...)` without `Charset` seems to be a depreceated 
interface.



##########
nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/ApicurioSchemaRegistryClientTest.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.apicurio.schemaregistry;
+
+import org.apache.commons.io.IOUtils;
+import 
org.apache.nifi.apicurio.schemaregistry.client.ApicurioSchemaRegistryClient;
+import org.apache.nifi.apicurio.schemaregistry.client.SchemaRegistryApiClient;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+class ApicurioSchemaRegistryClientTest {
+
+    private static final String TEST_URL = 
"http://test.apicurio-schema-registry.com:8888";;
+    private static final String SCHEMA_NAME = "schema1";
+    private static final String SEARCH_URL = TEST_URL + "/search";
+    private static final String METADATA_URL = TEST_URL + "/meta";
+    private static final String SCHEMA_URL = TEST_URL + "/schema";
+    private static final String GROUP_ID = "groupId1";
+    private static final String ARTIFACT_ID = "artifactId1";
+    @Mock
+    private SchemaRegistryApiClient apiClient;
+    private ApicurioSchemaRegistryClient schemaRegistryClient;
+
+    @BeforeEach
+    void setup() {
+        
doReturn(URI.create(SEARCH_URL)).when(apiClient).buildSearchUri(SCHEMA_NAME);
+        
doReturn(URI.create(SCHEMA_URL)).when(apiClient).buildSchemaUri(GROUP_ID, 
ARTIFACT_ID);
+        
doReturn(getResource("search_response.json")).when(apiClient).retrieveResponse(URI.create(SEARCH_URL));
+        
doReturn(getResource("schema_response.json")).when(apiClient).retrieveResponse(URI.create(SCHEMA_URL));
+    }
+
+    @Test
+    void testGetSchemaWithNameInvokesApiClientAndReturnsRecordSchema() throws 
IOException, SchemaNotFoundException {
+        
doReturn(URI.create(METADATA_URL)).when(apiClient).buildMetaDataUri(GROUP_ID, 
ARTIFACT_ID);
+        
doReturn(getResource("metadata_response.json")).when(apiClient).retrieveResponse(URI.create(METADATA_URL));
+
+        schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient);
+
+        final RecordSchema schema = 
schemaRegistryClient.getSchema(SCHEMA_NAME);
+
+        verify(apiClient).buildSearchUri(SCHEMA_NAME);
+        verify(apiClient).buildMetaDataUri(GROUP_ID, ARTIFACT_ID);
+        verify(apiClient).buildSchemaUri(GROUP_ID, ARTIFACT_ID);
+
+        final String expectedSchemaText = 
IOUtils.toString(getResource("schema_response.json"))
+                .replace("\n", "")
+                .replaceAll(" +", "");
+        assertEquals(expectedSchemaText, schema.getSchemaText().get());
+        assertEquals(expectedSchemaText, schema.getSchemaText().get());
+    }
+
+    @Test
+    void 
testGetSchemaWithNameAndVersionInvokesApiClientAndReturnsRecordSchema() throws 
IOException, SchemaNotFoundException {
+        schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient);
+
+        final RecordSchema schema = 
schemaRegistryClient.getSchema(SCHEMA_NAME, 3);
+
+        verify(apiClient).buildSearchUri(SCHEMA_NAME);
+        verify(apiClient, never()).buildMetaDataUri(GROUP_ID, ARTIFACT_ID);
+        verify(apiClient).buildSchemaUri(GROUP_ID, ARTIFACT_ID);
+
+        final String expectedSchemaText = 
IOUtils.toString(getResource("schema_response.json"))

Review Comment:
   `IOUtils.toString(...)` without `Charset` seems to be a depreceated 
interface.



##########
nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/ApicurioSchemaRegistry.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.apicurio.schemaregistry;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import 
org.apache.nifi.apicurio.schemaregistry.client.ApicurioSchemaRegistryClient;
+import 
org.apache.nifi.apicurio.schemaregistry.client.CachingSchemaRegistryClient;
+import org.apache.nifi.apicurio.schemaregistry.client.SchemaRegistryApiClient;
+import org.apache.nifi.apicurio.schemaregistry.client.SchemaRegistryClient;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"schema", "registry", "apicurio", "avro"})
+@CapabilityDescription("Provides a Schema Registry that interacts with the 
Apicurio Schema Registry so that those Schemas that are stored in the Apicurio 
Schema "
+        + "Registry can be used in NiFi. When a Schema is looked up by name by 
this registry, it will find a Schema in the Apicurio Schema Registry with their 
names.")
+public class ApicurioSchemaRegistry extends AbstractControllerService 
implements SchemaRegistry {
+
+    private static final Set<SchemaField> schemaFields = 
EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT,
+            SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, 
SchemaField.SCHEMA_VERSION);
+
+    static final PropertyDescriptor SCHEMA_REGISTRY_URL = new 
PropertyDescriptor.Builder()
+            .name("schema-registry-url")
+            .displayName("Schema Registry URL")
+            .description("The URL of the Schema Registry e.g. 
http://localhost:8080";)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .required(true)
+            .build();
+    static final PropertyDescriptor CACHE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("cache-size")
+            .displayName("Cache Size")
+            .description("Specifies how many Schemas should be cached from the 
Schema Registry")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)

Review Comment:
   AFAIK `NON_NEGATIVE_INTEGER_VALIDATOR` validates zero (0) value. Is that 
really a valid value for 'Cache Size'? If yes, it might worth it to mention its 
meaning in the description.



##########
nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtils.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.apicurio.schemaregistry.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+
+public class SchemaUtils {
+    private static final ObjectMapper OBJECT_MAPPER = setObjectMapper();
+
+    private SchemaUtils() {
+    }
+
+    public static RecordSchema createRecordSchema(final InputStream 
schemaStream, final String name, final int version) throws 
SchemaNotFoundException, IOException {
+        final JsonNode schemaNode = OBJECT_MAPPER.readTree(schemaStream);
+        final String schemaText = schemaNode.toString();
+
+        try {
+            final Schema avroSchema = new Schema.Parser().parse(schemaText);
+            final SchemaIdentifier schemaId = SchemaIdentifier.builder()
+                    .name(name)
+                    .version(version)
+                    .build();
+            return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
+        } catch (final SchemaParseException spe) {
+            throw new SchemaNotFoundException("Obtained Schema with name " + 
name
+                    + " from Apicurio Schema Registry but the Schema Text that 
was returned is not a valid Avro Schema");
+        }
+    }
+
+    public static int getVersionAttribute(InputStream in) {
+        final JsonNode metadataNode;
+        try {
+            metadataNode = OBJECT_MAPPER.readTree(in);
+        } catch (IOException e) {
+            throw new UncheckedIOException("Failed to read HTTP response input 
stream", e);

Review Comment:
   Please use `ProcessException` instead of generic, unchecked exceptions.



##########
nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClient.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.apicurio.schemaregistry.client;
+
+import 
org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils.ResultAttributes;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import static 
org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils.createRecordSchema;
+import static 
org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils.getResultAttributes;
+import static 
org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils.getVersionAttribute;
+
+public class ApicurioSchemaRegistryClient implements SchemaRegistryClient {
+    private final SchemaRegistryApiClient apiClient;
+
+    public ApicurioSchemaRegistryClient(SchemaRegistryApiClient apiClient) {
+        this.apiClient = apiClient;
+    }
+
+    @Override
+    public RecordSchema getSchema(String schemaName) throws IOException, 
SchemaNotFoundException {
+        final URI searchUri = apiClient.buildSearchUri(schemaName);
+        final InputStream searchResultStream = 
apiClient.retrieveResponse(searchUri);

Review Comment:
   `InputStream`s should be closed after they are not needed anymore. Both of 
these two methods work on a number of such streams, but they never explicitly 
close them.



##########
nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtils.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.apicurio.schemaregistry.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+
+public class SchemaUtils {
+    private static final ObjectMapper OBJECT_MAPPER = setObjectMapper();
+
+    private SchemaUtils() {
+    }
+
+    public static RecordSchema createRecordSchema(final InputStream 
schemaStream, final String name, final int version) throws 
SchemaNotFoundException, IOException {
+        final JsonNode schemaNode = OBJECT_MAPPER.readTree(schemaStream);
+        final String schemaText = schemaNode.toString();
+
+        try {
+            final Schema avroSchema = new Schema.Parser().parse(schemaText);
+            final SchemaIdentifier schemaId = SchemaIdentifier.builder()
+                    .name(name)
+                    .version(version)
+                    .build();
+            return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
+        } catch (final SchemaParseException spe) {
+            throw new SchemaNotFoundException("Obtained Schema with name " + 
name
+                    + " from Apicurio Schema Registry but the Schema Text that 
was returned is not a valid Avro Schema");
+        }
+    }
+
+    public static int getVersionAttribute(InputStream in) {
+        final JsonNode metadataNode;
+        try {
+            metadataNode = OBJECT_MAPPER.readTree(in);
+        } catch (IOException e) {
+            throw new UncheckedIOException("Failed to read HTTP response input 
stream", e);
+        }
+        return Integer.parseInt(metadataNode.get("version").asText());
+    }
+
+    public static ResultAttributes getResultAttributes(InputStream in) {
+        final JsonNode jsonNode;
+        try {
+            jsonNode = OBJECT_MAPPER.readTree(in);
+        } catch (IOException e) {
+            throw new UncheckedIOException("Failed to read HTTP response input 
stream", e);

Review Comment:
   Please use `ProcessException` instead of generic, unchecked exceptions.



##########
nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/ApicurioSchemaRegistryClientTest.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.apicurio.schemaregistry;
+
+import org.apache.commons.io.IOUtils;
+import 
org.apache.nifi.apicurio.schemaregistry.client.ApicurioSchemaRegistryClient;
+import org.apache.nifi.apicurio.schemaregistry.client.SchemaRegistryApiClient;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+class ApicurioSchemaRegistryClientTest {
+
+    private static final String TEST_URL = 
"http://test.apicurio-schema-registry.com:8888";;
+    private static final String SCHEMA_NAME = "schema1";
+    private static final String SEARCH_URL = TEST_URL + "/search";
+    private static final String METADATA_URL = TEST_URL + "/meta";
+    private static final String SCHEMA_URL = TEST_URL + "/schema";
+    private static final String GROUP_ID = "groupId1";
+    private static final String ARTIFACT_ID = "artifactId1";
+    @Mock
+    private SchemaRegistryApiClient apiClient;
+    private ApicurioSchemaRegistryClient schemaRegistryClient;
+
+    @BeforeEach
+    void setup() {
+        
doReturn(URI.create(SEARCH_URL)).when(apiClient).buildSearchUri(SCHEMA_NAME);
+        
doReturn(URI.create(SCHEMA_URL)).when(apiClient).buildSchemaUri(GROUP_ID, 
ARTIFACT_ID);
+        
doReturn(getResource("search_response.json")).when(apiClient).retrieveResponse(URI.create(SEARCH_URL));
+        
doReturn(getResource("schema_response.json")).when(apiClient).retrieveResponse(URI.create(SCHEMA_URL));
+    }
+
+    @Test
+    void testGetSchemaWithNameInvokesApiClientAndReturnsRecordSchema() throws 
IOException, SchemaNotFoundException {
+        
doReturn(URI.create(METADATA_URL)).when(apiClient).buildMetaDataUri(GROUP_ID, 
ARTIFACT_ID);
+        
doReturn(getResource("metadata_response.json")).when(apiClient).retrieveResponse(URI.create(METADATA_URL));
+
+        schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient);
+
+        final RecordSchema schema = 
schemaRegistryClient.getSchema(SCHEMA_NAME);
+
+        verify(apiClient).buildSearchUri(SCHEMA_NAME);
+        verify(apiClient).buildMetaDataUri(GROUP_ID, ARTIFACT_ID);
+        verify(apiClient).buildSchemaUri(GROUP_ID, ARTIFACT_ID);
+
+        final String expectedSchemaText = 
IOUtils.toString(getResource("schema_response.json"))
+                .replace("\n", "")
+                .replaceAll(" +", "");
+        assertEquals(expectedSchemaText, schema.getSchemaText().get());

Review Comment:
   See my comment on nullness check in `SchemaUtilsTest.java`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to