What would be the better / less dirty way of expiring a MapState or more
generally being able to access entries of a keyed state at some interval ?
The currentKey has to be set in the AbstractRocksDBState.

Incrementing a counter in the rich function and at some modulo, doing the
job does not really feel good.

2017-03-15 11:44 GMT+01:00 Vladislav Pernin (JIRA) <j...@apache.org>:

> Vladislav Pernin created FLINK-6061:
> ---------------------------------------
>
>              Summary: NPE on TypeSerializer.serialize with a
> RocksDBStateBackend calling state.entries in the open() function
>                  Key: FLINK-6061
>                  URL: https://issues.apache.org/jira/browse/FLINK-6061
>              Project: Flink
>           Issue Type: Bug
>           Components: DataStream API, State Backends, Checkpointing,
> Streaming
>     Affects Versions: 1.3.0
>             Reporter: Vladislav Pernin
>
>
> With a default state (heap), the call to state.entries() "nicely fails"
> with a IllegalStateException :
> {noformat}
> Caused by: java.lang.IllegalStateException: No key set.
>         at org.apache.flink.util.Preconditions.checkState(
> Preconditions.java:195)
>         at org.apache.flink.runtime.state.heap.HeapMapState.
> entries(HeapMapState.java:188)
>         at org.apache.flink.runtime.state.UserFacingMapState.
> entries(UserFacingMapState.java:77)
>         at org.apache.flink.Reproducer$FailingMapWithState.open(
> Reproducer.java:78)
>         at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
>         at org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:252)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
>         at java.lang.Thread.run(Thread.java:745)
> {noformat}
>
> With a RocksDBStateBackend, it fails with a NPE :
> {noformat}
> Caused by: java.lang.NullPointerException
>         at org.apache.flink.api.common.typeutils.base.
> ShortSerializer.serialize(ShortSerializer.java:64)
>         at org.apache.flink.api.common.typeutils.base.
> ShortSerializer.serialize(ShortSerializer.java:27)
>         at org.apache.flink.contrib.streaming.state.
> AbstractRocksDBState.writeKey(AbstractRocksDBState.java:181)
>         at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.
> writeKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
>         at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.
> writeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:148)
>         at org.apache.flink.contrib.streaming.state.RocksDBMapState.
> serializeCurrentKeyAndNamespace(RocksDBMapState.java:263)
>         at org.apache.flink.contrib.streaming.state.
> RocksDBMapState.iterator(RocksDBMapState.java:196)
>         at org.apache.flink.contrib.streaming.state.
> RocksDBMapState.entries(RocksDBMapState.java:143)
>         at org.apache.flink.runtime.state.UserFacingMapState.
> entries(UserFacingMapState.java:77)
>         at org.apache.flink.Reproducer$FailingMapWithState.open(
> Reproducer.java:78)
>         at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
>         at org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:252)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
>         at java.lang.Thread.run(Thread.java:745)
> {noformat}
>
> The reason is that the record is null, because backend.getCurrentKey() is
> null (not yet set) in AbstractRocksDBState.
> This may also be the case for other RockDBState implementations.
>
> You can find the reproducer here based on 1.3-SNAPSHOT (needed for the
> MapState) :
> https://github.com/vpernin/flink-rocksdbstate-npe
>
> The reproducer is a non sense application. There is no MapState with TTL
> or expiration yet, so the goal is to try to shrink or expire the state at
> some interval.
> This could be done by iterating over the entries of the state and removing
> some of them.
>
> This could probably not be done in the open() method of a rich function.
> I also tried to implement CheckpointListener and to access the state
> content in notifyCheckpointComplete() method, but it fails to, I guess due
> to the asynchronous nature of the checkpoint.
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.15#6346)
>

Reply via email to