Vladislav Pernin created FLINK-6061:
---------------------------------------
Summary: NPE on TypeSerializer.serialize with a
RocksDBStateBackend calling state.entries in the open() function
Key: FLINK-6061
URL: https://issues.apache.org/jira/browse/FLINK-6061
Project: Flink
Issue Type: Bug
Components: DataStream API, State Backends, Checkpointing, Streaming
Affects Versions: 1.3.0
Reporter: Vladislav Pernin
With a default state (heap), the call to state.entries() "nicely fails" with a
IllegalStateException :
{noformat}
Caused by: java.lang.IllegalStateException: No key set.
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.runtime.state.heap.HeapMapState.entries(HeapMapState.java:188)
at
org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)
at
org.apache.flink.Reproducer$FailingMapWithState.open(Reproducer.java:78)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
at java.lang.Thread.run(Thread.java:745)
{noformat}
With a RocksDBStateBackend, it fails with a NPE :
{noformat}
Caused by: java.lang.NullPointerException
at
org.apache.flink.api.common.typeutils.base.ShortSerializer.serialize(ShortSerializer.java:64)
at
org.apache.flink.api.common.typeutils.base.ShortSerializer.serialize(ShortSerializer.java:27)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKey(AbstractRocksDBState.java:181)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:148)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState.serializeCurrentKeyAndNamespace(RocksDBMapState.java:263)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState.iterator(RocksDBMapState.java:196)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState.entries(RocksDBMapState.java:143)
at
org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)
at
org.apache.flink.Reproducer$FailingMapWithState.open(Reproducer.java:78)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
at java.lang.Thread.run(Thread.java:745)
{noformat}
The reason is that the record is null, because backend.getCurrentKey() is null
(not yet set) in AbstractRocksDBState.
This may also be the case for other RockDBState implementations.
You can find the reproducer here based on 1.3-SNAPSHOT (needed for the
MapState) :
https://github.com/vpernin/flink-rocksdbstate-npe
The reproducer is a non sense application. There is no MapState with TTL or
expiration yet, so the goal is to try to shrink or expire the state at some
interval.
This could be done by iterating over the entries of the state and removing some
of them.
This could probably not be done in the open() method of a rich function.
I also tried to implement CheckpointListener and to access the state content in
notifyCheckpointComplete() method, but it fails to, I guess due to the
asynchronous nature of the checkpoint.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)