[ 
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18014732#comment-18014732
 ] 

Lukas Kucharski edited comment on NIFI-14424 at 8/18/25 9:02 PM:
-----------------------------------------------------------------

This ticket is not resolved. [~exceptionfactory] suggested to divide the 
functionality into to smaller PRs. That is what's been done. The fist PR 
[https://github.com/apache/nifi/pull/10105] The second PR 
https://github.com/apache/nifi/pull/10214


was (Author: JIRAUSER308933):
This ticket is not resolved. [~exceptionfactory] suggested to divide the 
functionality into to smaller PRs. That is what's been done. The second PR was 
posted for the review 18 aug 2025

> Support for Confluent schema registry Protobuf wire format in 
> StandardProtobufReader
> ------------------------------------------------------------------------------------
>
>                 Key: NIFI-14424
>                 URL: https://issues.apache.org/jira/browse/NIFI-14424
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Alaksiej Ščarbaty
>            Assignee: Lukas Kucharski
>            Priority: Major
>             Fix For: 2.6.0
>
>         Attachments: image-2025-07-16-12-52-20-296.png, 
> image-2025-07-17-10-32-37-291.png, image-2025-07-17-14-40-58-260.png, 
> image-2025-07-17-14-42-31-326.png, image-2025-07-17-14-43-58-023.png, 
> image-2025-07-17-14-45-47-577.png, image-2025-07-17-14-51-24-943.png, 
> image-2025-07-17-14-52-18-792.png
>
>          Time Spent: 12h 50m
>  Remaining Estimate: 0h
>
> h1. Introduction
> This change enables NIFI to deserialize protobuf messages encoded using the 
> Confluent [protobuf wire 
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
>  Confluent kafka protobuf serde components use a custom header for the 
> message payload. The header needs to be parsed and interpreted. The existing 
> NIFI Confluent integration can decode Confluent type headers only for the 
> payload containing Avro data. For protobuf data, an additional 
> variable-length byte section gets added to the header by the Kafka protobuf 
> serializer. 
>  
> In addition, a new implementation for *ProtobufReader - 
> StandardProtobufReader* will be added. The existing ProtobufReader component 
> can't be easily extended without breaking backward compatibility because two 
> component properties *Proto Directory* and *Message Type* are always 
> required. So, even when the component is configured to use a schema registry 
> and schema reference reader, the user needs to provide those two parameters. 
> It looks like the component was never intended to be used with any other 
> *Schema Access Strategy* than {*}Generate From Proto File{*}.  Additionally, 
> the current implementation can't work in a cluster without copying protobuf 
> schemas manually onto every node.  
> *StandardProtobufReader* will not support the *Generate From Proto File* 
> schema access strategy. It will support the remaining ones from the list 
> below:
> !image-2025-07-17-10-32-37-291.png!
> h1. History
> Initially, the changes have been pushed for review in [this 
> pr.|https://github.com/apache/nifi/pull/10094] However, after an initial look 
> by [~exceptionfactory], we decided that it's reasonable to create a new 
> implementation of ProtobufReader instead of maintaining the old one. In 
> addition, the PR will be divided into two smaller ones. First will cover 
> changes to the confluent-bundle and common apis, and the second will cover 
> the new ProtobufReader implementation
> With this context, the usecase below uses:
> {color:#de350b}*NEW* - {color}to indicate a feature or a component will be 
> added as a result of this ticket.
> {color:#de350b}*EXISTING*{color} - to indicate a feature or a component 
> existing in the current NIFI codebase
> h2.  
> h1. The use case
>  # The user has a schema registry and a Kafka cluster with the topic 
> containing messages encoded according to the Confluent [protobuf wire 
> format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
>  (Magic byte + 4byte schema_id + varint message indexes + payload)
>  # The user configures *StandardProtobufReader* **  service
>  # In *StandardProtobufReader* user sets Schema Access Strategy property to 
> *Schema Reference Reader (EXISTING)*
>  # In *StandardProtobufReader* sets Schema Registry property *(EXISTING) 
> ConfluentSchemaRegistry*
>  ## *ConfluentSchemaRegistry* was extended to support the fetching of 
> referenced/included schemas. If a protobuf schema imports other schemas, they 
> are fetched from the registry recursively. *(NEW)*
>  ## *SchemaRegistry (EXISTING)* interface was extended to support returning a 
> raw schema form ({*}NEW{*}), in addition to the currently returned 
> RecordSchema. With this change, a new abstraction was introduced to 
> nifi-schema-registry-service-api - *SchemaDefinition (NEW).* The intention of 
> this is to represent a raw form of various schemas.
>  # In *StandardProtobufReader* user sets Schema Reference reader ({*}existing 
> ConfluentEncodedSchemaReferenceReader){*}
>  # In *StandardProtobufReader* user sets *Message Name Resolver Strategy* 
> property {*}(NEW){*}. Two options are available. One - *Message Name 
> Property* *(NEW),* Two - *Message Name Resolver Service (NEW)*
>  ## Option one will exist for simple cases where the user knows the name of 
> the protobuf message upfront, and the message will not change dynamically 
> during normal work.
>  ## Option two allows the user to choose the controller service responsible 
> for resolving the message name. In this case, the controller service looks at 
> the header of the message payload to get the information needed to locate the 
> Message within the protobuf schema (the information is a message index array, 
> see protobuf wire format). This is done by 
> *ConfluentProtobufMessageNameResolver(NEW)* service.
>  ### *Explanation:* Why do we need message name resolving? Protobuf schemas 
> can contain many message definitions at the root level (see example below). 
> We need to explicitly give the message name to the deserializer for it to be 
> able to do its job. A Schema file or schema text alone may not be enough 
> because it may be ambiguous. The ProtobufReader/2 always needs the name of 
> the proto schema message to be able to decode the binary payload.  The new 
> strategy defines a new interface *MessageNameResolver* *(NEW)* in 
> nifi-schema-registry-service-api. Currently, one implementation exists: 
> *ConfluentProtobufMessageNameResolver* *(NEW)* in 
> nifi-confluent-platform-bundle.
>  ### The ConfluentProtobufMessageNameResolver looks at the second byte in the 
> input stream to decode Confluent standardized *message name indexes.* More 
> about it here: [protobuf wire 
> format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
>  The indexes are variable length and can be [0][2][0] or [1] etc. Indexes are 
> zigzag encoded varints. *(NEW)*
>  ### *Parsing of the protobuf message schemas:* After 
> *ConfluentProtobufMessageNameResolver* decodes the message indexes it looks 
> for the protobuf message name in the *SchemaDefinition (NEW).* Messages can 
> be nested, so the message index decoded from the content can point to the 
> nested message like `MessageA.MessageE.MessageG` {{  
> !image-2025-07-16-12-52-20-296.png!   }}
>  ### The *ConfluentMessageNameResolver* needs to parse the schema in order to 
> know the structure of the schema and locate the right message name *(NEW).* 
> For this *ANTLR* parser is used {*}(NEW){*}. Proto3 grammar is being 
> downloaded from the antlr repository at build time. Then the maven plugin 
> generates the Parser for proto3 schema format.
>  #### *Why not use the wire library to parse it?* In order to get the schema 
> message, we need to parse the schema, not compile it. The difference is that 
> we can parse an incomplete schema as opposed to compilation, where we would 
> need a schema + imported schemas. Confluent in the implementation of its 
> schema-registry uses a parser from an internal wire package: 
> com.squareup.wire.schema.internal.parser.*. The decision was made not to use 
> internal classes from 3rd party library. I think they are not public anymore 
> in the newer versions. Additionally, I did not want to add a wire library 
> dependency and its dependencies to the confluent-bundle because I remembered 
> the maintainers favor keeping the memory footprint small. ANTLr parser uses a 
> very small standalone antl-runtime.jar
>  # The user configures *ConsumeKafka* processor with the processing strategy 
> set to *RECORD (existing)* and sets the *Record Reader* property to the 
> instance configured in earlier steps
> h1. New components introduced to API in nifi-schema-registry-service-api
> h2. SchemaDefinition
> A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text, and 
> a collection of referenced {_}SchemaDefinitions{_}.
> h2. MessageNameResolver
> In the case of Protobuf a single proto file may contain multiple message 
> entries. To understand what message to use for decoding, a message index is 
> usually encoded in the payload. And as with a schema reference, different 
> systems encode this index differently.
> _MessageNameResolver_ is a new _ControllerService_ that is going to translate 
> an encoded Protobuf message index into a Protobuf Message name that’s later 
> going to be used to find a correct Protobuf message for deserialization.
> Schema Registry dependent implementations will land in their respective 
> modules. E.g., _nifi-confluent-schema-registry-service_ for Confluent.
> h2. MessageName
> Abstracts value returned by the MessageNameResolver
> h1. New components in nifi-protobuf-bundle
> h2. *StandardProtobufReader*
> Working name. The component will extend an abstract *SchemaRegistryService* 
> like all Readers do. 
> It will support the following Schema Access Strategies:
> !image-2025-07-17-14-42-31-326.png|width=616,height=356!!image-2025-07-17-14-43-58-023.png!!image-2025-07-17-14-45-47-577.png!
>  
> *Message Name Resolver Strategy* property will have two available allowable 
> values:
> !image-2025-07-17-14-52-18-792.png!
> When *Message name property* value gets selected, the component will show a 
> new property (not visible on screens above) called *Message Name,* where the 
> user can provide FQN name of the message 
> When *Message Name Resolver Service* gets selected, the component will show a 
> new property (not visible on screens above) called *Message Name Resolver 
> Service,* where a user can provide a reference to the service.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to