Hi Qingsheng, Leonard and Jark,

Thanks for your detailed feedback! However, I have questions about
some of your statements (maybe I didn't get something?).

> Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF proc_time”

I agree that the semantics of "FOR SYSTEM_TIME AS OF proc_time" is not
fully implemented with caching, but as you said, users go on it
consciously to achieve better performance (no one proposed to enable
caching by default, etc.). Or by users do you mean other developers of
connectors? In this case developers explicitly specify whether their
connector supports caching or not (in the list of supported options),
no one makes them do that if they don't want to. So what exactly is
the difference between implementing caching in modules
flink-table-runtime and in flink-table-common from the considered
point of view? How does it affect on breaking/non-breaking the
semantics of "FOR SYSTEM_TIME AS OF proc_time"?

> confront a situation that allows table options in DDL to control the behavior 
> of the framework, which has never happened previously and should be cautious

If we talk about main differences of semantics of DDL options and
config options("table.exec.xxx"), isn't it about limiting the scope of
the options + importance for the user business logic rather than
specific location of corresponding logic in the framework? I mean that
in my design, for example, putting an option with lookup cache
strategy in configurations would  be the wrong decision, because it
directly affects the user's business logic (not just performance
optimization) + touches just several functions of ONE table (there can
be multiple tables with different caches). Does it really matter for
the user (or someone else) where the logic is located, which is
affected by the applied option?
Also I can remember DDL option 'sink.parallelism', which in some way
"controls the behavior of the framework" and I don't see any problem
here.

> introduce a new interface for this all-caching scenario and the design would 
> become more complex

This is a subject for a separate discussion, but actually in our
internal version we solved this problem quite easily - we reused
InputFormat class (so there is no need for a new API). The point is
that currently all lookup connectors use InputFormat for scanning the
data in batch mode: HBase, JDBC and even Hive - it uses class
PartitionReader, that is actually just a wrapper around InputFormat.
The advantage of this solution is the ability to reload cache data in
parallel (number of threads depends on number of InputSplits, but has
an upper limit). As a result cache reload time significantly reduces
(as well as time of input stream blocking). I know that usually we try
to avoid usage of concurrency in Flink code, but maybe this one can be
an exception. BTW I don't say that it's an ideal solution, maybe there
are better ones.

> Providing the cache in the framework might introduce compatibility issues

