[
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18014857#comment-18014857
]
Lukas Kucharski commented on NIFI-14424:
----------------------------------------
{quote}I may not have been clear in the previous recommendation, but both the
pull request and the Jira issue should be divided. So this issue should be
closed, given the pull request 10105 has been merged, and a new linked Jira
issue should be created. This enables clearer tracking with a one-to-one
association between Jira issue and pull request.
{quote}
Got it. Will update this ticket and create another one
> Support for Confluent schema registry Protobuf wire format in
> StandardProtobufReader
> ------------------------------------------------------------------------------------
>
> Key: NIFI-14424
> URL: https://issues.apache.org/jira/browse/NIFI-14424
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Alaksiej Ščarbaty
> Assignee: Lukas Kucharski
> Priority: Major
> Attachments: image-2025-07-16-12-52-20-296.png,
> image-2025-07-17-10-32-37-291.png, image-2025-07-17-14-40-58-260.png,
> image-2025-07-17-14-42-31-326.png, image-2025-07-17-14-43-58-023.png,
> image-2025-07-17-14-45-47-577.png, image-2025-07-17-14-51-24-943.png,
> image-2025-07-17-14-52-18-792.png
>
> Time Spent: 12h 50m
> Remaining Estimate: 0h
>
> h1. Introduction
> The ticket is split into 2 PRs.
> The fist PR [https://github.com/apache/nifi/pull/10105]
> The second PR https://github.com/apache/nifi/pull/10214
>
> This change enables NIFI to deserialize protobuf messages encoded using the
> Confluent [protobuf wire
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> Confluent kafka protobuf serde components use a custom header for the
> message payload. The header needs to be parsed and interpreted. The existing
> NIFI Confluent integration can decode Confluent type headers only for the
> payload containing Avro data. For protobuf data, an additional
> variable-length byte section gets added to the header by the Kafka protobuf
> serializer.
>
> In addition, a new implementation for *ProtobufReader -
> StandardProtobufReader* will be added. The existing ProtobufReader component
> can't be easily extended without breaking backward compatibility because two
> component properties *Proto Directory* and *Message Type* are always
> required. So, even when the component is configured to use a schema registry
> and schema reference reader, the user needs to provide those two parameters.
> It looks like the component was never intended to be used with any other
> *Schema Access Strategy* than {*}Generate From Proto File{*}. Additionally,
> the current implementation can't work in a cluster without copying protobuf
> schemas manually onto every node.
> *StandardProtobufReader* will not support the *Generate From Proto File*
> schema access strategy. It will support the remaining ones from the list
> below:
> !image-2025-07-17-10-32-37-291.png!
> h1. History
> Initially, the changes have been pushed for review in [this
> pr.|https://github.com/apache/nifi/pull/10094] However, after an initial look
> by [~exceptionfactory], we decided that it's reasonable to create a new
> implementation of ProtobufReader instead of maintaining the old one. In
> addition, the PR will be divided into two smaller ones. First will cover
> changes to the confluent-bundle and common apis, and the second will cover
> the new ProtobufReader implementation
> With this context, the usecase below uses:
> {color:#de350b}*NEW* - {color}to indicate a feature or a component will be
> added as a result of this ticket.
> {color:#de350b}*EXISTING*{color} - to indicate a feature or a component
> existing in the current NIFI codebase
> h2.
> h1. The use case
> # The user has a schema registry and a Kafka cluster with the topic
> containing messages encoded according to the Confluent [protobuf wire
> format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> (Magic byte + 4byte schema_id + varint message indexes + payload)
> # The user configures *StandardProtobufReader* ** service
> # In *StandardProtobufReader* user sets Schema Access Strategy property to
> *Schema Reference Reader (EXISTING)*
> # In *StandardProtobufReader* sets Schema Registry property *(EXISTING)
> ConfluentSchemaRegistry*
> ## *ConfluentSchemaRegistry* was extended to support the fetching of
> referenced/included schemas. If a protobuf schema imports other schemas, they
> are fetched from the registry recursively. *(NEW)*
> ## *SchemaRegistry (EXISTING)* interface was extended to support returning a
> raw schema form ({*}NEW{*}), in addition to the currently returned
> RecordSchema. With this change, a new abstraction was introduced to
> nifi-schema-registry-service-api - *SchemaDefinition (NEW).* The intention of
> this is to represent a raw form of various schemas.
> # In *StandardProtobufReader* user sets Schema Reference reader ({*}existing
> ConfluentEncodedSchemaReferenceReader){*}
> # In *StandardProtobufReader* user sets *Message Name Resolver Strategy*
> property {*}(NEW){*}. Two options are available. One - *Message Name
> Property* *(NEW),* Two - *Message Name Resolver Service (NEW)*
> ## Option one will exist for simple cases where the user knows the name of
> the protobuf message upfront, and the message will not change dynamically
> during normal work.
> ## Option two allows the user to choose the controller service responsible
> for resolving the message name. In this case, the controller service looks at
> the header of the message payload to get the information needed to locate the
> Message within the protobuf schema (the information is a message index array,
> see protobuf wire format). This is done by
> *ConfluentProtobufMessageNameResolver(NEW)* service.
> ### *Explanation:* Why do we need message name resolving? Protobuf schemas
> can contain many message definitions at the root level (see example below).
> We need to explicitly give the message name to the deserializer for it to be
> able to do its job. A Schema file or schema text alone may not be enough
> because it may be ambiguous. The ProtobufReader/2 always needs the name of
> the proto schema message to be able to decode the binary payload. The new
> strategy defines a new interface *MessageNameResolver* *(NEW)* in
> nifi-schema-registry-service-api. Currently, one implementation exists:
> *ConfluentProtobufMessageNameResolver* *(NEW)* in
> nifi-confluent-platform-bundle.
> ### The ConfluentProtobufMessageNameResolver looks at the second byte in the
> input stream to decode Confluent standardized *message name indexes.* More
> about it here: [protobuf wire
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
> The indexes are variable length and can be [0][2][0] or [1] etc. Indexes are
> zigzag encoded varints. *(NEW)*
> ### *Parsing of the protobuf message schemas:* After
> *ConfluentProtobufMessageNameResolver* decodes the message indexes it looks
> for the protobuf message name in the *SchemaDefinition (NEW).* Messages can
> be nested, so the message index decoded from the content can point to the
> nested message like `MessageA.MessageE.MessageG` {{
> !image-2025-07-16-12-52-20-296.png! }}
> ### The *ConfluentMessageNameResolver* needs to parse the schema in order to
> know the structure of the schema and locate the right message name *(NEW).*
> For this *ANTLR* parser is used {*}(NEW){*}. Proto3 grammar is being
> downloaded from the antlr repository at build time. Then the maven plugin
> generates the Parser for proto3 schema format.
> #### *Why not use the wire library to parse it?* In order to get the schema
> message, we need to parse the schema, not compile it. The difference is that
> we can parse an incomplete schema as opposed to compilation, where we would
> need a schema + imported schemas. Confluent in the implementation of its
> schema-registry uses a parser from an internal wire package:
> com.squareup.wire.schema.internal.parser.*. The decision was made not to use
> internal classes from 3rd party library. I think they are not public anymore
> in the newer versions. Additionally, I did not want to add a wire library
> dependency and its dependencies to the confluent-bundle because I remembered
> the maintainers favor keeping the memory footprint small. ANTLr parser uses a
> very small standalone antl-runtime.jar
> # The user configures *ConsumeKafka* processor with the processing strategy
> set to *RECORD (existing)* and sets the *Record Reader* property to the
> instance configured in earlier steps
> h1. New components introduced to API in nifi-schema-registry-service-api
> h2. SchemaDefinition
> A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text, and
> a collection of referenced {_}SchemaDefinitions{_}.
> h2. MessageNameResolver
> In the case of Protobuf a single proto file may contain multiple message
> entries. To understand what message to use for decoding, a message index is
> usually encoded in the payload. And as with a schema reference, different
> systems encode this index differently.
> _MessageNameResolver_ is a new _ControllerService_ that is going to translate
> an encoded Protobuf message index into a Protobuf Message name that’s later
> going to be used to find a correct Protobuf message for deserialization.
> Schema Registry dependent implementations will land in their respective
> modules. E.g., _nifi-confluent-schema-registry-service_ for Confluent.
> h2. MessageName
> Abstracts value returned by the MessageNameResolver
> h1. New components in nifi-protobuf-bundle
> h2. *StandardProtobufReader*
> Working name. The component will extend an abstract *SchemaRegistryService*
> like all Readers do.
> It will support the following Schema Access Strategies:
> !image-2025-07-17-14-42-31-326.png|width=616,height=356!!image-2025-07-17-14-43-58-023.png!!image-2025-07-17-14-45-47-577.png!
>
> *Message Name Resolver Strategy* property will have two available allowable
> values:
> !image-2025-07-17-14-52-18-792.png!
> When *Message name property* value gets selected, the component will show a
> new property (not visible on screens above) called *Message Name,* where the
> user can provide FQN name of the message
> When *Message Name Resolver Service* gets selected, the component will show a
> new property (not visible on screens above) called *Message Name Resolver
> Service,* where a user can provide a reference to the service.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)