arvindKandpal-ksolves opened a new pull request, #124:
URL: https://github.com/apache/flink-connector-pulsar/pull/124

   ## Purpose of the change
   
   This PR fixes 
[FLINK-36235](https://issues.apache.org/jira/browse/FLINK-36235). 
   Currently, when a message fails to deserialize and returns `null`, the 
`PulsarDeserializationSchemaWrapper` emits this `null` value downstream. This 
violates Flink's `DeserializationSchema` contract (where a `null` return value 
means "drop this record") and can cause `NullPointerException`s in downstream 
operators. This PR adds a simple null-check guard to safely drop these 
corrupted or null records.
   
   ## Brief change log
   
   - Added an `if (instance != null)` check before emitting the record in 
`PulsarDeserializationSchemaWrapper#deserialize`.
   - Added a regression test `wrapperDropsNullDeserializedRecord` and a 
`CountingCollector` helper class in `PulsarDeserializationSchemaTest` to verify 
that null records are correctly dropped without errors.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added unit tests in `PulsarDeserializationSchemaTest.java` to explicitly 
test and verify the filtering behavior when the inner deserializer returns 
`null`.
   - Verified locally using `mvn -pl flink-connector-pulsar test`.
   
   ## Significant changes
   
   - [ ] Dependencies have been added or upgraded
   - [ ] Public API has been changed (Public API is any class annotated with 
`@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [ ] New feature has been introduced
       - If yes, how is this documented? (not applicable / docs / JavaDocs / 
not documented)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to