It's possible only in cases when the developer of the connector won't
properly refactor his code and will use new cache options incorrectly
(i.e. explicitly provide the same options into 2 different code
places). For correct behavior all he will need to do is to redirect
existing options to the framework's LookupConfig (+ maybe add an alias
for options, if there was different naming), everything will be
transparent for users. If the developer won't do refactoring at all,
nothing will be changed for the connector because of backward
compatibility. Also if a developer wants to use his own cache logic,
he just can refuse to pass some of the configs into the framework, and
instead make his own implementation with already existing configs and
metrics (but actually I think that it's a rare case).

> filters and projections should be pushed all the way down to the table 
> function, like what we do in the scan source

It's the great purpose. But the truth is that the ONLY connector that
supports filter pushdown is FileSystemTableSource
(no database connector supports it currently). Also for some databases
it's simply impossible to pushdown such complex filters that we have
in Flink.

>  only applying these optimizations to the cache seems not quite useful

Filters can cut off an arbitrarily large amount of data from the
dimension table. For a simple example, suppose in dimension table
'users'
we have column 'age' with values from 20 to 40, and input stream
'clicks' that is ~uniformly distributed by age of users. If we have
filter 'age > 30',
there will be twice less data in cache. This means the user can
increase 'lookup.cache.max-rows' by almost 2 times. It will gain a
huge
performance boost. Moreover, this optimization starts to really shine
in 'ALL' cache, where tables without filters and projections can't fit
in memory, but with them - can. This opens up additional possibilities
for users. And this doesn't sound as 'not quite useful'.

It would be great to hear other voices regarding this topic! Because
we have quite a lot of controversial points, and I think with the help
of others it will be easier for us to come to a consensus.

Best regards,
Smirnov Alexander


пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <renqs...@gmail.com>:
>
> Hi Alexander and Arvid,
>
> Thanks for the discussion and sorry for my late response! We had an internal 
> discussion together with Jark and Leonard and I’d like to summarize our 
> ideas. Instead of implementing the cache logic in the table runtime layer or 
> wrapping around the user-provided table function, we prefer to introduce some 
> new APIs extending TableFunction with these concerns:
>
> 1. Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF proc_time”, 
> because it couldn’t truly reflect the content of the lookup table at the 
> moment of querying. If users choose to enable caching on the lookup table, 
> they implicitly indicate that this breakage is acceptable in exchange for the 
> performance. So we prefer not to provide caching on the table runtime level.
>
> 2. If we make the cache implementation in the framework (whether in a runner 
> or a wrapper around TableFunction), we have to confront a situation that 
> allows table options in DDL to control the behavior of the framework, which 
> has never happened previously and should be cautious. Under the current 
> design the behavior of the framework should only be specified by 
> configurations (“table.exec.xxx”), and it’s hard to apply these general 
> configs to a specific table.
>
> 3. We have use cases that lookup source loads and refresh all records 
> periodically into the memory to achieve high lookup performance (like Hive 
> connector in the community, and also widely used by our internal connectors). 
> Wrapping the cache around the user’s TableFunction works fine for LRU caches, 
> but I think we have to introduce a new interface for this all-caching 
> scenario and the design would become more complex.
>
> 4. Providing the cache in the framework might introduce compatibility issues 
> to existing lookup sources like there might exist two caches with totally 
> different strategies if the user incorrectly configures the table (one in the 
> framework and another implemented by the lookup source).
>
> As for the optimization mentioned by Alexander, I think filters and 
> projections should be pushed all the way down to the table function, like 
> what we do in the scan source, instead of the runner with the cache. The goal 
> of using cache is to reduce the network I/O and pressure on the external 
> system, and only applying these optimizations to the cache seems not quite 
> useful.
>
> I made some updates to the FLIP[1] to reflect our ideas. We prefer to keep 
> the cache implementation as a part of TableFunction, and we could provide 
> some helper classes (CachingTableFunction, AllCachingTableFunction, 
> CachingAsyncTableFunction) to developers and regulate metrics of the cache. 
> Also, I made a POC[2] for your reference.
>
> Looking forward to your ideas!
>
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> [2] https://github.com/PatrickRen/flink/tree/FLIP-221
>
> Best regards,
>
> Qingsheng
>
> On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов <smirale...@gmail.com> 
> wrote:
>>
>> Thanks for the response, Arvid!
>>
>> I have few comments on your message.
>>
>> > but could also live with an easier solution as the first step:
>>
>> I think that these 2 ways are mutually exclusive (originally proposed
>> by Qingsheng and mine), because conceptually they follow the same
>> goal, but implementation details are different. If we will go one way,
>> moving to another way in the future will mean deleting existing code
>> and once again changing the API for connectors. So I think we should
>> reach a consensus with the community about that and then work together
>> on this FLIP, i.e. divide the work on tasks for different parts of the
>> flip (for example, LRU cache unification / introducing proposed set of
>> metrics / further work…). WDYT, Qingsheng?
>>
>> > as the source will only receive the requests after filter
>>
>> Actually if filters are applied to fields of the lookup table, we
>> firstly must do requests, and only after that we can filter responses,
>> because lookup connectors don't have filter pushdown. So if filtering
>> is done before caching, there will be much less rows in cache.
>>
>> > @Alexander unfortunately, your architecture is not shared. I don't know the
>>
>> > solution to share images to be honest.
>>
>> Sorry for that, I’m a bit new to such kinds of conversations :)
>> I have no write access to the confluence, so I made a Jira issue,
>> where described the proposed changes in more details -
>> https://issues.apache.org/jira/browse/FLINK-27411.
>>
>> Will happy to get more feedback!
>>
>> Best,
>> Smirnov Alexander
>>
>> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <ar...@apache.org>:
>> >
>> > Hi Qingsheng,
>> >
>> > Thanks for driving this; the inconsistency was not satisfying for me.
>> >
>> > I second Alexander's idea though but could also live with an easier
>> > solution as the first step: Instead of making caching an implementation
>> > detail of TableFunction X, rather devise a caching layer around X. So the
>> > proposal would be a CachingTableFunction that delegates to X in case of
>> > misses and else manages the cache. Lifting it into the operator model as
>> > proposed would be even better but is probably unnecessary in the first step
>> > for a lookup source (as the source will only receive the requests after
>> > filter; applying projection may be more interesting to save memory).
>> >
>> > Another advantage is that all the changes of this FLIP would be limited to
>> > options, no need for new public interfaces. Everything else remains an
>> > implementation of Table runtime. That means we can easily incorporate the
>> > optimization potential that Alexander pointed out later.
>> >
>> > @Alexander unfortunately, your architecture is not shared. I don't know the
>> > solution to share images to be honest.
>> >
>> > On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов <smirale...@gmail.com>
>> > wrote:
>> >
>> > > Hi Qingsheng! My name is Alexander, I'm not a committer yet, but I'd
>> > > really like to become one. And this FLIP really interested me.
>> > > Actually I have worked on a similar feature in my company’s Flink
>> > > fork, and we would like to share our thoughts on this and make code
>> > > open source.
>> > >
>> > > I think there is a better alternative than introducing an abstract
>> > > class for TableFunction (CachingTableFunction). As you know,
>> > > TableFunction exists in the flink-table-common module, which provides
>> > > only an API for working with tables – it’s very convenient for importing
>> > > in connectors. In turn, CachingTableFunction contains logic for
>> > > runtime execution,  so this class and everything connected with it
>> > > should be located in another module, probably in flink-table-runtime.
>> > > But this will require connectors to depend on another module, which
>> > > contains a lot of runtime logic, which doesn’t sound good.
>> > >
>> > > I suggest adding a new method ‘getLookupConfig’ to LookupTableSource
>> > > or LookupRuntimeProvider to allow connectors to only pass
>> > > configurations to the planner, therefore they won’t depend on runtime
>> > > realization. Based on these configs planner will construct a lookup
>> > > join operator with corresponding runtime logic (ProcessFunctions in
>> > > module flink-table-runtime). Architecture looks like in the pinned
>> > > image (LookupConfig class there is actually yours CacheConfig).
>> > >
>> > > Classes in flink-table-planner, that will be responsible for this –
>> > > CommonPhysicalLookupJoin and his inheritors.
>> > > Current classes for lookup join in  flink-table-runtime  -
>> > > LookupJoinRunner, AsyncLookupJoinRunner, LookupJoinRunnerWithCalc,
>> > > AsyncLookupJoinRunnerWithCalc.
>> > >
>> > > I suggest adding classes LookupJoinCachingRunner,
>> > > LookupJoinCachingRunnerWithCalc, etc.
>> > >
>> > > And here comes another more powerful advantage of such a solution. If
>> > > we have caching logic on a lower level, we can apply some
>> > > optimizations to it. LookupJoinRunnerWithCalc was named like this
>> > > because it uses the ‘calc’ function, which actually mostly consists of
>> > > filters and projections.
>> > >
>> > > For example, in join table A with lookup table B condition ‘JOIN … ON
>> > > A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’  ‘calc’
>> > > function will contain filters A.age = B.age + 10 and B.salary > 1000.
>> > >
>> > > If we apply this function before storing records in cache, size of
>> > > cache will be significantly reduced: filters = avoid storing useless
>> > > records in cache, projections = reduce records’ size. So the initial
>> > > max number of records in cache can be increased by the user.
>> > >
>> > > What do you think about it?
>> > >
>> > >
>> > > On 2022/04/19 02:47:11 Qingsheng Ren wrote:
>> > > > Hi devs,
>> > > >
>> > > > Yuan and I would like to start a discussion about FLIP-221[1], which
>> > > introduces an abstraction of lookup table cache and its standard metrics.
>> > > >
>> > > > Currently each lookup table source should implement their own cache to
>> > > store lookup results, and there isn’t a standard of metrics for users and
>> > > developers to tuning their jobs with lookup joins, which is a quite 
>> > > common
>> > > use case in Flink table / SQL.
>> > > >
>> > > > Therefore we propose some new APIs including cache, metrics, wrapper
>> > > classes of TableFunction and new table options. Please take a look at the
>> > > FLIP page [1] to get more details. Any suggestions and comments would be
>> > > appreciated!
>> > > >
>> > > > [1]
>> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>> > > >
>> > > > Best regards,
>> > > >
>> > > > Qingsheng
>> > > >
>> > > >
>> > >
>
>
>
> --
> Best Regards,
>
> Qingsheng Ren
>
> Real-time Computing Team
> Alibaba Cloud
>
> Email: renqs...@gmail.com

Reply via email to