This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 6bca79cb37 NIFI-11627 Added JsonSchemaRegistry for ValidateJson
6bca79cb37 is described below

commit 6bca79cb3720395352bfa587449422e09bec0233
Author: dan-s1 <dsti...@gmail.com>
AuthorDate: Fri Nov 10 17:36:42 2023 +0000

    NIFI-11627 Added JsonSchemaRegistry for ValidateJson
    
    - Added nifi-json-schema-api to nifi-commons
    - Added StandardJsonSchemaRegistry implementation of JsonSchemaRegistry
    - Added strategy configuration properties to ValidateJson
    
    This closes #8005
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 nifi-commons/nifi-json-schema-api/pom.xml          |  33 ++++
 .../org/apache/nifi/json/schema/JsonSchema.java    |  39 ++++
 .../org/apache/nifi/json/schema/SchemaVersion.java |  56 ++++++
 nifi-commons/nifi-json-schema-shared/pom.xml       |  37 ++++
 .../schema/access/JsonSchemaRegistryComponent.java |  32 ++++
 nifi-commons/pom.xml                               |   2 +
 .../nifi-registry-service/pom.xml                  |  20 ++
 .../services/StandardJsonSchemaRegistry.java       | 158 ++++++++++++++++
 .../org.apache.nifi.controller.ControllerService   |   3 +-
 .../services/TestStandardJsonSchemaRegistry.java   | 123 ++++++++++++
 .../nifi-standard-processors/pom.xml               |  10 +
 .../nifi/processors/standard/ValidateJson.java     | 210 +++++++++++++++------
 .../nifi/processors/standard/TestValidateJson.java | 118 ++++++++++--
 .../nifi-schema-registry-service-api/pom.xml       |   5 +
 .../services/JsonSchemaRegistry.java               |  37 ++++
 15 files changed, 815 insertions(+), 68 deletions(-)

diff --git a/nifi-commons/nifi-json-schema-api/pom.xml 
b/nifi-commons/nifi-json-schema-api/pom.xml
new file mode 100644
index 0000000000..eacabf87a7
--- /dev/null
+++ b/nifi-commons/nifi-json-schema-api/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-commons</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-json-schema-api</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/json/schema/JsonSchema.java
 
