This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3097a90f05 NIFI-14878 Added StandardProtobufReader for Protobuf Record
Processing (#10214)
3097a90f05 is described below
commit 3097a90f0519f023cc640053eb89055a30de0d81
Author: lkuchars <[email protected]>
AuthorDate: Thu Aug 28 19:47:53 2025 +0200
NIFI-14878 Added StandardProtobufReader for Protobuf Record Processing
(#10214)
Co-authored-by: ForThoseWhoComeAfter <[email protected]>
Signed-off-by: David Handermann <[email protected]>
---
.../confluent/schema/ProtobufMessageSchema.java | 2 +-
.../nifi/confluent/schemaregistry/VarintUtils.java | 8 +-
.../ConfluentSchemaRegistryTest.java | 2 +-
.../nifi/serialization/SchemaRegistryService.java | 6 +-
.../nifi-protobuf-services-nar/pom.xml | 4 +-
.../nifi-protobuf-services/pom.xml | 9 +-
.../nifi/services/protobuf/ProtobufReader.java | 2 +
.../services/protobuf/ProtobufSchemaCompiler.java | 252 +++++++++++++++++
.../services/protobuf/ProtobufSchemaValidator.java | 63 +++++
.../services/protobuf/StandardProtobufReader.java | 300 +++++++++++++++++++++
.../org.apache.nifi.controller.ControllerService | 3 +-
.../nifi/services/protobuf/ProtoTestUtil.java | 177 ++++++++----
.../protobuf/StandardProtobufReaderTestBase.java | 269 ++++++++++++++++++
.../nifi/services/protobuf/TestProtobufReader.java | 225 ++++++++++++++++
.../protobuf/TestStandardProtobufReader.java | 290 ++++++++++++++++++++
...stStandardProtobufReaderPropertyValidation.java | 191 +++++++++++++
.../converter/TestProtobufDataConverter.java | 22 +-
.../apache/nifi/protobuf/test/root_message.desc | Bin 0 -> 313 bytes
.../apache/nifi/protobuf/test/root_message.proto | 34 +++
.../apache/nifi/protobuf/test/user_profile.desc | Bin 0 -> 329 bytes
.../apache/nifi/protobuf/test/user_profile.proto | 35 +++
.../apache/nifi/protobuf/test/user_settings.desc | Bin 0 -> 827 bytes
.../apache/nifi/protobuf/test/user_settings.proto | 38 +++
.../services/MessageNameResolver.java | 3 +-
.../services/StandardMessageNameFactory.java | 46 ++++
25 files changed, 1913 insertions(+), 68 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-schema-api/src/main/java/org/apache/nifi/confluent/schema/ProtobufMessageSchema.java
b/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-schema-api/src/main/java/org/apache/nifi/confluent/schema/ProtobufMessageSchema.java
index 46769b5763..632217eabe 100644
---
a/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-schema-api/src/main/java/org/apache/nifi/confluent/schema/ProtobufMessageSchema.java
+++
b/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-schema-api/src/main/java/org/apache/nifi/confluent/schema/ProtobufMessageSchema.java
@@ -21,7 +21,7 @@ import java.util.Optional;
/**
* Basic abstraction over Protobuf schema message entity.
- * It contains only the fields that are needed for crude schema insights. It's
useful when the
+ * It only represents the fields that are needed for crude schema insights.
It's useful when the
* message name resolver needs to pinpoint specific messages by message
indexes encoded on the wire.
* <p>
* <a
href="https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format">See
the Confluent protobuf wire format.</a>
diff --git
a/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/VarintUtils.java
b/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/VarintUtils.java
index b9fab9b179..90c352ecfa 100644
---
a/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/VarintUtils.java
+++
b/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/VarintUtils.java
@@ -50,7 +50,7 @@ final class VarintUtils {
* @return the decoded varint value
* @throws IOException if unable to read from stream or invalid varint
format
*/
- public static int readVarintFromStream(final InputStream inputStream)
throws IOException {
+ static int readVarintFromStream(final InputStream inputStream) throws
IOException {
final int firstByte = inputStream.read();
if (firstByte == -1) {
throw new IOException("Unexpected end of stream while reading
varint");
@@ -66,7 +66,7 @@ final class VarintUtils {
* @return the decoded varint value
* @throws IOException if unable to read from stream or invalid varint
format
*/
- public static int readVarintFromStreamAfterFirstByteConsumed(final
InputStream inputStream, final int firstByte) throws IOException {
+ static int readVarintFromStreamAfterFirstByteConsumed(final InputStream
inputStream, final int firstByte) throws IOException {
// accumulated result
int value = 0;
@@ -115,7 +115,7 @@ final class VarintUtils {
* @param encodedValue the zigzag encoded value
* @return the decoded integer value
*/
- public static int decodeZigZag(final int encodedValue) {
+ static int decodeZigZag(final int encodedValue) {
return (encodedValue >>> 1) ^ -(encodedValue & 1);
}
@@ -127,7 +127,7 @@ final class VarintUtils {
* @param value the integer value to encode
* @return byte array containing the zigzag encoded varint
*/
- public static byte[] writeZigZagVarint(final int value) {
+ static byte[] writeZigZagVarint(final int value) {
final ByteArrayOutputStream output = new ByteArrayOutputStream(4);
// Zigzag encode
diff --git
a/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java
b/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java
index 14d964511a..2abfa63277 100644
---
a/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java
+++
b/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java
@@ -25,7 +25,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-public class ConfluentSchemaRegistryTest {
+class ConfluentSchemaRegistryTest {
private static final String SERVICE_ID =
ConfluentSchemaRegistry.class.getSimpleName();
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
index 2b90b40bc7..922adc2a96 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
@@ -100,7 +100,7 @@ public abstract class SchemaRegistryService extends
AbstractControllerService {
properties.add(SCHEMA_NAME);
properties.add(SCHEMA_VERSION);
properties.add(SCHEMA_BRANCH_NAME);
- properties.add(SCHEMA_TEXT);
+ properties.add(buildSchemaTextProperty());
properties.add(SCHEMA_REFERENCE_READER);
return properties;
@@ -126,6 +126,10 @@ public abstract class SchemaRegistryService extends
AbstractControllerService {
return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
}
+ protected PropertyDescriptor buildSchemaTextProperty() {
+ return SCHEMA_TEXT;
+ }
+
protected PropertyDescriptor buildStrategyProperty(final AllowableValue[]
values) {
return new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml
index 6a64165222..2bb0b638d1 100644
---
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml
@@ -30,12 +30,12 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-protobuf-services</artifactId>
- <version>2.6.0-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-shared-nar</artifactId>
- <version>2.6.0-SNAPSHOT</version>
+ <version>${project.version}</version>
<type>nar</type>
</dependency>
</dependencies>
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml
index fa9cb89a78..5056379025 100644
--- a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml
+++ b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml
@@ -48,7 +48,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
- <version>2.6.0-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
@@ -75,11 +75,6 @@
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-mock-record-utils</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ </dependencies>
</project>
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java
index 59154bb579..2a551ecc5b 100644
---
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java
@@ -21,6 +21,7 @@ import com.squareup.wire.schema.Location;
import com.squareup.wire.schema.Schema;
import com.squareup.wire.schema.SchemaLoader;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
@@ -55,6 +56,7 @@ import java.util.concurrent.atomic.AtomicReference;
@Tags({"protobuf", "record", "reader", "parser"})
@CapabilityDescription("Parses a Protocol Buffers message from binary format.")
+@SeeAlso(StandardProtobufReader.class)
public class ProtobufReader extends SchemaRegistryService implements
RecordReaderFactory {
private static final String ANY_PROTO = "google/protobuf/any.proto";
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaCompiler.java
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaCompiler.java
new file mode 100644
index 0000000000..b019c021cc
--- /dev/null
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaCompiler.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.squareup.wire.schema.CoreLoaderKt;
+import com.squareup.wire.schema.Location;
+import com.squareup.wire.schema.Schema;
+import com.squareup.wire.schema.SchemaLoader;
+import org.apache.commons.io.FileUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schemaregistry.services.SchemaDefinition;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static
org.apache.nifi.services.protobuf.ProtobufSchemaValidator.validateSchemaDefinitionIdentifiers;
+
+/**
+ * Handles Protocol Buffer schema compilation, caching, and temporary
directory operations.
+ * This class is responsible for compiling schema definitions into Wire Schema
objects,
+ * managing a cache of compiled schemas, and handling temporary directory
operations
+ * required during the compilation process.
+ */
+final class ProtobufSchemaCompiler {
+
+ private static final List<Location> STANDARD_PROTOBUF_LOCATIONS =
Arrays.asList(
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR,
"google/protobuf/any.proto"),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR,
"google/protobuf/duration.proto"),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR,
"google/protobuf/empty.proto"),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR,
"google/protobuf/struct.proto"),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR,
"google/protobuf/timestamp.proto"),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR,
"google/protobuf/wrappers.proto")
+ );
+ private static final int CACHE_EXPIRE_HOURS = 1;
+ private static final int COMPILED_SCHEMAS_CACHE_SIZE = 200;
+ private static final String PROTO_EXTENSION = ".proto";
+
+ private final Cache<SchemaIdentifier, Schema> compiledSchemaCache;
+ private final ComponentLog logger;
+ private final String tempDirectorySuffix;
+
+ /**
+ * Creates a new ProtobufSchemaCompiler with default cache settings.
+ *
+ * @param tempDirectorySuffix the suffix for temporary directory names
created by the compiler.
+ * This may help in finding the right temporary
directory in case of compilation issues
+ * @param logger the component logger for logging compilation
activities
+ */
+ public ProtobufSchemaCompiler(final String tempDirectorySuffix, final
ComponentLog logger) {
+ this.tempDirectorySuffix = Objects.requireNonNull(tempDirectorySuffix,
"Temporary directory suffix cannot be null");
+ this.logger = logger;
+ this.compiledSchemaCache = Caffeine.newBuilder()
+ .expireAfterAccess(CACHE_EXPIRE_HOURS, TimeUnit.HOURS)
+ .maximumSize(COMPILED_SCHEMAS_CACHE_SIZE)
+ .build();
+ }
+
+ /**
+ * Compiles a schema definition or retrieves it from cache.
+ *
+ * @param schemaDefinition the schema definition to compile
+ * @return the compiled Schema
+ */
+ public Schema compileOrGetFromCache(final SchemaDefinition
schemaDefinition) {
+ return compiledSchemaCache.get(schemaDefinition.getIdentifier(),
+ identifier -> {
+ try {
+ return compileSchemaDefinition(schemaDefinition);
+ } catch (final IOException e) {
+ throw new UncheckedIOException("Could not compile schema
for identifier: " + identifier, e);
+ }
+ });
+ }
+
+ /**
+ * Compiles a SchemaDefinition structure into a Schema using the wire
library.
+ * Creates a temporary directory structure that mirrors the package
structure and
+ * places all schemas in their appropriate directories.
+ *
+ * @param schemaDefinition the main schema definition to compile
+ * @return the compiled Schema
+ * @throws IOException if unable to create temporary files or compile
schema
+ */
+ private Schema compileSchemaDefinition(final SchemaDefinition
schemaDefinition) throws IOException {
+ logger.debug("Starting schema compilation for identifier: {}",
schemaDefinition.getIdentifier());
+
+ // Validate that all schema identifiers end with .proto extension
+ validateSchemaDefinitionIdentifiers(schemaDefinition, true);
+
+ return executeWithTemporaryDirectory(tempDir -> {
+ try {
+ // Process main schema definition
+ writeSchemaToTempDirectory(tempDir, schemaDefinition);
+
+ // Process all referenced schemas recursively
+ processSchemaReferences(tempDir,
schemaDefinition.getReferences());
+
+ // Create and configure schema loader
+ final Schema compiledSchema = createAndLoadSchema(tempDir);
+ logger.debug("Successfully compiled schema for identifier:
{}", schemaDefinition.getIdentifier());
+ return compiledSchema;
+
+ } catch (final Exception e) {
+ throw new RuntimeException("Failed to compile Protobuf schema
for identifier: " + schemaDefinition.getIdentifier(), e);
+ }
+ });
+ }
+
+ /**
+ * Executes a function with a temporary directory, ensuring proper cleanup.
+ *
+ * @param function the function to execute with the temporary directory
+ * @return the result of the function
+ * @throws IOException if unable to create or manage temporary directory
+ */
+ private <T> T executeWithTemporaryDirectory(final
WithTemporaryDirectory<T> function) throws IOException {
+ final Path tempDir = Files.createTempDirectory(tempDirectorySuffix +
"_protobuf_schema_compiler");
+ logger.debug("Created temporary directory for schema compilation: {}",
tempDir);
+
+ try {
+ return function.apply(tempDir);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ safeDeleteDirectory(tempDir);
+ }
+ }
+
+ private Schema createAndLoadSchema(final Path tempDir) {
+ final SchemaLoader schemaLoader = new
SchemaLoader(FileSystems.getDefault());
+
+ final List<Location> roots = new ArrayList<>();
+ roots.add(Location.get(tempDir.toString()));
+
+ // Add standard protobuf libraries
+ roots.addAll(STANDARD_PROTOBUF_LOCATIONS);
+
+ schemaLoader.initRoots(roots, Collections.emptyList());
+
+ // Load and return the compiled schema
+ return schemaLoader.loadSchema();
+ }
+
+ private void safeDeleteDirectory(final Path directory) {
+ if (Files.exists(directory)) {
+ try {
+ FileUtils.deleteDirectory(directory.toFile());
+ } catch (final IOException | IllegalArgumentException e) {
+ logger.warn("Failed to delete temporary directory: {}",
directory, e);
+ }
+ }
+ }
+
+
+ /**
+ * Writes a schema definition to the temporary directory structure.
+ * If package name is present, creates the appropriate directory structure.
+ *
+ * @param tempDir the temporary directory root
+ * @param schemaDefinition the schema definition to write
+ * @throws IOException if unable to create directories or write files
+ */
+ private void writeSchemaToTempDirectory(final Path tempDir, final
SchemaDefinition schemaDefinition) throws IOException {
+ logger.debug("Writing schema definition to temporary directory.
Identifier: {}", schemaDefinition.getIdentifier());
+
+ final String schemaFileName = generateSchemaFileName(schemaDefinition);
+ final Path schemaFile = tempDir.resolve(schemaFileName);
+
+ // Write schema text to file
+ Files.write(schemaFile, schemaDefinition.getText().getBytes(), CREATE,
WRITE, TRUNCATE_EXISTING);
+ logger.debug("Successfully wrote schema to file: {} (string length:
{})",
+ schemaFile, schemaDefinition.getText().length());
+ }
+
+ /**
+ * Generates a filename for a schema definition, ensuring it has a .proto
extension.
+ *
+ * @param schemaDefinition the schema definition
+ * @return the generated filename
+ */
+ private String generateSchemaFileName(final SchemaDefinition
schemaDefinition) {
+ String schemaFileName =
schemaDefinition.getIdentifier().getName().orElseGet(
+ () ->
String.valueOf(schemaDefinition.getIdentifier().getSchemaVersionId().orElse(0L))
+ );
+
+ if (!schemaFileName.endsWith(PROTO_EXTENSION)) {
+ schemaFileName += PROTO_EXTENSION; // Ensure the file ends with
.proto, otherwise the wire library will not recognize it
+ }
+
+ return schemaFileName;
+ }
+
+ private void processSchemaReferences(final Path tempDir, final Map<String,
SchemaDefinition> references) throws IOException {
+ logger.debug("Processing [{}] schema references in [{}]",
+ references.size(), tempDir);
+
+ for (final Map.Entry<String, SchemaDefinition> entry :
references.entrySet()) {
+ final String referenceKey = entry.getKey();
+ final SchemaDefinition referencedSchema = entry.getValue();
+
+ logger.debug("Processing schema reference [{}] Identifier [{}]",
+ referenceKey, referencedSchema.getIdentifier());
+
+ // Write referenced schema to appropriate directory
+ writeSchemaToTempDirectory(tempDir, referencedSchema);
+
+ // Process nested references recursively
+ if (!referencedSchema.getReferences().isEmpty()) {
+ logger.debug("Processing {} nested references for schema
reference: {}", referencedSchema.getReferences().size(), referenceKey);
+ processSchemaReferences(tempDir,
referencedSchema.getReferences());
+ } else {
+ logger.debug("No nested references found for schema reference:
{}", referenceKey);
+ }
+ }
+ }
+
+
+ @FunctionalInterface
+ private interface WithTemporaryDirectory<T> {
+ T apply(Path tempDir) throws Exception;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaValidator.java
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaValidator.java
new file mode 100644
index 0000000000..4e8257bad1
--- /dev/null
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaValidator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import org.apache.nifi.schemaregistry.services.SchemaDefinition;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+/**
+ * Validates Protocol Buffer SchemaDefinition objects and schema identifiers.
+ */
+final class ProtobufSchemaValidator {
+
+ private ProtobufSchemaValidator() {
+ }
+
+ /**
+ * Validates that all SchemaDefinition identifiers end with .proto
extension.
+ * Performs recursive validation on all referenced schemas.
+ *
+ * @param schemaDefinition the schema definition to validate
+ * @param isRootSchemaDefinition set to true if schema definition is a
root definition, false otherwise
+ * @throws IllegalArgumentException if any identifier does not end with
.proto extension
+ */
+ static void validateSchemaDefinitionIdentifiers(final SchemaDefinition
schemaDefinition, final boolean isRootSchemaDefinition) {
+ // do not validate schema identifier names for root schema
definitions. They might be coming from sources like text fields,
+ // flow file attributes and other sources that do not support naming.
+ if (!isRootSchemaDefinition) {
+ validateSchemaIdentifier(schemaDefinition.getIdentifier());
+ }
+
+ // Recursively validate all referenced schemas
+ // schema references have to end with .proto extension.
+ for (final SchemaDefinition referencedSchema :
schemaDefinition.getReferences().values()) {
+ validateSchemaDefinitionIdentifiers(referencedSchema, false);
+ }
+ }
+
+ /**
+ * Validates that a single SchemaIdentifier has a name ending with .proto
extension.
+ *
+ * @param schemaIdentifier the schema identifier to validate
+ * @throws IllegalArgumentException if the identifier name does not end
with .proto extension
+ */
+ private static void validateSchemaIdentifier(final SchemaIdentifier
schemaIdentifier) {
+ schemaIdentifier.getName()
+ .filter(name -> name.endsWith(".proto"))
+ .orElseThrow(() -> new IllegalArgumentException("Schema identifier
must have a name that ends with .proto extension. Schema identifier: " +
schemaIdentifier));
+ }
+}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/StandardProtobufReader.java
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/StandardProtobufReader.java
new file mode 100644
index 0000000000..b08ee5132e
--- /dev/null
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/StandardProtobufReader.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import com.squareup.wire.schema.Schema;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.MessageName;
+import org.apache.nifi.schemaregistry.services.MessageNameResolver;
+import org.apache.nifi.schemaregistry.services.SchemaDefinition;
+import org.apache.nifi.schemaregistry.services.SchemaReferenceReader;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.schemaregistry.services.StandardMessageNameFactory;
+import org.apache.nifi.schemaregistry.services.StandardSchemaDefinition;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaRegistryService;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.services.protobuf.schema.ProtoSchemaParser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.HexFormat;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
+import static
org.apache.nifi.services.protobuf.StandardProtobufReader.MessageNameResolverStrategy.MESSAGE_NAME_PROPERTY;
+
+@Tags({"protobuf", "record", "reader", "parser"})
+@CapabilityDescription("""
+ Parses Protocol Buffers messages from binary format into NiFi Records. \
+ Supports multiple schema access strategies including inline schema text,
schema registry lookup, \
+ and schema reference readers.
+ Protobuf reader needs to know the Proto schema message name in order to
deserialize the binary payload correctly. \
+ The name of this message can be determined statically using 'Message Name'
property, \
+ or dynamically, using a Message Name Resolver service.""")
+
+public class StandardProtobufReader extends SchemaRegistryService implements
RecordReaderFactory {
+
+ public static final PropertyDescriptor MESSAGE_NAME_RESOLUTION_STRATEGY =
new PropertyDescriptor.Builder()
+ .name("Message Name Resolution Strategy")
+ .description("Strategy for determining the Protocol Buffers message
name for processing")
+ .required(true)
+ .allowableValues(MESSAGE_NAME_PROPERTY,
MessageNameResolverStrategy.MESSAGE_NAME_RESOLVER)
+ .defaultValue(MESSAGE_NAME_PROPERTY)
+ .build();
+
+ public static final PropertyDescriptor MESSAGE_NAME = new
PropertyDescriptor.Builder()
+ .name("Message Name")
+ .description("Fully qualified name of the Protocol Buffers message
including its package (eg. mypackage.MyMessage).")
+ .required(true)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+ .dependsOn(MESSAGE_NAME_RESOLUTION_STRATEGY, MESSAGE_NAME_PROPERTY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MESSAGE_NAME_RESOLVER = new
PropertyDescriptor.Builder()
+ .name("Message Name Resolver")
+ .description("Service that dynamically resolves Protocol Buffer
message names from FlowFile content or attributes")
+ .required(true)
+ .identifiesControllerService(MessageNameResolver.class)
+ .dependsOn(MESSAGE_NAME_RESOLUTION_STRATEGY,
MessageNameResolverStrategy.MESSAGE_NAME_RESOLVER)
+ .build();
+
+ private static final PropertyDescriptor PROTOBUF_SCHEMA_TEXT = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(SCHEMA_TEXT)
+ .required(true)
+ .clearValidators()
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue("${proto.schema}")
+ .description("The text of a Proto 3 formatted Schema")
+ .build();
+
+ private static final String PROTO_EXTENSION = ".proto";
+
+ private volatile ProtobufSchemaCompiler schemaCompiler;
+ private volatile MessageNameResolver messageNameResolver;
+ private volatile SchemaReferenceReader schemaReferenceReader;
+ private volatile SchemaRegistry schemaRegistry;
+ private volatile String schemaAccessStrategyValue;
+ private volatile PropertyValue schemaText;
+ private volatile PropertyValue schemaName;
+ private volatile PropertyValue schemaBranchName;
+ private volatile PropertyValue schemaVersion;
+
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ super.storeSchemaAccessStrategy(context);
+ setupMessageNameResolver(context);
+ schemaAccessStrategyValue =
context.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
+ schemaReferenceReader =
context.getProperty(SCHEMA_REFERENCE_READER).asControllerService(SchemaReferenceReader.class);
+ schemaRegistry =
context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
+ schemaName = context.getProperty(SCHEMA_NAME);
+ schemaText = context.getProperty(SCHEMA_TEXT);
+ schemaBranchName = context.getProperty(SCHEMA_BRANCH_NAME);
+ schemaVersion = context.getProperty(SCHEMA_VERSION);
+ schemaCompiler = new ProtobufSchemaCompiler(getIdentifier(),
getLogger());
+ }
+
+ @Override
+ public RecordReader createRecordReader(final Map<String, String>
variables, final InputStream in, final long inputLength, final ComponentLog
logger) throws IOException, SchemaNotFoundException {
+ if (SCHEMA_TEXT_PROPERTY.getValue().equals(schemaAccessStrategyValue))
{
+ final SchemaDefinition schemaDefinition =
createSchemaDefinitionFromText(variables);
+ return createProtobufRecordReader(variables, in, schemaDefinition);
+ } else if
(SCHEMA_NAME_PROPERTY.getValue().equals(schemaAccessStrategyValue)) {
+ final SchemaDefinition schemaDefinition =
createSchemaDefinitionFromRegistry(variables);
+ return createProtobufRecordReader(variables, in, schemaDefinition);
+ } else if
(SCHEMA_REFERENCE_READER_PROPERTY.getValue().equals(schemaAccessStrategyValue))
{
+ final SchemaIdentifier schemaIdentifier =
schemaReferenceReader.getSchemaIdentifier(variables, in);
+ final SchemaDefinition schemaDefinition =
schemaRegistry.retrieveSchemaDefinition(schemaIdentifier);
+ logger.debug("Using message name for schema identifier: {}",
schemaDefinition.getIdentifier());
+ return createProtobufRecordReader(variables, in, schemaDefinition);
+ }
+
+ throw new SchemaNotFoundException("Unsupported schema access strategy:
" + schemaAccessStrategyValue);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new
ArrayList<>(super.getSupportedPropertyDescriptors());
+ properties.add(MESSAGE_NAME_RESOLUTION_STRATEGY);
+ properties.add(MESSAGE_NAME_RESOLVER);
+ properties.add(MESSAGE_NAME);
+ return properties;
+ }
+
+ @Override
+ protected PropertyDescriptor buildSchemaTextProperty() {
+ return PROTOBUF_SCHEMA_TEXT;
+ }
+
+ private RecordReader createProtobufRecordReader(final Map<String, String>
variables, final InputStream in, final SchemaDefinition schemaDefinition)
throws IOException {
+ final Schema schema =
schemaCompiler.compileOrGetFromCache(schemaDefinition);
+ final ProtoSchemaParser schemaParser = new ProtoSchemaParser(schema);
+ final MessageName messageName =
messageNameResolver.getMessageName(variables, schemaDefinition, in);
+ final RecordSchema recordSchema =
schemaParser.createSchema(messageName.getFullyQualifiedName());
+ return new ProtobufRecordReader(schema,
messageName.getFullyQualifiedName(), in, recordSchema);
+ }
+
+
+ private void setupMessageNameResolver(final ConfigurationContext context) {
+ final MessageNameResolverStrategy messageNameResolverStrategy =
context.getProperty(MESSAGE_NAME_RESOLUTION_STRATEGY).asAllowableValue(MessageNameResolverStrategy.class);
+ messageNameResolver = switch (messageNameResolverStrategy) {
+ case MESSAGE_NAME_PROPERTY -> new
PropertyMessageNameResolver(context);
+ case MESSAGE_NAME_RESOLVER ->
context.getProperty(MESSAGE_NAME_RESOLVER).asControllerService(MessageNameResolver.class);
+ };
+ }
+
+ private SchemaDefinition createSchemaDefinitionFromText(final Map<String,
String> variables) throws SchemaNotFoundException {
+ final String schemaTextString =
schemaText.evaluateAttributeExpressions(variables).getValue();
+ validateSchemaText(schemaTextString);
+
+ final String hash = sha256Hex(schemaTextString);
+ final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder()
+ .name(hash + PROTO_EXTENSION)
+ .build();
+
+ return new StandardSchemaDefinition(schemaIdentifier,
schemaTextString, SchemaDefinition.SchemaType.PROTOBUF);
+ }
+
+ private String sha256Hex(final String input) {
+ MessageDigest digest;
+ try {
+ digest = MessageDigest.getInstance("SHA-256");
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalStateException(e);
+ }
+ final byte[] hash = digest.digest(
+ input.getBytes(StandardCharsets.UTF_8));
+ return HexFormat.of().formatHex(hash);
+ }
+
+ private SchemaDefinition createSchemaDefinitionFromRegistry(final
Map<String, String> variables) throws SchemaNotFoundException, IOException {
+ final String schemaNameValue =
schemaName.evaluateAttributeExpressions(variables).getValue();
+ validateSchemaName(schemaNameValue);
+
+ final String schemaBranchNameValue =
schemaBranchName.evaluateAttributeExpressions(variables).getValue();
+ final String schemaVersionValue =
schemaVersion.evaluateAttributeExpressions(variables).getValue();
+
+ final SchemaIdentifier schemaIdentifier =
buildSchemaIdentifier(schemaNameValue, schemaBranchNameValue,
schemaVersionValue);
+ return schemaRegistry.retrieveSchemaDefinition(schemaIdentifier);
+ }
+
+ private SchemaIdentifier buildSchemaIdentifier(final String
schemaNameValue, final String schemaBranchNameValue, final String
schemaVersionValue) throws SchemaNotFoundException {
+ final SchemaIdentifier.Builder identifierBuilder =
SchemaIdentifier.builder().name(schemaNameValue);
+
+ if (schemaBranchNameValue != null && !schemaBranchNameValue.isBlank())
{
+ identifierBuilder.branch(schemaBranchNameValue);
+ }
+
+ if (schemaVersionValue != null && !schemaVersionValue.isBlank()) {
+ try {
+ identifierBuilder.version(Integer.valueOf(schemaVersionValue));
+ } catch (final NumberFormatException nfe) {
+ throw new SchemaNotFoundException("Could not retrieve schema
with name '%s' because a non-numeric version was supplied '%s'"
+ .formatted(schemaNameValue, schemaVersionValue), nfe);
+ }
+ }
+
+ return identifierBuilder.build();
+ }
+
+ private void validateSchemaText(final String schemaTextString) throws
SchemaNotFoundException {
+ if (schemaTextString == null || schemaTextString.isBlank()) {
+ throw new SchemaNotFoundException("Schema text not found");
+ }
+ }
+
+ private void validateSchemaName(final String schemaNameValue) throws
SchemaNotFoundException {
+ if (schemaNameValue == null || schemaNameValue.isBlank()) {
+ throw new SchemaNotFoundException("Schema name not provided or is
blank");
+ }
+ }
+
+
+ enum MessageNameResolverStrategy implements DescribedValue {
+
+ MESSAGE_NAME_PROPERTY("Message Name Property", "Use the 'Message Name'
property value to determine the message name"),
+ MESSAGE_NAME_RESOLVER("Message Name Resolver", "Use a 'Message Name
Resolver' service to dynamically determine the message name");
+
+ private final String displayName;
+ private final String description;
+
+ MessageNameResolverStrategy(final String displayName, final String
description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+ }
+
+ static class PropertyMessageNameResolver extends AbstractControllerService
implements MessageNameResolver {
+ private final PropertyContext context;
+
+ PropertyMessageNameResolver(PropertyContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public MessageName getMessageName(final Map<String, String> variables,
final SchemaDefinition schemaDefinition, final InputStream in) {
+ final String messageName =
context.getProperty(MESSAGE_NAME).evaluateAttributeExpressions(variables).getValue();
+ return StandardMessageNameFactory.fromName(messageName);
+ }
+ }
+
+}
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 44ded1008f..8ff7783b66 100644
---
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.services.protobuf.ProtobufReader
\ No newline at end of file
+org.apache.nifi.services.protobuf.ProtobufReader
+org.apache.nifi.services.protobuf.StandardProtobufReader
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/ProtoTestUtil.java
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/ProtoTestUtil.java
index 3293513b82..b9b55f2019 100644
---
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/ProtoTestUtil.java
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/ProtoTestUtil.java
@@ -16,8 +16,11 @@
*/
package org.apache.nifi.services.protobuf;
-import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
import com.google.protobuf.DynamicMessage;
import com.squareup.wire.schema.CoreLoaderKt;
import com.squareup.wire.schema.Location;
@@ -31,23 +34,24 @@ import java.nio.file.FileSystems;
import java.util.Arrays;
import java.util.Collections;
+import static java.lang.System.arraycopy;
import static
org.apache.nifi.services.protobuf.converter.ProtobufDataConverter.MAP_KEY_FIELD_NAME;
import static
org.apache.nifi.services.protobuf.converter.ProtobufDataConverter.MAP_VALUE_FIELD_NAME;
-public class ProtoTestUtil {
+public final class ProtoTestUtil {
- public static final String BASE_TEST_PATH = "src/test/resources/";
+ static final String BASE_TEST_PATH = "src/test/resources/";
public static Schema loadProto3TestSchema() {
- final SchemaLoader schemaLoader = new
SchemaLoader(FileSystems.getDefault());
-
schemaLoader.initRoots(Collections.singletonList(Location.get(BASE_TEST_PATH +
"test_proto3.proto")), Collections.emptyList());
- return schemaLoader.loadSchema();
+ return loadTestSchema("test_proto3.proto");
+ }
+
+ public static Schema loadRootMessageSchema() {
+ return
loadTestSchema("org/apache/nifi/protobuf/test/root_message.proto");
}
public static Schema loadRepeatedProto3TestSchema() {
- final SchemaLoader schemaLoader = new
SchemaLoader(FileSystems.getDefault());
-
schemaLoader.initRoots(Collections.singletonList(Location.get(BASE_TEST_PATH +
"test_repeated_proto3.proto")), Collections.emptyList());
- return schemaLoader.loadSchema();
+ return loadTestSchema("test_repeated_proto3.proto");
}
public static Schema loadProto2TestSchema() {
@@ -59,34 +63,52 @@ public class ProtoTestUtil {
}
public static Schema loadCircularReferenceTestSchema() {
- final SchemaLoader schemaLoader = new
SchemaLoader(FileSystems.getDefault());
-
schemaLoader.initRoots(Collections.singletonList(Location.get(BASE_TEST_PATH +
"test_circular_reference.proto")), Collections.emptyList());
- return schemaLoader.loadSchema();
+ return loadTestSchema("test_circular_reference.proto");
}
+ public static InputStream generateInputDataForRootMessage() throws
IOException, Descriptors.DescriptorValidationException {
+ final FileDescriptor fileDescriptor =
loadFileDescriptor("org/apache/nifi/protobuf/test/root_message.desc");
+
+ final Descriptor messageDescriptor =
fileDescriptor.findMessageTypeByName("RootMessage");
+ final Descriptor nestedMessageDescriptor =
fileDescriptor.findMessageTypeByName("NestedMessage");
+ final Descriptors.EnumDescriptor enumValueDescriptor =
fileDescriptor.findEnumTypeByName("TestEnum");
+
+ final DynamicMessage nestedMessage = DynamicMessage
+ .newBuilder(nestedMessageDescriptor)
+ .setField(nestedMessageDescriptor.findFieldByNumber(2),
enumValueDescriptor.findValueByNumber(2))
+ .build();
+
+ final DynamicMessage message = DynamicMessage
+ .newBuilder(messageDescriptor)
+ .setField(messageDescriptor.findFieldByNumber(1), nestedMessage)
+ .build();
+
+ return message.toByteString().newInput();
+ }
+
+
public static InputStream generateInputDataForProto3() throws IOException,
Descriptors.DescriptorValidationException {
- DescriptorProtos.FileDescriptorSet descriptorSet =
DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH
+ "test_proto3.desc"));
- Descriptors.FileDescriptor fileDescriptor =
Descriptors.FileDescriptor.buildFrom(descriptorSet.getFile(0), new
Descriptors.FileDescriptor[0]);
+ final FileDescriptor fileDescriptor =
loadFileDescriptor("test_proto3.desc");
- Descriptors.Descriptor messageDescriptor =
fileDescriptor.findMessageTypeByName("Proto3Message");
- Descriptors.Descriptor nestedMessageDescriptor =
fileDescriptor.findMessageTypeByName("NestedMessage");
- Descriptors.Descriptor nestedMessageDescriptor2 =
fileDescriptor.findMessageTypeByName("NestedMessage2");
- Descriptors.EnumDescriptor enumValueDescriptor =
fileDescriptor.findEnumTypeByName("TestEnum");
- Descriptors.Descriptor mapDescriptor =
nestedMessageDescriptor2.findNestedTypeByName("TestMapEntry");
+ final Descriptor messageDescriptor =
fileDescriptor.findMessageTypeByName("Proto3Message");
+ final Descriptor nestedMessageDescriptor =
fileDescriptor.findMessageTypeByName("NestedMessage");
+ final Descriptor nestedMessageDescriptor2 =
fileDescriptor.findMessageTypeByName("NestedMessage2");
+ final Descriptors.EnumDescriptor enumValueDescriptor =
fileDescriptor.findEnumTypeByName("TestEnum");
+ final Descriptor mapDescriptor =
nestedMessageDescriptor2.findNestedTypeByName("TestMapEntry");
- DynamicMessage mapEntry1 = DynamicMessage
+ final DynamicMessage mapEntry1 = DynamicMessage
.newBuilder(mapDescriptor)
.setField(mapDescriptor.findFieldByName(MAP_KEY_FIELD_NAME),
"test_key_entry1")
.setField(mapDescriptor.findFieldByName(MAP_VALUE_FIELD_NAME),
101)
.build();
- DynamicMessage mapEntry2 = DynamicMessage
+ final DynamicMessage mapEntry2 = DynamicMessage
.newBuilder(mapDescriptor)
.setField(mapDescriptor.findFieldByName(MAP_KEY_FIELD_NAME),
"test_key_entry2")
.setField(mapDescriptor.findFieldByName(MAP_VALUE_FIELD_NAME),
202)
.build();
- DynamicMessage nestedMessage2 = DynamicMessage
+ final DynamicMessage nestedMessage2 = DynamicMessage
.newBuilder(nestedMessageDescriptor2)
.setField(nestedMessageDescriptor2.findFieldByNumber(30),
Arrays.asList(mapEntry1, mapEntry2))
.setField(nestedMessageDescriptor2.findFieldByNumber(31), "One
Of Option")
@@ -94,13 +116,13 @@ public class ProtoTestUtil {
.setField(nestedMessageDescriptor2.findFieldByNumber(33), 3)
.build();
- DynamicMessage nestedMessage = DynamicMessage
+ final DynamicMessage nestedMessage = DynamicMessage
.newBuilder(nestedMessageDescriptor)
.setField(nestedMessageDescriptor.findFieldByNumber(20),
enumValueDescriptor.findValueByNumber(2))
.addRepeatedField(nestedMessageDescriptor.findFieldByNumber(21), nestedMessage2)
.build();
- DynamicMessage message = DynamicMessage
+ final DynamicMessage message = DynamicMessage
.newBuilder(messageDescriptor)
.setField(messageDescriptor.findFieldByNumber(1), true)
.setField(messageDescriptor.findFieldByNumber(2), "Test text")
@@ -124,14 +146,12 @@ public class ProtoTestUtil {
}
public static InputStream generateInputDataForRepeatedProto3() throws
IOException, Descriptors.DescriptorValidationException {
- DescriptorProtos.FileDescriptorSet descriptorSet =
DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH
+ "test_repeated_proto3.desc"));
- Descriptors.FileDescriptor fileDescriptor =
Descriptors.FileDescriptor.buildFrom(descriptorSet.getFile(0), new
Descriptors.FileDescriptor[0]);
-
- Descriptors.Descriptor messageDescriptor =
fileDescriptor.findMessageTypeByName("RootMessage");
- Descriptors.Descriptor repeatedMessageDescriptor =
fileDescriptor.findMessageTypeByName("RepeatedMessage");
- Descriptors.EnumDescriptor enumValueDescriptor =
fileDescriptor.findEnumTypeByName("TestEnum");
+ final FileDescriptor fileDescriptor =
loadFileDescriptor("test_repeated_proto3.desc");
+ final Descriptor messageDescriptor =
fileDescriptor.findMessageTypeByName("RootMessage");
+ final Descriptor repeatedMessageDescriptor =
fileDescriptor.findMessageTypeByName("RepeatedMessage");
+ final Descriptors.EnumDescriptor enumValueDescriptor =
fileDescriptor.findEnumTypeByName("TestEnum");
- DynamicMessage repeatedMessage1 = DynamicMessage
+ final DynamicMessage repeatedMessage1 = DynamicMessage
.newBuilder(repeatedMessageDescriptor)
.addRepeatedField(repeatedMessageDescriptor.findFieldByNumber(1), true)
.addRepeatedField(repeatedMessageDescriptor.findFieldByNumber(1), false)
@@ -167,12 +187,12 @@ public class ProtoTestUtil {
.addRepeatedField(repeatedMessageDescriptor.findFieldByNumber(16),
enumValueDescriptor.findValueByNumber(2))
.build();
- DynamicMessage repeatedMessage2 = DynamicMessage
+ final DynamicMessage repeatedMessage2 = DynamicMessage
.newBuilder(repeatedMessageDescriptor)
.addRepeatedField(repeatedMessageDescriptor.findFieldByNumber(1), true)
.build();
- DynamicMessage rootMessage = DynamicMessage
+ final DynamicMessage rootMessage = DynamicMessage
.newBuilder(messageDescriptor)
.addRepeatedField(messageDescriptor.findFieldByNumber(1),
repeatedMessage1)
.addRepeatedField(messageDescriptor.findFieldByNumber(1),
repeatedMessage2)
@@ -182,30 +202,27 @@ public class ProtoTestUtil {
}
public static InputStream generateInputDataForProto2() throws IOException,
Descriptors.DescriptorValidationException {
- DescriptorProtos.FileDescriptorSet anyDescriptorSet =
DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH
+ "google/protobuf/any.desc"));
- Descriptors.FileDescriptor anyDesc =
Descriptors.FileDescriptor.buildFrom(anyDescriptorSet.getFile(0), new
Descriptors.FileDescriptor[]{});
+ final FileDescriptor anyDesc =
loadFileDescriptor("google/protobuf/any.desc");
+ final FileDescriptor fileDescriptor =
loadFileDescriptor("test_proto2.desc", new FileDescriptor[]{anyDesc});
- DescriptorProtos.FileDescriptorSet descriptorSet =
DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH
+ "test_proto2.desc"));
- Descriptors.FileDescriptor fileDescriptor =
Descriptors.FileDescriptor.buildFrom(descriptorSet.getFile(0), new
Descriptors.FileDescriptor[]{anyDesc});
+ final Descriptor messageDescriptor =
fileDescriptor.findMessageTypeByName("Proto2Message");
+ final Descriptor anyTestDescriptor =
fileDescriptor.findMessageTypeByName("AnyValueMessage");
+ final Descriptors.FieldDescriptor fieldDescriptor =
fileDescriptor.findExtensionByName("extensionField");
+ final Descriptor anyDescriptor = anyDesc.findMessageTypeByName("Any");
- Descriptors.Descriptor messageDescriptor =
fileDescriptor.findMessageTypeByName("Proto2Message");
- Descriptors.Descriptor anyTestDescriptor =
fileDescriptor.findMessageTypeByName("AnyValueMessage");
- Descriptors.FieldDescriptor fieldDescriptor =
fileDescriptor.findExtensionByName("extensionField");
- Descriptors.Descriptor anyDescriptor =
anyDesc.findMessageTypeByName("Any");
-
- DynamicMessage anyTestMessage = DynamicMessage
+ final DynamicMessage anyTestMessage = DynamicMessage
.newBuilder(anyTestDescriptor)
.setField(anyTestDescriptor.findFieldByNumber(1), "Test field
1")
.setField(anyTestDescriptor.findFieldByNumber(2), "Test field
2")
.build();
- DynamicMessage anyMessage = DynamicMessage
+ final DynamicMessage anyMessage = DynamicMessage
.newBuilder(anyDescriptor)
.setField(anyDescriptor.findFieldByNumber(1),
"type.googleapis.com/AnyValueMessage")
.setField(anyDescriptor.findFieldByNumber(2),
anyTestMessage.toByteArray())
.build();
- DynamicMessage message = DynamicMessage
+ final DynamicMessage message = DynamicMessage
.newBuilder(messageDescriptor)
.setField(messageDescriptor.findFieldByNumber(1), true)
.setField(messageDescriptor.findFieldByNumber(3), anyMessage)
@@ -214,4 +231,74 @@ public class ProtoTestUtil {
return message.toByteString().newInput();
}
+
+ static FileDescriptor loadFileDescriptor(final String descriptorFileName)
throws IOException, Descriptors.DescriptorValidationException {
+ final FileDescriptorSet descriptorSet = FileDescriptorSet.parseFrom(
+ new FileInputStream(BASE_TEST_PATH + descriptorFileName));
+ return FileDescriptor.buildFrom(descriptorSet.getFile(0), new
FileDescriptor[0]);
+ }
+
+ static FileDescriptor loadFileDescriptor(final String descriptorFileName,
final FileDescriptor[] dependencies)
+ throws IOException, Descriptors.DescriptorValidationException {
+ final FileDescriptorSet descriptorSet = FileDescriptorSet.parseFrom(
+ new FileInputStream(BASE_TEST_PATH + descriptorFileName));
+ return FileDescriptor.buildFrom(descriptorSet.getFile(0),
dependencies);
+ }
+
+ static FileDescriptor[] loadAllFileDescriptors(final String
descriptorFileName) throws IOException,
Descriptors.DescriptorValidationException {
+ final FileDescriptorSet descriptorSet = FileDescriptorSet.parseFrom(
+ new FileInputStream(BASE_TEST_PATH + descriptorFileName));
+
+ final FileDescriptor[] fileDescriptors = new
FileDescriptor[descriptorSet.getFileCount()];
+
+ // Build descriptors in dependency order
+ for (int i = 0; i < descriptorSet.getFileCount(); i++) {
+ final FileDescriptorProto fileProto = descriptorSet.getFile(i);
+ final FileDescriptor[] dependencies = new FileDescriptor[i];
+ arraycopy(fileDescriptors, 0, dependencies, 0, i);
+ fileDescriptors[i] = FileDescriptor.buildFrom(fileProto,
dependencies);
+ }
+
+ return fileDescriptors;
+ }
+
+ /**
+ * Loads all file descriptors from a .desc file and returns a simplified
container
+ * that can find message types by name without array indexing.
+ */
+ static DescriptorContainer loadDescriptorContainer(final String
descriptorFileName) throws IOException,
Descriptors.DescriptorValidationException {
+ return new
DescriptorContainer(loadAllFileDescriptors(descriptorFileName));
+ }
+
+ private static Schema loadTestSchema(final String protoFileName) {
+ final SchemaLoader schemaLoader = new
SchemaLoader(FileSystems.getDefault());
+
schemaLoader.initRoots(Collections.singletonList(Location.get(BASE_TEST_PATH +
protoFileName)), Collections.emptyList());
+ return schemaLoader.loadSchema();
+ }
+
+ /**
+ * A simplified descriptor container that allows finding message types by
name
+ * without needing to know which specific FileDescriptor contains the
message.
+ */
+ static class DescriptorContainer {
+
+ private final FileDescriptor[] fileDescriptors;
+
+ DescriptorContainer(final FileDescriptor[] fileDescriptors) {
+ this.fileDescriptors = fileDescriptors;
+ }
+
+ /**
+ * Finds a message type by name across all loaded file descriptors.
+ */
+ Descriptor findMessageTypeByName(final String messageName) {
+ for (final FileDescriptor fileDescriptor : fileDescriptors) {
+ final Descriptor descriptor =
fileDescriptor.findMessageTypeByName(messageName);
+ if (descriptor != null) {
+ return descriptor;
+ }
+ }
+ throw new IllegalArgumentException("Message type '" + messageName
+ "' not found in loaded descriptors");
+ }
+ }
}
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/StandardProtobufReaderTestBase.java
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/StandardProtobufReaderTestBase.java
new file mode 100644
index 0000000000..fe63280876
--- /dev/null
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/StandardProtobufReaderTestBase.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.MessageName;
+import org.apache.nifi.schemaregistry.services.MessageNameResolver;
+import org.apache.nifi.schemaregistry.services.SchemaDefinition;
+import org.apache.nifi.schemaregistry.services.SchemaReferenceReader;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.schemaregistry.services.StandardMessageNameFactory;
+import org.apache.nifi.schemaregistry.services.StandardSchemaDefinition;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.Objects.requireNonNullElseGet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Base class for StandardProtobufReader tests containing common functionality
+ * and mock services that can be shared across different schema access
strategy tests.
+ */
+public abstract class StandardProtobufReaderTestBase {
+
+ protected static final String STANDARD_PROTOBUF_READER_SERVICE_ID =
"standard-protobuf-reader";
+ protected static final String MOCK_SCHEMA_REGISTRY_ID =
"mock-schema-registry";
+ protected static final String MOCK_SCHEMA_REFERENCE_READER_ID =
"mock-schema-reference-reader";
+ protected static final String MOCK_MESSAGE_NAME_RESOLVER_ID =
"mock-message-name-resolver";
+
+ protected static final String PROTO_3_MESSAGE = "Proto3Message";
+ protected static final String PROTO_3_SCHEMA = "test_proto3.proto";
+
+ protected TestRunner runner;
+ protected StandardProtobufReader standardProtobufReader;
+ protected MockSchemaRegistry mockSchemaRegistry;
+ protected MockSchemaReferenceReader mockSchemaReferenceReader;
+ protected MockMessageNameResolver mockMessageNameResolver;
+
+
+ @BeforeEach
+ void setUp() throws InitializationException {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ standardProtobufReader = new StandardProtobufReader();
+ runner.addControllerService(STANDARD_PROTOBUF_READER_SERVICE_ID,
standardProtobufReader);
+ setupMockServices();
+ }
+
+ protected void setupMockServices() throws InitializationException {
+ mockSchemaRegistry = new MockSchemaRegistry();
+ mockSchemaReferenceReader = new MockSchemaReferenceReader();
+ mockMessageNameResolver = new MockMessageNameResolver();
+ runner.addControllerService(MOCK_SCHEMA_REGISTRY_ID,
mockSchemaRegistry);
+ runner.addControllerService(MOCK_SCHEMA_REFERENCE_READER_ID,
mockSchemaReferenceReader);
+ runner.addControllerService(MOCK_MESSAGE_NAME_RESOLVER_ID,
mockMessageNameResolver);
+ }
+
+ protected void enableAllControllerServices() {
+ runner.enableControllerService(mockSchemaRegistry);
+ runner.enableControllerService(mockSchemaReferenceReader);
+ runner.enableControllerService(mockMessageNameResolver);
+ runner.enableControllerService(standardProtobufReader);
+ }
+
+ protected RecordReader createRecordReader(final byte[] testData) throws
IOException, SchemaNotFoundException {
+ return standardProtobufReader.createRecordReader(
+ emptyMap(),
+ new ByteArrayInputStream(testData),
+ testData.length,
+ runner.getLogger()
+ );
+ }
+
+ protected RecordReader createRecordReader() throws IOException,
SchemaNotFoundException {
+ final byte[] testData = new byte[0];
+ return createRecordReader(testData);
+ }
+
+ /**
+ * Reads the test_proto3.proto file from resources.
+ */
+ protected String getTestProto3File() {
+ try {
+ return new String(
+
getClass().getClassLoader().getResourceAsStream("test_proto3.proto").readAllBytes()
+ );
+ } catch (final Exception e) {
+ throw new RuntimeException("Failed to read test_proto3.proto from
resources", e);
+ }
+ }
+
+ protected ValidationResult verifyExactlyOneValidationError() {
+ final Collection<ValidationResult> results =
runner.validate(standardProtobufReader);
+ final List<ValidationResult> invalids = results.stream().filter(result
-> !result.isValid()).toList();
+ if (invalids.size() != 1) {
+ fail("Expected exactly one invalid result, but found: " +
invalids.size());
+ }
+ return invalids.getFirst();
+ }
+
+ // assertions made on the test_proto3.proto message generated by the
ProtoTestUtil.generateInputDataForProto3() method
+ protected void runAssertionsOnTestProto3Message(final RecordReader
recordReader) throws MalformedRecordException, IOException {
+ final RecordSchema schema = recordReader.getSchema();
+ assertNotNull(schema);
+
+ // Iterate over record reader and validate actual data
+ final Record record = recordReader.nextRecord();
+ assertNotNull(record);
+
+ assertTrue(record.getAsBoolean("booleanField"));
+ assertEquals("Test text", record.getAsString("stringField"));
+ assertEquals(Integer.MAX_VALUE, record.getAsInt("int32Field"));
+ assertEquals(Integer.MIN_VALUE, record.getAsInt("sint32Field"));
+ final Byte[] bytesValue = (Byte[]) record.getValue("bytesField");
+ final String bytesString = new
String(ArrayUtils.toPrimitive(bytesValue), UTF_8);
+
+ assertEquals("Test bytes", bytesString);
+ assertEquals(Long.MAX_VALUE, record.getAsLong("int64Field"));
+ assertEquals(Long.MIN_VALUE, record.getAsLong("sint64Field"));
+
+ // Validate nested message
+ final Record nestedMessage = (Record) record.getValue("nestedMessage");
+ assertNotNull(nestedMessage, "nestedMessage should not be null");
+ assertEquals("ENUM_VALUE_3", nestedMessage.getAsString("testEnum"));
+
+ // Validate nested message2 array
+ final Object[] nestedMessage2Value =
nestedMessage.getAsArray("nestedMessage2");
+ assertNotNull(nestedMessage2Value);
+
+
+ assertEquals(1, nestedMessage2Value.length);
+ final Record nestedMessage2 = (Record) nestedMessage2Value[0];
+ assertNotNull(nestedMessage2);
+
+ // Validate map field
+ final Map<String, Object> testMap = (Map<String, Object>)
nestedMessage2.getValue("testMap");
+ assertNotNull(testMap);
+
+ assertEquals(2, testMap.size());
+ assertEquals(101, ((Number)
testMap.get("test_key_entry1")).intValue());
+ assertEquals(202, ((Number)
testMap.get("test_key_entry2")).intValue());
+
+ assertNull(recordReader.nextRecord());
+ }
+
+
+ /**
+ * Mock implementation of SchemaRegistry for testing purposes.
+ */
+ protected static class MockSchemaRegistry extends
AbstractControllerService implements SchemaRegistry {
+ private String schemaText = "";
+ private SchemaDefinition schemaDefinition = null;
+ private boolean shouldThrowSchemaNotFound = false;
+
+ public void returnSchemaText(final String schemaText) {
+ this.schemaText = schemaText;
+ }
+
+ public void returnSchemaDefinition(final SchemaDefinition
schemaDefinition) {
+ this.schemaDefinition = schemaDefinition;
+ }
+
+ public void throwSchemaNotFoundWhenCalled(final boolean shouldThrow) {
+ this.shouldThrowSchemaNotFound = shouldThrow;
+ }
+
+ @Override
+ public RecordSchema retrieveSchema(final SchemaIdentifier
schemaIdentifier) throws IOException, SchemaNotFoundException {
+ throw new UnsupportedOperationException("retrieveSchema is not
implemented in MockSchemaRegistry");
+ }
+
+ @Override
+ public Set<SchemaField> getSuppliedSchemaFields() {
+ return emptySet();
+ }
+
+ @Override
+ public SchemaDefinition retrieveSchemaDefinition(final
SchemaIdentifier schemaIdentifier) throws SchemaNotFoundException {
+ if (shouldThrowSchemaNotFound) {
+ throw new SchemaNotFoundException("Schema not found: " +
schemaIdentifier);
+ }
+ return requireNonNullElseGet(schemaDefinition, () -> new
StandardSchemaDefinition(schemaIdentifier, schemaText,
SchemaDefinition.SchemaType.PROTOBUF));
+ }
+ }
+
+ /**
+ * Mock implementation of SchemaReferenceReader for testing purposes.
+ */
+ protected static class MockSchemaReferenceReader extends
AbstractControllerService implements SchemaReferenceReader {
+ private SchemaIdentifier schemaIdentifier;
+ private boolean shouldThrowException = false;
+
+ public void returnSchemaIdentifierWithName(final String schemaName) {
+ schemaIdentifier = SchemaIdentifier.builder()
+ .name(schemaName)
+ .build();
+ }
+
+ public void throwExceptionWhenCalled(final boolean
shouldThrowException) {
+ this.shouldThrowException = shouldThrowException;
+ }
+
+ @Override
+ public SchemaIdentifier getSchemaIdentifier(final Map<String, String>
variables, final InputStream contentStream) throws IOException {
+ if (shouldThrowException) {
+ throw new IOException("Mock schema reference reader configured
to throw exception");
+ }
+ return schemaIdentifier;
+ }
+
+ @Override
+ public Set<SchemaField> getSuppliedSchemaFields() {
+ return emptySet();
+ }
+ }
+
+ protected static class MockMessageNameResolver extends
AbstractControllerService implements MessageNameResolver {
+ private String messageName;
+
+ public void returnMessageName(final String messageName) {
+ this.messageName = messageName;
+ }
+
+ @Override
+ public MessageName getMessageName(final Map<String, String> variables,
final SchemaDefinition schemaDefinition, final InputStream contentStream) {
+ return StandardMessageNameFactory.fromName(messageName);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestProtobufReader.java
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestProtobufReader.java
new file mode 100644
index 0000000000..d42c6120e0
--- /dev/null
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestProtobufReader.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
+import static org.apache.nifi.services.protobuf.ProtobufReader.MESSAGE_TYPE;
+import static
org.apache.nifi.services.protobuf.ProtobufReader.PROTOBUF_DIRECTORY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestProtobufReader {
+
+ private static final String PROTOBUF_READER_SERVICE_ID = "protobuf-reader";
+ private static final String PROTO3_MESSAGE_TYPE = "Proto3Message";
+ private static final String REPEATED_PROTO3_MESSAGE_TYPE = "RootMessage";
+ private static final String PROTO2_MESSAGE_TYPE = "Proto2Message";
+
+ private TestRunner runner;
+ private ProtobufReader protobufReader;
+
+ @BeforeEach
+ void setUp() throws InitializationException {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ protobufReader = new ProtobufReader();
+ runner.addControllerService(PROTOBUF_READER_SERVICE_ID,
protobufReader);
+ }
+
+
+ private static Stream<Arguments> validConfigurationProvider() {
+ return Stream.of(
+ Arguments.of("test_proto3.proto", PROTO3_MESSAGE_TYPE),
+ Arguments.of("test_repeated_proto3.proto",
REPEATED_PROTO3_MESSAGE_TYPE),
+ Arguments.of("test_proto2.proto", PROTO2_MESSAGE_TYPE)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("validConfigurationProvider")
+ void testValidConfigurations(final String protoFileName, final String
messageType) throws IOException {
+ final Path testTempDir = createTempDirWithProtoFile(protoFileName);
+ runner.setProperty(protobufReader, PROTOBUF_DIRECTORY,
testTempDir.toString());
+ runner.setProperty(protobufReader, MESSAGE_TYPE, messageType);
+ runner.enableControllerService(protobufReader);
+
+ runner.assertValid(protobufReader);
+ }
+
+ @Test
+ void testInvalidConfigurationMissingDirectory() {
+ runner.setProperty(protobufReader, MESSAGE_TYPE, PROTO3_MESSAGE_TYPE);
+
+ final Collection<ValidationResult> results =
runner.validate(protobufReader);
+ assertFalse(results.stream().allMatch(ValidationResult::isValid));
+
+ final ValidationResult invalidResult = findFirstInvalid(results);
+ assertNotNull(invalidResult);
+ assertEquals("Proto Directory is required",
invalidResult.getExplanation());
+ }
+
+ @Test
+ void testInvalidConfigurationMissingMessageType() throws IOException {
+ final Path testTempDir =
createTempDirWithProtoFile("test_proto3.proto");
+ runner.setProperty(protobufReader, PROTOBUF_DIRECTORY,
testTempDir.toString());
+
+ final Collection<ValidationResult> results =
runner.validate(protobufReader);
+ assertFalse(results.stream().allMatch(ValidationResult::isValid));
+
+ final ValidationResult invalidResult = findFirstInvalid(results);
+ assertNotNull(invalidResult);
+ assertEquals("Message Type is required",
invalidResult.getExplanation());
+ }
+
+ @Test
+ void testInvalidConfigurationNonExistentDirectory() throws IOException {
+ final Path testTempDir =
createTempDirWithProtoFile("test_proto3.proto");
+ final String nonExistentDir =
testTempDir.resolve("non-existent").toString();
+ runner.setProperty(protobufReader, PROTOBUF_DIRECTORY, nonExistentDir);
+ runner.setProperty(protobufReader, MESSAGE_TYPE, PROTO3_MESSAGE_TYPE);
+
+ final Collection<ValidationResult> results =
runner.validate(protobufReader);
+ assertFalse(results.stream().allMatch(ValidationResult::isValid));
+
+ final ValidationResult invalidResult = findFirstInvalid(results);
+ assertNotNull(invalidResult);
+ assertTrue(invalidResult.getExplanation().contains("Directory does not
exist"));
+ }
+
+ @Test
+ void testCustomValidationWithInvalidMessageType() throws IOException {
+ final Path testTempDir =
createTempDirWithProtoFile("test_proto3.proto");
+ runner.setProperty(protobufReader, PROTOBUF_DIRECTORY,
testTempDir.toString());
+ runner.setProperty(protobufReader, MESSAGE_TYPE, "NonExistentMessage");
+
+ final Collection<ValidationResult> results =
runner.validate(protobufReader);
+ assertFalse(results.stream().allMatch(ValidationResult::isValid));
+
+ final ValidationResult invalidResult = findFirstInvalid(results);
+
+ assertNotNull(invalidResult);
+ assertTrue(invalidResult.getExplanation().contains("message type
cannot be found"));
+ }
+
+ @Test
+ void testPropertyDescriptorsContainRequiredProperties() {
+ final List<PropertyDescriptor> descriptors = List.of(
+ SCHEMA_ACCESS_STRATEGY,
+ SCHEMA_REGISTRY,
+ SCHEMA_NAME,
+ SCHEMA_VERSION,
+ SCHEMA_BRANCH_NAME,
+ SCHEMA_TEXT,
+ SCHEMA_REFERENCE_READER,
+ PROTOBUF_DIRECTORY,
+ MESSAGE_TYPE
+ );
+ assertEquals(descriptors,
protobufReader.getSupportedPropertyDescriptors());
+ }
+
+ @Test
+ void testCreateRecordReaderWithValidConfiguration() throws Exception {
+ final Path testTempDir =
createTempDirWithProtoFile("test_proto3.proto");
+ runner.setProperty(protobufReader, PROTOBUF_DIRECTORY,
testTempDir.toString());
+ runner.setProperty(protobufReader, MESSAGE_TYPE, PROTO3_MESSAGE_TYPE);
+ runner.enableControllerService(protobufReader);
+
+ try (final InputStream inputStream =
ProtoTestUtil.generateInputDataForProto3()) {
+ final RecordReader recordReader =
protobufReader.createRecordReader(Collections.emptyMap(), inputStream, 100L,
runner.getLogger());
+
+ assertNotNull(recordReader);
+
+ final Record record = recordReader.nextRecord();
+ assertNotNull(record);
+ assertEquals(true, record.getValue("booleanField"));
+ assertEquals("Test text", record.getValue("stringField"));
+ }
+ }
+
+ @Test
+ void testCreateRecordReaderWithRepeatedFields() throws Exception {
+ final Path testTempDir =
createTempDirWithProtoFile("test_repeated_proto3.proto");
+ runner.setProperty(protobufReader, PROTOBUF_DIRECTORY,
testTempDir.toString());
+ runner.setProperty(protobufReader, MESSAGE_TYPE,
REPEATED_PROTO3_MESSAGE_TYPE);
+ runner.enableControllerService(protobufReader);
+
+ try (final InputStream inputStream =
ProtoTestUtil.generateInputDataForRepeatedProto3()) {
+ final RecordReader recordReader =
protobufReader.createRecordReader(Collections.emptyMap(), inputStream, 100L,
runner.getLogger());
+ assertNotNull(recordReader);
+ final Record record = recordReader.nextRecord();
+ assertNotNull(record);
+ assertNotNull(record.getValue("repeatedMessage"));
+ }
+ }
+
+
+ @Test
+ void testValidationWithCircularReferenceProto() throws IOException {
+ final Path testTempDir =
createTempDirWithProtoFile("test_circular_reference.proto");
+ runner.setProperty(protobufReader, PROTOBUF_DIRECTORY,
testTempDir.toString());
+ runner.setProperty(protobufReader, MESSAGE_TYPE, "A");
+
+ final Collection<ValidationResult> results =
runner.validate(protobufReader);
+ assertTrue(results.stream().allMatch(ValidationResult::isValid));
+ }
+
+
+ private Path createTempDirWithProtoFile(final String protoFileName) throws
IOException {
+ final Path testTempDir = Files.createTempDirectory("proto-test-");
+ try (final InputStream resourceStream =
getClass().getClassLoader().getResourceAsStream(protoFileName)) {
+ if (resourceStream != null) {
+ Files.copy(resourceStream, testTempDir.resolve(protoFileName),
StandardCopyOption.REPLACE_EXISTING);
+ }
+ }
+ return testTempDir;
+ }
+
+ private ValidationResult findFirstInvalid(final
Collection<ValidationResult> results) {
+ return results.stream().filter(result ->
!result.isValid()).findFirst().orElse(null);
+ }
+}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReader.java
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReader.java
new file mode 100644
index 0000000000..8cb7bd0689
--- /dev/null
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReader.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import com.google.protobuf.DynamicMessage;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaDefinition;
+import org.apache.nifi.schemaregistry.services.StandardSchemaDefinition;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.google.protobuf.Descriptors.Descriptor;
+import static com.google.protobuf.Descriptors.DescriptorValidationException;
+import static java.util.Collections.emptyMap;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+import static
org.apache.nifi.services.protobuf.ProtoTestUtil.generateInputDataForProto3;
+import static
org.apache.nifi.services.protobuf.ProtobufSchemaValidator.validateSchemaDefinitionIdentifiers;
+import static
org.apache.nifi.services.protobuf.StandardProtobufReader.MESSAGE_NAME;
+import static
org.apache.nifi.services.protobuf.StandardProtobufReader.MESSAGE_NAME_RESOLUTION_STRATEGY;
+import static
org.apache.nifi.services.protobuf.StandardProtobufReader.MESSAGE_NAME_RESOLVER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestStandardProtobufReader extends StandardProtobufReaderTestBase {
+
+ @BeforeEach
+ void beforeEach() {
+ runner.setProperty(standardProtobufReader, SCHEMA_ACCESS_STRATEGY,
SCHEMA_REFERENCE_READER_PROPERTY);
+ runner.setProperty(standardProtobufReader, SCHEMA_REGISTRY,
MOCK_SCHEMA_REGISTRY_ID);
+ runner.setProperty(standardProtobufReader, SCHEMA_REFERENCE_READER,
MOCK_SCHEMA_REFERENCE_READER_ID);
+ runner.setProperty(standardProtobufReader,
MESSAGE_NAME_RESOLUTION_STRATEGY,
StandardProtobufReader.MessageNameResolverStrategy.MESSAGE_NAME_RESOLVER);
+ runner.setProperty(standardProtobufReader, MESSAGE_NAME_RESOLVER,
MOCK_MESSAGE_NAME_RESOLVER_ID);
+ }
+
+ @Test
+ void testStandardProtobufReaderWithTestProto3Schema() throws Exception {
+ final String testSchema = getTestProto3File();
+ mockSchemaRegistry.returnSchemaText(testSchema);
+
mockSchemaReferenceReader.returnSchemaIdentifierWithName(PROTO_3_SCHEMA);
+ mockMessageNameResolver.returnMessageName(PROTO_3_MESSAGE);
+ enableAllControllerServices();
+
+ final InputStream testDataStream = generateInputDataForProto3();
+ final RecordReader recordReader =
standardProtobufReader.createRecordReader(emptyMap(), testDataStream, 0L,
runner.getLogger());
+
+ assertNotNull(recordReader);
+ runAssertionsOnTestProto3Message(recordReader);
+ }
+
+
+ @Test
+ void testSchemaNotFoundRethrown() {
+ mockSchemaRegistry.throwSchemaNotFoundWhenCalled(true);
+ enableAllControllerServices();
+
+ assertThrows(SchemaNotFoundException.class, this::createRecordReader);
+ }
+
+ @Test
+ void testSchemaReferenceReaderException() {
+ mockSchemaRegistry.returnSchemaText(getTestProto3File());
+ mockSchemaReferenceReader.throwExceptionWhenCalled(true);
+
+ enableAllControllerServices();
+
+ assertThrows(IOException.class, this::createRecordReader);
+ }
+
+ @Test
+ void testCreateRecordReaderWithSchemaReferences() throws Exception {
+ // Setup schema registry to return schema with references using
existing mock
+ final String userProfileSchema = loadUserProfileSchema();
+ final String userSettingsSchema = loadUserSettingsSchema();
+
+ // Create schema definition with references
+ final SchemaDefinition userProfileDef =
createSchemaDefinition("user_profile.proto", userProfileSchema, new
HashMap<>());
+ // root message here is user_settings.proto, it has a reference to
user_profile.proto
+ final SchemaDefinition userSettingsDef =
createSchemaDefinition("user_settings.proto", userSettingsSchema,
Map.of("user_profile.proto", userProfileDef));
+
+ mockSchemaRegistry.returnSchemaDefinition(userSettingsDef);
+
mockSchemaReferenceReader.returnSchemaIdentifierWithName("user_settings");
+
mockMessageNameResolver.returnMessageName("org.apache.nifi.protobuf.test.UserSettings");
+ enableAllControllerServices();
+
+ // Create test data using DynamicMessage
+ final InputStream inputStream = generateInputDataForUserSettings();
+ final RecordReader recordReader =
standardProtobufReader.createRecordReader(emptyMap(), inputStream, 0L,
runner.getLogger());
+
+ assertNotNull(recordReader);
+
+ final Record userSettings = recordReader.nextRecord();
+ assertNotNull(userSettings);
+ assertEquals("en-US", userSettings.getAsString("language"));
+ assertTrue(userSettings.getAsBoolean("two_factor_enabled"));
+
+ final Record profileRecord = (Record) userSettings.getValue("profile");
+ assertNotNull(profileRecord);
+ assertEquals("user123", profileRecord.getAsString("user_id"));
+ assertEquals(30, profileRecord.getAsInt("age"));
+ }
+
+ @Test
+ void testValidSchemaDefinitionWithProtoExtension() {
+ final SchemaDefinition validReferencedSchema =
createSchemaDefinition("user_profile.proto");
+ final SchemaDefinition validMainSchema =
createSchemaDefinition("user_settings.proto", Map.of("user_profile.proto",
validReferencedSchema));
+
+ enableAllControllerServices();
+
+ validateSchemaDefinitionIdentifiers(validMainSchema, true);
+ }
+
+ @Test
+ void testRootSchemaIsAllowedToHaveInvalidName() {
+ final SchemaDefinition validReferencedSchema =
createSchemaDefinition("user_profile.proto");
+ final SchemaDefinition validSchema =
createSchemaDefinition("user_settings.noproto.extension",
Map.of("user_profile.proto", validReferencedSchema));
+
+ enableAllControllerServices();
+
+ validateSchemaDefinitionIdentifiers(validSchema, true);
+ }
+
+ @Test
+ void testValidMainSchemaWithInvalidReferencedSchema() {
+ final SchemaDefinition invalidReferencedSchema =
createSchemaDefinition("user_profile.invalid");
+ final SchemaDefinition mixedSchema =
createSchemaDefinition("user_settings.proto",
+ Map.of("user_profile.invalid", invalidReferencedSchema));
+
+ enableAllControllerServices();
+
+ final IllegalArgumentException referencedException =
assertThrows(IllegalArgumentException.class, () -> {
+ validateSchemaDefinitionIdentifiers(mixedSchema, true);
+ });
+
+ assertTrue(referencedException.getMessage().contains("ends with .proto
extension"));
+ }
+
+ @Test
+ void testSchemaDefinitionWithMissingName() {
+ final SchemaDefinition schemaWithoutName =
createSchemaDefinitionWithoutName();
+ enableAllControllerServices();
+ validateSchemaDefinitionIdentifiers(schemaWithoutName, true);
+ }
+
+ @Nested
+ class WithSchemaNamePropertyAccessStrategy {
+
+ @BeforeEach
+ void beforeEach() {
+ runner.setProperty(standardProtobufReader, SCHEMA_ACCESS_STRATEGY,
SCHEMA_NAME_PROPERTY);
+ runner.setProperty(standardProtobufReader, MESSAGE_NAME,
PROTO_3_MESSAGE);
+
+ }
+
+ @Test
+ void testStandardProtobufReaderWithTestProto3Schema() throws Exception
{
+ runner.setProperty(standardProtobufReader, SCHEMA_NAME,
PROTO_3_SCHEMA);
+
TestStandardProtobufReader.this.testStandardProtobufReaderWithTestProto3Schema();
+ }
+
+ @Test
+ void testSchemaNotFoundRethrown() {
+ TestStandardProtobufReader.this.testSchemaNotFoundRethrown();
+ }
+
+ @Test
+ void testCreateRecordReaderWithSchemaReferences() throws Exception {
+ runner.setProperty(standardProtobufReader, SCHEMA_NAME,
"user_settings");
+
TestStandardProtobufReader.this.testCreateRecordReaderWithSchemaReferences();
+ }
+ }
+
+ @Nested
+ class WithSchemaTextAccessStrategy {
+
+ @BeforeEach
+ void beforeEach() {
+ runner.setProperty(standardProtobufReader, SCHEMA_ACCESS_STRATEGY,
SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(standardProtobufReader, SCHEMA_TEXT,
getTestProto3File());
+
+ }
+
+ @Test
+ void testStandardProtobufReaderWithTestProto3Schema() throws Exception
{
+
TestStandardProtobufReader.this.testStandardProtobufReaderWithTestProto3Schema();
+ }
+ }
+
+ private SchemaDefinition createSchemaDefinition(final String name) {
+ return createSchemaDefinition(name, "test schema content", new
HashMap<>());
+ }
+
+ private SchemaDefinition createSchemaDefinition(final String name, final
Map<String, SchemaDefinition> references) {
+ return createSchemaDefinition(name, "test schema content", references);
+ }
+
+ private SchemaDefinition createSchemaDefinition(final String name, final
String schemaText, final Map<String, SchemaDefinition> references) {
+ return new StandardSchemaDefinition(
+ SchemaIdentifier.builder()
+ .name(name)
+ .id(Long.MAX_VALUE)
+ .build(),
+ schemaText,
+ SchemaDefinition.SchemaType.PROTOBUF,
+ references
+ );
+ }
+
+ private SchemaDefinition createSchemaDefinitionWithoutName() {
+ return new StandardSchemaDefinition(
+ SchemaIdentifier.builder()
+ .id(1L)
+ .build(),
+ "test schema content",
+ SchemaDefinition.SchemaType.PROTOBUF,
+ new HashMap<>()
+ );
+ }
+
+ private InputStream generateInputDataForUserSettings() throws IOException,
DescriptorValidationException {
+ // Load descriptor container from .desc file containing both
UserProfile and UserSettings
+ final ProtoTestUtil.DescriptorContainer descriptorContainer =
ProtoTestUtil.loadDescriptorContainer("org/apache/nifi/protobuf/test/user_settings.desc");
+
+ final Descriptor userProfileDescriptor =
descriptorContainer.findMessageTypeByName("UserProfile");
+ final Descriptor userSettingsDescriptor =
descriptorContainer.findMessageTypeByName("UserSettings");
+
+ // Create UserProfile message
+ final DynamicMessage userProfile =
DynamicMessage.newBuilder(userProfileDescriptor)
+ .setField(userProfileDescriptor.findFieldByName("user_id"),
"user123")
+ .setField(userProfileDescriptor.findFieldByName("age"), 30)
+ .build();
+
+ // Create UserSettings message with embedded UserProfile
+ final DynamicMessage userSettings =
DynamicMessage.newBuilder(userSettingsDescriptor)
+ .setField(userSettingsDescriptor.findFieldByName("profile"),
userProfile)
+ .setField(userSettingsDescriptor.findFieldByName("language"),
"en-US")
+
.setField(userSettingsDescriptor.findFieldByName("two_factor_enabled"), true)
+ .build();
+
+ return userSettings.toByteString().newInput();
+ }
+
+ private String loadProtoSchema(final String resourcePath) throws
IOException {
+ try (final InputStream inputStream =
getClass().getClassLoader().getResourceAsStream(resourcePath)) {
+ if (inputStream == null) {
+ throw new IOException("Could not find " + resourcePath + " in
classpath");
+ }
+ return new String(inputStream.readAllBytes());
+ }
+ }
+
+ private String loadUserProfileSchema() throws IOException {
+ return
loadProtoSchema("org/apache/nifi/protobuf/test/user_profile.proto");
+ }
+
+ private String loadUserSettingsSchema() throws IOException {
+ return
loadProtoSchema("org/apache/nifi/protobuf/test/user_settings.proto");
+ }
+}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReaderPropertyValidation.java
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReaderPropertyValidation.java
new file mode 100644
index 0000000000..b58b6504ea
--- /dev/null
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReaderPropertyValidation.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import org.apache.nifi.components.ValidationResult;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
+import static
org.apache.nifi.services.protobuf.StandardProtobufReader.MESSAGE_NAME;
+import static
org.apache.nifi.services.protobuf.StandardProtobufReader.MESSAGE_NAME_RESOLUTION_STRATEGY;
+import static
org.apache.nifi.services.protobuf.StandardProtobufReader.MESSAGE_NAME_RESOLVER;
+import static
org.apache.nifi.services.protobuf.StandardProtobufReader.MessageNameResolverStrategy.MESSAGE_NAME_PROPERTY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestStandardProtobufReaderPropertyValidation extends
StandardProtobufReaderTestBase {
+
+ @BeforeEach
+ void beforeEach() {
+ runner.setProperty(standardProtobufReader, SCHEMA_ACCESS_STRATEGY,
SCHEMA_REFERENCE_READER_PROPERTY);
+ runner.setProperty(standardProtobufReader, SCHEMA_REGISTRY,
MOCK_SCHEMA_REGISTRY_ID);
+ runner.setProperty(standardProtobufReader, SCHEMA_REFERENCE_READER,
MOCK_SCHEMA_REFERENCE_READER_ID);
+ runner.setProperty(standardProtobufReader,
MESSAGE_NAME_RESOLUTION_STRATEGY,
StandardProtobufReader.MessageNameResolverStrategy.MESSAGE_NAME_RESOLVER);
+ runner.setProperty(standardProtobufReader, MESSAGE_NAME_RESOLVER,
MOCK_MESSAGE_NAME_RESOLVER_ID);
+ // Ensure configuration is valid before running tests
+ runner.assertValid(standardProtobufReader);
+ }
+
+ @Test
+ void testInvalidWithoutMessageNameProperty() {
+ runner.setProperty(standardProtobufReader,
MESSAGE_NAME_RESOLUTION_STRATEGY, MESSAGE_NAME_PROPERTY);
+ runner.removeProperty(standardProtobufReader, MESSAGE_NAME);
+ final ValidationResult invalidResult =
verifyExactlyOneValidationError();
+
+ assertEquals("Message Name is required",
invalidResult.getExplanation());
+ }
+
+ @Test
+ void testInvalidWithoutMessageNameResolver() {
+ runner.setProperty(standardProtobufReader,
MESSAGE_NAME_RESOLUTION_STRATEGY,
StandardProtobufReader.MessageNameResolverStrategy.MESSAGE_NAME_RESOLVER);
+ runner.removeProperty(standardProtobufReader, MESSAGE_NAME_RESOLVER);
+ final ValidationResult invalidResult =
verifyExactlyOneValidationError();
+
+ assertTrue(invalidResult.getExplanation().contains("Message Name
Resolver"));
+ }
+
+ @Nested
+ class SchemaTextPropertyAccessStrategy {
+ @BeforeEach
+ void beforeEach() {
+ runner.setProperty(standardProtobufReader, SCHEMA_ACCESS_STRATEGY,
SCHEMA_TEXT_PROPERTY);
+ // Ensure configuration is valid before running tests
+ runner.assertValid(standardProtobufReader);
+ }
+
+ @Test
+ void testValidWhenSchemaTextPropertyRemoved() { // default value of
${proto.schema}
+ runner.removeProperty(standardProtobufReader, SCHEMA_TEXT);
+ enableAllControllerServices();
+ runner.assertValid(standardProtobufReader);
+ }
+ }
+
+ @Nested
+ class SchemaNamePropertyAccessStrategy {
+ @BeforeEach
+ void beforeEach() {
+ runner.setProperty(standardProtobufReader, SCHEMA_ACCESS_STRATEGY,
SCHEMA_NAME_PROPERTY);
+ runner.setProperty(standardProtobufReader, SCHEMA_NAME, "Any
schema name");
+ runner.setProperty(standardProtobufReader, SCHEMA_BRANCH_NAME,
"Any branch");
+ // Ensure configuration is valid before running tests
+ runner.assertValid(standardProtobufReader);
+ }
+
+ @Test
+ void testValidWithMessageNameResolver() { // default setting from
beforeEach
+ enableAllControllerServices();
+ runner.assertValid(standardProtobufReader);
+
+ }
+
+ @Test
+ void testValidWithMessageNameProperty() {
+ runner.setProperty(standardProtobufReader,
MESSAGE_NAME_RESOLUTION_STRATEGY, MESSAGE_NAME_PROPERTY);
+ runner.setProperty(standardProtobufReader, MESSAGE_NAME, "Any
message name");
+ enableAllControllerServices();
+ runner.assertValid(standardProtobufReader);
+ }
+
+ @Test
+ void testValidWithoutVersionAndBranch() {
+ runner.removeProperty(standardProtobufReader, SCHEMA_VERSION);
+ runner.removeProperty(standardProtobufReader, SCHEMA_BRANCH_NAME);
+ enableAllControllerServices();
+ runner.assertValid(standardProtobufReader);
+ }
+
+ @Test
+ void testInvalidWhenBranchAndVersionSetTogether() {
+ runner.setProperty(standardProtobufReader, SCHEMA_VERSION, "1");
+ runner.setProperty(standardProtobufReader, SCHEMA_BRANCH_NAME,
"Any branch");
+
+ final ValidationResult invalidResult =
verifyExactlyOneValidationError();
+
+ assertTrue(invalidResult.getExplanation().contains("Schema
Branch"));
+ }
+
+ @Test
+ void testInvalidWithoutSchemaRegistry() {
+ runner.removeProperty(standardProtobufReader, SCHEMA_REGISTRY);
+ final ValidationResult invalidResult =
verifyExactlyOneValidationError();
+
+ assertTrue(invalidResult.getExplanation().contains("Schema Name"));
+ }
+
+ @Test
+ void testValidWithoutSchemaName() { // default property value gets set
automatically
+ runner.removeProperty(standardProtobufReader, SCHEMA_NAME);
+ runner.assertValid(standardProtobufReader);
+ }
+ }
+
+ @Nested
+ class SchemaReferenceReaderSchemaAccessStrategy {
+
+ @BeforeEach
+ void beforeEach() {
+ runner.setProperty(standardProtobufReader, SCHEMA_ACCESS_STRATEGY,
SCHEMA_REFERENCE_READER_PROPERTY);
+ runner.setProperty(standardProtobufReader, SCHEMA_REGISTRY,
MOCK_SCHEMA_REGISTRY_ID);
+ runner.setProperty(standardProtobufReader,
SCHEMA_REFERENCE_READER, MOCK_SCHEMA_REFERENCE_READER_ID);
+ // Ensure configuration is valid before running tests
+ runner.assertValid(standardProtobufReader);
+ }
+
+ @Test
+ void testValidWithMessageNameResolver() { // default setting from
beforeEach
+ enableAllControllerServices();
+ runner.assertValid(standardProtobufReader);
+
+ }
+
+ @Test
+ void testValidWithMessageNameProperty() {
+ runner.setProperty(standardProtobufReader,
MESSAGE_NAME_RESOLUTION_STRATEGY, MESSAGE_NAME_PROPERTY);
+ runner.setProperty(standardProtobufReader, MESSAGE_NAME, "Any
message name");
+ enableAllControllerServices();
+ runner.assertValid(standardProtobufReader);
+ }
+
+ @Test
+ void testInvalidWithoutSchemaRegistry() {
+ runner.removeProperty(standardProtobufReader, SCHEMA_REGISTRY);
+ final ValidationResult invalidResult =
verifyExactlyOneValidationError();
+
+ assertTrue(invalidResult.getExplanation().contains("Schema
Reference Reader"));
+ }
+
+ @Test
+ void testInvalidWithoutSchemaReferenceReader() {
+ runner.removeProperty(standardProtobufReader,
SCHEMA_REFERENCE_READER);
+ final ValidationResult invalidResult =
verifyExactlyOneValidationError();
+
+ assertTrue(invalidResult.getExplanation().contains("Schema
Reference Reader"));
+ }
+ }
+}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/converter/TestProtobufDataConverter.java
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/converter/TestProtobufDataConverter.java
index 8690d6b7eb..26e922f272 100644
---
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/converter/TestProtobufDataConverter.java
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/converter/TestProtobufDataConverter.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.services.protobuf.converter;
-import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.squareup.wire.schema.Schema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.RecordSchema;
@@ -28,8 +28,10 @@ import java.io.IOException;
import java.math.BigInteger;
import java.util.Map;
+import static
org.apache.nifi.services.protobuf.ProtoTestUtil.generateInputDataForRootMessage;
import static
org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto2TestSchema;
import static
org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto3TestSchema;
+import static
org.apache.nifi.services.protobuf.ProtoTestUtil.loadRootMessageSchema;
import static
org.apache.nifi.services.protobuf.ProtoTestUtil.loadRepeatedProto3TestSchema;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -40,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestProtobufDataConverter {
@Test
- public void testDataConverterForProto3() throws
Descriptors.DescriptorValidationException, IOException {
+ public void testDataConverterForProto3() throws
DescriptorValidationException, IOException {
final Schema schema = loadProto3TestSchema();
final RecordSchema recordSchema = new
ProtoSchemaParser(schema).createSchema("Proto3Message");
@@ -80,7 +82,19 @@ public class TestProtobufDataConverter {
}
@Test
- public void testDataConverterForRepeatedProto3() throws
Descriptors.DescriptorValidationException, IOException {
+ public void testDataConverterCanHandleNestedMessages() throws
DescriptorValidationException, IOException {
+ final Schema schema = loadRootMessageSchema();
+ final RecordSchema recordSchema = new
ProtoSchemaParser(schema).createSchema("org.apache.nifi.protobuf.test.RootMessage");
+
+ final ProtobufDataConverter dataConverter = new
ProtobufDataConverter(schema, "org.apache.nifi.protobuf.test.RootMessage",
recordSchema, false, false);
+ final MapRecord record =
dataConverter.createRecord(generateInputDataForRootMessage());
+
+ final MapRecord nestedRecord = (MapRecord)
record.getValue("nestedMessage");
+ assertEquals("ENUM_VALUE_3", nestedRecord.getValue("testEnum"));
+ }
+
+ @Test
+ public void testDataConverterForRepeatedProto3() throws
DescriptorValidationException, IOException {
final Schema schema = loadRepeatedProto3TestSchema();
final RecordSchema recordSchema = new
ProtoSchemaParser(schema).createSchema("RootMessage");
@@ -118,7 +132,7 @@ public class TestProtobufDataConverter {
}
@Test
- public void testDataConverterForProto2() throws
Descriptors.DescriptorValidationException, IOException {
+ public void testDataConverterForProto2() throws
DescriptorValidationException, IOException {
final Schema schema = loadProto2TestSchema();
final RecordSchema recordSchema = new
ProtoSchemaParser(schema).createSchema("Proto2Message");
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/root_message.desc
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/root_message.desc
new file mode 100644
index 0000000000..803b5db48d
Binary files /dev/null and
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/root_message.desc
differ
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/root_message.proto
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/root_message.proto
new file mode 100644
index 0000000000..ff7ff5cdf1
--- /dev/null
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/root_message.proto
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package org.apache.nifi.protobuf.test;
+
+message RootMessage {
+ .org.apache.nifi.protobuf.test.NestedMessage nestedMessage = 1;
+}
+
+message NestedMessage {
+ .org.apache.nifi.protobuf.test.TestEnum testEnum = 2;
+
+}
+
+enum TestEnum {
+ ENUM_VALUE_1 = 0;
+ ENUM_VALUE_2 = 1;
+ ENUM_VALUE_3 = 2;
+}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_profile.desc
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_profile.desc
new file mode 100644
index 0000000000..0b6fcd5966
Binary files /dev/null and
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_profile.desc
differ
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_profile.proto
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_profile.proto
new file mode 100644
index 0000000000..c5a01fe0f1
--- /dev/null
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_profile.proto
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package org.apache.nifi.protobuf.test;
+
+message UserProfile {
+ string user_id = 1;
+ int32 age = 4;
+ UserStatus status = 5;
+ Address address = 7;
+}
+
+message Address {
+ string street = 1;
+}
+
+enum UserStatus {
+ INACTIVE = 0;
+ ACTIVE = 1;
+}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_settings.desc
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_settings.desc
new file mode 100644
index 0000000000..c9c639cc74
Binary files /dev/null and
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_settings.desc
differ
diff --git
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_settings.proto
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_settings.proto
new file mode 100644
index 0000000000..015b2a1df9
--- /dev/null
+++
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_settings.proto
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package org.apache.nifi.protobuf.test;
+
+import "user_profile.proto";
+
+message UserSettings {
+ .org.apache.nifi.protobuf.test.UserProfile profile = 1;
+ NotificationSettings notifications = 2;
+ Theme theme = 4;
+ string language = 5;
+ bool two_factor_enabled = 7;
+}
+
+message NotificationSettings {
+ bool email_notifications = 1;
+}
+
+enum Theme {
+ LIGHT = 0;
+ DARK = 1;
+}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/MessageNameResolver.java
b/nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/MessageNameResolver.java
index 32aafacb2f..3dafe93013 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/MessageNameResolver.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/MessageNameResolver.java
@@ -21,9 +21,8 @@ import org.apache.nifi.controller.ControllerService;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
-
/**
- * A service interface for resolving message names from schema definitions and
input streams.
+ * An interface for resolving message names from schema definitions and input
streams.
* This interface is typically used in scenarios where message types need to
be determined
* dynamically from the content of the message and the associated schema
definition.
* <p>
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/StandardMessageNameFactory.java
b/nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/StandardMessageNameFactory.java
new file mode 100644
index 0000000000..cdacc153fb
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/StandardMessageNameFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Factory class for creating StandardMessageName instances from various input
formats.
+ */
+public final class StandardMessageNameFactory {
+ /**
+ * Parses a fully qualified message name that may include a package
namespace.
+ * If the name contains a dot (.), everything before the last dot is
treated as the namespace,
+ * and everything after the last dot is treated as the message name.
+ *
+ * @param fullMessageName the fully qualified message name (e.g.,
"mypackage.MyMessage" or "MyMessage")
+ * @return a new StandardMessageName instance with parsed namespace and
name
+ * @throws NullPointerException if fullMessageName is null
+ */
+ public static StandardMessageName fromName(final String fullMessageName) {
+ Objects.requireNonNull(fullMessageName, "fullMessageName must not be
null");
+ final int lastDotIndex = fullMessageName.lastIndexOf('.');
+ if (lastDotIndex > 0) {
+ final String namespace = fullMessageName.substring(0,
lastDotIndex);
+ final String name = fullMessageName.substring(lastDotIndex + 1);
+ return new StandardMessageName(Optional.of(namespace), name);
+ } else {
+ return new StandardMessageName(Optional.empty(), fullMessageName);
+ }
+ }
+}