exceptionfactory commented on code in PR #8005:
URL: https://github.com/apache/nifi/pull/8005#discussion_r1416003586


##########
nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/schema/access/JsonSchema.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import java.util.Objects;
+
+public class JsonSchema {
+    private final SchemaVersion schemaDraftVersion;

Review Comment:
   Recommend renaming this to just `schemaVersion` removing `draft` from the 
name.
   ```suggestion
       private final SchemaVersion schemaVersion;
   ```



##########
nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/schema/access/JsonSchema.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import java.util.Objects;
+
+public class JsonSchema {
+    private final SchemaVersion schemaDraftVersion;
+    private final String schemaText;
+
+    public JsonSchema(SchemaVersion schemaDraftVersion, String schemaText) {
+        Objects.requireNonNull(schemaDraftVersion, "Schema draft version 
cannot be null");
+        Objects.requireNonNull(schemaText, "The text of the schema cannot be 
null");
+        this.schemaDraftVersion = schemaDraftVersion;
+        this.schemaText = schemaText;
+    }
+
+    public SchemaVersion getSchemaDraftVersion() {

Review Comment:
   ```suggestion
       public SchemaVersion getSchemaVersion() {
   ```



##########
nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import com.networknt.schema.JsonSchemaFactory;
+import com.networknt.schema.SpecVersion;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.access.SchemaVersion;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+@Tags({"schema", "registry", "json"})
+@CapabilityDescription("Provides a service for registering and accessing 
schemas. One can register a schema "
+        + "as a dynamic property where 'name' represents the schema name and 
'value' represents the textual "
+        + "representation of the actual schema following the syntax and 
semantics of JSON's Schema format. "
+        + "Empty schemas and schemas only consisting of whitespace are not 
acceptable schemas."
+        + "The registry is heterogeneous registry as it can store schemas of 
multiple schema draft versions. "
+        + "By default the registry is configured to store schemas of Draft 
2020-12. When a schema is added, the version "
+        + "which is currently is set, is what the schema is saved as.")
+@DynamicProperty(name = "Schema name", value = "Schema Content",
+        description = "Adds a named schema using the JSON string 
representation of a JSON schema",
+        expressionLanguageScope = ExpressionLanguageScope.NONE)
+public class InMemoryJsonSchemaRegistry extends AbstractControllerService 
implements JsonSchemaRegistry, JsonSchemaRegistryComponent {
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.singletonList(SCHEMA_VERSION);
+
+    private final ConcurrentMap<String, JsonSchema> jsonSchemas;
+    private final ConcurrentMap<SchemaVersion, JsonSchemaFactory> 
schemaFactories;
+    private volatile SchemaVersion schemaVersion;
+
+    public InMemoryJsonSchemaRegistry() {
+        jsonSchemas = new ConcurrentHashMap<>();
+        schemaFactories = Arrays.stream(SchemaVersion.values())
+                .collect(Collectors.toConcurrentMap(Function.identity(),
+                        schemaDraftVersion -> 
JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get())));
+        schemaVersion = 
SchemaVersion.valueOf(SCHEMA_VERSION.getDefaultValue());
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (SCHEMA_VERSION.getName().equals(descriptor.getName()) && 
!newValue.equals(oldValue)) {
+            schemaVersion = SchemaVersion.valueOf(newValue);
+        } else if(descriptor.isDynamic() && isBlank(newValue)) {
+            jsonSchemas.remove(descriptor.getName());
+        } else if (descriptor.isDynamic() && isNotBlank(newValue)) {
+            try {
+                final String schemaName = descriptor.getName();
+                final JsonSchemaFactory jsonSchemaFactory = 
schemaFactories.get(schemaVersion);
+                jsonSchemaFactory.getSchema(newValue);
+                jsonSchemas.put(schemaName, new JsonSchema(schemaVersion, 
newValue));
+            } catch (final Exception e) {
+                // not a problem - the service won't be valid and the 
validation message will indicate what is wrong.

Review Comment:
   Perhaps log this at the debug level for potential troubleshooting.



##########
nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import com.networknt.schema.JsonSchemaFactory;
+import com.networknt.schema.SpecVersion;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.access.SchemaVersion;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+@Tags({"schema", "registry", "json"})
+@CapabilityDescription("Provides a service for registering and accessing 
schemas. One can register a schema "
+        + "as a dynamic property where 'name' represents the schema name and 
'value' represents the textual "
+        + "representation of the actual schema following the syntax and 
semantics of JSON's Schema format. "
+        + "Empty schemas and schemas only consisting of whitespace are not 
acceptable schemas."
+        + "The registry is heterogeneous registry as it can store schemas of 
multiple schema draft versions. "
+        + "By default the registry is configured to store schemas of Draft 
2020-12. When a schema is added, the version "
+        + "which is currently is set, is what the schema is saved as.")
+@DynamicProperty(name = "Schema name", value = "Schema Content",

Review Comment:
   ```suggestion
   @DynamicProperty(name = "Schema Name", value = "Schema Content",
   ```



##########
nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import com.networknt.schema.JsonSchemaFactory;
+import com.networknt.schema.SpecVersion;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.access.SchemaVersion;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+@Tags({"schema", "registry", "json"})
+@CapabilityDescription("Provides a service for registering and accessing 
schemas. One can register a schema "
+        + "as a dynamic property where 'name' represents the schema name and 
'value' represents the textual "
+        + "representation of the actual schema following the syntax and 
semantics of JSON's Schema format. "
+        + "Empty schemas and schemas only consisting of whitespace are not 
acceptable schemas."
+        + "The registry is heterogeneous registry as it can store schemas of 
multiple schema draft versions. "
+        + "By default the registry is configured to store schemas of Draft 
2020-12. When a schema is added, the version "
+        + "which is currently is set, is what the schema is saved as.")
+@DynamicProperty(name = "Schema name", value = "Schema Content",
+        description = "Adds a named schema using the JSON string 
representation of a JSON schema",
+        expressionLanguageScope = ExpressionLanguageScope.NONE)
+public class InMemoryJsonSchemaRegistry extends AbstractControllerService 
implements JsonSchemaRegistry, JsonSchemaRegistryComponent {
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.singletonList(SCHEMA_VERSION);
+
+    private final ConcurrentMap<String, JsonSchema> jsonSchemas;
+    private final ConcurrentMap<SchemaVersion, JsonSchemaFactory> 
schemaFactories;
+    private volatile SchemaVersion schemaVersion;
+
+    public InMemoryJsonSchemaRegistry() {
+        jsonSchemas = new ConcurrentHashMap<>();
+        schemaFactories = Arrays.stream(SchemaVersion.values())
+                .collect(Collectors.toConcurrentMap(Function.identity(),
+                        schemaDraftVersion -> 
JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get())));
+        schemaVersion = 
SchemaVersion.valueOf(SCHEMA_VERSION.getDefaultValue());
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (SCHEMA_VERSION.getName().equals(descriptor.getName()) && 
!newValue.equals(oldValue)) {
+            schemaVersion = SchemaVersion.valueOf(newValue);
+        } else if(descriptor.isDynamic() && isBlank(newValue)) {
+            jsonSchemas.remove(descriptor.getName());
+        } else if (descriptor.isDynamic() && isNotBlank(newValue)) {
+            try {
+                final String schemaName = descriptor.getName();
+                final JsonSchemaFactory jsonSchemaFactory = 
schemaFactories.get(schemaVersion);
+                jsonSchemaFactory.getSchema(newValue);
+                jsonSchemas.put(schemaName, new JsonSchema(schemaVersion, 
newValue));
+            } catch (final Exception e) {
+                // not a problem - the service won't be valid and the 
validation message will indicate what is wrong.
+            }
+        }
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final Set<ValidationResult> results = new HashSet<>();
+
+        final boolean noEnteredSchema = 
validationContext.getProperties().keySet().stream()
+                .noneMatch(PropertyDescriptor::isDynamic);
+        if(noEnteredSchema) {
+            results.add(new ValidationResult.Builder()
+                    .subject("Supported Dynamic Property Descriptor")
+                    .valid(false)
+                    .explanation("There must be at least one JSON schema 
specified")
+                    .build());
+        } else {
+            // Iterate over dynamic properties, validating only newly added 
schemas, and adding results
+            schemaVersion = 
SchemaVersion.valueOf(validationContext.getProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION).getValue());
+            validationContext.getProperties().entrySet().stream()
+                    .filter(entry -> entry.getKey().isDynamic() && 
!jsonSchemas.containsKey(entry.getKey().getName()))
+                    .forEach(entry -> {
+                        String subject = entry.getKey().getName();
+                        String input = entry.getValue();
+                        if (isNotBlank(input)) {
+                            try {
+                                final JsonSchemaFactory jsonSchemaFactory = 
schemaFactories.get(schemaVersion);
+                                jsonSchemaFactory.getSchema(input);
+                            } catch (Exception e) {
+                                results.add(new ValidationResult.Builder()
+                                        .input(input)
+                                        .subject(subject)
+                                        .valid(false)
+                                        .explanation("Not a valid JSON Schema: 
" + e.getMessage())
+                                        .build());
+                            }
+                        }
+                    });
+        }
+
+        return results;
+    }
+
+    @Override
+    public JsonSchema retrieveSchema(final String schemaName) throws 
SchemaNotFoundException {
+        if(!jsonSchemas.containsKey(schemaName)) {
+            throw new SchemaNotFoundException("Unable to find schema with name 
'" + schemaName + "'");
+        }
+        return jsonSchemas.get(schemaName);

Review Comment:
   It seems like this could be simplified to call 
`jsonSchemas.get(schemaName)`, and if the return value is `null`, throw the 
exception. It is minor, but makes it simpler with just one call to the Map.



##########
nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import com.networknt.schema.JsonSchemaFactory;
+import com.networknt.schema.SpecVersion;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.access.SchemaVersion;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+@Tags({"schema", "registry", "json"})
+@CapabilityDescription("Provides a service for registering and accessing 
schemas. One can register a schema "
+        + "as a dynamic property where 'name' represents the schema name and 
'value' represents the textual "
+        + "representation of the actual schema following the syntax and 
semantics of JSON's Schema format. "
+        + "Empty schemas and schemas only consisting of whitespace are not 
acceptable schemas."
+        + "The registry is heterogeneous registry as it can store schemas of 
multiple schema draft versions. "
+        + "By default the registry is configured to store schemas of Draft 
2020-12. When a schema is added, the version "
+        + "which is currently is set, is what the schema is saved as.")
+@DynamicProperty(name = "Schema name", value = "Schema Content",
+        description = "Adds a named schema using the JSON string 
representation of a JSON schema",
+        expressionLanguageScope = ExpressionLanguageScope.NONE)
+public class InMemoryJsonSchemaRegistry extends AbstractControllerService 
implements JsonSchemaRegistry, JsonSchemaRegistryComponent {

Review Comment:
   It seems like a different name would be better for this implementation. The 
JSON Schema definitions are available in memory, but they persist in the flow 
configuration based on properties. One option might be 
`DynamicPropertyJsonSchemaRegistry`, but that may not be optimal. Perhaps 
`PropertySourceJsonSchemaRegistry`, `PropertyValueJsonSchemaRegistry`, or 
`PropertyConfiguredJsonSchemaRegistry`? Any other ideas?



##########
nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import com.networknt.schema.JsonSchemaFactory;
+import com.networknt.schema.SpecVersion;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.access.SchemaVersion;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+@Tags({"schema", "registry", "json"})
+@CapabilityDescription("Provides a service for registering and accessing 
schemas. One can register a schema "
+        + "as a dynamic property where 'name' represents the schema name and 
'value' represents the textual "
+        + "representation of the actual schema following the syntax and 
semantics of JSON's Schema format. "
+        + "Empty schemas and schemas only consisting of whitespace are not 
acceptable schemas."
+        + "The registry is heterogeneous registry as it can store schemas of 
multiple schema draft versions. "

Review Comment:
   ```suggestion
           + "The registry is heterogeneous as it can store schemas of 
different schema draft versions. "
   ```



##########
nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml:
##########
@@ -52,5 +57,15 @@ language governing permissions and limitations under the 
License. -->
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.networknt</groupId>
+            <artifactId>json-schema-validator</artifactId>
+            <version>1.0.86</version>

Review Comment:
   The latest version is now 1.0.87.
   ```suggestion
               <version>1.0.87</version>
   ```



##########
nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import com.networknt.schema.JsonSchemaFactory;
+import com.networknt.schema.SpecVersion;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.access.SchemaVersion;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+@Tags({"schema", "registry", "json"})
+@CapabilityDescription("Provides a service for registering and accessing 
schemas. One can register a schema "
+        + "as a dynamic property where 'name' represents the schema name and 
'value' represents the textual "
+        + "representation of the actual schema following the syntax and 
semantics of JSON's Schema format. "
+        + "Empty schemas and schemas only consisting of whitespace are not 
acceptable schemas."
+        + "The registry is heterogeneous registry as it can store schemas of 
multiple schema draft versions. "
+        + "By default the registry is configured to store schemas of Draft 
2020-12. When a schema is added, the version "
+        + "which is currently is set, is what the schema is saved as.")
+@DynamicProperty(name = "Schema name", value = "Schema Content",
+        description = "Adds a named schema using the JSON string 
representation of a JSON schema",
+        expressionLanguageScope = ExpressionLanguageScope.NONE)
+public class InMemoryJsonSchemaRegistry extends AbstractControllerService 
implements JsonSchemaRegistry, JsonSchemaRegistryComponent {
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.singletonList(SCHEMA_VERSION);
+
+    private final ConcurrentMap<String, JsonSchema> jsonSchemas;
+    private final ConcurrentMap<SchemaVersion, JsonSchemaFactory> 
schemaFactories;
+    private volatile SchemaVersion schemaVersion;
+
+    public InMemoryJsonSchemaRegistry() {
+        jsonSchemas = new ConcurrentHashMap<>();
+        schemaFactories = Arrays.stream(SchemaVersion.values())
+                .collect(Collectors.toConcurrentMap(Function.identity(),
+                        schemaDraftVersion -> 
JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get())));
+        schemaVersion = 
SchemaVersion.valueOf(SCHEMA_VERSION.getDefaultValue());
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (SCHEMA_VERSION.getName().equals(descriptor.getName()) && 
!newValue.equals(oldValue)) {
+            schemaVersion = SchemaVersion.valueOf(newValue);
+        } else if(descriptor.isDynamic() && isBlank(newValue)) {
+            jsonSchemas.remove(descriptor.getName());
+        } else if (descriptor.isDynamic() && isNotBlank(newValue)) {
+            try {
+                final String schemaName = descriptor.getName();
+                final JsonSchemaFactory jsonSchemaFactory = 
schemaFactories.get(schemaVersion);
+                jsonSchemaFactory.getSchema(newValue);
+                jsonSchemas.put(schemaName, new JsonSchema(schemaVersion, 
newValue));
+            } catch (final Exception e) {
+                // not a problem - the service won't be valid and the 
validation message will indicate what is wrong.
+            }
+        }
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final Set<ValidationResult> results = new HashSet<>();
+
+        final boolean noEnteredSchema = 
validationContext.getProperties().keySet().stream()
+                .noneMatch(PropertyDescriptor::isDynamic);
+        if(noEnteredSchema) {

Review Comment:
   Spacing:
   ```suggestion
           if (noEnteredSchema) {
   ```



##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java:
##########
@@ -221,4 +292,28 @@ private static String getFileContent(final String 
filename) {
     private static String getRelativeResourcePath(final String filename) {
         return String.format("/%s/%s", TestValidateJson.class.getSimpleName(), 
filename);
     }
+
+    private static class SampleJsonSchemaRegistry extends 
AbstractControllerService implements JsonSchemaRegistry {
+        private final String identifier;
+        private final String schemaName;
+
+        public SampleJsonSchemaRegistry(String identifier, String schemaName) {
+            this.identifier = identifier;
+            this.schemaName = schemaName;
+        }
+
+        @Override
+        public String getIdentifier() {
+            return identifier;
+        }
+
+        @Override
+        public JsonSchema retrieveSchema(String schemaName) throws 
SchemaNotFoundException {
+            if(StringUtils.isNotBlank(schemaName) && 
this.schemaName.equals(schemaName)) {

Review Comment:
   It looks like the `isNotBlank()` check is unnecessary, since 
`this.schemaName` is always defined.
   ```suggestion
               if (this.schemaName.equals(schemaName)) {
   ```



##########
nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import com.networknt.schema.JsonSchemaFactory;
+import com.networknt.schema.SpecVersion;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.access.SchemaVersion;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+@Tags({"schema", "registry", "json"})
+@CapabilityDescription("Provides a service for registering and accessing 
schemas. One can register a schema "
+        + "as a dynamic property where 'name' represents the schema name and 
'value' represents the textual "
+        + "representation of the actual schema following the syntax and 
semantics of JSON's Schema format. "

Review Comment:
   ```suggestion
           + "representation of the actual schema following the syntax and 
semantics of the JSON Schema format. "
   ```



##########
nifi-commons/nifi-json-schema-shared/src/main/java/org/apache/nifi/schema/access/JsonSchemaRegistryComponent.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.components.PropertyDescriptor;
+
+public interface JsonSchemaRegistryComponent {
+    PropertyDescriptor SCHEMA_VERSION = new PropertyDescriptor
+        .Builder().name("Schema Version")
+        .displayName("JSON Draft Schema Version")

Review Comment:
   The name and displayName should match.
   ```suggestion
           .Builder()
           .name("JSON Schema Version")
           .displayName("JSON Schema Version")
   ```



##########
nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import com.networknt.schema.JsonSchemaFactory;
+import com.networknt.schema.SpecVersion;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.access.SchemaVersion;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+@Tags({"schema", "registry", "json"})
+@CapabilityDescription("Provides a service for registering and accessing 
schemas. One can register a schema "
+        + "as a dynamic property where 'name' represents the schema name and 
'value' represents the textual "
+        + "representation of the actual schema following the syntax and 
semantics of JSON's Schema format. "
+        + "Empty schemas and schemas only consisting of whitespace are not 
acceptable schemas."
+        + "The registry is heterogeneous registry as it can store schemas of 
multiple schema draft versions. "
+        + "By default the registry is configured to store schemas of Draft 
2020-12. When a schema is added, the version "
+        + "which is currently is set, is what the schema is saved as.")
+@DynamicProperty(name = "Schema name", value = "Schema Content",
+        description = "Adds a named schema using the JSON string 
representation of a JSON schema",
+        expressionLanguageScope = ExpressionLanguageScope.NONE)
+public class InMemoryJsonSchemaRegistry extends AbstractControllerService 
implements JsonSchemaRegistry, JsonSchemaRegistryComponent {
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.singletonList(SCHEMA_VERSION);
+
+    private final ConcurrentMap<String, JsonSchema> jsonSchemas;
+    private final ConcurrentMap<SchemaVersion, JsonSchemaFactory> 
schemaFactories;
+    private volatile SchemaVersion schemaVersion;
+
+    public InMemoryJsonSchemaRegistry() {
+        jsonSchemas = new ConcurrentHashMap<>();
+        schemaFactories = Arrays.stream(SchemaVersion.values())
+                .collect(Collectors.toConcurrentMap(Function.identity(),
+                        schemaDraftVersion -> 
JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get())));
+        schemaVersion = 
SchemaVersion.valueOf(SCHEMA_VERSION.getDefaultValue());
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (SCHEMA_VERSION.getName().equals(descriptor.getName()) && 
!newValue.equals(oldValue)) {
+            schemaVersion = SchemaVersion.valueOf(newValue);
+        } else if(descriptor.isDynamic() && isBlank(newValue)) {
+            jsonSchemas.remove(descriptor.getName());
+        } else if (descriptor.isDynamic() && isNotBlank(newValue)) {
+            try {
+                final String schemaName = descriptor.getName();
+                final JsonSchemaFactory jsonSchemaFactory = 
schemaFactories.get(schemaVersion);
+                jsonSchemaFactory.getSchema(newValue);
+                jsonSchemas.put(schemaName, new JsonSchema(schemaVersion, 
newValue));
+            } catch (final Exception e) {
+                // not a problem - the service won't be valid and the 
validation message will indicate what is wrong.
+            }
+        }
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final Set<ValidationResult> results = new HashSet<>();
+
+        final boolean noEnteredSchema = 
validationContext.getProperties().keySet().stream()

Review Comment:
   Perhaps name this `noSchemasConfigured`?



##########
nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import com.networknt.schema.JsonSchemaFactory;
+import com.networknt.schema.SpecVersion;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.access.SchemaVersion;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+@Tags({"schema", "registry", "json"})
+@CapabilityDescription("Provides a service for registering and accessing 
schemas. One can register a schema "

Review Comment:
   ```suggestion
   @CapabilityDescription("Provides a service for registering and accessing 
JSON schemas. One can register a schema "
   ```



##########
nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/InMemoryJsonSchemaRegistry.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import com.networknt.schema.JsonSchemaFactory;
+import com.networknt.schema.SpecVersion;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.access.SchemaVersion;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+@Tags({"schema", "registry", "json"})
+@CapabilityDescription("Provides a service for registering and accessing 
schemas. One can register a schema "
+        + "as a dynamic property where 'name' represents the schema name and 
'value' represents the textual "
+        + "representation of the actual schema following the syntax and 
semantics of JSON's Schema format. "
+        + "Empty schemas and schemas only consisting of whitespace are not 
acceptable schemas."
+        + "The registry is heterogeneous registry as it can store schemas of 
multiple schema draft versions. "
+        + "By default the registry is configured to store schemas of Draft 
2020-12. When a schema is added, the version "
+        + "which is currently is set, is what the schema is saved as.")
+@DynamicProperty(name = "Schema name", value = "Schema Content",
+        description = "Adds a named schema using the JSON string 
representation of a JSON schema",
+        expressionLanguageScope = ExpressionLanguageScope.NONE)
+public class InMemoryJsonSchemaRegistry extends AbstractControllerService 
implements JsonSchemaRegistry, JsonSchemaRegistryComponent {
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.singletonList(SCHEMA_VERSION);
+
+    private final ConcurrentMap<String, JsonSchema> jsonSchemas;
+    private final ConcurrentMap<SchemaVersion, JsonSchemaFactory> 
schemaFactories;
+    private volatile SchemaVersion schemaVersion;
+
+    public InMemoryJsonSchemaRegistry() {
+        jsonSchemas = new ConcurrentHashMap<>();
+        schemaFactories = Arrays.stream(SchemaVersion.values())
+                .collect(Collectors.toConcurrentMap(Function.identity(),
+                        schemaDraftVersion -> 
JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get())));
+        schemaVersion = 
SchemaVersion.valueOf(SCHEMA_VERSION.getDefaultValue());
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (SCHEMA_VERSION.getName().equals(descriptor.getName()) && 
!newValue.equals(oldValue)) {
+            schemaVersion = SchemaVersion.valueOf(newValue);
+        } else if(descriptor.isDynamic() && isBlank(newValue)) {
+            jsonSchemas.remove(descriptor.getName());
+        } else if (descriptor.isDynamic() && isNotBlank(newValue)) {
+            try {
+                final String schemaName = descriptor.getName();
+                final JsonSchemaFactory jsonSchemaFactory = 
schemaFactories.get(schemaVersion);
+                jsonSchemaFactory.getSchema(newValue);
+                jsonSchemas.put(schemaName, new JsonSchema(schemaVersion, 
newValue));
+            } catch (final Exception e) {
+                // not a problem - the service won't be valid and the 
validation message will indicate what is wrong.
+            }
+        }
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final Set<ValidationResult> results = new HashSet<>();
+
+        final boolean noEnteredSchema = 
validationContext.getProperties().keySet().stream()
+                .noneMatch(PropertyDescriptor::isDynamic);
+        if(noEnteredSchema) {
+            results.add(new ValidationResult.Builder()
+                    .subject("Supported Dynamic Property Descriptor")
+                    .valid(false)
+                    .explanation("There must be at least one JSON schema 
specified")
+                    .build());
+        } else {
+            // Iterate over dynamic properties, validating only newly added 
schemas, and adding results
+            schemaVersion = 
SchemaVersion.valueOf(validationContext.getProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION).getValue());
+            validationContext.getProperties().entrySet().stream()
+                    .filter(entry -> entry.getKey().isDynamic() && 
!jsonSchemas.containsKey(entry.getKey().getName()))
+                    .forEach(entry -> {
+                        String subject = entry.getKey().getName();
+                        String input = entry.getValue();
+                        if (isNotBlank(input)) {
+                            try {
+                                final JsonSchemaFactory jsonSchemaFactory = 
schemaFactories.get(schemaVersion);
+                                jsonSchemaFactory.getSchema(input);
+                            } catch (Exception e) {
+                                results.add(new ValidationResult.Builder()
+                                        .input(input)
+                                        .subject(subject)
+                                        .valid(false)
+                                        .explanation("Not a valid JSON Schema: 
" + e.getMessage())
+                                        .build());
+                            }
+                        }
+                    });
+        }
+
+        return results;
+    }
+
+    @Override
+    public JsonSchema retrieveSchema(final String schemaName) throws 
SchemaNotFoundException {
+        if(!jsonSchemas.containsKey(schemaName)) {

Review Comment:
   ```suggestion
           if (!jsonSchemas.containsKey(schemaName)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to