This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d426f58278 NIFI-14953 Added Schema Text Validation to 
StandardProtobufReader (#10291)
d426f58278 is described below

commit d426f58278c4fd1408bf3d825a26634f90f10d65
Author: lkuchars <[email protected]>
AuthorDate: Fri Oct 10 20:29:23 2025 +0200

    NIFI-14953 Added Schema Text Validation to StandardProtobufReader (#10291)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../services/protobuf/ProtobufSchemaCompiler.java  |  8 +-
 .../protobuf/SchemaCompilationException.java       | 23 ++++++
 .../services/protobuf/StandardProtobufReader.java  | 88 +++++++++++++++++++++-
 ...stStandardProtobufReaderPropertyValidation.java | 56 ++++++++++++++
 4 files changed, 168 insertions(+), 7 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaCompiler.java
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaCompiler.java
index b019c021cc..5bae30ac88 100644
--- 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaCompiler.java
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaCompiler.java
@@ -129,7 +129,9 @@ final class ProtobufSchemaCompiler {
                 final Schema compiledSchema = createAndLoadSchema(tempDir);
                 logger.debug("Successfully compiled schema for identifier: 
{}", schemaDefinition.getIdentifier());
                 return compiledSchema;
-
+            } catch (final IllegalStateException e) {
+                // Illegal state exception is thrown by the wire library for 
schema issues
+                throw new SchemaCompilationException("Could not compile 
schema: %s".formatted(schemaDefinition.toString()), e);
             } catch (final Exception e) {
                 throw new RuntimeException("Failed to compile Protobuf schema 
for identifier: " + schemaDefinition.getIdentifier(), e);
             }
@@ -149,8 +151,6 @@ final class ProtobufSchemaCompiler {
 
         try {
             return function.apply(tempDir);
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
         } finally {
             safeDeleteDirectory(tempDir);
         }
@@ -247,6 +247,6 @@ final class ProtobufSchemaCompiler {
 
     @FunctionalInterface
     private interface WithTemporaryDirectory<T> {
-        T apply(Path tempDir) throws Exception;
+        T apply(Path tempDir);
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/SchemaCompilationException.java
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/SchemaCompilationException.java
new file mode 100644
index 0000000000..da7f4bfc06
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/SchemaCompilationException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+public class SchemaCompilationException extends RuntimeException {
+    public SchemaCompilationException(final String message, final Throwable 
cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/StandardProtobufReader.java
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/StandardProtobufReader.java
index b08ee5132e..c8ee72fcdc 100644
--- 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/StandardProtobufReader.java
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/StandardProtobufReader.java
@@ -23,11 +23,15 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.DescribedValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.schemaregistry.services.MessageName;
 import org.apache.nifi.schemaregistry.services.MessageNameResolver;
@@ -49,6 +53,7 @@ import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HexFormat;
 import java.util.List;
 import java.util.Map;
@@ -135,7 +140,6 @@ public class StandardProtobufReader extends 
SchemaRegistryService implements Rec
         schemaText = context.getProperty(SCHEMA_TEXT);
         schemaBranchName = context.getProperty(SCHEMA_BRANCH_NAME);
         schemaVersion = context.getProperty(SCHEMA_VERSION);
-        schemaCompiler = new ProtobufSchemaCompiler(getIdentifier(), 
getLogger());
     }
 
     @Override
@@ -156,6 +160,12 @@ public class StandardProtobufReader extends 
SchemaRegistryService implements Rec
         throw new SchemaNotFoundException("Unsupported schema access strategy: 
" + schemaAccessStrategyValue);
     }
 
+    @Override
+    protected void init(final ControllerServiceInitializationContext config) 
throws InitializationException {
+        super.init(config);
+        schemaCompiler = new ProtobufSchemaCompiler(getIdentifier(), 
getLogger());
+    }
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
@@ -170,6 +180,80 @@ public class StandardProtobufReader extends 
SchemaRegistryService implements Rec
         return PROTOBUF_SCHEMA_TEXT;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final List<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
+        final String schemaAccessStrategyValue = 
validationContext.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
+
+        // Only validate when using Schema Text property strategy
+        if (SCHEMA_TEXT_PROPERTY.getValue().equals(schemaAccessStrategyValue)) 
{
+            final PropertyValue schemaTextProperty = 
validationContext.getProperty(SCHEMA_TEXT);
+            final String schemaTextValue = schemaTextProperty.getValue();
+
+            if 
(validationContext.isExpressionLanguagePresent(schemaTextValue)) {
+                return results;
+            }
+
+            if (schemaTextValue == null || schemaTextValue.isBlank()) {
+                results.add(new ValidationResult.Builder()
+                    .subject(SCHEMA_TEXT.getDisplayName())
+                    .input(schemaTextValue)
+                    .valid(false)
+                    .explanation("Schema Text value is missing")
+                    .build());
+                return results;
+            }
+
+            try {
+                // Try to compile the schema to validate protobuf format
+                final String hash = sha256Hex(schemaTextValue);
+                final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder()
+                    .name(hash + PROTO_EXTENSION)
+                    .build();
+                final Schema compiledSchema = 
validateSchemaCompiles(schemaIdentifier, schemaTextValue);
+
+                // Validate Message Name if using MESSAGE_NAME_PROPERTY 
strategy
+                final String messageNameStrategy = 
validationContext.getProperty(MESSAGE_NAME_RESOLUTION_STRATEGY).getValue();
+                if 
(MESSAGE_NAME_PROPERTY.getValue().equals(messageNameStrategy)) {
+                    final PropertyValue messageNameProperty = 
validationContext.getProperty(MESSAGE_NAME);
+                    final String messageNameValue = 
messageNameProperty.getValue();
+
+                    if 
(validationContext.isExpressionLanguageSupported(MESSAGE_NAME.getName())
+                        && 
validationContext.isExpressionLanguagePresent(messageNameValue)) {
+                        return results;
+                    }
+
+                    if (messageNameValue != null && 
!messageNameValue.isBlank()) {
+                        // Check if the message name exists in the compiled 
schema
+                        if (compiledSchema.getType(messageNameValue) == null) {
+                            results.add(new ValidationResult.Builder()
+                                .subject(MESSAGE_NAME.getDisplayName())
+                                .input(messageNameValue)
+                                .valid(false)
+                                .explanation(String.format("Message name '%s' 
cannot be found in the provided protobuf schema", messageNameValue))
+                                .build());
+                        }
+                    }
+                }
+
+            } catch (final SchemaCompilationException e) {
+                results.add(new ValidationResult.Builder()
+                    .subject(SCHEMA_TEXT.getDisplayName())
+                    .input(schemaTextValue)
+                    .valid(false)
+                    .explanation("Invalid protobuf schema format: " + 
e.getMessage())
+                    .build());
+            }
+        }
+
+        return results;
+    }
+
+    private Schema validateSchemaCompiles(final SchemaIdentifier 
schemaIdentifier, final String schemaTextValue) {
+        final SchemaDefinition schemaDefinition = new 
StandardSchemaDefinition(schemaIdentifier, schemaTextValue, 
SchemaDefinition.SchemaType.PROTOBUF);
+        return schemaCompiler.compileOrGetFromCache(schemaDefinition);
+    }
+
     private RecordReader createProtobufRecordReader(final Map<String, String> 
variables, final InputStream in, final SchemaDefinition schemaDefinition) 
throws IOException {
         final Schema schema = 
schemaCompiler.compileOrGetFromCache(schemaDefinition);
         final ProtoSchemaParser schemaParser = new ProtoSchemaParser(schema);
@@ -178,7 +262,6 @@ public class StandardProtobufReader extends 
SchemaRegistryService implements Rec
         return new ProtobufRecordReader(schema, 
messageName.getFullyQualifiedName(), in, recordSchema);
     }
 
-
     private void setupMessageNameResolver(final ConfigurationContext context) {
         final MessageNameResolverStrategy messageNameResolverStrategy = 
context.getProperty(MESSAGE_NAME_RESOLUTION_STRATEGY).asAllowableValue(MessageNameResolverStrategy.class);
         messageNameResolver = switch (messageNameResolverStrategy) {
@@ -253,7 +336,6 @@ public class StandardProtobufReader extends 
SchemaRegistryService implements Rec
         }
     }
 
-
     enum MessageNameResolverStrategy implements DescribedValue {
 
         MESSAGE_NAME_PROPERTY("Message Name Property", "Use the 'Message Name' 
property value to determine the message name"),
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReaderPropertyValidation.java
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReaderPropertyValidation.java
index b58b6504ea..b6287dd081 100644
--- 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReaderPropertyValidation.java
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReaderPropertyValidation.java
@@ -84,6 +84,62 @@ class TestStandardProtobufReaderPropertyValidation extends 
StandardProtobufReade
             enableAllControllerServices();
             runner.assertValid(standardProtobufReader);
         }
+
+        @Test
+        void testInvalidSchemaTextFormat() {
+            runner.setProperty(standardProtobufReader, SCHEMA_TEXT, "invalid 
protobuf schema");
+            final ValidationResult invalidResult = 
verifyExactlyOneValidationError();
+
+            assertTrue(invalidResult.getExplanation().contains("Invalid 
protobuf schema format"));
+        }
+
+        @Test
+        void testInvalidMessageNameInSchema() {
+            final String validProtobufSchema = """
+                syntax = "proto3";
+                package test;
+                message Person {
+                  string name = 1;
+                  int32 age = 2;
+                }
+                """;
+
+            runner.setProperty(standardProtobufReader, SCHEMA_TEXT, 
validProtobufSchema);
+            runner.setProperty(standardProtobufReader, 
MESSAGE_NAME_RESOLUTION_STRATEGY, MESSAGE_NAME_PROPERTY);
+            runner.setProperty(standardProtobufReader, MESSAGE_NAME, 
"test.NonExistentMessage");
+
+            final ValidationResult invalidResult = 
verifyExactlyOneValidationError();
+
+            
assertTrue(invalidResult.getExplanation().contains("test.NonExistentMessage"));
+        }
+
+        @Test
+        void testValidMessageNameInSchema() {
+            final String validProtobufSchema = """
+                syntax = "proto3";
+                package test;
+                message Person {
+                  string name = 1;
+                  int32 age = 2;
+                }
+                """;
+
+            runner.setProperty(standardProtobufReader, SCHEMA_TEXT, 
validProtobufSchema);
+            runner.setProperty(standardProtobufReader, 
MESSAGE_NAME_RESOLUTION_STRATEGY, MESSAGE_NAME_PROPERTY);
+            runner.setProperty(standardProtobufReader, MESSAGE_NAME, 
"test.Person");
+
+            runner.assertValid(standardProtobufReader);
+        }
+
+        @Test
+        void testSkipValidationWhenExpressionLanguagePresent() {
+            runner.setProperty(standardProtobufReader, SCHEMA_TEXT, 
"${invalid.protobuf.schema}");
+            runner.setProperty(standardProtobufReader, 
MESSAGE_NAME_RESOLUTION_STRATEGY, MESSAGE_NAME_PROPERTY);
+            runner.setProperty(standardProtobufReader, MESSAGE_NAME, 
"${message.name}");
+
+            // Should be valid because expression language is present
+            runner.assertValid(standardProtobufReader);
+        }
     }
 
     @Nested

Reply via email to