1996fanrui opened a new pull request, #26831:
URL: https://github.com/apache/flink/pull/26831

   ## What is the purpose of the change
   
   Currently, hashmap, RocksDB and ForSt are compatible with the case where the 
user value of the map state is null (Including  write, read, checkpoint and 
restore), but it has an asymmetry that causing a NPE issue.
   
   The core issue lies in the current implementation of 
serializeValueNullSensitive. It unconditionally calls 
valueSerializer.serialize(value, out) even when the value is null. This design 
has a significant flaw: the behavior of the state backend becomes dependent on 
the specific implementation of the TypeSerializer. If a serializer is not 
designed to handle null inputs, it will throw a NullPointerException, causing 
unexpected failures.
   The most compelling argument for the change comes from the corresponding 
deserialization logic found in RocksDBMapState 
[https://github.com/apache/flink/blob/bf1cd860617f7b51ac91516814c0e931e5bba241/flin[…]c/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java](https://github.com/apache/flink/blob/bf1cd860617f7b51ac91516814c0e931e5bba241/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java#L413)
   During deserialization, the code first reads a boolean flag to check if the 
stored value is null. If it is, the method immediately returns null and does 
not attempt to deserialize any further data from the input stream.
   
   This reveals a critical asymmetry:
      1. Serialization: Writes a isNull flag and then always attempts to 
serialize the value object, even if it's null.
      2. Deserialization: Reads the isNull flag and, if true, never attempts to 
deserialize the value object.
   The data written by serializer.serialize(null, ...) is therefore completely 
redundant and ignored during deserialization.
   
   
   NPE Risk: The behavior becomes dependent on the TypeSerializer 
implementation. Serializers not designed to handle null inputs will throw 
NullPointerException. 
   
   Unfortunately, many core serializers in Flink, such as 
`org.apache.flink.api.common.typeutils.base.IntSerializer` and 
`org.apache.flink.api.common.typeutils.base.BooleanSerializer`, are designed to 
be null-unsafe and will throw a {@link NullPointerException} if they encounter 
a null value.
   
   ## Brief change log
   
   - [FLINK-38137][state] RocksDB and ForSt state backend are compatible with 
the case where the user value of the map state is null
     - Don't serialize user value when it's null
     - The user value is not used during deserialization, so it doesn't 
introduce any incompatibility problem
   - [FLINK-38137][state] Disable testMapStateWithNullValue for change log 
state backend temporarily
     - Change log state backend is not compatible with the case where the user 
value of the map state is null
     - Code path: 
https://github.com/apache/flink/blob/31785e076c86d0a44e3f4a17f44a04908a2d3eb4/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java#L224
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - Added `StateBackendTestBase#testMapStateWithNullValue`, it checks the 
put and read map state when user value is null
     - Added `MapStateNullValueCheckpointingITCase`, it checks whether 
checkpoint and restore work as expected when the user value of MapState is null.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper:  no
     - The S3 file system connector:  no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to