1996fanrui opened a new pull request, #26831: URL: https://github.com/apache/flink/pull/26831
## What is the purpose of the change Currently, hashmap, RocksDB and ForSt are compatible with the case where the user value of the map state is null (Including write, read, checkpoint and restore), but it has an asymmetry that causing a NPE issue. The core issue lies in the current implementation of serializeValueNullSensitive. It unconditionally calls valueSerializer.serialize(value, out) even when the value is null. This design has a significant flaw: the behavior of the state backend becomes dependent on the specific implementation of the TypeSerializer. If a serializer is not designed to handle null inputs, it will throw a NullPointerException, causing unexpected failures. The most compelling argument for the change comes from the corresponding deserialization logic found in RocksDBMapState [https://github.com/apache/flink/blob/bf1cd860617f7b51ac91516814c0e931e5bba241/flin[…]c/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java](https://github.com/apache/flink/blob/bf1cd860617f7b51ac91516814c0e931e5bba241/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java#L413) During deserialization, the code first reads a boolean flag to check if the stored value is null. If it is, the method immediately returns null and does not attempt to deserialize any further data from the input stream. This reveals a critical asymmetry: 1. Serialization: Writes a isNull flag and then always attempts to serialize the value object, even if it's null. 2. Deserialization: Reads the isNull flag and, if true, never attempts to deserialize the value object. The data written by serializer.serialize(null, ...) is therefore completely redundant and ignored during deserialization. NPE Risk: The behavior becomes dependent on the TypeSerializer implementation. Serializers not designed to handle null inputs will throw NullPointerException. Unfortunately, many core serializers in Flink, such as `org.apache.flink.api.common.typeutils.base.IntSerializer` and `org.apache.flink.api.common.typeutils.base.BooleanSerializer`, are designed to be null-unsafe and will throw a {@link NullPointerException} if they encounter a null value. ## Brief change log - [FLINK-38137][state] RocksDB and ForSt state backend are compatible with the case where the user value of the map state is null - Don't serialize user value when it's null - The user value is not used during deserialization, so it doesn't introduce any incompatibility problem - [FLINK-38137][state] Disable testMapStateWithNullValue for change log state backend temporarily - Change log state backend is not compatible with the case where the user value of the map state is null - Code path: https://github.com/apache/flink/blob/31785e076c86d0a44e3f4a17f44a04908a2d3eb4/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java#L224 ## Verifying this change This change added tests and can be verified as follows: - Added `StateBackendTestBase#testMapStateWithNullValue`, it checks the put and read map state when user value is null - Added `MapStateNullValueCheckpointingITCase`, it checks whether checkpoint and restore work as expected when the user value of MapState is null. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
