What would be the better / less dirty way of expiring a MapState or more generally being able to access entries of a keyed state at some interval ? The currentKey has to be set in the AbstractRocksDBState.
Incrementing a counter in the rich function and at some modulo, doing the job does not really feel good. 2017-03-15 11:44 GMT+01:00 Vladislav Pernin (JIRA) <j...@apache.org>: > Vladislav Pernin created FLINK-6061: > --------------------------------------- > > Summary: NPE on TypeSerializer.serialize with a > RocksDBStateBackend calling state.entries in the open() function > Key: FLINK-6061 > URL: https://issues.apache.org/jira/browse/FLINK-6061 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing, > Streaming > Affects Versions: 1.3.0 > Reporter: Vladislav Pernin > > > With a default state (heap), the call to state.entries() "nicely fails" > with a IllegalStateException : > {noformat} > Caused by: java.lang.IllegalStateException: No key set. > at org.apache.flink.util.Preconditions.checkState( > Preconditions.java:195) > at org.apache.flink.runtime.state.heap.HeapMapState. > entries(HeapMapState.java:188) > at org.apache.flink.runtime.state.UserFacingMapState. > entries(UserFacingMapState.java:77) > at org.apache.flink.Reproducer$FailingMapWithState.open( > Reproducer.java:78) > at org.apache.flink.api.common.functions.util.FunctionUtils. > openFunction(FunctionUtils.java:36) > at org.apache.flink.streaming.api.operators. > AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > openAllOperators(StreamTask.java:376) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670) > at java.lang.Thread.run(Thread.java:745) > {noformat} > > With a RocksDBStateBackend, it fails with a NPE : > {noformat} > Caused by: java.lang.NullPointerException > at org.apache.flink.api.common.typeutils.base. > ShortSerializer.serialize(ShortSerializer.java:64) > at org.apache.flink.api.common.typeutils.base. > ShortSerializer.serialize(ShortSerializer.java:27) > at org.apache.flink.contrib.streaming.state. > AbstractRocksDBState.writeKey(AbstractRocksDBState.java:181) > at org.apache.flink.contrib.streaming.state.AbstractRocksDBState. > writeKeyWithGroupAndNamespace(AbstractRocksDBState.java:163) > at org.apache.flink.contrib.streaming.state.AbstractRocksDBState. > writeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:148) > at org.apache.flink.contrib.streaming.state.RocksDBMapState. > serializeCurrentKeyAndNamespace(RocksDBMapState.java:263) > at org.apache.flink.contrib.streaming.state. > RocksDBMapState.iterator(RocksDBMapState.java:196) > at org.apache.flink.contrib.streaming.state. > RocksDBMapState.entries(RocksDBMapState.java:143) > at org.apache.flink.runtime.state.UserFacingMapState. > entries(UserFacingMapState.java:77) > at org.apache.flink.Reproducer$FailingMapWithState.open( > Reproducer.java:78) > at org.apache.flink.api.common.functions.util.FunctionUtils. > openFunction(FunctionUtils.java:36) > at org.apache.flink.streaming.api.operators. > AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > openAllOperators(StreamTask.java:376) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670) > at java.lang.Thread.run(Thread.java:745) > {noformat} > > The reason is that the record is null, because backend.getCurrentKey() is > null (not yet set) in AbstractRocksDBState. > This may also be the case for other RockDBState implementations. > > You can find the reproducer here based on 1.3-SNAPSHOT (needed for the > MapState) : > https://github.com/vpernin/flink-rocksdbstate-npe > > The reproducer is a non sense application. There is no MapState with TTL > or expiration yet, so the goal is to try to shrink or expire the state at > some interval. > This could be done by iterating over the entries of the state and removing > some of them. > > This could probably not be done in the open() method of a rich function. > I also tried to implement CheckpointListener and to access the state > content in notifyCheckpointComplete() method, but it fails to, I guess due > to the asynchronous nature of the checkpoint. > > > > -- > This message was sent by Atlassian JIRA > (v6.3.15#6346) >