Hi Experts,

Recently, I was learning how temporal table join works in Flink via reading
the source code of
TemporalRowTimeJoinOperator. and I found these comments in the source code:

    /**
>      * Mapping from artificial row index (generated by `nextLeftIndex`)
> into the left side `Row`. We
>      * can not use List to accumulate Rows, because we need efficient
> deletes of the oldest rows.
>      *
>      * <p>TODO: this could be OrderedMultiMap[Jlong, Row] indexed by row's
> timestamp, to avoid full
>      * map traversals (if we have lots of rows on the state that exceed
> `currentWatermark`).
>      */
>     private transient MapState<Long, RowData> leftState;
>

AFAIK that currently Flink hasn't supported such a complex map state,
OrderedMultiMap, so
I tried to implement a similar one [1] that meets the requirement via
existing map state but having
some space overhead. And I need some feedback from you about this
implementation.

Before explaining the overhead and trade-off between current implementation
and min, let me try to
give a brief introduction of my implementation.

First, I implemented a min-heap state so that I can use it to extract the
earliest row time of left rows.
based on this heap, I implemented a data structure similar to adjacency
list, that I can use it to
simulate a MapState<K, List<V>> state and archive putting a new value
without deserializing the
whole list of values that actual MapState<K, List<V>> state will do.

Regarding to time complexity and space complexity, let's assume these
conditions:

   1. The total number of all left rows buffered in state is "N".
   2. The distinct number of row times among these buffered rows is "R".
   3. The number of emitted results is approximately "K" each time.
   4. The distinct number of row times among these emitted results is "P".

current
implementation my
implementation
"processElement1"
time complexity O(1) O(log R)
"extract left rows to emit"
time complexity O(N) O(K + PlogR)
space complexity N + 1
(1 for nextLeftIndex) 2N + 2R + 2

>From this table, we know that the space overhead is more than two times of
current implementation,
but the benefit of "extract" time complexity is not always significant due
to the fact that it depends on
many conditions.

Please let me know what you think about such an implementation. Is it
worthy or not?
Besides, I'm not sure if there is already a similar effort on this. For
example, support OrderedMultiMap
or OrderedMap state for general purposes. It will be great if you can point
those JIRA issues to me.
Any feedback is welcome. Thank you in advance.

[1]
https://github.com/tony810430/flink/commit/0d5d9d70df0e85b0ac161ffa89c11249a0b3db2a

best regards,

Reply via email to