b/nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/json/schema/JsonSchema.java
new file mode 100644
index 0000000000..94b1fd18a3
--- /dev/null
+++ 
b/nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/json/schema/JsonSchema.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.json.schema;
+
+import java.util.Objects;
+
+public class JsonSchema {
+    private final SchemaVersion schemaVersion;
+    private final String schemaText;
+
+    public JsonSchema(SchemaVersion schemaVersion, String schemaText) {
+        Objects.requireNonNull(schemaVersion, "Schema version cannot be null");
+        Objects.requireNonNull(schemaText, "The text of the schema cannot be 
null");
+        this.schemaVersion = schemaVersion;
+        this.schemaText = schemaText;
+    }
+
+    public SchemaVersion getSchemaVersion() {
+        return schemaVersion;
+    }
+
+    public String getSchemaText() {
+        return schemaText;
+    }
+}
diff --git 
a/nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/json/schema/SchemaVersion.java
 
b/nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/json/schema/SchemaVersion.java
new file mode 100644
index 0000000000..2f386a63e9
--- /dev/null
+++ 
b/nifi-commons/nifi-json-schema-api/src/main/java/org/apache/nifi/json/schema/SchemaVersion.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.json.schema;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum SchemaVersion implements DescribedValue {
+    DRAFT_4("Draft Version 4", "Draft 4", 
"https://json-schema.org/draft-04/schema";),
+    DRAFT_6("Draft Version 6", "Draft 6", 
"https://json-schema.org/draft-06/schema";),
+    DRAFT_7("Draft Version 7", "Draft 7", 
"https://json-schema.org/draft-07/schema";),
+    DRAFT_2019_09("Draft Version 2019-09", "Draft 2019-09", 
"https://json-schema.org/draft/2019-09/schema";),
+    DRAFT_2020_12("Draft Version 2020-12", "Draft 2020-12", 
"https://json-schema.org/draft/2020-12/schema";);
+
+    private final String description;
+    private final String displayName;
+    private final String uri;
+
+    SchemaVersion(String description, String displayName, String uri) {
+        this.description = description;
+        this.displayName = displayName;
+        this.uri = uri;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+}
diff --git a/nifi-commons/nifi-json-schema-shared/pom.xml 
b/nifi-commons/nifi-json-schema-shared/pom.xml
new file mode 100644
index 0000000000..35149bd83f
--- /dev/null
+++ b/nifi-commons/nifi-json-schema-shared/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-commons</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-json-schema-shared</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-json-schema-api</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/nifi-commons/nifi-json-schema-shared/src/main/java/org/apache/nifi/schema/access/JsonSchemaRegistryComponent.java
 
b/nifi-commons/nifi-json-schema-shared/src/main/java/org/apache/nifi/schema/access/JsonSchemaRegistryComponent.java
new file mode 100644
index 0000000000..0fc53bed30
--- /dev/null
+++ 
b/nifi-commons/nifi-json-schema-shared/src/main/java/org/apache/nifi/schema/access/JsonSchemaRegistryComponent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.json.schema.SchemaVersion;
+
+public interface JsonSchemaRegistryComponent {
+    PropertyDescriptor SCHEMA_VERSION = new PropertyDescriptor
+        .Builder()
+        .name("JSON Schema Version")
+        .displayName("JSON Schema Version")
+        .description("The JSON schema specification")
+        .required(true)
+        .allowableValues(SchemaVersion.class)
+        .defaultValue(SchemaVersion.DRAFT_2020_12.getValue())
+        .build();
+}
diff --git a/nifi-commons/pom.xml b/nifi-commons/pom.xml
index dddf9b7d86..034b39f123 100644
--- a/nifi-commons/pom.xml
+++ b/nifi-commons/pom.xml
@@ -30,6 +30,8 @@
         <module>nifi-flowfile-packager</module>
         <module>nifi-flow-encryptor</module>
         <module>nifi-hl7-query-language</module>
+        <module>nifi-json-schema-api</module>
+        <module>nifi-json-schema-shared</module>
         <module>nifi-json-utils</module>
         <module>nifi-jetty-configuration</module>
         <module>nifi-kubernetes-client</module>
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
index b879530b07..be9603ff9b 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
@@ -44,6 +44,11 @@ language governing permissions and limitations under the 
License. -->
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-utils</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-json-schema-api</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
@@ -52,5 +57,20 @@ language governing permissions and limitations under the 
License. -->
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.networknt</groupId>
+            <artifactId>json-schema-validator</artifactId>
+            <version>1.0.87</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-json-schema-shared</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/StandardJsonSchemaRegistry.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/StandardJsonSchemaRegistry.java
new file mode 100644
index 0000000000..3e877ef8e1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/StandardJsonSchemaRegistry.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import com.networknt.schema.JsonSchemaFactory;
+import com.networknt.schema.SpecVersion;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.json.schema.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.json.schema.SchemaVersion;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+@Tags({"schema", "registry", "json"})
+@CapabilityDescription("Provides a service for registering and accessing JSON 
schemas. One can register a schema "
+        + "as a dynamic property where 'name' represents the schema name and 
'value' represents the textual "
+        + "representation of the actual schema following the syntax and 
semantics of the JSON Schema format. "
+        + "Empty schemas and schemas only consisting of whitespace are not 
acceptable schemas."
+        + "The registry is heterogeneous registry as it can store schemas of 
different schema draft versions. "
+        + "By default the registry is configured to store schemas of Draft 
2020-12. When a schema is added, the version "
+        + "which is currently is set, is what the schema is saved as.")
+@DynamicProperty(name = "Schema Name", value = "Schema Content",
+        description = "Adds a named schema using the JSON string 
representation of a JSON schema",
+        expressionLanguageScope = ExpressionLanguageScope.NONE)
+public class StandardJsonSchemaRegistry extends AbstractControllerService 
implements JsonSchemaRegistry, JsonSchemaRegistryComponent {
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.singletonList(SCHEMA_VERSION);
+
+    private final ConcurrentMap<String, JsonSchema> jsonSchemas;
+    private final ConcurrentMap<SchemaVersion, JsonSchemaFactory> 
schemaFactories;
+    private volatile SchemaVersion schemaVersion;
+
+    public StandardJsonSchemaRegistry() {
+        jsonSchemas = new ConcurrentHashMap<>();
+        schemaFactories = Arrays.stream(SchemaVersion.values())
+                .collect(Collectors.toConcurrentMap(Function.identity(),
+                        schemaDraftVersion -> 
JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get())));
+        schemaVersion = 
SchemaVersion.valueOf(SCHEMA_VERSION.getDefaultValue());
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (SCHEMA_VERSION.getName().equals(descriptor.getName()) && 
!newValue.equals(oldValue)) {
+            schemaVersion = SchemaVersion.valueOf(newValue);
+        } else if(descriptor.isDynamic() && isBlank(newValue)) {
+            jsonSchemas.remove(descriptor.getName());
+        } else if (descriptor.isDynamic() && isNotBlank(newValue)) {
+            try {
+                final String schemaName = descriptor.getName();
+                final JsonSchemaFactory jsonSchemaFactory = 
schemaFactories.get(schemaVersion);
+                jsonSchemaFactory.getSchema(newValue);
+                jsonSchemas.put(schemaName, new JsonSchema(schemaVersion, 
newValue));
+            } catch (final Exception e) {
+                getLogger().debug("Exception thrown when changing value of 
schema name '{}' from '{}' to '{}'",
+                        descriptor.getName(), oldValue, newValue, e);
+            }
+        }
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final Set<ValidationResult> results = new HashSet<>();
+
+        final boolean noSchemasConfigured = 
validationContext.getProperties().keySet().stream()
+                .noneMatch(PropertyDescriptor::isDynamic);
+        if (noSchemasConfigured) {
+            results.add(new ValidationResult.Builder()
+                    .subject("Supported Dynamic Property Descriptor")
+                    .valid(false)
+                    .explanation("There must be at least one JSON schema 
specified")
+                    .build());
+        } else {
+            // Iterate over dynamic properties, validating only newly added 
schemas, and adding results
+            schemaVersion = 
SchemaVersion.valueOf(validationContext.getProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION).getValue());
+            validationContext.getProperties().entrySet().stream()
+                    .filter(entry -> entry.getKey().isDynamic() && 
!jsonSchemas.containsKey(entry.getKey().getName()))
+                    .forEach(entry -> {
+                        String subject = entry.getKey().getName();
+                        String input = entry.getValue();
+                        if (isNotBlank(input)) {
+                            try {
+                                final JsonSchemaFactory jsonSchemaFactory = 
schemaFactories.get(schemaVersion);
+                                jsonSchemaFactory.getSchema(input);
+                            } catch (Exception e) {
+                                results.add(new ValidationResult.Builder()
+                                        .input(input)
+                                        .subject(subject)
+                                        .valid(false)
+                                        .explanation("Not a valid JSON Schema: 
" + e.getMessage())
+                                        .build());
+                            }
+                        }
+                    });
+        }
+
+        return results;
+    }
+
+    @Override
+    public JsonSchema retrieveSchema(final String schemaName) throws 
SchemaNotFoundException {
+        JsonSchema jsonSchema = jsonSchemas.get(schemaName);
+        if (jsonSchema == null) {
+            throw new SchemaNotFoundException("Unable to find schema with name 
'" + schemaName + "'");
+        }
+        return jsonSchema;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+                .dynamic(true)
+                .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+                .build();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index a000cd743d..1cb051bdac 100644
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -12,4 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.schemaregistry.services.AvroSchemaRegistry
\ No newline at end of file
+org.apache.nifi.schemaregistry.services.AvroSchemaRegistry
+org.apache.nifi.schemaregistry.services.StandardJsonSchemaRegistry
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestStandardJsonSchemaRegistry.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestStandardJsonSchemaRegistry.java
new file mode 100644
index 0000000000..87560ee01e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestStandardJsonSchemaRegistry.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.nifi.util.TestRunners;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class TestStandardJsonSchemaRegistry {
+    private static final String SCHEMA_NAME = "fooSchema";
+    private static final PropertyDescriptor 
SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR =  new PropertyDescriptor.Builder()
+            .name(SCHEMA_NAME)
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dynamic(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    @Mock
+    private ValidationContext validationContext;
+    @Mock
+    private PropertyValue propertyValue;
+    private Map<PropertyDescriptor, String> properties;
+    private StandardJsonSchemaRegistry delegate;
+
+    @BeforeEach
+    void setUp() throws InitializationException {
+        properties = new HashMap<>();
+        delegate = new StandardJsonSchemaRegistry();
+        TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        runner.addControllerService("jsonSchemaRegistry", delegate);
+    }
+
+    @Test
+    void testCustomValidateWithoutAnySchemaSpecified() {
+        when(validationContext.getProperties()).thenReturn(properties);
+
+        final Collection<ValidationResult> results = 
delegate.customValidate(validationContext);
+        assertEquals(1, results.size());
+        final ValidationResult result = results.iterator().next();
+        assertFalse(result.isValid());
+        assertTrue(result.getExplanation().contains("at least one JSON schema 
specified"));
+    }
+
+    @ParameterizedTest(name = "{3}")
+    @MethodSource("dynamicProperties")
+    void 
testCustomValidateWithSchemaRegistrationFromDynamicProperties(PropertyDescriptor
 propertyDescriptor, String schema, int numValidationErrors) {
+        
when(validationContext.getProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION)).thenReturn(propertyValue);
+        
when(propertyValue.getValue()).thenReturn(JsonSchemaRegistryComponent.SCHEMA_VERSION.getDefaultValue());
+        when(validationContext.getProperties()).thenReturn(properties);
+        properties.put(propertyDescriptor, schema);
+        delegate.getSupportedPropertyDescriptors().forEach(prop -> 
properties.put(prop, prop.getDisplayName()));
+
+        assertEquals(numValidationErrors, 
delegate.customValidate(validationContext).size());
+    }
+
+    @ParameterizedTest(name = "{3}")
+    @MethodSource("dynamicProperties")
+    void testSchemaRetrieval(PropertyDescriptor propertyDescriptor, String 
schema, int numValidationErrors) throws SchemaNotFoundException {
+        delegate.onPropertyModified(propertyDescriptor, null, schema);
+        boolean validSchema = numValidationErrors == 0;
+
+        if(validSchema) {
+            assertDoesNotThrow(() -> delegate.retrieveSchema(SCHEMA_NAME));
+            assertNotNull(delegate.retrieveSchema(SCHEMA_NAME));
+        } else {
+            assertThrows(SchemaNotFoundException.class, () -> 
delegate.retrieveSchema(SCHEMA_NAME));
+        }
+    }
+
+    private static Stream<Arguments> dynamicProperties() {
+        return Stream.of(
+                Arguments.of(SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR, "{}", 0, 
"empty object schema"),
+                Arguments.of(SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR, "[]", 0, 
"empty array schema"),
+                Arguments.of(SUPPORTED_DYNAMIC_PROPERTY_DESCRIPTOR, "not a 
schema", 1, "non whitespace")
+        );
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 5becb29639..7bf8071ead 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -499,6 +499,16 @@
             <artifactId>mockwebserver</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-json-schema-api</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-json-schema-shared</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java
index 25b1499135..29a10456e9 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java
@@ -19,9 +19,8 @@ package org.apache.nifi.processors.standard;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.networknt.schema.JsonSchema;
 import com.networknt.schema.JsonSchemaFactory;
-import com.networknt.schema.SpecVersion.VersionFlag;
+import com.networknt.schema.SpecVersion;
 import com.networknt.schema.ValidationMessage;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -39,23 +38,35 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.DescribedValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.resource.ResourceCardinality;
 import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.migration.PropertyConfiguration;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.json.schema.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.json.schema.SchemaVersion;
+import org.apache.nifi.schemaregistry.services.JsonSchemaRegistry;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 @SideEffectFree
 @SupportsBatching
@@ -79,23 +90,22 @@ import java.util.Set;
         }
 )
 public class ValidateJson extends AbstractProcessor {
-    public enum SchemaVersion implements DescribedValue {
-        DRAFT_4("Draft Version 4", "Draft 4", VersionFlag.V4),
-        DRAFT_6("Draft Version 6", "Draft 6", VersionFlag.V6),
-        DRAFT_7("Draft Version 7", "Draft 7", VersionFlag.V7),
-        DRAFT_2019_09("Draft Version 2019-09", "Draft 2019-09", 
VersionFlag.V201909),
-        DRAFT_2020_12("Draft Version 2020-12", "Draft 2020-12", 
VersionFlag.V202012);
+    public enum JsonSchemaStrategy implements DescribedValue {
+        SCHEMA_NAME_PROPERTY(SCHEMA_NAME_PROPERTY_NAME + " Property",
+                "The name of the Schema to use is specified by the '" + 
SCHEMA_NAME_PROPERTY_NAME +
+                        "' Property. The value of this property is used to 
lookup the Schema in the configured JSON Schema Registry Service."),
+        SCHEMA_CONTENT_PROPERTY(SCHEMA_CONTENT_PROPERTY_NAME + " Property",
+                "A URL or file path to the JSON schema or the actual JSON 
schema is specified by the '" + SCHEMA_CONTENT_PROPERTY_NAME + "' Property. " +
+                        "No matter how the JSON schema is specified, it must 
be a valid JSON schema");
 
-        private final String description;
-        private final String displayName;
-        private final VersionFlag versionFlag;
-
-        SchemaVersion(String description, String displayName, VersionFlag 
versionFlag) {
-            this.description = description;
+        JsonSchemaStrategy(String displayName, String description) {
             this.displayName = displayName;
-            this.versionFlag = versionFlag;
+            this.description = description;
         }
 
+        private final String displayName;
+        private final String description;
+
         @Override
         public String getValue() {
             return name();
@@ -110,37 +120,62 @@ public class ValidateJson extends AbstractProcessor {
         public String getDescription() {
             return description;
         }
-
-        public VersionFlag getVersionFlag() {
-            return versionFlag;
-        }
     }
 
-    public static final String ERROR_ATTRIBUTE_KEY = "json.validation.errors";
+    protected static final String ERROR_ATTRIBUTE_KEY = 
"json.validation.errors";
+    private static final String SCHEMA_NAME_PROPERTY_NAME = "Schema Name";
+    private static final String SCHEMA_CONTENT_PROPERTY_NAME = "JSON Schema";
+
+    public static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Schema Access Strategy")
+            .displayName("Schema Access Strategy")
+            .description("Specifies how to obtain the schema that is to be 
used for interpreting the data.")
+            .allowableValues(JsonSchemaStrategy.class)
+            
.defaultValue(JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor SCHEMA_NAME = new 
PropertyDescriptor.Builder()
+            .name(SCHEMA_NAME_PROPERTY_NAME)
+            .displayName(SCHEMA_NAME_PROPERTY_NAME)
+            .description("Specifies the name of the schema to lookup in the 
Schema Registry property")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("${schema.name}")
+            .dependsOn(SCHEMA_ACCESS_STRATEGY, 
JsonSchemaStrategy.SCHEMA_NAME_PROPERTY)
+            .build();
 
-    public static final PropertyDescriptor SCHEMA_CONTENT = new 
PropertyDescriptor
-            .Builder().name("JSON Schema")
-            .displayName("JSON Schema")
-            .description("The content of a JSON Schema")
+    public static final PropertyDescriptor SCHEMA_REGISTRY = new 
PropertyDescriptor.Builder()
+            .name("JSON Schema Registry")
+            .displayName("JSON Schema Registry")
+            .description("Specifies the Controller Service to use for the JSON 
Schema Registry")
+            .identifiesControllerService(JsonSchemaRegistry.class)
+            .required(true)
+            .dependsOn(SCHEMA_ACCESS_STRATEGY, 
JsonSchemaStrategy.SCHEMA_NAME_PROPERTY)
+            .build();
+
+    public static final PropertyDescriptor SCHEMA_CONTENT = new 
PropertyDescriptor.Builder()
+            .name(SCHEMA_CONTENT_PROPERTY_NAME)
+            .displayName(SCHEMA_CONTENT_PROPERTY_NAME)
+            .description("A URL or file path to the JSON schema or the actual 
JSON schema content")
             .required(true)
             .identifiesExternalResource(ResourceCardinality.SINGLE, 
ResourceType.FILE, ResourceType.URL, ResourceType.TEXT)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(SCHEMA_ACCESS_STRATEGY, 
JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY)
             .build();
 
-    public static final PropertyDescriptor SCHEMA_VERSION = new 
PropertyDescriptor
-        .Builder().name("Schema Version")
-        .displayName("Schema Version")
-        .description("The JSON schema specification")
-        .required(true)
-        .allowableValues(SchemaVersion.class)
-        .defaultValue(SchemaVersion.DRAFT_2020_12.getValue())
-        .build();
+    public static final PropertyDescriptor SCHEMA_VERSION = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(JsonSchemaRegistryComponent.SCHEMA_VERSION)
+            .dependsOn(SCHEMA_ACCESS_STRATEGY, 
JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY)
+            .build();
 
-    public static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(
-            Arrays.asList(
-                    SCHEMA_CONTENT,
-                    SCHEMA_VERSION
-            )
+    private static final List<PropertyDescriptor> PROPERTIES = List.of(
+            SCHEMA_ACCESS_STRATEGY,
+            SCHEMA_NAME,
+            SCHEMA_REGISTRY,
+            SCHEMA_CONTENT,
+            SCHEMA_VERSION
     );
 
     public static final Relationship REL_VALID = new Relationship.Builder()
@@ -158,21 +193,28 @@ public class ValidateJson extends AbstractProcessor {
         .description("FlowFiles that cannot be read as JSON are routed to this 
relationship")
         .build();
 
-    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(
-            Arrays.asList(
-                    REL_VALID,
-                    REL_INVALID,
-                    REL_FAILURE
-            ))
+    private static final Set<Relationship> RELATIONSHIPS = Set.of(
+        REL_VALID,
+        REL_INVALID,
+        REL_FAILURE
     );
 
-    private static final ObjectMapper MAPPER;
-    static {
-        MAPPER = new ObjectMapper();
-        MAPPER.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
-    }
+    private static final ObjectMapper MAPPER = new 
ObjectMapper().configure(JsonParser.Feature.ALLOW_COMMENTS, true);
 
-    private JsonSchema schema;
+    private final ConcurrentMap<SchemaVersion, JsonSchemaFactory> 
schemaFactories =  Arrays.stream(SchemaVersion.values())
+            .collect(
+                    Collectors.toConcurrentMap(
+                            Function.identity(),
+                            schemaDraftVersion -> 
JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.fromId(schemaDraftVersion.getUri()).get())
+                    )
+            );
+    private volatile com.networknt.schema.JsonSchema schema;
+    private volatile JsonSchemaRegistry jsonSchemaRegistry;
+
+    @Override
+    public void migrateProperties(final PropertyConfiguration config) {
+        config.renameProperty("Schema Version", SCHEMA_VERSION.getName());
+    }
 
     @Override
     public Set<Relationship> getRelationships() {
@@ -184,12 +226,38 @@ public class ValidateJson extends AbstractProcessor {
         return PROPERTIES;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        Collection<ValidationResult> validationResults = new ArrayList<>();
+        final String schemaAccessStrategy = 
getSchemaAccessStrategy(validationContext);
+
+        if (isNameStrategy(validationContext) && 
!validationContext.getProperty(SCHEMA_REGISTRY).isSet()) {
+            validationResults.add(new ValidationResult.Builder()
+                    .subject(SCHEMA_REGISTRY.getDisplayName())
+                    
.explanation(getPropertyValidateMessage(schemaAccessStrategy, SCHEMA_REGISTRY))
+                    .valid(false)
+                    .build());
+        } else if (isContentStrategy(validationContext) && 
!validationContext.getProperty(SCHEMA_CONTENT).isSet()) {
+            validationResults.add(new ValidationResult.Builder()
+                    .subject(SCHEMA_CONTENT.getDisplayName())
+                    
.explanation(getPropertyValidateMessage(schemaAccessStrategy, SCHEMA_CONTENT))
+                    .valid(false)
+                    .build());
+        }
+
+        return validationResults;
+    }
+
     @OnScheduled
     public void onScheduled(final ProcessContext context) throws IOException {
-        try (final InputStream inputStream = 
context.getProperty(SCHEMA_CONTENT).asResource().read()) {
-            final SchemaVersion schemaVersion = 
SchemaVersion.valueOf(context.getProperty(SCHEMA_VERSION).getValue());
-            final JsonSchemaFactory factory = 
JsonSchemaFactory.getInstance(schemaVersion.getVersionFlag());
-            schema = factory.getSchema(inputStream);
+        if (isNameStrategy(context)) {
+            jsonSchemaRegistry = 
context.getProperty(SCHEMA_REGISTRY).asControllerService(JsonSchemaRegistry.class);
+        } else if (isContentStrategy(context)) {
+            try (final InputStream inputStream = 
context.getProperty(SCHEMA_CONTENT).asResource().read()) {
+                final SchemaVersion schemaVersion = 
SchemaVersion.valueOf(context.getProperty(SCHEMA_VERSION).getValue());
+                final JsonSchemaFactory factory = 
schemaFactories.get(schemaVersion);
+                schema = factory.getSchema(inputStream);
+            }
         }
     }
 
@@ -200,6 +268,20 @@ public class ValidateJson extends AbstractProcessor {
             return;
         }
 
+        if (isNameStrategy(context)) {
+            try {
+                final String schemaName = 
context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
+                final JsonSchema jsonSchema = 
jsonSchemaRegistry.retrieveSchema(schemaName);
+                final JsonSchemaFactory factory = 
schemaFactories.get(jsonSchema.getSchemaVersion());
+                schema = factory.getSchema(jsonSchema.getSchemaText());
+            } catch (Exception e) {
+                getLogger().error("Could not retrieve JSON schema for {}", 
flowFile, e);
+                session.getProvenanceReporter().route(flowFile, REL_FAILURE);
+                session.transfer(flowFile, REL_FAILURE);
+                return;
+            }
+        }
+
         try (final InputStream in = session.read(flowFile)) {
             final JsonNode node = MAPPER.readTree(in);
             final Set<ValidationMessage> errors = schema.validate(node);
@@ -221,4 +303,22 @@ public class ValidateJson extends AbstractProcessor {
             session.transfer(flowFile, REL_FAILURE);
         }
     }
+
+    private String getPropertyValidateMessage(String schemaAccessStrategy, 
PropertyDescriptor property) {
+        return "The '" + schemaAccessStrategy + "' Schema Access Strategy 
requires that the " + property.getDisplayName() + " property be set.";
+    }
+
+    private boolean isNameStrategy(PropertyContext context) {
+        final String schemaAccessStrategy = getSchemaAccessStrategy(context);
+        return 
JsonSchemaStrategy.SCHEMA_NAME_PROPERTY.getValue().equals(schemaAccessStrategy);
+    }
+
+    private String getSchemaAccessStrategy(PropertyContext context) {
+        return context.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
+    }
+
+    private boolean isContentStrategy(PropertyContext context) {
+        final String schemaAccessStrategy = getSchemaAccessStrategy(context);
+        return 
JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY.getValue().equals(schemaAccessStrategy);
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java
index 14688ce706..c7b44177ef 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java
@@ -17,12 +17,22 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.json.schema.JsonSchema;
+import org.apache.nifi.schema.access.JsonSchemaRegistryComponent;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.json.schema.SchemaVersion;
+import org.apache.nifi.schemaregistry.services.JsonSchemaRegistry;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.opentest4j.AssertionFailedError;
 
 import java.io.IOException;
@@ -30,18 +40,22 @@ import java.io.InputStream;
 import java.io.UncheckedIOException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class TestValidateJson {
     private static final String JSON = getFileContent("simple-example.json");
+    private static final String SIMPLE_SCHEMA = 
getFileContent("schema-simple-example.json");
     private static final String NON_JSON = "Not JSON";
-    private static final String SCHEMA_VERSION = 
ValidateJson.SchemaVersion.DRAFT_7.getValue();
+    private static final String SCHEMA_VERSION = 
SchemaVersion.DRAFT_7.getValue();
     private TestRunner runner;
 
     @BeforeEach
@@ -49,11 +63,39 @@ class TestValidateJson {
         runner = TestRunners.newTestRunner(ValidateJson.class);
     }
 
+    @ParameterizedTest(name = "{2}")
+    @MethodSource("customValidateArgs")
+    void testCustomValidateMissingProperty(String strategy, String errMsg) {
+        runner.setProperty(ValidateJson.SCHEMA_ACCESS_STRATEGY, strategy);
+        runner.enqueue(JSON);
+
+        AssertionFailedError e = assertThrows(AssertionFailedError.class, () 
-> runner.run());
+        assertTrue(e.getMessage().contains(errMsg));
+    }
+
     @Test
     void testPassSchema() {
         final String schemaPath = getFilePath("schema-simple-example.json");
         runner.setProperty(ValidateJson.SCHEMA_CONTENT, schemaPath);
-        runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+        runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, 
SCHEMA_VERSION);
+
+        runner.enqueue(JSON);
+
+        runner.run();
+
+        runner.assertTransferCount(ValidateJson.REL_FAILURE, 0);
+        runner.assertTransferCount(ValidateJson.REL_INVALID, 0);
+        runner.assertTransferCount(ValidateJson.REL_VALID, 1);
+
+        assertValidationErrors(ValidateJson.REL_VALID, false);
+        assertEquals(1, runner.getProvenanceEvents().size());
+        assertEquals(ProvenanceEventType.ROUTE, 
runner.getProvenanceEvents().get(0).getEventType());
+    }
+
+    @Test
+    void testNoSchemaVersionSpecified() {
+        final String schemaPath = getFilePath("schema-simple-example.json");
+        runner.setProperty(ValidateJson.SCHEMA_CONTENT, schemaPath);
 
         runner.enqueue(JSON);
 
@@ -71,7 +113,7 @@ class TestValidateJson {
     @Test
     void testEmptySchema() {
         runner.setProperty(ValidateJson.SCHEMA_CONTENT, "{}");
-        runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+        runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, 
SCHEMA_VERSION);
 
         runner.enqueue(JSON);
         runner.run();
@@ -89,7 +131,7 @@ class TestValidateJson {
     void testAllUnknownKeywordsSchema() {
         runner.setProperty(ValidateJson.SCHEMA_CONTENT,
                 "{\"fruit\": \"Apple\", \"size\": \"Large\", \"color\": 
\"Red\"}");
-        runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+        runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, 
SCHEMA_VERSION);
 
         runner.enqueue(JSON);
         runner.run();
@@ -107,7 +149,7 @@ class TestValidateJson {
     void testPatternSchemaCheck() {
         final String schemaPath = 
getFilePath("schema-simple-example-unmatched-pattern.json");
         runner.setProperty(ValidateJson.SCHEMA_CONTENT, schemaPath);
-        runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+        runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, 
SCHEMA_VERSION);
 
         runner.enqueue(JSON);
         runner.run();
@@ -125,7 +167,7 @@ class TestValidateJson {
     void testMissingRequiredValue() {
         final String schema = 
getFileContent("schema-simple-example-missing-required.json");
         runner.setProperty(ValidateJson.SCHEMA_CONTENT, schema);
-        runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+        runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, 
SCHEMA_VERSION);
 
         runner.enqueue(JSON);
         runner.run();
@@ -141,9 +183,8 @@ class TestValidateJson {
 
     @Test
     void testInvalidJson() {
-        final String schema = getFileContent("schema-simple-example.json");
-        runner.setProperty(ValidateJson.SCHEMA_CONTENT, schema);
-        runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+        runner.setProperty(ValidateJson.SCHEMA_CONTENT, SIMPLE_SCHEMA);
+        runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, 
SCHEMA_VERSION);
 
         runner.enqueue(NON_JSON);
         runner.run();
@@ -160,7 +201,7 @@ class TestValidateJson {
     @Test
     void testNonExistingSchema() {
         runner.setProperty(ValidateJson.SCHEMA_CONTENT, "not-found.json");
-        runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+        runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, 
SCHEMA_VERSION);
 
         runner.enqueue(JSON);
         assertThrows(AssertionFailedError.class, () -> runner.run());
@@ -169,7 +210,7 @@ class TestValidateJson {
     @Test
     void testBadSchema() {
         runner.setProperty(ValidateJson.SCHEMA_CONTENT, NON_JSON);
-        runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+        runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, 
SCHEMA_VERSION);
 
         runner.enqueue(JSON);
         assertThrows(AssertionFailedError.class, () -> runner.run());
@@ -179,7 +220,7 @@ class TestValidateJson {
     void testJsonWithComments() {
         final String schemaPath = getFilePath("schema-simple-example.json");
         runner.setProperty(ValidateJson.SCHEMA_CONTENT, schemaPath);
-        runner.setProperty(ValidateJson.SCHEMA_VERSION, SCHEMA_VERSION);
+        runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, 
SCHEMA_VERSION);
 
         runner.enqueue(getFileContent("simple-example-with-comments.json"));
 
@@ -191,6 +232,28 @@ class TestValidateJson {
 
         assertValidationErrors(ValidateJson.REL_VALID, false);
     }
+
+    @Test
+    void testSchemaRetrievalFromRegistry() throws InitializationException {
+        final String registryIdentifier = "registry";
+        final String schemaName = "someSchema";
+        final JsonSchemaRegistry validJsonSchemaRegistry = new 
SampleJsonSchemaRegistry(registryIdentifier, schemaName);
+        runner.addControllerService(registryIdentifier, 
validJsonSchemaRegistry);
+        runner.enableControllerService(validJsonSchemaRegistry);
+        runner.assertValid(validJsonSchemaRegistry);
+        runner.setProperty(ValidateJson.SCHEMA_ACCESS_STRATEGY, 
ValidateJson.JsonSchemaStrategy.SCHEMA_NAME_PROPERTY.getValue());
+        runner.setProperty(ValidateJson.SCHEMA_REGISTRY, registryIdentifier);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("schema.name", schemaName);
+        runner.enqueue(JSON, attributes);
+        runner.run();
+
+        runner.assertTransferCount(ValidateJson.REL_FAILURE, 0);
+        runner.assertTransferCount(ValidateJson.REL_INVALID, 0);
+        runner.assertTransferCount(ValidateJson.REL_VALID, 1);
+    }
+
     private void assertValidationErrors(Relationship relationship, boolean 
expected) {
         final Map<String, String> attributes = 
runner.getFlowFilesForRelationship(relationship).get(0).getAttributes();
 
@@ -202,6 +265,13 @@ class TestValidateJson {
         }
     }
 
+    private static Stream<Arguments> customValidateArgs() {
+        return Stream.of(
+                
Arguments.of(ValidateJson.JsonSchemaStrategy.SCHEMA_NAME_PROPERTY.getValue(), 
"requires that the JSON Schema Registry property be set", "no registry set"),
+                
Arguments.of(ValidateJson.JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY.getValue(),
 "requires that the JSON Schema property be set", "no content specified")
+        );
+    }
+
     private static String getFilePath(final String filename) {
         final String path = getRelativeResourcePath(filename);
         final URL url = 
Objects.requireNonNull(TestValidateJson.class.getResource(path), "Resource not 
found");
@@ -221,4 +291,28 @@ class TestValidateJson {
     private static String getRelativeResourcePath(final String filename) {
         return String.format("/%s/%s", TestValidateJson.class.getSimpleName(), 
filename);
     }
+
+    private static class SampleJsonSchemaRegistry extends 
AbstractControllerService implements JsonSchemaRegistry {
+        private final String identifier;
+        private final String schemaName;
+
+        public SampleJsonSchemaRegistry(String identifier, String schemaName) {
+            this.identifier = identifier;
+            this.schemaName = schemaName;
+        }
+
+        @Override
+        public String getIdentifier() {
+            return identifier;
+        }
+
+        @Override
+        public JsonSchema retrieveSchema(String schemaName) throws 
SchemaNotFoundException {
+            if (this.schemaName.equals(schemaName)) {
+                return new JsonSchema(SchemaVersion.DRAFT_2020_12, "{}");
+            } else {
+                throw new SchemaNotFoundException("");
+            }
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
index d5f5032580..8b5b204eaa 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
@@ -31,5 +31,10 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-record</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-json-schema-api</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/JsonSchemaRegistry.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/JsonSchemaRegistry.java
new file mode 100644
index 0000000000..364546fd10
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/JsonSchemaRegistry.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.json.schema.JsonSchema;
+import java.io.IOException;
+
+/**
+ * Represents {@link ControllerService} strategy to expose internal and/or
+ * integrate with external Schema Registry
+ */
+public interface JsonSchemaRegistry extends ControllerService {
+    /**
+     * Retrieves the schema based on the provided schema name
+     * @param schemaName The name of the schema
+     * @return the schema for the given descriptor
+     * @throws IOException if unable to communicate with the backing store
+     * @throws SchemaNotFoundException if unable to find the schema based on 
the given descriptor
+     */
+    JsonSchema retrieveSchema(String schemaName) throws IOException, 
SchemaNotFoundException;
+}

Reply via email to