This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d426f58278 NIFI-14953 Added Schema Text Validation to
StandardProtobufReader (#10291)
d426f58278 is described below
commit d426f58278c4fd1408bf3d825a26634f90f10d65
Author: lkuchars <[email protected]>
AuthorDate: Fri Oct 10 20:29:23 2025 +0200
NIFI-14953 Added Schema Text Validation to StandardProtobufReader (#10291)
Signed-off-by: David Handermann <[email protected]>
---
.../services/protobuf/ProtobufSchemaCompiler.java | 8 +-
.../protobuf/SchemaCompilationException.java | 23 ++++++
.../services/protobuf/StandardProtobufReader.java | 88 +++++++++++++++++++++-
...stStandardProtobufReaderPropertyValidation.java | 56 ++++++++++++++
4 files changed, 168 insertions(+), 7 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaCompiler.java
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaCompiler.java
index b019c021cc..5bae30ac88 100644
---
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaCompiler.java
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaCompiler.java
@@ -129,7 +129,9 @@ final class ProtobufSchemaCompiler {
final Schema compiledSchema = createAndLoadSchema(tempDir);
logger.debug("Successfully compiled schema for identifier:
{}", schemaDefinition.getIdentifier());
return compiledSchema;
-
+ } catch (final IllegalStateException e) {
+ // Illegal state exception is thrown by the wire library for
schema issues
+ throw new SchemaCompilationException("Could not compile
schema: %s".formatted(schemaDefinition.toString()), e);
} catch (final Exception e) {
throw new RuntimeException("Failed to compile Protobuf schema
for identifier: " + schemaDefinition.getIdentifier(), e);
}
@@ -149,8 +151,6 @@ final class ProtobufSchemaCompiler {
try {
return function.apply(tempDir);
- } catch (final Exception e) {
- throw new RuntimeException(e);
} finally {
safeDeleteDirectory(tempDir);
}
@@ -247,6 +247,6 @@ final class ProtobufSchemaCompiler {
@FunctionalInterface
private interface WithTemporaryDirectory<T> {
- T apply(Path tempDir) throws Exception;
+ T apply(Path tempDir);
}
}
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/SchemaCompilationException.java
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/SchemaCompilationException.java
new file mode 100644
index 0000000000..da7f4bfc06
--- /dev/null
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/SchemaCompilationException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+public class SchemaCompilationException extends RuntimeException {
+ public SchemaCompilationException(final String message, final Throwable
cause) {
+ super(message, cause);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/StandardProtobufReader.java
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/StandardProtobufReader.java
index b08ee5132e..c8ee72fcdc 100644
---
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/StandardProtobufReader.java
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/StandardProtobufReader.java
@@ -23,11 +23,15 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.MessageName;
import org.apache.nifi.schemaregistry.services.MessageNameResolver;
@@ -49,6 +53,7 @@ import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HexFormat;
import java.util.List;
import java.util.Map;
@@ -135,7 +140,6 @@ public class StandardProtobufReader extends
SchemaRegistryService implements Rec
schemaText = context.getProperty(SCHEMA_TEXT);
schemaBranchName = context.getProperty(SCHEMA_BRANCH_NAME);
schemaVersion = context.getProperty(SCHEMA_VERSION);
- schemaCompiler = new ProtobufSchemaCompiler(getIdentifier(),
getLogger());
}
@Override
@@ -156,6 +160,12 @@ public class StandardProtobufReader extends
SchemaRegistryService implements Rec
throw new SchemaNotFoundException("Unsupported schema access strategy:
" + schemaAccessStrategyValue);
}
+ @Override
+ protected void init(final ControllerServiceInitializationContext config)
throws InitializationException {
+ super.init(config);
+ schemaCompiler = new ProtobufSchemaCompiler(getIdentifier(),
getLogger());
+ }
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new
ArrayList<>(super.getSupportedPropertyDescriptors());
@@ -170,6 +180,80 @@ public class StandardProtobufReader extends
SchemaRegistryService implements Rec
return PROTOBUF_SCHEMA_TEXT;
}
+ @Override
+ protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
+ final List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
+ final String schemaAccessStrategyValue =
validationContext.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
+
+ // Only validate when using Schema Text property strategy
+ if (SCHEMA_TEXT_PROPERTY.getValue().equals(schemaAccessStrategyValue))
{
+ final PropertyValue schemaTextProperty =
validationContext.getProperty(SCHEMA_TEXT);
+ final String schemaTextValue = schemaTextProperty.getValue();
+
+ if
(validationContext.isExpressionLanguagePresent(schemaTextValue)) {
+ return results;
+ }
+
+ if (schemaTextValue == null || schemaTextValue.isBlank()) {
+ results.add(new ValidationResult.Builder()
+ .subject(SCHEMA_TEXT.getDisplayName())
+ .input(schemaTextValue)
+ .valid(false)
+ .explanation("Schema Text value is missing")
+ .build());
+ return results;
+ }
+
+ try {
+ // Try to compile the schema to validate protobuf format
+ final String hash = sha256Hex(schemaTextValue);
+ final SchemaIdentifier schemaIdentifier =
SchemaIdentifier.builder()
+ .name(hash + PROTO_EXTENSION)
+ .build();
+ final Schema compiledSchema =
validateSchemaCompiles(schemaIdentifier, schemaTextValue);
+
+ // Validate Message Name if using MESSAGE_NAME_PROPERTY
strategy
+ final String messageNameStrategy =
validationContext.getProperty(MESSAGE_NAME_RESOLUTION_STRATEGY).getValue();
+ if
(MESSAGE_NAME_PROPERTY.getValue().equals(messageNameStrategy)) {
+ final PropertyValue messageNameProperty =
validationContext.getProperty(MESSAGE_NAME);
+ final String messageNameValue =
messageNameProperty.getValue();
+
+ if
(validationContext.isExpressionLanguageSupported(MESSAGE_NAME.getName())
+ &&
validationContext.isExpressionLanguagePresent(messageNameValue)) {
+ return results;
+ }
+
+ if (messageNameValue != null &&
!messageNameValue.isBlank()) {
+ // Check if the message name exists in the compiled
schema
+ if (compiledSchema.getType(messageNameValue) == null) {
+ results.add(new ValidationResult.Builder()
+ .subject(MESSAGE_NAME.getDisplayName())
+ .input(messageNameValue)
+ .valid(false)
+ .explanation(String.format("Message name '%s'
cannot be found in the provided protobuf schema", messageNameValue))
+ .build());
+ }
+ }
+ }
+
+ } catch (final SchemaCompilationException e) {
+ results.add(new ValidationResult.Builder()
+ .subject(SCHEMA_TEXT.getDisplayName())
+ .input(schemaTextValue)
+ .valid(false)
+ .explanation("Invalid protobuf schema format: " +
e.getMessage())
+ .build());
+ }
+ }
+
+ return results;
+ }
+
+ private Schema validateSchemaCompiles(final SchemaIdentifier
schemaIdentifier, final String schemaTextValue) {
+ final SchemaDefinition schemaDefinition = new
StandardSchemaDefinition(schemaIdentifier, schemaTextValue,
SchemaDefinition.SchemaType.PROTOBUF);
+ return schemaCompiler.compileOrGetFromCache(schemaDefinition);
+ }
+
private RecordReader createProtobufRecordReader(final Map<String, String>
variables, final InputStream in, final SchemaDefinition schemaDefinition)
throws IOException {
final Schema schema =
schemaCompiler.compileOrGetFromCache(schemaDefinition);
final ProtoSchemaParser schemaParser = new ProtoSchemaParser(schema);
@@ -178,7 +262,6 @@ public class StandardProtobufReader extends
SchemaRegistryService implements Rec
return new ProtobufRecordReader(schema,
messageName.getFullyQualifiedName(), in, recordSchema);
}
-
private void setupMessageNameResolver(final ConfigurationContext context) {
final MessageNameResolverStrategy messageNameResolverStrategy =
context.getProperty(MESSAGE_NAME_RESOLUTION_STRATEGY).asAllowableValue(MessageNameResolverStrategy.class);
messageNameResolver = switch (messageNameResolverStrategy) {
@@ -253,7 +336,6 @@ public class StandardProtobufReader extends
SchemaRegistryService implements Rec
}
}
-
enum MessageNameResolverStrategy implements DescribedValue {
MESSAGE_NAME_PROPERTY("Message Name Property", "Use the 'Message Name'
property value to determine the message name"),
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReaderPropertyValidation.java
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReaderPropertyValidation.java
index b58b6504ea..b6287dd081 100644
---
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReaderPropertyValidation.java
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReaderPropertyValidation.java
@@ -84,6 +84,62 @@ class TestStandardProtobufReaderPropertyValidation extends
StandardProtobufReade
enableAllControllerServices();
runner.assertValid(standardProtobufReader);
}
+
+ @Test
+ void testInvalidSchemaTextFormat() {
+ runner.setProperty(standardProtobufReader, SCHEMA_TEXT, "invalid
protobuf schema");
+ final ValidationResult invalidResult =
verifyExactlyOneValidationError();
+
+ assertTrue(invalidResult.getExplanation().contains("Invalid
protobuf schema format"));
+ }
+
+ @Test
+ void testInvalidMessageNameInSchema() {
+ final String validProtobufSchema = """
+ syntax = "proto3";
+ package test;
+ message Person {
+ string name = 1;
+ int32 age = 2;
+ }
+ """;
+
+ runner.setProperty(standardProtobufReader, SCHEMA_TEXT,
validProtobufSchema);
+ runner.setProperty(standardProtobufReader,
MESSAGE_NAME_RESOLUTION_STRATEGY, MESSAGE_NAME_PROPERTY);
+ runner.setProperty(standardProtobufReader, MESSAGE_NAME,
"test.NonExistentMessage");
+
+ final ValidationResult invalidResult =
verifyExactlyOneValidationError();
+
+
assertTrue(invalidResult.getExplanation().contains("test.NonExistentMessage"));
+ }
+
+ @Test
+ void testValidMessageNameInSchema() {
+ final String validProtobufSchema = """
+ syntax = "proto3";
+ package test;
+ message Person {
+ string name = 1;
+ int32 age = 2;
+ }
+ """;
+
+ runner.setProperty(standardProtobufReader, SCHEMA_TEXT,
validProtobufSchema);
+ runner.setProperty(standardProtobufReader,
MESSAGE_NAME_RESOLUTION_STRATEGY, MESSAGE_NAME_PROPERTY);
+ runner.setProperty(standardProtobufReader, MESSAGE_NAME,
"test.Person");
+
+ runner.assertValid(standardProtobufReader);
+ }
+
+ @Test
+ void testSkipValidationWhenExpressionLanguagePresent() {
+ runner.setProperty(standardProtobufReader, SCHEMA_TEXT,
"${invalid.protobuf.schema}");
+ runner.setProperty(standardProtobufReader,
MESSAGE_NAME_RESOLUTION_STRATEGY, MESSAGE_NAME_PROPERTY);
+ runner.setProperty(standardProtobufReader, MESSAGE_NAME,
"${message.name}");
+
+ // Should be valid because expression language is present
+ runner.assertValid(standardProtobufReader);
+ }
}
@Nested