[ 
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lukas Kucharski updated NIFI-14424:
-----------------------------------
    Description: 
h2.   Summary

  Add Confluent Protobuf Wire Format Support - Foundation Services and APIs
h2.   Description

  This enhancement introduces foundational support for deserializing Protobuf 
messages encoded using the Confluent protobuf wire format in NiFi. The existing 
Confluent Schema Registry integration only supported Avro data decoding. 
Confluent's Protobuf serialization adds a variable-length byte section to the 
message header containing message indexes that must be parsed and interpreted.

  This ticket represents Phase 1 of the implementation, providing the core 
services and API extensions. Phase 2 will introduce the new 
StandardProtobufReader component.
h2.   Key Components Implemented
h3.   API Extensions (nifi-schema-registry-service-api)

  - {*}SchemaDefinition{*}: New abstraction representing raw schema forms with 
schema text and referenced schema collections
  - {*}MessageNameResolver{*}: Controller service interface for translating 
encoded Protobuf message indexes into message names
  - {*}MessageName{*}: Abstraction for values returned by MessageNameResolver
h3.   Confluent Platform Bundle Enhancements

  - {*}ConfluentProtobufMessageNameResolver{*}: Service that decodes 
Confluent-standardized message name indexes from wire format using zigzag 
encoded varints
  - Enhanced {*}RestSchemaRegistryClient{*}: Extended to support recursive 
fetching of referenced/imported Protobuf schemas
  - {*}ProtobufMessageSchema{*}: Schema representation supporting nested 
message parsing
  - ANTLR Parser Integration: Added Proto3 grammar parser for schema structure 
analysis without requiring full compilation
h3.   Wire Format Processing

  - Message Index Decoding: Parses variable-length message indexes (e.g., 
[0][2][0] or [1]) from Confluent wire format headers
  - Nested Message Support: Resolves complex message paths like 
MessageA.MessageE.MessageG
  - Schema Structure Analysis: Parses Protobuf schemas to locate correct 
message definitions for deserialization
h2.   Next Phase

  Phase 2 (linked issue) will implement *StandardProtobufReader* component that 
utilizes these foundational services to provide full Protobuf record processing 
capabilities with Schema Registry integration.

 

  was:
h1. Introduction

The ticket is split into 2 PRs. 
The fist PR [https://github.com/apache/nifi/pull/10105] 
The second PR https://github.com/apache/nifi/pull/10214

 

This change enables NIFI to deserialize protobuf messages encoded using the 
Confluent [protobuf wire 
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
 Confluent kafka protobuf serde components use a custom header for the message 
payload. The header needs to be parsed and interpreted. The existing NIFI 
Confluent integration can decode Confluent type headers only for the payload 
containing Avro data. For protobuf data, an additional variable-length byte 
section gets added to the header by the Kafka protobuf serializer. 

 

In addition, a new implementation for *ProtobufReader - StandardProtobufReader* 
will be added. The existing ProtobufReader component can't be easily extended 
without breaking backward compatibility because two component properties *Proto 
Directory* and *Message Type* are always required. So, even when the component 
is configured to use a schema registry and schema reference reader, the user 
needs to provide those two parameters. It looks like the component was never 
intended to be used with any other *Schema Access Strategy* than {*}Generate 
From Proto File{*}.  Additionally, the current implementation can't work in a 
cluster without copying protobuf schemas manually onto every node.  

*StandardProtobufReader* will not support the *Generate From Proto File* schema 
access strategy. It will support the remaining ones from the list below:
!image-2025-07-17-10-32-37-291.png!
h1. History

Initially, the changes have been pushed for review in [this 
pr.|https://github.com/apache/nifi/pull/10094] However, after an initial look 
by [~exceptionfactory], we decided that it's reasonable to create a new 
implementation of ProtobufReader instead of maintaining the old one. In 
addition, the PR will be divided into two smaller ones. First will cover 
changes to the confluent-bundle and common apis, and the second will cover the 
new ProtobufReader implementation

With this context, the usecase below uses:

{color:#de350b}*NEW* - {color}to indicate a feature or a component will be 
added as a result of this ticket.

{color:#de350b}*EXISTING*{color} - to indicate a feature or a component 
existing in the current NIFI codebase
h2.  
h1. The use case
 # The user has a schema registry and a Kafka cluster with the topic containing 
messages encoded according to the Confluent [protobuf wire 
format|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
 (Magic byte + 4byte schema_id + varint message indexes + payload)
 # The user configures *StandardProtobufReader* **  service
 # In *StandardProtobufReader* user sets Schema Access Strategy property to 
*Schema Reference Reader (EXISTING)*
 # In *StandardProtobufReader* sets Schema Registry property *(EXISTING) 
ConfluentSchemaRegistry*
 ## *ConfluentSchemaRegistry* was extended to support the fetching of 
referenced/included schemas. If a protobuf schema imports other schemas, they 
are fetched from the registry recursively. *(NEW)*
 ## *SchemaRegistry (EXISTING)* interface was extended to support returning a 
raw schema form ({*}NEW{*}), in addition to the currently returned 
RecordSchema. With this change, a new abstraction was introduced to 
nifi-schema-registry-service-api - *SchemaDefinition (NEW).* The intention of 
this is to represent a raw form of various schemas.
 # In *StandardProtobufReader* user sets Schema Reference reader ({*}existing 
ConfluentEncodedSchemaReferenceReader){*}
 # In *StandardProtobufReader* user sets *Message Name Resolver Strategy* 
property {*}(NEW){*}. Two options are available. One - *Message Name Property* 
*(NEW),* Two - *Message Name Resolver Service (NEW)*
 ## Option one will exist for simple cases where the user knows the name of the 
protobuf message upfront, and the message will not change dynamically during 
normal work.
 ## Option two allows the user to choose the controller service responsible for 
resolving the message name. In this case, the controller service looks at the 
header of the message payload to get the information needed to locate the 
Message within the protobuf schema (the information is a message index array, 
see protobuf wire format). This is done by 
*ConfluentProtobufMessageNameResolver(NEW)* service.
 ### *Explanation:* Why do we need message name resolving? Protobuf schemas can 
contain many message definitions at the root level (see example below). We need 
to explicitly give the message name to the deserializer for it to be able to do 
its job. A Schema file or schema text alone may not be enough because it may be 
ambiguous. The ProtobufReader/2 always needs the name of the proto schema 
message to be able to decode the binary payload.  The new strategy defines a 
new interface *MessageNameResolver* *(NEW)* in 
nifi-schema-registry-service-api. Currently, one implementation exists: 
*ConfluentProtobufMessageNameResolver* *(NEW)* in 
nifi-confluent-platform-bundle.
 ### The ConfluentProtobufMessageNameResolver looks at the second byte in the 
input stream to decode Confluent standardized *message name indexes.* More 
about it here: [protobuf wire 
format.|https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format]
 The indexes are variable length and can be [0][2][0] or [1] etc. Indexes are 
zigzag encoded varints. *(NEW)*
 ### *Parsing of the protobuf message schemas:* After 
*ConfluentProtobufMessageNameResolver* decodes the message indexes it looks for 
the protobuf message name in the *SchemaDefinition (NEW).* Messages can be 
nested, so the message index decoded from the content can point to the nested 
message like `MessageA.MessageE.MessageG` {{  
!image-2025-07-16-12-52-20-296.png!   }}
 ### The *ConfluentMessageNameResolver* needs to parse the schema in order to 
know the structure of the schema and locate the right message name *(NEW).* For 
this *ANTLR* parser is used {*}(NEW){*}. Proto3 grammar is being downloaded 
from the antlr repository at build time. Then the maven plugin generates the 
Parser for proto3 schema format.
 #### *Why not use the wire library to parse it?* In order to get the schema 
message, we need to parse the schema, not compile it. The difference is that we 
can parse an incomplete schema as opposed to compilation, where we would need a 
schema + imported schemas. Confluent in the implementation of its 
schema-registry uses a parser from an internal wire package: 
com.squareup.wire.schema.internal.parser.*. The decision was made not to use 
internal classes from 3rd party library. I think they are not public anymore in 
the newer versions. Additionally, I did not want to add a wire library 
dependency and its dependencies to the confluent-bundle because I remembered 
the maintainers favor keeping the memory footprint small. ANTLr parser uses a 
very small standalone antl-runtime.jar
 # The user configures *ConsumeKafka* processor with the processing strategy 
set to *RECORD (existing)* and sets the *Record Reader* property to the 
instance configured in earlier steps

h1. New components introduced to API in nifi-schema-registry-service-api
h2. SchemaDefinition

A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text, and a 
collection of referenced {_}SchemaDefinitions{_}.
h2. MessageNameResolver

In the case of Protobuf a single proto file may contain multiple message 
entries. To understand what message to use for decoding, a message index is 
usually encoded in the payload. And as with a schema reference, different 
systems encode this index differently.

_MessageNameResolver_ is a new _ControllerService_ that is going to translate 
an encoded Protobuf message index into a Protobuf Message name that’s later 
going to be used to find a correct Protobuf message for deserialization.

Schema Registry dependent implementations will land in their respective 
modules. E.g., _nifi-confluent-schema-registry-service_ for Confluent.
h2. MessageName

Abstracts value returned by the MessageNameResolver
h1. New components in nifi-protobuf-bundle
h2. *StandardProtobufReader*

Working name. The component will extend an abstract *SchemaRegistryService* 
like all Readers do. 

It will support the following Schema Access Strategies:

!image-2025-07-17-14-42-31-326.png|width=616,height=356!!image-2025-07-17-14-43-58-023.png!!image-2025-07-17-14-45-47-577.png!

 

*Message Name Resolver Strategy* property will have two available allowable 
values:

!image-2025-07-17-14-52-18-792.png!

When *Message name property* value gets selected, the component will show a new 
property (not visible on screens above) called *Message Name,* where the user 
can provide FQN name of the message 
When *Message Name Resolver Service* gets selected, the component will show a 
new property (not visible on screens above) called *Message Name Resolver 
Service,* where a user can provide a reference to the service.


> Support for Confluent schema registry Protobuf wire format in 
> StandardProtobufReader
> ------------------------------------------------------------------------------------
>
>                 Key: NIFI-14424
>                 URL: https://issues.apache.org/jira/browse/NIFI-14424
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Alaksiej Ščarbaty
>            Assignee: Lukas Kucharski
>            Priority: Major
>         Attachments: image-2025-07-16-12-52-20-296.png, 
> image-2025-07-17-10-32-37-291.png, image-2025-07-17-14-40-58-260.png, 
> image-2025-07-17-14-42-31-326.png, image-2025-07-17-14-43-58-023.png, 
> image-2025-07-17-14-45-47-577.png, image-2025-07-17-14-51-24-943.png, 
> image-2025-07-17-14-52-18-792.png
>
>          Time Spent: 13h 20m
>  Remaining Estimate: 0h
>
> h2.   Summary
>   Add Confluent Protobuf Wire Format Support - Foundation Services and APIs
> h2.   Description
>   This enhancement introduces foundational support for deserializing Protobuf 
> messages encoded using the Confluent protobuf wire format in NiFi. The 
> existing Confluent Schema Registry integration only supported Avro data 
> decoding. Confluent's Protobuf serialization adds a variable-length byte 
> section to the message header containing message indexes that must be parsed 
> and interpreted.
>   This ticket represents Phase 1 of the implementation, providing the core 
> services and API extensions. Phase 2 will introduce the new 
> StandardProtobufReader component.
> h2.   Key Components Implemented
> h3.   API Extensions (nifi-schema-registry-service-api)
>   - {*}SchemaDefinition{*}: New abstraction representing raw schema forms 
> with schema text and referenced schema collections
>   - {*}MessageNameResolver{*}: Controller service interface for translating 
> encoded Protobuf message indexes into message names
>   - {*}MessageName{*}: Abstraction for values returned by MessageNameResolver
> h3.   Confluent Platform Bundle Enhancements
>   - {*}ConfluentProtobufMessageNameResolver{*}: Service that decodes 
> Confluent-standardized message name indexes from wire format using zigzag 
> encoded varints
>   - Enhanced {*}RestSchemaRegistryClient{*}: Extended to support recursive 
> fetching of referenced/imported Protobuf schemas
>   - {*}ProtobufMessageSchema{*}: Schema representation supporting nested 
> message parsing
>   - ANTLR Parser Integration: Added Proto3 grammar parser for schema 
> structure analysis without requiring full compilation
> h3.   Wire Format Processing
>   - Message Index Decoding: Parses variable-length message indexes (e.g., 
> [0][2][0] or [1]) from Confluent wire format headers
>   - Nested Message Support: Resolves complex message paths like 
> MessageA.MessageE.MessageG
>   - Schema Structure Analysis: Parses Protobuf schemas to locate correct 
> message definitions for deserialization
> h2.   Next Phase
>   Phase 2 (linked issue) will implement *StandardProtobufReader* component 
> that utilizes these foundational services to provide full Protobuf record 
> processing capabilities with Schema Registry integration.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to