khaledh opened a new pull request, #28108:
URL: https://github.com/apache/flink/pull/28108

   ## What is the purpose of the change
   
   Protobuf allows recursive message definitions (e.g., `message Node { Node 
child = 1; }`). When Flink's protobuf format encounters such a schema, 
`PbToRowTypeUtil.generateRowType` recurses infinitely and crashes with 
`StackOverflowError`. This makes it impossible to use the protobuf format with 
any proto schema that contains recursive references.
   
   This change adds cycle detection to `PbToRowTypeUtil`. When a message type 
is encountered that is already being resolved in the current ancestor chain, 
the recursive field is emitted as `VARBINARY` (raw protobuf bytes) instead of 
recursing. The protobuf binary data is preserved and can be deserialized on 
demand by consumers if needed.
   
   ## Brief change log
   
   - `PbToRowTypeUtil` tracks ancestor message types during schema generation 
and emits `VARBINARY` for fields that would cause infinite recursion
   - `PbCodegenBytesDeserializer` generates code to serialize recursive message 
fields to their raw protobuf bytes via `.toByteArray()`
   - `PbCodegenDeserializeFactory` routes MESSAGE fields mapped to `VARBINARY` 
to the new bytes deserializer
   - `PbSchemaValidationUtils` accepts `VARBINARY` as a valid mapping for 
MESSAGE fields at recursive boundaries
   
   ## Verifying this change
   
   **Note: This change has been reviewed and deployed internally at Shopify on 
production Flink jobs with no regressions since March 13, 2026.**
   
   This change added tests and can be verified as follows:
   
   - Added `RecursiveMessageProtoToRowTest` with 6 tests covering:
     - Schema generation produces `VARBINARY` for recursive fields instead of 
crashing
     - Nested data is preserved as parseable protobuf bytes through the full 
deserialization pipeline
     - Proto3 unset recursive fields produce empty bytes (default instance) 
regardless of `readDefaultValues` setting
     - Proto2 unset recursive fields are null with `readDefaultValues=false` 
(explicit field presence)
     - Proto2 unset recursive fields produce empty bytes with 
`readDefaultValues=true`
     - Proto2 set recursive fields are preserved as bytes
   - Added `test_recursive_message.proto` (proto3) and 
`test_recursive_message_proto2.proto` (proto2) test protos with 
self-referencing message definitions
   - All 70 `flink-protobuf` tests pass with no regressions
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
     - The serializers: **yes** - adds handling for a new case in protobuf 
deserialization (recursive messages)
     - The runtime per-record code paths (performance sensitive): **no** - 
cycle detection runs at planning time only, not per-record
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no**
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [x] Yes (please specify the tool below)
   
   Generated-by: Claude Code
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to