Hi David,

I really like the proposal. This has so much potential for various
optimizations, especially for temporal joins. My only concern is that the
interfaces seems unnecessarily complicated.

My feeling would be that we only need a single, simple interface that would
fit it all (the same way as it's already present in Apache Beam):

@Experimental
public interface TemporalListState<T>
        extends MergingState<TimestampedValue<T>,
Iterable<TimestampedValue<T>>> {

    /**
     * Read a timestamp-limited subrange of the list. The result is ordered
by timestamp.
     *
     * <p>All values with timestamps >= minTimestamp and < limitTimestamp
will be in the resuling
     * iterable. This means that only timestamps strictly less than
     * Instant.ofEpochMilli(Long.MAX_VALUE) can be used as timestamps.
     */
    Iterable<TimestampedValue<T>> readRange(long minTimestamp, long
limitTimestamp);

    /**
     * Clear a timestamp-limited subrange of the list.
     *
     * <p>All values with timestamps >= minTimestamp and < limitTimestamp
will be removed from the
     * list.
     */
    void clearRange(long minTimestamp, long limitTimestamp);
}

Is there anything missing here? Why do we need a temporal value state at
all? In my understanding it's still basically a "temporal list state", just
with a slightly different API. This is indeed necessary with the "temporal
list state" API you've proposed, would it make sense to try unifying the
two? I really think that the Beam community already did a good job on
designing this API.

Adding one state primitive is already a big change, so if we can keep it
minimal it would be great.

One more point on the proposed API, being able to clear only a single
"timestamped value" at the time might be limiting for some use cases
(performance wise, because we can't optimize it as we are with the range
delete).

Best,
D.

On Tue, Apr 12, 2022 at 9:32 AM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi David,
>
> Thanks for driving.
>
> I understand that state storage itself supports byte ordering, have we
> considered exposing Binary**State? This way the upper layers can be
> implemented on demand, Temporal is just one of them.
>
> Best,
> Jingsong
>
> On Tue, Apr 12, 2022 at 3:01 PM Aitozi <gjying1...@gmail.com> wrote:
> >
> > Hi David
> >      I have look through the doc, I think it will be a good improvement
> to
> > this pattern usage, I'm interested in it. Do you have some POC work to
> > share for a closer look.
> > Besides, I have one question that can we support expose the namespace in
> > the different state type not limited to `TemporalState`. By this, user
> can
> > specify the namespace
> > and the TemporalState is one of the special case that it use timestamp as
> > the namespace. I think it will be more extendable.
> >     What do you think about this ?
> >
> > Best,
> > Aitozi.
> >
> > David Anderson <dander...@apache.org> 于2022年4月11日周一 20:54写道:
> >
> > > Greetings, Flink developers.
> > >
> > > I would like to open up a discussion of a proposal [1] to add a new
> kind of
> > > state to Flink.
> > >
> > > The goal here is to optimize a fairly common pattern, which is using
> > >
> > > MapState<Long, List<Event>>
> > >
> > > to store lists of events associated with timestamps. This pattern is
> used
> > > internally in quite a few operators that implement sorting and joins,
> and
> > > it also shows up in user code, for example, when implementing custom
> > > windowing in a KeyedProcessFunction.
> > >
> > > Nico Kruber, Seth Wiesman, and I have implemented a POC that achieves a
> > > more than 2x improvement in throughput when performing these
> operations on
> > > RocksDB by better leveraging the capabilities of the RocksDB state
> backend.
> > >
> > > See FLIP-220 [1] for details.
> > >
> > > Best,
> > > David
> > >
> > > [1] https://cwiki.apache.org/confluence/x/Xo_FD
> > >
>

Reply via email to