This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3097a90f05 NIFI-14878 Added StandardProtobufReader for Protobuf Record 
Processing (#10214)
3097a90f05 is described below

commit 3097a90f0519f023cc640053eb89055a30de0d81
Author: lkuchars <[email protected]>
AuthorDate: Thu Aug 28 19:47:53 2025 +0200

    NIFI-14878 Added StandardProtobufReader for Protobuf Record Processing 
(#10214)
    
    Co-authored-by: ForThoseWhoComeAfter <[email protected]>
    Signed-off-by: David Handermann <[email protected]>
---
 .../confluent/schema/ProtobufMessageSchema.java    |   2 +-
 .../nifi/confluent/schemaregistry/VarintUtils.java |   8 +-
 .../ConfluentSchemaRegistryTest.java               |   2 +-
 .../nifi/serialization/SchemaRegistryService.java  |   6 +-
 .../nifi-protobuf-services-nar/pom.xml             |   4 +-
 .../nifi-protobuf-services/pom.xml                 |   9 +-
 .../nifi/services/protobuf/ProtobufReader.java     |   2 +
 .../services/protobuf/ProtobufSchemaCompiler.java  | 252 +++++++++++++++++
 .../services/protobuf/ProtobufSchemaValidator.java |  63 +++++
 .../services/protobuf/StandardProtobufReader.java  | 300 +++++++++++++++++++++
 .../org.apache.nifi.controller.ControllerService   |   3 +-
 .../nifi/services/protobuf/ProtoTestUtil.java      | 177 ++++++++----
 .../protobuf/StandardProtobufReaderTestBase.java   | 269 ++++++++++++++++++
 .../nifi/services/protobuf/TestProtobufReader.java | 225 ++++++++++++++++
 .../protobuf/TestStandardProtobufReader.java       | 290 ++++++++++++++++++++
 ...stStandardProtobufReaderPropertyValidation.java | 191 +++++++++++++
 .../converter/TestProtobufDataConverter.java       |  22 +-
 .../apache/nifi/protobuf/test/root_message.desc    | Bin 0 -> 313 bytes
 .../apache/nifi/protobuf/test/root_message.proto   |  34 +++
 .../apache/nifi/protobuf/test/user_profile.desc    | Bin 0 -> 329 bytes
 .../apache/nifi/protobuf/test/user_profile.proto   |  35 +++
 .../apache/nifi/protobuf/test/user_settings.desc   | Bin 0 -> 827 bytes
 .../apache/nifi/protobuf/test/user_settings.proto  |  38 +++
 .../services/MessageNameResolver.java              |   3 +-
 .../services/StandardMessageNameFactory.java       |  46 ++++
 25 files changed, 1913 insertions(+), 68 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-schema-api/src/main/java/org/apache/nifi/confluent/schema/ProtobufMessageSchema.java
 
b/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-schema-api/src/main/java/org/apache/nifi/confluent/schema/ProtobufMessageSchema.java
index 46769b5763..632217eabe 100644
--- 
a/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-schema-api/src/main/java/org/apache/nifi/confluent/schema/ProtobufMessageSchema.java
+++ 
b/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-schema-api/src/main/java/org/apache/nifi/confluent/schema/ProtobufMessageSchema.java
@@ -21,7 +21,7 @@ import java.util.Optional;
 
 /**
  * Basic abstraction over Protobuf schema message entity.
- * It contains only the fields that are needed for crude schema insights. It's 
useful when the
+ * It only represents the fields that are needed for crude schema insights. 
It's useful when the
  * message name resolver needs to pinpoint specific messages by message 
indexes encoded on the wire.
  * <p>
  * <a 
href="https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format";>See
 the Confluent protobuf wire format.</a>
diff --git 
a/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/VarintUtils.java
 
b/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/VarintUtils.java
index b9fab9b179..90c352ecfa 100644
--- 
a/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/VarintUtils.java
+++ 
b/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/VarintUtils.java
@@ -50,7 +50,7 @@ final class VarintUtils {
      * @return the decoded varint value
      * @throws IOException if unable to read from stream or invalid varint 
format
      */
-    public static int readVarintFromStream(final InputStream inputStream) 
throws IOException {
+    static int readVarintFromStream(final InputStream inputStream) throws 
IOException {
         final int firstByte = inputStream.read();
         if (firstByte == -1) {
             throw new IOException("Unexpected end of stream while reading 
varint");
@@ -66,7 +66,7 @@ final class VarintUtils {
      * @return the decoded varint value
      * @throws IOException if unable to read from stream or invalid varint 
format
      */
-    public static int readVarintFromStreamAfterFirstByteConsumed(final 
InputStream inputStream, final int firstByte) throws IOException {
+    static int readVarintFromStreamAfterFirstByteConsumed(final InputStream 
inputStream, final int firstByte) throws IOException {
         // accumulated result
         int value = 0;
 
@@ -115,7 +115,7 @@ final class VarintUtils {
      * @param encodedValue the zigzag encoded value
      * @return the decoded integer value
      */
-    public static int decodeZigZag(final int encodedValue) {
+    static int decodeZigZag(final int encodedValue) {
         return (encodedValue >>> 1) ^ -(encodedValue & 1);
     }
 
@@ -127,7 +127,7 @@ final class VarintUtils {
      * @param value the integer value to encode
      * @return byte array containing the zigzag encoded varint
      */
-    public static byte[] writeZigZagVarint(final int value) {
+    static byte[] writeZigZagVarint(final int value) {
         final ByteArrayOutputStream output = new ByteArrayOutputStream(4);
 
         // Zigzag encode
diff --git 
a/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java
 
b/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java
index 14d964511a..2abfa63277 100644
--- 
a/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java
+++ 
b/nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java
@@ -25,7 +25,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
-public class ConfluentSchemaRegistryTest {
+class ConfluentSchemaRegistryTest {
 
     private static final String SERVICE_ID = 
ConfluentSchemaRegistry.class.getSimpleName();
 
diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
index 2b90b40bc7..922adc2a96 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
@@ -100,7 +100,7 @@ public abstract class SchemaRegistryService extends 
AbstractControllerService {
         properties.add(SCHEMA_NAME);
         properties.add(SCHEMA_VERSION);
         properties.add(SCHEMA_BRANCH_NAME);
-        properties.add(SCHEMA_TEXT);
+        properties.add(buildSchemaTextProperty());
         properties.add(SCHEMA_REFERENCE_READER);
 
         return properties;
@@ -126,6 +126,10 @@ public abstract class SchemaRegistryService extends 
AbstractControllerService {
         return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
     }
 
+    protected PropertyDescriptor buildSchemaTextProperty() {
+        return SCHEMA_TEXT;
+    }
+
     protected PropertyDescriptor buildStrategyProperty(final AllowableValue[] 
values) {
         return new PropertyDescriptor.Builder()
                 .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml
index 6a64165222..2bb0b638d1 100644
--- 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml
@@ -30,12 +30,12 @@
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-protobuf-services</artifactId>
-            <version>2.6.0-SNAPSHOT</version>
+            <version>${project.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-standard-shared-nar</artifactId>
-            <version>2.6.0-SNAPSHOT</version>
+            <version>${project.version}</version>
             <type>nar</type>
         </dependency>
     </dependencies>
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml
index fa9cb89a78..5056379025 100644
--- a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml
+++ b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml
@@ -48,7 +48,7 @@
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-avro-record-utils</artifactId>
-            <version>2.6.0-SNAPSHOT</version>
+            <version>${project.version}</version>
         </dependency>
 
 
@@ -75,11 +75,6 @@
             <artifactId>nifi-mock</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-mock-record-utils</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
 
+    </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java
index 59154bb579..2a551ecc5b 100644
--- 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java
@@ -21,6 +21,7 @@ import com.squareup.wire.schema.Location;
 import com.squareup.wire.schema.Schema;
 import com.squareup.wire.schema.SchemaLoader;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.AllowableValue;
@@ -55,6 +56,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 @Tags({"protobuf", "record", "reader", "parser"})
 @CapabilityDescription("Parses a Protocol Buffers message from binary format.")
+@SeeAlso(StandardProtobufReader.class)
 public class ProtobufReader extends SchemaRegistryService implements 
RecordReaderFactory {
 
     private static final String ANY_PROTO = "google/protobuf/any.proto";
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaCompiler.java
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaCompiler.java
new file mode 100644
index 0000000000..b019c021cc
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaCompiler.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.squareup.wire.schema.CoreLoaderKt;
+import com.squareup.wire.schema.Location;
+import com.squareup.wire.schema.Schema;
+import com.squareup.wire.schema.SchemaLoader;
+import org.apache.commons.io.FileUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schemaregistry.services.SchemaDefinition;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static 
org.apache.nifi.services.protobuf.ProtobufSchemaValidator.validateSchemaDefinitionIdentifiers;
+
+/**
+ * Handles Protocol Buffer schema compilation, caching, and temporary 
directory operations.
+ * This class is responsible for compiling schema definitions into Wire Schema 
objects,
+ * managing a cache of compiled schemas, and handling temporary directory 
operations
+ * required during the compilation process.
+ */
+final class ProtobufSchemaCompiler {
+
+    private static final List<Location> STANDARD_PROTOBUF_LOCATIONS = 
Arrays.asList(
+        Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, 
"google/protobuf/any.proto"),
+        Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, 
"google/protobuf/duration.proto"),
+        Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, 
"google/protobuf/empty.proto"),
+        Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, 
"google/protobuf/struct.proto"),
+        Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, 
"google/protobuf/timestamp.proto"),
+        Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, 
"google/protobuf/wrappers.proto")
+    );
+    private static final int CACHE_EXPIRE_HOURS = 1;
+    private static final int COMPILED_SCHEMAS_CACHE_SIZE = 200;
+    private static final String PROTO_EXTENSION = ".proto";
+
+    private final Cache<SchemaIdentifier, Schema> compiledSchemaCache;
+    private final ComponentLog logger;
+    private final String tempDirectorySuffix;
+
+    /**
+     * Creates a new ProtobufSchemaCompiler with default cache settings.
+     *
+     * @param tempDirectorySuffix the suffix for temporary directory names 
created by the compiler.
+     *                            This may help in finding the right temporary 
directory in case of compilation issues
+     * @param logger              the component logger for logging compilation 
activities
+     */
+    public ProtobufSchemaCompiler(final String tempDirectorySuffix, final 
ComponentLog logger) {
+        this.tempDirectorySuffix = Objects.requireNonNull(tempDirectorySuffix, 
"Temporary directory suffix cannot be null");
+        this.logger = logger;
+        this.compiledSchemaCache = Caffeine.newBuilder()
+            .expireAfterAccess(CACHE_EXPIRE_HOURS, TimeUnit.HOURS)
+            .maximumSize(COMPILED_SCHEMAS_CACHE_SIZE)
+            .build();
+    }
+
+    /**
+     * Compiles a schema definition or retrieves it from cache.
+     *
+     * @param schemaDefinition the schema definition to compile
+     * @return the compiled Schema
+     */
+    public Schema compileOrGetFromCache(final SchemaDefinition 
schemaDefinition) {
+        return compiledSchemaCache.get(schemaDefinition.getIdentifier(),
+            identifier -> {
+                try {
+                    return compileSchemaDefinition(schemaDefinition);
+                } catch (final IOException e) {
+                    throw new UncheckedIOException("Could not compile schema 
for identifier: " + identifier, e);
+                }
+            });
+    }
+
+    /**
+     * Compiles a SchemaDefinition structure into a Schema using the wire 
library.
+     * Creates a temporary directory structure that mirrors the package 
structure and
+     * places all schemas in their appropriate directories.
+     *
+     * @param schemaDefinition the main schema definition to compile
+     * @return the compiled Schema
+     * @throws IOException if unable to create temporary files or compile 
schema
+     */
+    private Schema compileSchemaDefinition(final SchemaDefinition 
schemaDefinition) throws IOException {
+        logger.debug("Starting schema compilation for identifier: {}", 
schemaDefinition.getIdentifier());
+
+        // Validate that all schema identifiers end with .proto extension
+        validateSchemaDefinitionIdentifiers(schemaDefinition, true);
+
+        return executeWithTemporaryDirectory(tempDir -> {
+            try {
+                // Process main schema definition
+                writeSchemaToTempDirectory(tempDir, schemaDefinition);
+
+                // Process all referenced schemas recursively
+                processSchemaReferences(tempDir, 
schemaDefinition.getReferences());
+
+                // Create and configure schema loader
+                final Schema compiledSchema = createAndLoadSchema(tempDir);
+                logger.debug("Successfully compiled schema for identifier: 
{}", schemaDefinition.getIdentifier());
+                return compiledSchema;
+
+            } catch (final Exception e) {
+                throw new RuntimeException("Failed to compile Protobuf schema 
for identifier: " + schemaDefinition.getIdentifier(), e);
+            }
+        });
+    }
+
+    /**
+     * Executes a function with a temporary directory, ensuring proper cleanup.
+     *
+     * @param function the function to execute with the temporary directory
+     * @return the result of the function
+     * @throws IOException if unable to create or manage temporary directory
+     */
+    private <T> T executeWithTemporaryDirectory(final 
WithTemporaryDirectory<T> function) throws IOException {
+        final Path tempDir = Files.createTempDirectory(tempDirectorySuffix + 
"_protobuf_schema_compiler");
+        logger.debug("Created temporary directory for schema compilation: {}", 
tempDir);
+
+        try {
+            return function.apply(tempDir);
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            safeDeleteDirectory(tempDir);
+        }
+    }
+
+    private Schema createAndLoadSchema(final Path tempDir) {
+        final SchemaLoader schemaLoader = new 
SchemaLoader(FileSystems.getDefault());
+
+        final List<Location> roots = new ArrayList<>();
+        roots.add(Location.get(tempDir.toString()));
+
+        // Add standard protobuf libraries
+        roots.addAll(STANDARD_PROTOBUF_LOCATIONS);
+
+        schemaLoader.initRoots(roots, Collections.emptyList());
+
+        // Load and return the compiled schema
+        return schemaLoader.loadSchema();
+    }
+
+    private void safeDeleteDirectory(final Path directory) {
+        if (Files.exists(directory)) {
+            try {
+                FileUtils.deleteDirectory(directory.toFile());
+            } catch (final IOException | IllegalArgumentException e) {
+                logger.warn("Failed to delete temporary directory: {}", 
directory, e);
+            }
+        }
+    }
+
+
+    /**
+     * Writes a schema definition to the temporary directory structure.
+     * If package name is present, creates the appropriate directory structure.
+     *
+     * @param tempDir          the temporary directory root
+     * @param schemaDefinition the schema definition to write
+     * @throws IOException if unable to create directories or write files
+     */
+    private void writeSchemaToTempDirectory(final Path tempDir, final 
SchemaDefinition schemaDefinition) throws IOException {
+        logger.debug("Writing schema definition to temporary directory. 
Identifier: {}", schemaDefinition.getIdentifier());
+
+        final String schemaFileName = generateSchemaFileName(schemaDefinition);
+        final Path schemaFile = tempDir.resolve(schemaFileName);
+
+        // Write schema text to file
+        Files.write(schemaFile, schemaDefinition.getText().getBytes(), CREATE, 
WRITE, TRUNCATE_EXISTING);
+        logger.debug("Successfully wrote schema to file: {} (string length: 
{})",
+            schemaFile, schemaDefinition.getText().length());
+    }
+
+    /**
+     * Generates a filename for a schema definition, ensuring it has a .proto 
extension.
+     *
+     * @param schemaDefinition the schema definition
+     * @return the generated filename
+     */
+    private String generateSchemaFileName(final SchemaDefinition 
schemaDefinition) {
+        String schemaFileName = 
schemaDefinition.getIdentifier().getName().orElseGet(
+            () -> 
String.valueOf(schemaDefinition.getIdentifier().getSchemaVersionId().orElse(0L))
+        );
+
+        if (!schemaFileName.endsWith(PROTO_EXTENSION)) {
+            schemaFileName += PROTO_EXTENSION; // Ensure the file ends with 
.proto, otherwise the wire library will not recognize it
+        }
+
+        return schemaFileName;
+    }
+
+    private void processSchemaReferences(final Path tempDir, final Map<String, 
SchemaDefinition> references) throws IOException {
+        logger.debug("Processing [{}] schema references in [{}]",
+            references.size(), tempDir);
+
+        for (final Map.Entry<String, SchemaDefinition> entry : 
references.entrySet()) {
+            final String referenceKey = entry.getKey();
+            final SchemaDefinition referencedSchema = entry.getValue();
+
+            logger.debug("Processing schema reference [{}] Identifier [{}]",
+                referenceKey, referencedSchema.getIdentifier());
+
+            // Write referenced schema to appropriate directory
+            writeSchemaToTempDirectory(tempDir, referencedSchema);
+
+            // Process nested references recursively
+            if (!referencedSchema.getReferences().isEmpty()) {
+                logger.debug("Processing {} nested references for schema 
reference: {}", referencedSchema.getReferences().size(), referenceKey);
+                processSchemaReferences(tempDir, 
referencedSchema.getReferences());
+            } else {
+                logger.debug("No nested references found for schema reference: 
{}", referenceKey);
+            }
+        }
+    }
+
+
+    @FunctionalInterface
+    private interface WithTemporaryDirectory<T> {
+        T apply(Path tempDir) throws Exception;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaValidator.java
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaValidator.java
new file mode 100644
index 0000000000..4e8257bad1
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufSchemaValidator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import org.apache.nifi.schemaregistry.services.SchemaDefinition;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+/**
+ * Validates Protocol Buffer SchemaDefinition objects and schema identifiers.
+ */
+final class ProtobufSchemaValidator {
+
+    private ProtobufSchemaValidator() {
+    }
+
+    /**
+     * Validates that all SchemaDefinition identifiers end with .proto 
extension.
+     * Performs recursive validation on all referenced schemas.
+     *
+     * @param schemaDefinition       the schema definition to validate
+     * @param isRootSchemaDefinition set to true if schema definition is a 
root definition, false otherwise
+     * @throws IllegalArgumentException if any identifier does not end with 
.proto extension
+     */
+    static void validateSchemaDefinitionIdentifiers(final SchemaDefinition 
schemaDefinition, final boolean isRootSchemaDefinition) {
+        // do not validate schema identifier names for root schema 
definitions. They might be coming from sources like text fields,
+        // flow file attributes and other sources that do not support naming.
+        if (!isRootSchemaDefinition) {
+            validateSchemaIdentifier(schemaDefinition.getIdentifier());
+        }
+
+        // Recursively validate all referenced schemas
+        // schema references have to end with .proto extension.
+        for (final SchemaDefinition referencedSchema : 
schemaDefinition.getReferences().values()) {
+            validateSchemaDefinitionIdentifiers(referencedSchema, false);
+        }
+    }
+
+    /**
+     * Validates that a single SchemaIdentifier has a name ending with .proto 
extension.
+     *
+     * @param schemaIdentifier the schema identifier to validate
+     * @throws IllegalArgumentException if the identifier name does not end 
with .proto extension
+     */
+    private static void validateSchemaIdentifier(final SchemaIdentifier 
schemaIdentifier) {
+        schemaIdentifier.getName()
+            .filter(name -> name.endsWith(".proto"))
+            .orElseThrow(() -> new IllegalArgumentException("Schema identifier 
must have a name that ends with .proto extension. Schema identifier: " + 
schemaIdentifier));
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/StandardProtobufReader.java
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/StandardProtobufReader.java
new file mode 100644
index 0000000000..b08ee5132e
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/StandardProtobufReader.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import com.squareup.wire.schema.Schema;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.MessageName;
+import org.apache.nifi.schemaregistry.services.MessageNameResolver;
+import org.apache.nifi.schemaregistry.services.SchemaDefinition;
+import org.apache.nifi.schemaregistry.services.SchemaReferenceReader;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.schemaregistry.services.StandardMessageNameFactory;
+import org.apache.nifi.schemaregistry.services.StandardSchemaDefinition;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaRegistryService;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.services.protobuf.schema.ProtoSchemaParser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.HexFormat;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
+import static 
org.apache.nifi.services.protobuf.StandardProtobufReader.MessageNameResolverStrategy.MESSAGE_NAME_PROPERTY;
+
+@Tags({"protobuf", "record", "reader", "parser"})
+@CapabilityDescription("""
+    Parses Protocol Buffers messages from binary format into NiFi Records. \
+    Supports multiple schema access strategies including inline schema text, 
schema registry lookup, \
+    and schema reference readers.
+    Protobuf reader needs to know the Proto schema message name in order to 
deserialize the binary payload correctly. \
+    The name of this message can be determined statically using 'Message Name' 
property, \
+    or dynamically, using a Message Name Resolver service.""")
+
+public class StandardProtobufReader extends SchemaRegistryService implements 
RecordReaderFactory {
+
+    public static final PropertyDescriptor MESSAGE_NAME_RESOLUTION_STRATEGY = 
new PropertyDescriptor.Builder()
+        .name("Message Name Resolution Strategy")
+        .description("Strategy for determining the Protocol Buffers message 
name for processing")
+        .required(true)
+        .allowableValues(MESSAGE_NAME_PROPERTY, 
MessageNameResolverStrategy.MESSAGE_NAME_RESOLVER)
+        .defaultValue(MESSAGE_NAME_PROPERTY)
+        .build();
+
+    public static final PropertyDescriptor MESSAGE_NAME = new 
PropertyDescriptor.Builder()
+        .name("Message Name")
+        .description("Fully qualified name of the Protocol Buffers message 
including its package (eg. mypackage.MyMessage).")
+        .required(true)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .dependsOn(MESSAGE_NAME_RESOLUTION_STRATEGY, MESSAGE_NAME_PROPERTY)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    public static final PropertyDescriptor MESSAGE_NAME_RESOLVER = new 
PropertyDescriptor.Builder()
+        .name("Message Name Resolver")
+        .description("Service that dynamically resolves Protocol Buffer 
message names from FlowFile content or attributes")
+        .required(true)
+        .identifiesControllerService(MessageNameResolver.class)
+        .dependsOn(MESSAGE_NAME_RESOLUTION_STRATEGY, 
MessageNameResolverStrategy.MESSAGE_NAME_RESOLVER)
+        .build();
+
+    private static final PropertyDescriptor PROTOBUF_SCHEMA_TEXT = new 
PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(SCHEMA_TEXT)
+        .required(true)
+        .clearValidators()
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .defaultValue("${proto.schema}")
+        .description("The text of a Proto 3 formatted Schema")
+        .build();
+
+    private static final String PROTO_EXTENSION = ".proto";
+
+    private volatile ProtobufSchemaCompiler schemaCompiler;
+    private volatile MessageNameResolver messageNameResolver;
+    private volatile SchemaReferenceReader schemaReferenceReader;
+    private volatile SchemaRegistry schemaRegistry;
+    private volatile String schemaAccessStrategyValue;
+    private volatile PropertyValue schemaText;
+    private volatile PropertyValue schemaName;
+    private volatile PropertyValue schemaBranchName;
+    private volatile PropertyValue schemaVersion;
+
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        super.storeSchemaAccessStrategy(context);
+        setupMessageNameResolver(context);
+        schemaAccessStrategyValue = 
context.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
+        schemaReferenceReader = 
context.getProperty(SCHEMA_REFERENCE_READER).asControllerService(SchemaReferenceReader.class);
+        schemaRegistry = 
context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
+        schemaName = context.getProperty(SCHEMA_NAME);
+        schemaText = context.getProperty(SCHEMA_TEXT);
+        schemaBranchName = context.getProperty(SCHEMA_BRANCH_NAME);
+        schemaVersion = context.getProperty(SCHEMA_VERSION);
+        schemaCompiler = new ProtobufSchemaCompiler(getIdentifier(), 
getLogger());
+    }
+
+    @Override
+    public RecordReader createRecordReader(final Map<String, String> 
variables, final InputStream in, final long inputLength, final ComponentLog 
logger) throws IOException, SchemaNotFoundException {
+        if (SCHEMA_TEXT_PROPERTY.getValue().equals(schemaAccessStrategyValue)) 
{
+            final SchemaDefinition schemaDefinition = 
createSchemaDefinitionFromText(variables);
+            return createProtobufRecordReader(variables, in, schemaDefinition);
+        } else if 
(SCHEMA_NAME_PROPERTY.getValue().equals(schemaAccessStrategyValue)) {
+            final SchemaDefinition schemaDefinition = 
createSchemaDefinitionFromRegistry(variables);
+            return createProtobufRecordReader(variables, in, schemaDefinition);
+        } else if 
(SCHEMA_REFERENCE_READER_PROPERTY.getValue().equals(schemaAccessStrategyValue)) 
{
+            final SchemaIdentifier schemaIdentifier = 
schemaReferenceReader.getSchemaIdentifier(variables, in);
+            final SchemaDefinition schemaDefinition = 
schemaRegistry.retrieveSchemaDefinition(schemaIdentifier);
+            logger.debug("Using message name for schema identifier: {}", 
schemaDefinition.getIdentifier());
+            return createProtobufRecordReader(variables, in, schemaDefinition);
+        }
+
+        throw new SchemaNotFoundException("Unsupported schema access strategy: 
" + schemaAccessStrategyValue);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(MESSAGE_NAME_RESOLUTION_STRATEGY);
+        properties.add(MESSAGE_NAME_RESOLVER);
+        properties.add(MESSAGE_NAME);
+        return properties;
+    }
+
+    @Override
+    protected PropertyDescriptor buildSchemaTextProperty() {
+        return PROTOBUF_SCHEMA_TEXT;
+    }
+
+    private RecordReader createProtobufRecordReader(final Map<String, String> 
variables, final InputStream in, final SchemaDefinition schemaDefinition) 
throws IOException {
+        final Schema schema = 
schemaCompiler.compileOrGetFromCache(schemaDefinition);
+        final ProtoSchemaParser schemaParser = new ProtoSchemaParser(schema);
+        final MessageName messageName = 
messageNameResolver.getMessageName(variables, schemaDefinition, in);
+        final RecordSchema recordSchema = 
schemaParser.createSchema(messageName.getFullyQualifiedName());
+        return new ProtobufRecordReader(schema, 
messageName.getFullyQualifiedName(), in, recordSchema);
+    }
+
+
+    private void setupMessageNameResolver(final ConfigurationContext context) {
+        final MessageNameResolverStrategy messageNameResolverStrategy = 
context.getProperty(MESSAGE_NAME_RESOLUTION_STRATEGY).asAllowableValue(MessageNameResolverStrategy.class);
+        messageNameResolver = switch (messageNameResolverStrategy) {
+            case MESSAGE_NAME_PROPERTY -> new 
PropertyMessageNameResolver(context);
+            case MESSAGE_NAME_RESOLVER -> 
context.getProperty(MESSAGE_NAME_RESOLVER).asControllerService(MessageNameResolver.class);
+        };
+    }
+
+    private SchemaDefinition createSchemaDefinitionFromText(final Map<String, 
String> variables) throws SchemaNotFoundException {
+        final String schemaTextString = 
schemaText.evaluateAttributeExpressions(variables).getValue();
+        validateSchemaText(schemaTextString);
+
+        final String hash = sha256Hex(schemaTextString);
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder()
+            .name(hash + PROTO_EXTENSION)
+            .build();
+
+        return new StandardSchemaDefinition(schemaIdentifier, 
schemaTextString, SchemaDefinition.SchemaType.PROTOBUF);
+    }
+
+    private String sha256Hex(final String input) {
+        MessageDigest digest;
+        try {
+            digest = MessageDigest.getInstance("SHA-256");
+        } catch (NoSuchAlgorithmException e) {
+            throw new IllegalStateException(e);
+        }
+        final byte[] hash = digest.digest(
+                input.getBytes(StandardCharsets.UTF_8));
+        return HexFormat.of().formatHex(hash);
+    }
+
+    private SchemaDefinition createSchemaDefinitionFromRegistry(final 
Map<String, String> variables) throws SchemaNotFoundException, IOException {
+        final String schemaNameValue = 
schemaName.evaluateAttributeExpressions(variables).getValue();
+        validateSchemaName(schemaNameValue);
+
+        final String schemaBranchNameValue = 
schemaBranchName.evaluateAttributeExpressions(variables).getValue();
+        final String schemaVersionValue = 
schemaVersion.evaluateAttributeExpressions(variables).getValue();
+
+        final SchemaIdentifier schemaIdentifier = 
buildSchemaIdentifier(schemaNameValue, schemaBranchNameValue, 
schemaVersionValue);
+        return schemaRegistry.retrieveSchemaDefinition(schemaIdentifier);
+    }
+
+    private SchemaIdentifier buildSchemaIdentifier(final String 
schemaNameValue, final String schemaBranchNameValue, final String 
schemaVersionValue) throws SchemaNotFoundException {
+        final SchemaIdentifier.Builder identifierBuilder = 
SchemaIdentifier.builder().name(schemaNameValue);
+
+        if (schemaBranchNameValue != null && !schemaBranchNameValue.isBlank()) 
{
+            identifierBuilder.branch(schemaBranchNameValue);
+        }
+
+        if (schemaVersionValue != null && !schemaVersionValue.isBlank()) {
+            try {
+                identifierBuilder.version(Integer.valueOf(schemaVersionValue));
+            } catch (final NumberFormatException nfe) {
+                throw new SchemaNotFoundException("Could not retrieve schema 
with name '%s' because a non-numeric version was supplied '%s'"
+                    .formatted(schemaNameValue, schemaVersionValue), nfe);
+            }
+        }
+
+        return identifierBuilder.build();
+    }
+
+    private void validateSchemaText(final String schemaTextString) throws 
SchemaNotFoundException {
+        if (schemaTextString == null || schemaTextString.isBlank()) {
+            throw new SchemaNotFoundException("Schema text not found");
+        }
+    }
+
+    private void validateSchemaName(final String schemaNameValue) throws 
SchemaNotFoundException {
+        if (schemaNameValue == null || schemaNameValue.isBlank()) {
+            throw new SchemaNotFoundException("Schema name not provided or is 
blank");
+        }
+    }
+
+
+    enum MessageNameResolverStrategy implements DescribedValue {
+
+        MESSAGE_NAME_PROPERTY("Message Name Property", "Use the 'Message Name' 
property value to determine the message name"),
+        MESSAGE_NAME_RESOLVER("Message Name Resolver", "Use a 'Message Name 
Resolver' service to dynamically determine the message name");
+
+        private final String displayName;
+        private final String description;
+
+        MessageNameResolverStrategy(final String displayName, final String 
description) {
+            this.displayName = displayName;
+            this.description = description;
+        }
+
+        @Override
+        public String getValue() {
+            return name();
+        }
+
+        @Override
+        public String getDisplayName() {
+            return displayName;
+        }
+
+        @Override
+        public String getDescription() {
+            return description;
+        }
+    }
+
+    static class PropertyMessageNameResolver extends AbstractControllerService 
implements MessageNameResolver {
+        private final PropertyContext context;
+
+        PropertyMessageNameResolver(PropertyContext context) {
+            this.context = context;
+        }
+
+        @Override
+        public MessageName getMessageName(final Map<String, String> variables, 
final SchemaDefinition schemaDefinition, final InputStream in) {
+            final String messageName = 
context.getProperty(MESSAGE_NAME).evaluateAttributeExpressions(variables).getValue();
+            return StandardMessageNameFactory.fromName(messageName);
+        }
+    }
+
+}
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 44ded1008f..8ff7783b66 100644
--- 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.nifi.services.protobuf.ProtobufReader
\ No newline at end of file
+org.apache.nifi.services.protobuf.ProtobufReader
+org.apache.nifi.services.protobuf.StandardProtobufReader
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/ProtoTestUtil.java
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/ProtoTestUtil.java
index 3293513b82..b9b55f2019 100644
--- 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/ProtoTestUtil.java
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/ProtoTestUtil.java
@@ -16,8 +16,11 @@
  */
 package org.apache.nifi.services.protobuf;
 
-import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
 import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
 import com.google.protobuf.DynamicMessage;
 import com.squareup.wire.schema.CoreLoaderKt;
 import com.squareup.wire.schema.Location;
@@ -31,23 +34,24 @@ import java.nio.file.FileSystems;
 import java.util.Arrays;
 import java.util.Collections;
 
+import static java.lang.System.arraycopy;
 import static 
org.apache.nifi.services.protobuf.converter.ProtobufDataConverter.MAP_KEY_FIELD_NAME;
 import static 
org.apache.nifi.services.protobuf.converter.ProtobufDataConverter.MAP_VALUE_FIELD_NAME;
 
-public class ProtoTestUtil {
+public final class ProtoTestUtil {
 
-    public static final String BASE_TEST_PATH = "src/test/resources/";
+    static final String BASE_TEST_PATH = "src/test/resources/";
 
     public static Schema loadProto3TestSchema() {
-        final SchemaLoader schemaLoader = new 
SchemaLoader(FileSystems.getDefault());
-        
schemaLoader.initRoots(Collections.singletonList(Location.get(BASE_TEST_PATH + 
"test_proto3.proto")), Collections.emptyList());
-        return schemaLoader.loadSchema();
+        return loadTestSchema("test_proto3.proto");
+    }
+
+    public static Schema loadRootMessageSchema() {
+        return 
loadTestSchema("org/apache/nifi/protobuf/test/root_message.proto");
     }
 
     public static Schema loadRepeatedProto3TestSchema() {
-        final SchemaLoader schemaLoader = new 
SchemaLoader(FileSystems.getDefault());
-        
schemaLoader.initRoots(Collections.singletonList(Location.get(BASE_TEST_PATH + 
"test_repeated_proto3.proto")), Collections.emptyList());
-        return schemaLoader.loadSchema();
+        return loadTestSchema("test_repeated_proto3.proto");
     }
 
     public static Schema loadProto2TestSchema() {
@@ -59,34 +63,52 @@ public class ProtoTestUtil {
     }
 
     public static Schema loadCircularReferenceTestSchema() {
-        final SchemaLoader schemaLoader = new 
SchemaLoader(FileSystems.getDefault());
-        
schemaLoader.initRoots(Collections.singletonList(Location.get(BASE_TEST_PATH + 
"test_circular_reference.proto")), Collections.emptyList());
-        return schemaLoader.loadSchema();
+        return loadTestSchema("test_circular_reference.proto");
     }
 
+    public static InputStream generateInputDataForRootMessage() throws 
IOException, Descriptors.DescriptorValidationException {
+        final FileDescriptor fileDescriptor = 
loadFileDescriptor("org/apache/nifi/protobuf/test/root_message.desc");
+
+        final Descriptor messageDescriptor = 
fileDescriptor.findMessageTypeByName("RootMessage");
+        final Descriptor nestedMessageDescriptor = 
fileDescriptor.findMessageTypeByName("NestedMessage");
+        final Descriptors.EnumDescriptor enumValueDescriptor = 
fileDescriptor.findEnumTypeByName("TestEnum");
+
+        final DynamicMessage nestedMessage = DynamicMessage
+            .newBuilder(nestedMessageDescriptor)
+            .setField(nestedMessageDescriptor.findFieldByNumber(2), 
enumValueDescriptor.findValueByNumber(2))
+            .build();
+
+        final DynamicMessage message = DynamicMessage
+            .newBuilder(messageDescriptor)
+            .setField(messageDescriptor.findFieldByNumber(1), nestedMessage)
+            .build();
+
+        return message.toByteString().newInput();
+    }
+
+
     public static InputStream generateInputDataForProto3() throws IOException, 
Descriptors.DescriptorValidationException {
-        DescriptorProtos.FileDescriptorSet descriptorSet = 
DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH 
+ "test_proto3.desc"));
-        Descriptors.FileDescriptor fileDescriptor = 
Descriptors.FileDescriptor.buildFrom(descriptorSet.getFile(0), new 
Descriptors.FileDescriptor[0]);
+        final FileDescriptor fileDescriptor = 
loadFileDescriptor("test_proto3.desc");
 
-        Descriptors.Descriptor messageDescriptor = 
fileDescriptor.findMessageTypeByName("Proto3Message");
-        Descriptors.Descriptor nestedMessageDescriptor = 
fileDescriptor.findMessageTypeByName("NestedMessage");
-        Descriptors.Descriptor nestedMessageDescriptor2 = 
fileDescriptor.findMessageTypeByName("NestedMessage2");
-        Descriptors.EnumDescriptor enumValueDescriptor = 
fileDescriptor.findEnumTypeByName("TestEnum");
-        Descriptors.Descriptor mapDescriptor = 
nestedMessageDescriptor2.findNestedTypeByName("TestMapEntry");
+        final Descriptor messageDescriptor = 
fileDescriptor.findMessageTypeByName("Proto3Message");
+        final Descriptor nestedMessageDescriptor = 
fileDescriptor.findMessageTypeByName("NestedMessage");
+        final Descriptor nestedMessageDescriptor2 = 
fileDescriptor.findMessageTypeByName("NestedMessage2");
+        final Descriptors.EnumDescriptor enumValueDescriptor = 
fileDescriptor.findEnumTypeByName("TestEnum");
+        final Descriptor mapDescriptor = 
nestedMessageDescriptor2.findNestedTypeByName("TestMapEntry");
 
-        DynamicMessage mapEntry1 = DynamicMessage
+        final DynamicMessage mapEntry1 = DynamicMessage
                 .newBuilder(mapDescriptor)
                 .setField(mapDescriptor.findFieldByName(MAP_KEY_FIELD_NAME), 
"test_key_entry1")
                 .setField(mapDescriptor.findFieldByName(MAP_VALUE_FIELD_NAME), 
101)
                 .build();
 
-        DynamicMessage mapEntry2 = DynamicMessage
+        final DynamicMessage mapEntry2 = DynamicMessage
                 .newBuilder(mapDescriptor)
                 .setField(mapDescriptor.findFieldByName(MAP_KEY_FIELD_NAME), 
"test_key_entry2")
                 .setField(mapDescriptor.findFieldByName(MAP_VALUE_FIELD_NAME), 
202)
                 .build();
 
-        DynamicMessage nestedMessage2 = DynamicMessage
+        final DynamicMessage nestedMessage2 = DynamicMessage
                 .newBuilder(nestedMessageDescriptor2)
                 .setField(nestedMessageDescriptor2.findFieldByNumber(30), 
Arrays.asList(mapEntry1, mapEntry2))
                 .setField(nestedMessageDescriptor2.findFieldByNumber(31), "One 
Of Option")
@@ -94,13 +116,13 @@ public class ProtoTestUtil {
                 .setField(nestedMessageDescriptor2.findFieldByNumber(33), 3)
                 .build();
 
-        DynamicMessage nestedMessage = DynamicMessage
+        final DynamicMessage nestedMessage = DynamicMessage
                 .newBuilder(nestedMessageDescriptor)
                 .setField(nestedMessageDescriptor.findFieldByNumber(20), 
enumValueDescriptor.findValueByNumber(2))
                 
.addRepeatedField(nestedMessageDescriptor.findFieldByNumber(21), nestedMessage2)
                 .build();
 
-        DynamicMessage message = DynamicMessage
+        final DynamicMessage message = DynamicMessage
                 .newBuilder(messageDescriptor)
                 .setField(messageDescriptor.findFieldByNumber(1), true)
                 .setField(messageDescriptor.findFieldByNumber(2), "Test text")
@@ -124,14 +146,12 @@ public class ProtoTestUtil {
     }
 
     public static InputStream generateInputDataForRepeatedProto3() throws 
IOException, Descriptors.DescriptorValidationException {
-        DescriptorProtos.FileDescriptorSet descriptorSet = 
DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH 
+ "test_repeated_proto3.desc"));
-        Descriptors.FileDescriptor fileDescriptor = 
Descriptors.FileDescriptor.buildFrom(descriptorSet.getFile(0), new 
Descriptors.FileDescriptor[0]);
-
-        Descriptors.Descriptor messageDescriptor = 
fileDescriptor.findMessageTypeByName("RootMessage");
-        Descriptors.Descriptor repeatedMessageDescriptor = 
fileDescriptor.findMessageTypeByName("RepeatedMessage");
-        Descriptors.EnumDescriptor enumValueDescriptor = 
fileDescriptor.findEnumTypeByName("TestEnum");
+        final FileDescriptor fileDescriptor = 
loadFileDescriptor("test_repeated_proto3.desc");
+        final Descriptor messageDescriptor = 
fileDescriptor.findMessageTypeByName("RootMessage");
+        final Descriptor repeatedMessageDescriptor = 
fileDescriptor.findMessageTypeByName("RepeatedMessage");
+        final Descriptors.EnumDescriptor enumValueDescriptor = 
fileDescriptor.findEnumTypeByName("TestEnum");
 
-        DynamicMessage repeatedMessage1 = DynamicMessage
+        final DynamicMessage repeatedMessage1 = DynamicMessage
                 .newBuilder(repeatedMessageDescriptor)
                 
.addRepeatedField(repeatedMessageDescriptor.findFieldByNumber(1), true)
                 
.addRepeatedField(repeatedMessageDescriptor.findFieldByNumber(1), false)
@@ -167,12 +187,12 @@ public class ProtoTestUtil {
                 
.addRepeatedField(repeatedMessageDescriptor.findFieldByNumber(16), 
enumValueDescriptor.findValueByNumber(2))
                 .build();
 
-        DynamicMessage repeatedMessage2 = DynamicMessage
+        final DynamicMessage repeatedMessage2 = DynamicMessage
                 .newBuilder(repeatedMessageDescriptor)
                 
.addRepeatedField(repeatedMessageDescriptor.findFieldByNumber(1), true)
                 .build();
 
-        DynamicMessage rootMessage = DynamicMessage
+        final DynamicMessage rootMessage = DynamicMessage
                 .newBuilder(messageDescriptor)
                 .addRepeatedField(messageDescriptor.findFieldByNumber(1), 
repeatedMessage1)
                 .addRepeatedField(messageDescriptor.findFieldByNumber(1), 
repeatedMessage2)
@@ -182,30 +202,27 @@ public class ProtoTestUtil {
     }
 
     public static InputStream generateInputDataForProto2() throws IOException, 
Descriptors.DescriptorValidationException {
-        DescriptorProtos.FileDescriptorSet anyDescriptorSet = 
DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH 
+ "google/protobuf/any.desc"));
-        Descriptors.FileDescriptor anyDesc = 
Descriptors.FileDescriptor.buildFrom(anyDescriptorSet.getFile(0), new 
Descriptors.FileDescriptor[]{});
+        final FileDescriptor anyDesc = 
loadFileDescriptor("google/protobuf/any.desc");
+        final FileDescriptor fileDescriptor = 
loadFileDescriptor("test_proto2.desc", new FileDescriptor[]{anyDesc});
 
-        DescriptorProtos.FileDescriptorSet descriptorSet = 
DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH 
+ "test_proto2.desc"));
-        Descriptors.FileDescriptor fileDescriptor = 
Descriptors.FileDescriptor.buildFrom(descriptorSet.getFile(0), new 
Descriptors.FileDescriptor[]{anyDesc});
+        final Descriptor messageDescriptor = 
fileDescriptor.findMessageTypeByName("Proto2Message");
+        final Descriptor anyTestDescriptor = 
fileDescriptor.findMessageTypeByName("AnyValueMessage");
+        final Descriptors.FieldDescriptor fieldDescriptor = 
fileDescriptor.findExtensionByName("extensionField");
+        final Descriptor anyDescriptor = anyDesc.findMessageTypeByName("Any");
 
-        Descriptors.Descriptor messageDescriptor = 
fileDescriptor.findMessageTypeByName("Proto2Message");
-        Descriptors.Descriptor anyTestDescriptor = 
fileDescriptor.findMessageTypeByName("AnyValueMessage");
-        Descriptors.FieldDescriptor fieldDescriptor = 
fileDescriptor.findExtensionByName("extensionField");
-        Descriptors.Descriptor anyDescriptor = 
anyDesc.findMessageTypeByName("Any");
-
-        DynamicMessage anyTestMessage = DynamicMessage
+        final DynamicMessage anyTestMessage = DynamicMessage
                 .newBuilder(anyTestDescriptor)
                 .setField(anyTestDescriptor.findFieldByNumber(1), "Test field 
1")
                 .setField(anyTestDescriptor.findFieldByNumber(2), "Test field 
2")
                 .build();
 
-        DynamicMessage anyMessage = DynamicMessage
+        final DynamicMessage anyMessage = DynamicMessage
                 .newBuilder(anyDescriptor)
                 .setField(anyDescriptor.findFieldByNumber(1), 
"type.googleapis.com/AnyValueMessage")
                 .setField(anyDescriptor.findFieldByNumber(2), 
anyTestMessage.toByteArray())
                 .build();
 
-        DynamicMessage message = DynamicMessage
+        final DynamicMessage message = DynamicMessage
                 .newBuilder(messageDescriptor)
                 .setField(messageDescriptor.findFieldByNumber(1), true)
                 .setField(messageDescriptor.findFieldByNumber(3), anyMessage)
@@ -214,4 +231,74 @@ public class ProtoTestUtil {
 
         return message.toByteString().newInput();
     }
+
+    static FileDescriptor loadFileDescriptor(final String descriptorFileName) 
throws IOException, Descriptors.DescriptorValidationException {
+        final FileDescriptorSet descriptorSet = FileDescriptorSet.parseFrom(
+            new FileInputStream(BASE_TEST_PATH + descriptorFileName));
+        return FileDescriptor.buildFrom(descriptorSet.getFile(0), new 
FileDescriptor[0]);
+    }
+
+    static FileDescriptor loadFileDescriptor(final String descriptorFileName, 
final FileDescriptor[] dependencies)
+        throws IOException, Descriptors.DescriptorValidationException {
+        final FileDescriptorSet descriptorSet = FileDescriptorSet.parseFrom(
+            new FileInputStream(BASE_TEST_PATH + descriptorFileName));
+        return FileDescriptor.buildFrom(descriptorSet.getFile(0), 
dependencies);
+    }
+
+    static FileDescriptor[] loadAllFileDescriptors(final String 
descriptorFileName) throws IOException, 
Descriptors.DescriptorValidationException {
+        final FileDescriptorSet descriptorSet = FileDescriptorSet.parseFrom(
+            new FileInputStream(BASE_TEST_PATH + descriptorFileName));
+
+        final FileDescriptor[] fileDescriptors = new 
FileDescriptor[descriptorSet.getFileCount()];
+
+        // Build descriptors in dependency order
+        for (int i = 0; i < descriptorSet.getFileCount(); i++) {
+            final FileDescriptorProto fileProto = descriptorSet.getFile(i);
+            final FileDescriptor[] dependencies = new FileDescriptor[i];
+            arraycopy(fileDescriptors, 0, dependencies, 0, i);
+            fileDescriptors[i] = FileDescriptor.buildFrom(fileProto, 
dependencies);
+        }
+
+        return fileDescriptors;
+    }
+
+    /**
+     * Loads all file descriptors from a .desc file and returns a simplified 
container
+     * that can find message types by name without array indexing.
+     */
+    static DescriptorContainer loadDescriptorContainer(final String 
descriptorFileName) throws IOException, 
Descriptors.DescriptorValidationException {
+        return new 
DescriptorContainer(loadAllFileDescriptors(descriptorFileName));
+    }
+
+    private static Schema loadTestSchema(final String protoFileName) {
+        final SchemaLoader schemaLoader = new 
SchemaLoader(FileSystems.getDefault());
+        
schemaLoader.initRoots(Collections.singletonList(Location.get(BASE_TEST_PATH + 
protoFileName)), Collections.emptyList());
+        return schemaLoader.loadSchema();
+    }
+
+    /**
+     * A simplified descriptor container that allows finding message types by 
name
+     * without needing to know which specific FileDescriptor contains the 
message.
+     */
+    static class DescriptorContainer {
+
+        private final FileDescriptor[] fileDescriptors;
+
+        DescriptorContainer(final FileDescriptor[] fileDescriptors) {
+            this.fileDescriptors = fileDescriptors;
+        }
+
+        /**
+         * Finds a message type by name across all loaded file descriptors.
+         */
+        Descriptor findMessageTypeByName(final String messageName) {
+            for (final FileDescriptor fileDescriptor : fileDescriptors) {
+                final Descriptor descriptor = 
fileDescriptor.findMessageTypeByName(messageName);
+                if (descriptor != null) {
+                    return descriptor;
+                }
+            }
+            throw new IllegalArgumentException("Message type '" + messageName 
+ "' not found in loaded descriptors");
+        }
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/StandardProtobufReaderTestBase.java
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/StandardProtobufReaderTestBase.java
new file mode 100644
index 0000000000..fe63280876
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/StandardProtobufReaderTestBase.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.MessageName;
+import org.apache.nifi.schemaregistry.services.MessageNameResolver;
+import org.apache.nifi.schemaregistry.services.SchemaDefinition;
+import org.apache.nifi.schemaregistry.services.SchemaReferenceReader;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.schemaregistry.services.StandardMessageNameFactory;
+import org.apache.nifi.schemaregistry.services.StandardSchemaDefinition;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.Objects.requireNonNullElseGet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Base class for StandardProtobufReader tests containing common functionality
+ * and mock services that can be shared across different schema access 
strategy tests.
+ */
+public abstract class StandardProtobufReaderTestBase {
+
+    protected static final String STANDARD_PROTOBUF_READER_SERVICE_ID = 
"standard-protobuf-reader";
+    protected static final String MOCK_SCHEMA_REGISTRY_ID = 
"mock-schema-registry";
+    protected static final String MOCK_SCHEMA_REFERENCE_READER_ID = 
"mock-schema-reference-reader";
+    protected static final String MOCK_MESSAGE_NAME_RESOLVER_ID = 
"mock-message-name-resolver";
+
+    protected static final String PROTO_3_MESSAGE = "Proto3Message";
+    protected static final String PROTO_3_SCHEMA = "test_proto3.proto";
+
+    protected TestRunner runner;
+    protected StandardProtobufReader standardProtobufReader;
+    protected MockSchemaRegistry mockSchemaRegistry;
+    protected MockSchemaReferenceReader mockSchemaReferenceReader;
+    protected MockMessageNameResolver mockMessageNameResolver;
+
+
+    @BeforeEach
+    void setUp() throws InitializationException {
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        standardProtobufReader = new StandardProtobufReader();
+        runner.addControllerService(STANDARD_PROTOBUF_READER_SERVICE_ID, 
standardProtobufReader);
+        setupMockServices();
+    }
+
+    protected void setupMockServices() throws InitializationException {
+        mockSchemaRegistry = new MockSchemaRegistry();
+        mockSchemaReferenceReader = new MockSchemaReferenceReader();
+        mockMessageNameResolver = new MockMessageNameResolver();
+        runner.addControllerService(MOCK_SCHEMA_REGISTRY_ID, 
mockSchemaRegistry);
+        runner.addControllerService(MOCK_SCHEMA_REFERENCE_READER_ID, 
mockSchemaReferenceReader);
+        runner.addControllerService(MOCK_MESSAGE_NAME_RESOLVER_ID, 
mockMessageNameResolver);
+    }
+
+    protected void enableAllControllerServices() {
+        runner.enableControllerService(mockSchemaRegistry);
+        runner.enableControllerService(mockSchemaReferenceReader);
+        runner.enableControllerService(mockMessageNameResolver);
+        runner.enableControllerService(standardProtobufReader);
+    }
+
+    protected RecordReader createRecordReader(final byte[] testData) throws 
IOException, SchemaNotFoundException {
+        return standardProtobufReader.createRecordReader(
+            emptyMap(),
+            new ByteArrayInputStream(testData),
+            testData.length,
+            runner.getLogger()
+        );
+    }
+
+    protected RecordReader createRecordReader() throws IOException, 
SchemaNotFoundException {
+        final byte[] testData = new byte[0];
+        return createRecordReader(testData);
+    }
+
+    /**
+     * Reads the test_proto3.proto file from resources.
+     */
+    protected String getTestProto3File() {
+        try {
+            return new String(
+                
getClass().getClassLoader().getResourceAsStream("test_proto3.proto").readAllBytes()
+            );
+        } catch (final Exception e) {
+            throw new RuntimeException("Failed to read test_proto3.proto from 
resources", e);
+        }
+    }
+
+    protected ValidationResult verifyExactlyOneValidationError() {
+        final Collection<ValidationResult> results = 
runner.validate(standardProtobufReader);
+        final List<ValidationResult> invalids = results.stream().filter(result 
-> !result.isValid()).toList();
+        if (invalids.size() != 1) {
+            fail("Expected exactly one invalid result, but found: " + 
invalids.size());
+        }
+        return invalids.getFirst();
+    }
+
+    // assertions made on the test_proto3.proto message generated by the 
ProtoTestUtil.generateInputDataForProto3() method
+    protected void runAssertionsOnTestProto3Message(final RecordReader 
recordReader) throws MalformedRecordException, IOException {
+        final RecordSchema schema = recordReader.getSchema();
+        assertNotNull(schema);
+
+        // Iterate over record reader and validate actual data
+        final Record record = recordReader.nextRecord();
+        assertNotNull(record);
+
+        assertTrue(record.getAsBoolean("booleanField"));
+        assertEquals("Test text", record.getAsString("stringField"));
+        assertEquals(Integer.MAX_VALUE, record.getAsInt("int32Field"));
+        assertEquals(Integer.MIN_VALUE, record.getAsInt("sint32Field"));
+        final Byte[] bytesValue = (Byte[]) record.getValue("bytesField");
+        final String bytesString = new 
String(ArrayUtils.toPrimitive(bytesValue), UTF_8);
+
+        assertEquals("Test bytes", bytesString);
+        assertEquals(Long.MAX_VALUE, record.getAsLong("int64Field"));
+        assertEquals(Long.MIN_VALUE, record.getAsLong("sint64Field"));
+
+        // Validate nested message
+        final Record nestedMessage = (Record) record.getValue("nestedMessage");
+        assertNotNull(nestedMessage, "nestedMessage should not be null");
+        assertEquals("ENUM_VALUE_3", nestedMessage.getAsString("testEnum"));
+
+        // Validate nested message2 array
+        final Object[] nestedMessage2Value = 
nestedMessage.getAsArray("nestedMessage2");
+        assertNotNull(nestedMessage2Value);
+
+
+        assertEquals(1, nestedMessage2Value.length);
+        final Record nestedMessage2 = (Record) nestedMessage2Value[0];
+        assertNotNull(nestedMessage2);
+
+        // Validate map field
+        final Map<String, Object> testMap = (Map<String, Object>) 
nestedMessage2.getValue("testMap");
+        assertNotNull(testMap);
+
+        assertEquals(2, testMap.size());
+        assertEquals(101, ((Number) 
testMap.get("test_key_entry1")).intValue());
+        assertEquals(202, ((Number) 
testMap.get("test_key_entry2")).intValue());
+
+        assertNull(recordReader.nextRecord());
+    }
+
+
+    /**
+     * Mock implementation of SchemaRegistry for testing purposes.
+     */
+    protected static class MockSchemaRegistry extends 
AbstractControllerService implements SchemaRegistry {
+        private String schemaText = "";
+        private SchemaDefinition schemaDefinition = null;
+        private boolean shouldThrowSchemaNotFound = false;
+
+        public void returnSchemaText(final String schemaText) {
+            this.schemaText = schemaText;
+        }
+
+        public void returnSchemaDefinition(final SchemaDefinition 
schemaDefinition) {
+            this.schemaDefinition = schemaDefinition;
+        }
+
+        public void throwSchemaNotFoundWhenCalled(final boolean shouldThrow) {
+            this.shouldThrowSchemaNotFound = shouldThrow;
+        }
+
+        @Override
+        public RecordSchema retrieveSchema(final SchemaIdentifier 
schemaIdentifier) throws IOException, SchemaNotFoundException {
+            throw new UnsupportedOperationException("retrieveSchema is not 
implemented in MockSchemaRegistry");
+        }
+
+        @Override
+        public Set<SchemaField> getSuppliedSchemaFields() {
+            return emptySet();
+        }
+
+        @Override
+        public SchemaDefinition retrieveSchemaDefinition(final 
SchemaIdentifier schemaIdentifier) throws SchemaNotFoundException {
+            if (shouldThrowSchemaNotFound) {
+                throw new SchemaNotFoundException("Schema not found: " + 
schemaIdentifier);
+            }
+            return requireNonNullElseGet(schemaDefinition, () -> new 
StandardSchemaDefinition(schemaIdentifier, schemaText, 
SchemaDefinition.SchemaType.PROTOBUF));
+        }
+    }
+
+    /**
+     * Mock implementation of SchemaReferenceReader for testing purposes.
+     */
+    protected static class MockSchemaReferenceReader extends 
AbstractControllerService implements SchemaReferenceReader {
+        private SchemaIdentifier schemaIdentifier;
+        private boolean shouldThrowException = false;
+
+        public void returnSchemaIdentifierWithName(final String schemaName) {
+            schemaIdentifier = SchemaIdentifier.builder()
+                .name(schemaName)
+                .build();
+        }
+
+        public void throwExceptionWhenCalled(final boolean 
shouldThrowException) {
+            this.shouldThrowException = shouldThrowException;
+        }
+
+        @Override
+        public SchemaIdentifier getSchemaIdentifier(final Map<String, String> 
variables, final InputStream contentStream) throws IOException {
+            if (shouldThrowException) {
+                throw new IOException("Mock schema reference reader configured 
to throw exception");
+            }
+            return schemaIdentifier;
+        }
+
+        @Override
+        public Set<SchemaField> getSuppliedSchemaFields() {
+            return emptySet();
+        }
+    }
+
+    protected static class MockMessageNameResolver extends 
AbstractControllerService implements MessageNameResolver {
+        private String messageName;
+
+        public void returnMessageName(final String messageName) {
+            this.messageName = messageName;
+        }
+
+        @Override
+        public MessageName getMessageName(final Map<String, String> variables, 
final SchemaDefinition schemaDefinition, final InputStream contentStream) {
+            return StandardMessageNameFactory.fromName(messageName);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestProtobufReader.java
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestProtobufReader.java
new file mode 100644
index 0000000000..d42c6120e0
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestProtobufReader.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
+import static org.apache.nifi.services.protobuf.ProtobufReader.MESSAGE_TYPE;
+import static 
org.apache.nifi.services.protobuf.ProtobufReader.PROTOBUF_DIRECTORY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestProtobufReader {
+
+    private static final String PROTOBUF_READER_SERVICE_ID = "protobuf-reader";
+    private static final String PROTO3_MESSAGE_TYPE = "Proto3Message";
+    private static final String REPEATED_PROTO3_MESSAGE_TYPE = "RootMessage";
+    private static final String PROTO2_MESSAGE_TYPE = "Proto2Message";
+
+    private TestRunner runner;
+    private ProtobufReader protobufReader;
+
+    @BeforeEach
+    void setUp() throws InitializationException {
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        protobufReader = new ProtobufReader();
+        runner.addControllerService(PROTOBUF_READER_SERVICE_ID, 
protobufReader);
+    }
+
+
+    private static Stream<Arguments> validConfigurationProvider() {
+        return Stream.of(
+            Arguments.of("test_proto3.proto", PROTO3_MESSAGE_TYPE),
+            Arguments.of("test_repeated_proto3.proto", 
REPEATED_PROTO3_MESSAGE_TYPE),
+            Arguments.of("test_proto2.proto", PROTO2_MESSAGE_TYPE)
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("validConfigurationProvider")
+    void testValidConfigurations(final String protoFileName, final String 
messageType) throws IOException {
+        final Path testTempDir = createTempDirWithProtoFile(protoFileName);
+        runner.setProperty(protobufReader, PROTOBUF_DIRECTORY, 
testTempDir.toString());
+        runner.setProperty(protobufReader, MESSAGE_TYPE, messageType);
+        runner.enableControllerService(protobufReader);
+
+        runner.assertValid(protobufReader);
+    }
+
+    @Test
+    void testInvalidConfigurationMissingDirectory() {
+        runner.setProperty(protobufReader, MESSAGE_TYPE, PROTO3_MESSAGE_TYPE);
+
+        final Collection<ValidationResult> results = 
runner.validate(protobufReader);
+        assertFalse(results.stream().allMatch(ValidationResult::isValid));
+
+        final ValidationResult invalidResult = findFirstInvalid(results);
+        assertNotNull(invalidResult);
+        assertEquals("Proto Directory is required", 
invalidResult.getExplanation());
+    }
+
+    @Test
+    void testInvalidConfigurationMissingMessageType() throws IOException {
+        final Path testTempDir = 
createTempDirWithProtoFile("test_proto3.proto");
+        runner.setProperty(protobufReader, PROTOBUF_DIRECTORY, 
testTempDir.toString());
+
+        final Collection<ValidationResult> results = 
runner.validate(protobufReader);
+        assertFalse(results.stream().allMatch(ValidationResult::isValid));
+
+        final ValidationResult invalidResult = findFirstInvalid(results);
+        assertNotNull(invalidResult);
+        assertEquals("Message Type is required", 
invalidResult.getExplanation());
+    }
+
+    @Test
+    void testInvalidConfigurationNonExistentDirectory() throws IOException {
+        final Path testTempDir = 
createTempDirWithProtoFile("test_proto3.proto");
+        final String nonExistentDir = 
testTempDir.resolve("non-existent").toString();
+        runner.setProperty(protobufReader, PROTOBUF_DIRECTORY, nonExistentDir);
+        runner.setProperty(protobufReader, MESSAGE_TYPE, PROTO3_MESSAGE_TYPE);
+
+        final Collection<ValidationResult> results = 
runner.validate(protobufReader);
+        assertFalse(results.stream().allMatch(ValidationResult::isValid));
+
+        final ValidationResult invalidResult = findFirstInvalid(results);
+        assertNotNull(invalidResult);
+        assertTrue(invalidResult.getExplanation().contains("Directory does not 
exist"));
+    }
+
+    @Test
+    void testCustomValidationWithInvalidMessageType() throws IOException {
+        final Path testTempDir = 
createTempDirWithProtoFile("test_proto3.proto");
+        runner.setProperty(protobufReader, PROTOBUF_DIRECTORY, 
testTempDir.toString());
+        runner.setProperty(protobufReader, MESSAGE_TYPE, "NonExistentMessage");
+
+        final Collection<ValidationResult> results = 
runner.validate(protobufReader);
+        assertFalse(results.stream().allMatch(ValidationResult::isValid));
+
+        final ValidationResult invalidResult = findFirstInvalid(results);
+
+        assertNotNull(invalidResult);
+        assertTrue(invalidResult.getExplanation().contains("message type 
cannot be found"));
+    }
+
+    @Test
+    void testPropertyDescriptorsContainRequiredProperties() {
+        final List<PropertyDescriptor> descriptors = List.of(
+            SCHEMA_ACCESS_STRATEGY,
+            SCHEMA_REGISTRY,
+            SCHEMA_NAME,
+            SCHEMA_VERSION,
+            SCHEMA_BRANCH_NAME,
+            SCHEMA_TEXT,
+            SCHEMA_REFERENCE_READER,
+            PROTOBUF_DIRECTORY,
+            MESSAGE_TYPE
+        );
+        assertEquals(descriptors, 
protobufReader.getSupportedPropertyDescriptors());
+    }
+
+    @Test
+    void testCreateRecordReaderWithValidConfiguration() throws Exception {
+        final Path testTempDir = 
createTempDirWithProtoFile("test_proto3.proto");
+        runner.setProperty(protobufReader, PROTOBUF_DIRECTORY, 
testTempDir.toString());
+        runner.setProperty(protobufReader, MESSAGE_TYPE, PROTO3_MESSAGE_TYPE);
+        runner.enableControllerService(protobufReader);
+
+        try (final InputStream inputStream = 
ProtoTestUtil.generateInputDataForProto3()) {
+            final RecordReader recordReader = 
protobufReader.createRecordReader(Collections.emptyMap(), inputStream, 100L, 
runner.getLogger());
+
+            assertNotNull(recordReader);
+
+            final Record record = recordReader.nextRecord();
+            assertNotNull(record);
+            assertEquals(true, record.getValue("booleanField"));
+            assertEquals("Test text", record.getValue("stringField"));
+        }
+    }
+
+    @Test
+    void testCreateRecordReaderWithRepeatedFields() throws Exception {
+        final Path testTempDir = 
createTempDirWithProtoFile("test_repeated_proto3.proto");
+        runner.setProperty(protobufReader, PROTOBUF_DIRECTORY, 
testTempDir.toString());
+        runner.setProperty(protobufReader, MESSAGE_TYPE, 
REPEATED_PROTO3_MESSAGE_TYPE);
+        runner.enableControllerService(protobufReader);
+
+        try (final InputStream inputStream = 
ProtoTestUtil.generateInputDataForRepeatedProto3()) {
+            final RecordReader recordReader = 
protobufReader.createRecordReader(Collections.emptyMap(), inputStream, 100L, 
runner.getLogger());
+            assertNotNull(recordReader);
+            final Record record = recordReader.nextRecord();
+            assertNotNull(record);
+            assertNotNull(record.getValue("repeatedMessage"));
+        }
+    }
+
+
+    @Test
+    void testValidationWithCircularReferenceProto() throws IOException {
+        final Path testTempDir = 
createTempDirWithProtoFile("test_circular_reference.proto");
+        runner.setProperty(protobufReader, PROTOBUF_DIRECTORY, 
testTempDir.toString());
+        runner.setProperty(protobufReader, MESSAGE_TYPE, "A");
+
+        final Collection<ValidationResult> results = 
runner.validate(protobufReader);
+        assertTrue(results.stream().allMatch(ValidationResult::isValid));
+    }
+
+
+    private Path createTempDirWithProtoFile(final String protoFileName) throws 
IOException {
+        final Path testTempDir = Files.createTempDirectory("proto-test-");
+        try (final InputStream resourceStream = 
getClass().getClassLoader().getResourceAsStream(protoFileName)) {
+            if (resourceStream != null) {
+                Files.copy(resourceStream, testTempDir.resolve(protoFileName), 
StandardCopyOption.REPLACE_EXISTING);
+            }
+        }
+        return testTempDir;
+    }
+
+    private ValidationResult findFirstInvalid(final 
Collection<ValidationResult> results) {
+        return results.stream().filter(result -> 
!result.isValid()).findFirst().orElse(null);
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReader.java
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReader.java
new file mode 100644
index 0000000000..8cb7bd0689
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReader.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import com.google.protobuf.DynamicMessage;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaDefinition;
+import org.apache.nifi.schemaregistry.services.StandardSchemaDefinition;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.google.protobuf.Descriptors.Descriptor;
+import static com.google.protobuf.Descriptors.DescriptorValidationException;
+import static java.util.Collections.emptyMap;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+import static 
org.apache.nifi.services.protobuf.ProtoTestUtil.generateInputDataForProto3;
+import static 
org.apache.nifi.services.protobuf.ProtobufSchemaValidator.validateSchemaDefinitionIdentifiers;
+import static 
org.apache.nifi.services.protobuf.StandardProtobufReader.MESSAGE_NAME;
+import static 
org.apache.nifi.services.protobuf.StandardProtobufReader.MESSAGE_NAME_RESOLUTION_STRATEGY;
+import static 
org.apache.nifi.services.protobuf.StandardProtobufReader.MESSAGE_NAME_RESOLVER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestStandardProtobufReader extends StandardProtobufReaderTestBase {
+
+    @BeforeEach
+    void beforeEach() {
+        runner.setProperty(standardProtobufReader, SCHEMA_ACCESS_STRATEGY, 
SCHEMA_REFERENCE_READER_PROPERTY);
+        runner.setProperty(standardProtobufReader, SCHEMA_REGISTRY, 
MOCK_SCHEMA_REGISTRY_ID);
+        runner.setProperty(standardProtobufReader, SCHEMA_REFERENCE_READER, 
MOCK_SCHEMA_REFERENCE_READER_ID);
+        runner.setProperty(standardProtobufReader, 
MESSAGE_NAME_RESOLUTION_STRATEGY, 
StandardProtobufReader.MessageNameResolverStrategy.MESSAGE_NAME_RESOLVER);
+        runner.setProperty(standardProtobufReader, MESSAGE_NAME_RESOLVER, 
MOCK_MESSAGE_NAME_RESOLVER_ID);
+    }
+
+    @Test
+    void testStandardProtobufReaderWithTestProto3Schema() throws Exception {
+        final String testSchema = getTestProto3File();
+        mockSchemaRegistry.returnSchemaText(testSchema);
+        
mockSchemaReferenceReader.returnSchemaIdentifierWithName(PROTO_3_SCHEMA);
+        mockMessageNameResolver.returnMessageName(PROTO_3_MESSAGE);
+        enableAllControllerServices();
+
+        final InputStream testDataStream = generateInputDataForProto3();
+        final RecordReader recordReader = 
standardProtobufReader.createRecordReader(emptyMap(), testDataStream, 0L, 
runner.getLogger());
+
+        assertNotNull(recordReader);
+        runAssertionsOnTestProto3Message(recordReader);
+    }
+
+
+    @Test
+    void testSchemaNotFoundRethrown() {
+        mockSchemaRegistry.throwSchemaNotFoundWhenCalled(true);
+        enableAllControllerServices();
+
+        assertThrows(SchemaNotFoundException.class, this::createRecordReader);
+    }
+
+    @Test
+    void testSchemaReferenceReaderException() {
+        mockSchemaRegistry.returnSchemaText(getTestProto3File());
+        mockSchemaReferenceReader.throwExceptionWhenCalled(true);
+
+        enableAllControllerServices();
+
+        assertThrows(IOException.class, this::createRecordReader);
+    }
+
+    @Test
+    void testCreateRecordReaderWithSchemaReferences() throws Exception {
+        // Setup schema registry to return schema with references using 
existing mock
+        final String userProfileSchema = loadUserProfileSchema();
+        final String userSettingsSchema = loadUserSettingsSchema();
+
+        // Create schema definition with references
+        final SchemaDefinition userProfileDef = 
createSchemaDefinition("user_profile.proto", userProfileSchema, new 
HashMap<>());
+        // root message here is user_settings.proto, it has a reference to 
user_profile.proto
+        final SchemaDefinition userSettingsDef = 
createSchemaDefinition("user_settings.proto", userSettingsSchema, 
Map.of("user_profile.proto", userProfileDef));
+
+        mockSchemaRegistry.returnSchemaDefinition(userSettingsDef);
+        
mockSchemaReferenceReader.returnSchemaIdentifierWithName("user_settings");
+        
mockMessageNameResolver.returnMessageName("org.apache.nifi.protobuf.test.UserSettings");
+        enableAllControllerServices();
+
+        // Create test data using DynamicMessage
+        final InputStream inputStream = generateInputDataForUserSettings();
+        final RecordReader recordReader = 
standardProtobufReader.createRecordReader(emptyMap(), inputStream, 0L, 
runner.getLogger());
+
+        assertNotNull(recordReader);
+
+        final Record userSettings = recordReader.nextRecord();
+        assertNotNull(userSettings);
+        assertEquals("en-US", userSettings.getAsString("language"));
+        assertTrue(userSettings.getAsBoolean("two_factor_enabled"));
+
+        final Record profileRecord = (Record) userSettings.getValue("profile");
+        assertNotNull(profileRecord);
+        assertEquals("user123", profileRecord.getAsString("user_id"));
+        assertEquals(30, profileRecord.getAsInt("age"));
+    }
+
+    @Test
+    void testValidSchemaDefinitionWithProtoExtension() {
+        final SchemaDefinition validReferencedSchema = 
createSchemaDefinition("user_profile.proto");
+        final SchemaDefinition validMainSchema = 
createSchemaDefinition("user_settings.proto", Map.of("user_profile.proto", 
validReferencedSchema));
+
+        enableAllControllerServices();
+
+        validateSchemaDefinitionIdentifiers(validMainSchema, true);
+    }
+
+    @Test
+    void testRootSchemaIsAllowedToHaveInvalidName() {
+        final SchemaDefinition validReferencedSchema = 
createSchemaDefinition("user_profile.proto");
+        final SchemaDefinition validSchema = 
createSchemaDefinition("user_settings.noproto.extension", 
Map.of("user_profile.proto", validReferencedSchema));
+
+        enableAllControllerServices();
+
+        validateSchemaDefinitionIdentifiers(validSchema, true);
+    }
+
+    @Test
+    void testValidMainSchemaWithInvalidReferencedSchema() {
+        final SchemaDefinition invalidReferencedSchema = 
createSchemaDefinition("user_profile.invalid");
+        final SchemaDefinition mixedSchema = 
createSchemaDefinition("user_settings.proto",
+            Map.of("user_profile.invalid", invalidReferencedSchema));
+
+        enableAllControllerServices();
+
+        final IllegalArgumentException referencedException = 
assertThrows(IllegalArgumentException.class, () -> {
+            validateSchemaDefinitionIdentifiers(mixedSchema, true);
+        });
+
+        assertTrue(referencedException.getMessage().contains("ends with .proto 
extension"));
+    }
+
+    @Test
+    void testSchemaDefinitionWithMissingName() {
+        final SchemaDefinition schemaWithoutName = 
createSchemaDefinitionWithoutName();
+        enableAllControllerServices();
+        validateSchemaDefinitionIdentifiers(schemaWithoutName, true);
+    }
+
+    @Nested
+    class WithSchemaNamePropertyAccessStrategy {
+
+        @BeforeEach
+        void beforeEach() {
+            runner.setProperty(standardProtobufReader, SCHEMA_ACCESS_STRATEGY, 
SCHEMA_NAME_PROPERTY);
+            runner.setProperty(standardProtobufReader, MESSAGE_NAME, 
PROTO_3_MESSAGE);
+
+        }
+
+        @Test
+        void testStandardProtobufReaderWithTestProto3Schema() throws Exception 
{
+            runner.setProperty(standardProtobufReader, SCHEMA_NAME, 
PROTO_3_SCHEMA);
+            
TestStandardProtobufReader.this.testStandardProtobufReaderWithTestProto3Schema();
+        }
+
+        @Test
+        void testSchemaNotFoundRethrown() {
+            TestStandardProtobufReader.this.testSchemaNotFoundRethrown();
+        }
+
+        @Test
+        void testCreateRecordReaderWithSchemaReferences() throws Exception {
+            runner.setProperty(standardProtobufReader, SCHEMA_NAME, 
"user_settings");
+            
TestStandardProtobufReader.this.testCreateRecordReaderWithSchemaReferences();
+        }
+    }
+
+    @Nested
+    class WithSchemaTextAccessStrategy {
+
+        @BeforeEach
+        void beforeEach() {
+            runner.setProperty(standardProtobufReader, SCHEMA_ACCESS_STRATEGY, 
SCHEMA_TEXT_PROPERTY);
+            runner.setProperty(standardProtobufReader, SCHEMA_TEXT, 
getTestProto3File());
+
+        }
+
+        @Test
+        void testStandardProtobufReaderWithTestProto3Schema() throws Exception 
{
+            
TestStandardProtobufReader.this.testStandardProtobufReaderWithTestProto3Schema();
+        }
+    }
+
+    private SchemaDefinition createSchemaDefinition(final String name) {
+        return createSchemaDefinition(name, "test schema content", new 
HashMap<>());
+    }
+
+    private SchemaDefinition createSchemaDefinition(final String name, final 
Map<String, SchemaDefinition> references) {
+        return createSchemaDefinition(name, "test schema content", references);
+    }
+
+    private SchemaDefinition createSchemaDefinition(final String name, final 
String schemaText, final Map<String, SchemaDefinition> references) {
+        return new StandardSchemaDefinition(
+            SchemaIdentifier.builder()
+                .name(name)
+                .id(Long.MAX_VALUE)
+                .build(),
+            schemaText,
+            SchemaDefinition.SchemaType.PROTOBUF,
+            references
+        );
+    }
+
+    private SchemaDefinition createSchemaDefinitionWithoutName() {
+        return new StandardSchemaDefinition(
+            SchemaIdentifier.builder()
+                .id(1L)
+                .build(),
+            "test schema content",
+            SchemaDefinition.SchemaType.PROTOBUF,
+            new HashMap<>()
+        );
+    }
+
+    private InputStream generateInputDataForUserSettings() throws IOException, 
DescriptorValidationException {
+        // Load descriptor container from .desc file containing both 
UserProfile and UserSettings
+        final ProtoTestUtil.DescriptorContainer descriptorContainer = 
ProtoTestUtil.loadDescriptorContainer("org/apache/nifi/protobuf/test/user_settings.desc");
+
+        final Descriptor userProfileDescriptor = 
descriptorContainer.findMessageTypeByName("UserProfile");
+        final Descriptor userSettingsDescriptor = 
descriptorContainer.findMessageTypeByName("UserSettings");
+
+        // Create UserProfile message
+        final DynamicMessage userProfile = 
DynamicMessage.newBuilder(userProfileDescriptor)
+            .setField(userProfileDescriptor.findFieldByName("user_id"), 
"user123")
+            .setField(userProfileDescriptor.findFieldByName("age"), 30)
+            .build();
+
+        // Create UserSettings message with embedded UserProfile
+        final DynamicMessage userSettings = 
DynamicMessage.newBuilder(userSettingsDescriptor)
+            .setField(userSettingsDescriptor.findFieldByName("profile"), 
userProfile)
+            .setField(userSettingsDescriptor.findFieldByName("language"), 
"en-US")
+            
.setField(userSettingsDescriptor.findFieldByName("two_factor_enabled"), true)
+            .build();
+
+        return userSettings.toByteString().newInput();
+    }
+
+    private String loadProtoSchema(final String resourcePath) throws 
IOException {
+        try (final InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream(resourcePath)) {
+            if (inputStream == null) {
+                throw new IOException("Could not find " + resourcePath + " in 
classpath");
+            }
+            return new String(inputStream.readAllBytes());
+        }
+    }
+
+    private String loadUserProfileSchema() throws IOException {
+        return 
loadProtoSchema("org/apache/nifi/protobuf/test/user_profile.proto");
+    }
+
+    private String loadUserSettingsSchema() throws IOException {
+        return 
loadProtoSchema("org/apache/nifi/protobuf/test/user_settings.proto");
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReaderPropertyValidation.java
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReaderPropertyValidation.java
new file mode 100644
index 0000000000..b58b6504ea
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestStandardProtobufReaderPropertyValidation.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import org.apache.nifi.components.ValidationResult;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
+import static 
org.apache.nifi.services.protobuf.StandardProtobufReader.MESSAGE_NAME;
+import static 
org.apache.nifi.services.protobuf.StandardProtobufReader.MESSAGE_NAME_RESOLUTION_STRATEGY;
+import static 
org.apache.nifi.services.protobuf.StandardProtobufReader.MESSAGE_NAME_RESOLVER;
+import static 
org.apache.nifi.services.protobuf.StandardProtobufReader.MessageNameResolverStrategy.MESSAGE_NAME_PROPERTY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestStandardProtobufReaderPropertyValidation extends 
StandardProtobufReaderTestBase {
+
+    @BeforeEach
+    void beforeEach() {
+        runner.setProperty(standardProtobufReader, SCHEMA_ACCESS_STRATEGY, 
SCHEMA_REFERENCE_READER_PROPERTY);
+        runner.setProperty(standardProtobufReader, SCHEMA_REGISTRY, 
MOCK_SCHEMA_REGISTRY_ID);
+        runner.setProperty(standardProtobufReader, SCHEMA_REFERENCE_READER, 
MOCK_SCHEMA_REFERENCE_READER_ID);
+        runner.setProperty(standardProtobufReader, 
MESSAGE_NAME_RESOLUTION_STRATEGY, 
StandardProtobufReader.MessageNameResolverStrategy.MESSAGE_NAME_RESOLVER);
+        runner.setProperty(standardProtobufReader, MESSAGE_NAME_RESOLVER, 
MOCK_MESSAGE_NAME_RESOLVER_ID);
+        // Ensure configuration is valid before running tests
+        runner.assertValid(standardProtobufReader);
+    }
+
+    @Test
+    void testInvalidWithoutMessageNameProperty() {
+        runner.setProperty(standardProtobufReader, 
MESSAGE_NAME_RESOLUTION_STRATEGY, MESSAGE_NAME_PROPERTY);
+        runner.removeProperty(standardProtobufReader, MESSAGE_NAME);
+        final ValidationResult invalidResult = 
verifyExactlyOneValidationError();
+
+        assertEquals("Message Name is required", 
invalidResult.getExplanation());
+    }
+
+    @Test
+    void testInvalidWithoutMessageNameResolver() {
+        runner.setProperty(standardProtobufReader, 
MESSAGE_NAME_RESOLUTION_STRATEGY, 
StandardProtobufReader.MessageNameResolverStrategy.MESSAGE_NAME_RESOLVER);
+        runner.removeProperty(standardProtobufReader, MESSAGE_NAME_RESOLVER);
+        final ValidationResult invalidResult = 
verifyExactlyOneValidationError();
+
+        assertTrue(invalidResult.getExplanation().contains("Message Name 
Resolver"));
+    }
+
+    @Nested
+    class SchemaTextPropertyAccessStrategy {
+        @BeforeEach
+        void beforeEach() {
+            runner.setProperty(standardProtobufReader, SCHEMA_ACCESS_STRATEGY, 
SCHEMA_TEXT_PROPERTY);
+            // Ensure configuration is valid before running tests
+            runner.assertValid(standardProtobufReader);
+        }
+
+        @Test
+        void testValidWhenSchemaTextPropertyRemoved() { // default value of 
${proto.schema}
+            runner.removeProperty(standardProtobufReader, SCHEMA_TEXT);
+            enableAllControllerServices();
+            runner.assertValid(standardProtobufReader);
+        }
+    }
+
+    @Nested
+    class SchemaNamePropertyAccessStrategy {
+        @BeforeEach
+        void beforeEach() {
+            runner.setProperty(standardProtobufReader, SCHEMA_ACCESS_STRATEGY, 
SCHEMA_NAME_PROPERTY);
+            runner.setProperty(standardProtobufReader, SCHEMA_NAME, "Any 
schema name");
+            runner.setProperty(standardProtobufReader, SCHEMA_BRANCH_NAME, 
"Any branch");
+            // Ensure configuration is valid before running tests
+            runner.assertValid(standardProtobufReader);
+        }
+
+        @Test
+        void testValidWithMessageNameResolver() { // default setting from 
beforeEach
+            enableAllControllerServices();
+            runner.assertValid(standardProtobufReader);
+
+        }
+
+        @Test
+        void testValidWithMessageNameProperty() {
+            runner.setProperty(standardProtobufReader, 
MESSAGE_NAME_RESOLUTION_STRATEGY, MESSAGE_NAME_PROPERTY);
+            runner.setProperty(standardProtobufReader, MESSAGE_NAME, "Any 
message name");
+            enableAllControllerServices();
+            runner.assertValid(standardProtobufReader);
+        }
+
+        @Test
+        void testValidWithoutVersionAndBranch() {
+            runner.removeProperty(standardProtobufReader, SCHEMA_VERSION);
+            runner.removeProperty(standardProtobufReader, SCHEMA_BRANCH_NAME);
+            enableAllControllerServices();
+            runner.assertValid(standardProtobufReader);
+        }
+
+        @Test
+        void testInvalidWhenBranchAndVersionSetTogether() {
+            runner.setProperty(standardProtobufReader, SCHEMA_VERSION, "1");
+            runner.setProperty(standardProtobufReader, SCHEMA_BRANCH_NAME, 
"Any branch");
+
+            final ValidationResult invalidResult = 
verifyExactlyOneValidationError();
+
+            assertTrue(invalidResult.getExplanation().contains("Schema 
Branch"));
+        }
+
+        @Test
+        void testInvalidWithoutSchemaRegistry() {
+            runner.removeProperty(standardProtobufReader, SCHEMA_REGISTRY);
+            final ValidationResult invalidResult = 
verifyExactlyOneValidationError();
+
+            assertTrue(invalidResult.getExplanation().contains("Schema Name"));
+        }
+
+        @Test
+        void testValidWithoutSchemaName() { // default property value gets set 
automatically
+            runner.removeProperty(standardProtobufReader, SCHEMA_NAME);
+            runner.assertValid(standardProtobufReader);
+        }
+    }
+
+    @Nested
+    class SchemaReferenceReaderSchemaAccessStrategy {
+
+        @BeforeEach
+        void beforeEach() {
+            runner.setProperty(standardProtobufReader, SCHEMA_ACCESS_STRATEGY, 
SCHEMA_REFERENCE_READER_PROPERTY);
+            runner.setProperty(standardProtobufReader, SCHEMA_REGISTRY, 
MOCK_SCHEMA_REGISTRY_ID);
+            runner.setProperty(standardProtobufReader, 
SCHEMA_REFERENCE_READER, MOCK_SCHEMA_REFERENCE_READER_ID);
+            // Ensure configuration is valid before running tests
+            runner.assertValid(standardProtobufReader);
+        }
+
+        @Test
+        void testValidWithMessageNameResolver() { // default setting from 
beforeEach
+            enableAllControllerServices();
+            runner.assertValid(standardProtobufReader);
+
+        }
+
+        @Test
+        void testValidWithMessageNameProperty() {
+            runner.setProperty(standardProtobufReader, 
MESSAGE_NAME_RESOLUTION_STRATEGY, MESSAGE_NAME_PROPERTY);
+            runner.setProperty(standardProtobufReader, MESSAGE_NAME, "Any 
message name");
+            enableAllControllerServices();
+            runner.assertValid(standardProtobufReader);
+        }
+
+        @Test
+        void testInvalidWithoutSchemaRegistry() {
+            runner.removeProperty(standardProtobufReader, SCHEMA_REGISTRY);
+            final ValidationResult invalidResult = 
verifyExactlyOneValidationError();
+
+            assertTrue(invalidResult.getExplanation().contains("Schema 
Reference Reader"));
+        }
+
+        @Test
+        void testInvalidWithoutSchemaReferenceReader() {
+            runner.removeProperty(standardProtobufReader, 
SCHEMA_REFERENCE_READER);
+            final ValidationResult invalidResult = 
verifyExactlyOneValidationError();
+
+            assertTrue(invalidResult.getExplanation().contains("Schema 
Reference Reader"));
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/converter/TestProtobufDataConverter.java
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/converter/TestProtobufDataConverter.java
index 8690d6b7eb..26e922f272 100644
--- 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/converter/TestProtobufDataConverter.java
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/converter/TestProtobufDataConverter.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.services.protobuf.converter;
 
-import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
 import com.squareup.wire.schema.Schema;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.RecordSchema;
@@ -28,8 +28,10 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.util.Map;
 
+import static 
org.apache.nifi.services.protobuf.ProtoTestUtil.generateInputDataForRootMessage;
 import static 
org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto2TestSchema;
 import static 
org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto3TestSchema;
+import static 
org.apache.nifi.services.protobuf.ProtoTestUtil.loadRootMessageSchema;
 import static 
org.apache.nifi.services.protobuf.ProtoTestUtil.loadRepeatedProto3TestSchema;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -40,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class TestProtobufDataConverter {
 
     @Test
-    public void testDataConverterForProto3() throws 
Descriptors.DescriptorValidationException, IOException {
+    public void testDataConverterForProto3() throws 
DescriptorValidationException, IOException {
         final Schema schema = loadProto3TestSchema();
         final RecordSchema recordSchema = new 
ProtoSchemaParser(schema).createSchema("Proto3Message");
 
@@ -80,7 +82,19 @@ public class TestProtobufDataConverter {
     }
 
     @Test
-    public void testDataConverterForRepeatedProto3() throws 
Descriptors.DescriptorValidationException, IOException {
+    public void testDataConverterCanHandleNestedMessages() throws 
DescriptorValidationException, IOException {
+        final Schema schema = loadRootMessageSchema();
+        final RecordSchema recordSchema = new 
ProtoSchemaParser(schema).createSchema("org.apache.nifi.protobuf.test.RootMessage");
+
+        final ProtobufDataConverter dataConverter = new 
ProtobufDataConverter(schema, "org.apache.nifi.protobuf.test.RootMessage", 
recordSchema, false, false);
+        final MapRecord record = 
dataConverter.createRecord(generateInputDataForRootMessage());
+
+        final MapRecord nestedRecord = (MapRecord) 
record.getValue("nestedMessage");
+        assertEquals("ENUM_VALUE_3", nestedRecord.getValue("testEnum"));
+    }
+
+    @Test
+    public void testDataConverterForRepeatedProto3() throws 
DescriptorValidationException, IOException {
         final Schema schema = loadRepeatedProto3TestSchema();
         final RecordSchema recordSchema = new 
ProtoSchemaParser(schema).createSchema("RootMessage");
 
@@ -118,7 +132,7 @@ public class TestProtobufDataConverter {
     }
 
     @Test
-    public void testDataConverterForProto2() throws 
Descriptors.DescriptorValidationException, IOException {
+    public void testDataConverterForProto2() throws 
DescriptorValidationException, IOException {
         final Schema schema = loadProto2TestSchema();
         final RecordSchema recordSchema = new 
ProtoSchemaParser(schema).createSchema("Proto2Message");
 
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/root_message.desc
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/root_message.desc
new file mode 100644
index 0000000000..803b5db48d
Binary files /dev/null and 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/root_message.desc
 differ
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/root_message.proto
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/root_message.proto
new file mode 100644
index 0000000000..ff7ff5cdf1
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/root_message.proto
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package org.apache.nifi.protobuf.test;
+
+message RootMessage {
+  .org.apache.nifi.protobuf.test.NestedMessage nestedMessage = 1;
+}
+
+message NestedMessage {
+  .org.apache.nifi.protobuf.test.TestEnum testEnum = 2;
+
+}
+
+enum TestEnum {
+  ENUM_VALUE_1 = 0;
+  ENUM_VALUE_2 = 1;
+  ENUM_VALUE_3 = 2;
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_profile.desc
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_profile.desc
new file mode 100644
index 0000000000..0b6fcd5966
Binary files /dev/null and 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_profile.desc
 differ
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_profile.proto
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_profile.proto
new file mode 100644
index 0000000000..c5a01fe0f1
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_profile.proto
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package org.apache.nifi.protobuf.test;
+
+message UserProfile {
+  string user_id = 1;
+  int32 age = 4;
+  UserStatus status = 5;
+  Address address = 7;
+}
+
+message Address {
+  string street = 1;
+}
+
+enum UserStatus {
+  INACTIVE = 0;
+  ACTIVE = 1;
+} 
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_settings.desc
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_settings.desc
new file mode 100644
index 0000000000..c9c639cc74
Binary files /dev/null and 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_settings.desc
 differ
diff --git 
a/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_settings.proto
 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_settings.proto
new file mode 100644
index 0000000000..015b2a1df9
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/org/apache/nifi/protobuf/test/user_settings.proto
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package org.apache.nifi.protobuf.test;
+
+import "user_profile.proto";
+
+message UserSettings {
+  .org.apache.nifi.protobuf.test.UserProfile profile = 1;
+  NotificationSettings notifications = 2;
+  Theme theme = 4;
+  string language = 5;
+  bool two_factor_enabled = 7;
+}
+
+message NotificationSettings {
+  bool email_notifications = 1;
+}
+
+enum Theme {
+  LIGHT = 0;
+  DARK = 1;
+} 
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/MessageNameResolver.java
 
b/nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/MessageNameResolver.java
index 32aafacb2f..3dafe93013 100644
--- 
a/nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/MessageNameResolver.java
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/MessageNameResolver.java
@@ -21,9 +21,8 @@ import org.apache.nifi.controller.ControllerService;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
-
 /**
- * A service interface for resolving message names from schema definitions and 
input streams.
+ * An interface for resolving message names from schema definitions and input 
streams.
  * This interface is typically used in scenarios where message types need to 
be determined
  * dynamically from the content of the message and the associated schema 
definition.
  * <p>
diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/StandardMessageNameFactory.java
 
b/nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/StandardMessageNameFactory.java
new file mode 100644
index 0000000000..cdacc153fb
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/StandardMessageNameFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Factory class for creating StandardMessageName instances from various input 
formats.
+ */
+public final class StandardMessageNameFactory {
+    /**
+     * Parses a fully qualified message name that may include a package 
namespace.
+     * If the name contains a dot (.), everything before the last dot is 
treated as the namespace,
+     * and everything after the last dot is treated as the message name.
+     *
+     * @param fullMessageName the fully qualified message name (e.g., 
"mypackage.MyMessage" or "MyMessage")
+     * @return a new StandardMessageName instance with parsed namespace and 
name
+     * @throws NullPointerException if fullMessageName is null
+     */
+    public static StandardMessageName fromName(final String fullMessageName) {
+        Objects.requireNonNull(fullMessageName, "fullMessageName must not be 
null");
+        final int lastDotIndex = fullMessageName.lastIndexOf('.');
+        if (lastDotIndex > 0) {
+            final String namespace = fullMessageName.substring(0, 
lastDotIndex);
+            final String name = fullMessageName.substring(lastDotIndex + 1);
+            return new StandardMessageName(Optional.of(namespace), name);
+        } else {
+            return new StandardMessageName(Optional.empty(), fullMessageName);
+        }
+    }
+}

Reply via email to