[
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18014732#comment-18014732
]
Lukas Kucharski edited comment on NIFI-14424 at 8/18/25 9:02 PM:
-----------------------------------------------------------------
This ticket is not resolved. [~exceptionfactory] suggested to divide the
functionality into to smaller PRs. That is what's been done. The fist PR
[https://github.com/apache/nifi/pull/10105] The second PR
https://github.com/apache/nifi/pull/10214
was (Author: JIRAUSER308933):
This ticket is not resolved. [~exceptionfactory] suggested to divide the
functionality into to smaller PRs. That is what's been done. The second PR was
posted for the review 18 aug 2025
> Support for Confluent schema registry Protobuf wire format in
> StandardProtobufReader
> ------------------------------------------------------------------------------------
>
> Key: NIFI-14424
> URL: https://issues.apache.org/jira/browse/NIFI-14424
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Alaksiej Ščarbaty
> Assignee: Lukas Kucharski
> Priority: Major
> Fix For: 2.6.0
>
> Attachments: image-2025-07-16-12-52-20-296.png,
> image-2025-07-17-10-32-37-291.png, image-2025-07-17-14-40-58-260.png,
> image-2025-07-17-14-42-31-326.png, image-2025-07-17-14-43-58-023.png,
> image-2025-07-17-14-45-47-577.png, image-2025-07-17-14-51-24-943.png,
> image-2025-07-17-14-52-18-792.png
>
> Time Spent: 12h 50m
> Remaining Estimate: 0h
>
> h1. Introduction
> This change enables NIFI to deserialize protobuf messages encoded using the
> Confluent [protobuf wire
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> Confluent kafka protobuf serde components use a custom header for the
> message payload. The header needs to be parsed and interpreted. The existing
> NIFI Confluent integration can decode Confluent type headers only for the
> payload containing Avro data. For protobuf data, an additional
> variable-length byte section gets added to the header by the Kafka protobuf
> serializer.
>
> In addition, a new implementation for *ProtobufReader -
> StandardProtobufReader* will be added. The existing ProtobufReader component
> can't be easily extended without breaking backward compatibility because two
> component properties *Proto Directory* and *Message Type* are always
> required. So, even when the component is configured to use a schema registry
> and schema reference reader, the user needs to provide those two parameters.
> It looks like the component was never intended to be used with any other
> *Schema Access Strategy* than {*}Generate From Proto File{*}. Additionally,
> the current implementation can't work in a cluster without copying protobuf
> schemas manually onto every node.
> *StandardProtobufReader* will not support the *Generate From Proto File*
> schema access strategy. It will support the remaining ones from the list
> below:
> !image-2025-07-17-10-32-37-291.png!
> h1. History
> Initially, the changes have been pushed for review in [this
> pr.|https://github.com/apache/nifi/pull/10094] However, after an initial look
> by [~exceptionfactory], we decided that it's reasonable to create a new
> implementation of ProtobufReader instead of maintaining the old one. In
> addition, the PR will be divided into two smaller ones. First will cover
> changes to the confluent-bundle and common apis, and the second will cover
> the new ProtobufReader implementation
> With this context, the usecase below uses:
> {color:#de350b}*NEW* - {color}to indicate a feature or a component will be
> added as a result of this ticket.
> {color:#de350b}*EXISTING*{color} - to indicate a feature or a component
> existing in the current NIFI codebase
> h2.
> h1. The use case
> # The user has a schema registry and a Kafka cluster with the topic
> containing messages encoded according to the Confluent [protobuf wire
> format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> (Magic byte + 4byte schema_id + varint message indexes + payload)
> # The user configures *StandardProtobufReader* ** service
> # In *StandardProtobufReader* user sets Schema Access Strategy property to
> *Schema Reference Reader (EXISTING)*
> # In *StandardProtobufReader* sets Schema Registry property *(EXISTING)
> ConfluentSchemaRegistry*
> ## *ConfluentSchemaRegistry* was extended to support the fetching of
> referenced/included schemas. If a protobuf schema imports other schemas, they
> are fetched from the registry recursively. *(NEW)*
> ## *SchemaRegistry (EXISTING)* interface was extended to support returning a
> raw schema form ({*}NEW{*}), in addition to the currently returned
> RecordSchema. With this change, a new abstraction was introduced to
> nifi-schema-registry-service-api - *SchemaDefinition (NEW).* The intention of
> this is to represent a raw form of various schemas.
> # In *StandardProtobufReader* user sets Schema Reference reader ({*}existing
> ConfluentEncodedSchemaReferenceReader){*}
> # In *StandardProtobufReader* user sets *Message Name Resolver Strategy*
> property {*}(NEW){*}. Two options are available. One - *Message Name
> Property* *(NEW),* Two - *Message Name Resolver Service (NEW)*
> ## Option one will exist for simple cases where the user knows the name of
> the protobuf message upfront, and the message will not change dynamically
> during normal work.
> ## Option two allows the user to choose the controller service responsible
> for resolving the message name. In this case, the controller service looks at
> the header of the message payload to get the information needed to locate the
> Message within the protobuf schema (the information is a message index array,
> see protobuf wire format). This is done by
> *ConfluentProtobufMessageNameResolver(NEW)* service.
> ### *Explanation:* Why do we need message name resolving? Protobuf schemas
> can contain many message definitions at the root level (see example below).
> We need to explicitly give the message name to the deserializer for it to be
> able to do its job. A Schema file or schema text alone may not be enough
> because it may be ambiguous. The ProtobufReader/2 always needs the name of
> the proto schema message to be able to decode the binary payload. The new
> strategy defines a new interface *MessageNameResolver* *(NEW)* in
> nifi-schema-registry-service-api. Currently, one implementation exists:
> *ConfluentProtobufMessageNameResolver* *(NEW)* in
> nifi-confluent-platform-bundle.
> ### The ConfluentProtobufMessageNameResolver looks at the second byte in the
> input stream to decode Confluent standardized *message name indexes.* More
> about it here: [protobuf wire
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> The indexes are variable length and can be [0][2][0] or [1] etc. Indexes are
> zigzag encoded varints. *(NEW)*
> ### *Parsing of the protobuf message schemas:* After
> *ConfluentProtobufMessageNameResolver* decodes the message indexes it looks
> for the protobuf message name in the *SchemaDefinition (NEW).* Messages can
> be nested, so the message index decoded from the content can point to the
> nested message like `MessageA.MessageE.MessageG` {{
> !image-2025-07-16-12-52-20-296.png! }}
> ### The *ConfluentMessageNameResolver* needs to parse the schema in order to
> know the structure of the schema and locate the right message name *(NEW).*
> For this *ANTLR* parser is used {*}(NEW){*}. Proto3 grammar is being
> downloaded from the antlr repository at build time. Then the maven plugin
> generates the Parser for proto3 schema format.
> #### *Why not use the wire library to parse it?* In order to get the schema
> message, we need to parse the schema, not compile it. The difference is that
> we can parse an incomplete schema as opposed to compilation, where we would
> need a schema + imported schemas. Confluent in the implementation of its
> schema-registry uses a parser from an internal wire package:
> com.squareup.wire.schema.internal.parser.*. The decision was made not to use
> internal classes from 3rd party library. I think they are not public anymore
> in the newer versions. Additionally, I did not want to add a wire library
> dependency and its dependencies to the confluent-bundle because I remembered
> the maintainers favor keeping the memory footprint small. ANTLr parser uses a
> very small standalone antl-runtime.jar
> # The user configures *ConsumeKafka* processor with the processing strategy
> set to *RECORD (existing)* and sets the *Record Reader* property to the
> instance configured in earlier steps
> h1. New components introduced to API in nifi-schema-registry-service-api
> h2. SchemaDefinition
> A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text, and
> a collection of referenced {_}SchemaDefinitions{_}.
> h2. MessageNameResolver
> In the case of Protobuf a single proto file may contain multiple message
> entries. To understand what message to use for decoding, a message index is
> usually encoded in the payload. And as with a schema reference, different
> systems encode this index differently.
> _MessageNameResolver_ is a new _ControllerService_ that is going to translate
> an encoded Protobuf message index into a Protobuf Message name that’s later
> going to be used to find a correct Protobuf message for deserialization.
> Schema Registry dependent implementations will land in their respective
> modules. E.g., _nifi-confluent-schema-registry-service_ for Confluent.
> h2. MessageName
> Abstracts value returned by the MessageNameResolver
> h1. New components in nifi-protobuf-bundle
> h2. *StandardProtobufReader*
> Working name. The component will extend an abstract *SchemaRegistryService*
> like all Readers do.
> It will support the following Schema Access Strategies:
> !image-2025-07-17-14-42-31-326.png|width=616,height=356!!image-2025-07-17-14-43-58-023.png!!image-2025-07-17-14-45-47-577.png!
>
> *Message Name Resolver Strategy* property will have two available allowable
> values:
> !image-2025-07-17-14-52-18-792.png!
> When *Message name property* value gets selected, the component will show a
> new property (not visible on screens above) called *Message Name,* where the
> user can provide FQN name of the message
> When *Message Name Resolver Service* gets selected, the component will show a
> new property (not visible on screens above) called *Message Name Resolver
> Service,* where a user can provide a reference to the service.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)