Thanks for the expedited response!

   - The cache in IntervalJoin operator maybe not necessary for statebackend
   which has the ability to cache itself

I guess it's yes and no. If we have something like FsBackend, all states
were stored in memory so we don't need this cache. But, we need to have
incremental checkpoiting support for large window (several TB)
Ideally, we need to have a better way to tell state backend what to cache
(align with event time expiration) versus what should go straight to disk
(~200x read speed difference) What do you think?

   - We need a tradeoff when introducing a cache in the operator. Because
   if two input side of IntervalJoin come evenly intersecting with each other.
   e.g (left, right, left, right), the cache would be invalid and reload
   repeatedly.

It's a valid concern, maybe this one side sorted cache is not that useful
to some use cases. Should have flag to turn off by default.
I wish there is a way to tell statebackend which to cache and how long
(event time) to cache without adding into operator. For small stateful
jobs, we can give free memory to all. For large stateful jobs, flink
probably needs some support/control from statebackend.


   - I'm a little confused. If user could tolerant clear up state earlier,
   why not just change [lowerbound, upbound] shorter directly?

It's asymetric cleanup, we handle two streams cleanup not in same way as we
do by default. The rationale is one stream may be offering meta data like
(all social media posts updates) while other is all impressions and likes
over certain post updates. A post can update 10-100 times over a couple of
days and get millions of impressions. We want to optimize lookup from
impression stream while cover the fact that upstream might delay couple of
hours and make recent updates avaliable. In the end, we still want to find
all impressions over give post update in the last 28 days for example.


Thanks,
Chen


On Fri, Mar 6, 2020 at 7:46 AM 张静(槿瑜) <jinyu...@alibaba-inc.com> wrote:

> Hi, chenqi.
>   I'm interested in your optimizations about IntervalJoin. And I also have
> a few questions about two points:
>   1. Introduce cache to speed up lookup operation.
>       * The cache in IntervalJoin operator maybe not necessary for
> statebackend which has the ability to cache itself.
>       * We need a tradeoff when introduce cache in operator. Because if
> two input side of IntervalJoin come evenly intersecting with each other.
> e.g (left, right, left, right), the cache would be invalid and reload
> repeatedly.
>
> 2. Add config to earlier cleanup state.
>    * I'm a little confused. If user could tolerant clear up state earlier,
> why not just change [lowerbound, upbound] shorter directly?
> Looking forward to your response, thanks.
>
>
> ------------------------------------------------------------------
> 发件人:Chen Qin <qinnc...@gmail.com>
> 发送时间:2020年3月6日(星期五) 10:54
> 收件人:dev <dev@flink.apache.org>
> 主 题:[Discuss] IntervalJoin one side sorted cache
>
> Hi there,
>
> I would like kick off discussion on
> https://issues.apache.org/jira/browse/FLINK-16392 and discuss what is best
> way moving forward. Here is problem statement and proposal we have in mind.
> Please kindly provide feedback.
>
> Native intervaljoin rely on statebackend(e.g rocksdb) to insert/fetch left
> and right buffer. This design choice reduce minimize heap memory footprint
> while bounded process throughput of single taskmanager iops to rocksdb
> access speed. Here at Pinterest, we have some large use cases where
> developers join large and slow evolving data stream (e.g post updates in
> last 28 days) with web traffic datastream (e.g post views up to 28 days
> after given update).
>
> This post some challenge to current implementation of intervaljoin
>
>    - partitioned rocksdb needs to keep both updates and views for 28 days,
>    large buffer(especially view stream side) cause rocksdb slow down and
> lead
>    to overall interval join performance degregate quickly as state build
> up.
>
>
>    - view stream is web scale, even after setting large parallelism it can
>    put lot of pressure on each subtask and backpressure entire job
>
> In proposed implementation, we plan to introduce two changes
>
>    - support ProcessJoinFunction settings to opt-in earlier cleanup time of
>    right stream(e.g view stream don't have to stay in buffer for 28 days
> and
>    wait for update stream to join, related post views happens after update
> in
>    event time semantic) This optimization can reduce state size to improve
>    rocksdb throughput. If extreme case, user can opt-in in flight join and
>    skip write into right view stream buffer to save iops budget on each
> subtask
>
>
>    - support ProcessJoinFunction settings to expedite keyed lookup of slow
>    changing stream. Instead of every post view pull post updates from
> rocksdb.
>    user can opt-in and having one side buffer cache available in memory.
> If a
>    given post update, cache load recent views from right buffer and use
>    sortedMap to find buckets. If a given post view, cache load recent
> updates
>    from left buffer to memory. When another view for that post arrives,
> flink
>    save cost of rocksdb access.
>
> Thanks,
> Chen Qin
>

Reply via email to