Hi Zakelly,
Thanks for sharing all these resources, I took some time to go through them and
wanted to share a quick summary and a few thoughts.
Here’s a brief recap of what was discussed on the mailing list (more details in
[1]):
They proposed introducing a BinarySortedMultiMapState<UK, UV> with two
user-facing APIs:
- BinarySortedMultiMapState<UK, UV> for storing multiple values per key (add(K
key, V value) to append a value, etc.)
- BinarySortedMapState<UK, UV> as a simpler option when there’s only one value
per key (a special case of the first one)
The readRange() method would use an eager-batch mode by default (fetching
entries in chunks of 128), which strikes a good balance between lazy access
(one by one) and fully eager reads.
Main challenge: ensuring that the binary representation preserves the same
ordering as the Java Comparable.
So keys would need a custom serializer::
interface LexicographicalTypeSerializer<T> extends TypeSerializer<T> {
Optional<Comparator<T>> findComparator(); // useful for HeapStateBackend
}
This allows users to define custom key types while preserving sort order.
For the HeapStateBackend, a skip list was suggested instead of a TreeMap, as it
offers better memory locality and fewer allocations, while still maintaining
O(log n) performance.
Note 1: From what I understand (I haven’t looked into Flink’s internals yet),
the proposed encoding should be compatible with ForSt as well.
Note 2: For the order book use case, sorting doubles (IEEE 754) is fairly
straightforward:
long toOrderedBits(double value) {
long bits = Double.doubleToRawLongBits(value);
return bits ^ (((bits >> 63) - 1L) | 0x8000000000000000L);
}
[1] https://cwiki.apache.org/confluence/x/Xo_FD
Thanks gain,
Charles
From: Zakelly Lan <[email protected]>
Date: Monday, April 14, 2025 at 5:55 AM
To: [email protected] <[email protected]>
Subject: Re: [DISCUSS] Missing TreeMapState abstraction for maintaining ordered
state
Hi Charles,
There is a FLIP-220[1] and discussions[2] about introducing a sorted map
state in binary order. But it seems like there is no further progress and
conclusion. It would be nice if anyone could drive this.
I would +1 for introducing this. As the RocksDB stores key-value in binary
order, we should also keep that order. And the API design requires more
discussion.
[1] https://cwiki.apache.org/confluence/x/Xo_FD
[2] https://lists.apache.org/thread/lbdpr1cjt9ddrotowmzzk4lv787b3t5z
Best,
Zakelly
On Sat, Apr 12, 2025 at 6:35 PM Charles COLELLA <[email protected]>
wrote:
> Hi all,
>
> I'm building a Flink application that processes real-time order book data
> (~70k events/sec), partitioned by instrument.
>
>
> For each key, I need to maintain an order book where levels are kept
> ordered by price. Since Flink's state API doesn't support ordered
> structures like a NavigableMap or TreeMap, I currently have to:
>
> - Store all levels in a MapState<Double, OrderLevel> for persistence,
>
> - Maintain a parallel TreeMap<Double, OrderLevel> in memory to preserve
> ordering,
>
> - Keep both in sync on every update,
>
> - And rebuild the memory structure from MapState during recovery.
>
>
> This adds significant overhead:
>
> - Extra memory (duplicate data structures),
>
> - Extra logic (synchronization and consistency),
>
> - Extra CPU cost (sorting, filtering, top-K extraction).
>
> The use case (maintaining sorted state per key) is common in financial
> applications, ranking systems, and any stream logic requiring efficient
> ordered access.
>
>
> I’ve tried to search the Jira, dev mailing list, and flips, but couldn’t
> find a discussion on 'TreeMapState', 'SortedMapState`, or abstraction that
> would support this.
>
>
> Has this been discussed before, or would it make sense to open a proposal?
>
>
> Thanks,
>
> Charles
>