Sean Z created FLINK-19709:
------------------------------
Summary: Support KeyedSortedMapState in DataStream API
Key: FLINK-19709
URL: https://issues.apache.org/jira/browse/FLINK-19709
Project: Flink
Issue Type: New Feature
Components: API / DataStream, Runtime / State Backends
Reporter: Sean Z
Current DataStream API doesn't have SortedMapState supported. There are lots of
use cases based on sorted time-series data like range-query or higher/lower key
fetch, and ordered data seems like a nature of time-series stream processing.
Therefore, we propose to support the KeyedSortedMapState feature.
There were some previous discussions
[FLINK-6219|https://issues.apache.org/jira/browse/FLINK-6219] about
SortedMapState, and the thread was closed because blink code might cover this
feature. However, the [blink
code|[https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java]]
wasn't merged into the master branch since then. The major concern is the
inconsistent comparison between heap/off-heap state backends. In RocksDB, the
comparison should be based on bytes, which makes generic key types support
challenging, and in heap state backend, the comparison is more about Comparable
interface.
There are two possible solutions to this issue as discussed in dev email list,
1. There is a prototype feature called TemporalState which could limit the key
type to Long type. It makes sense for most of the use cases are about timestamp
as a key but it brings limitations to support generic key types. We also need
to gracefully handle negative long.
2. We keep the different sorting behavior of different state backends and set
it to bytes comparison for given serialization by default in off-heap state
backends. Let users provide their own specific serializer if they want to sort
some customized type on RocksDB. This solution could start with blink
implementation, which has already supported serializing various key types
(almost all the atomic types) into an ordered bytes.
[code|[https://github.com/apache/flink/tree/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered]]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)