Hi devs,

Thanks for the in-depth discussion! We recently update some designs of FLIP-221 
[1]:

1. Introduce a new interface “FullCachingReloadTrigger” for developer to 
customize the reload strategy. The previous design was only time-based and not 
flexable enough. Developers can implement any logic on this interface to 
trigger a full caching reload.

2. LookupFunctionProvider / AsyncLookupFunctionProvider are renamed to 
PartialCachingLookupProvider / AsyncPartialCachingLookupProvider, in order to 
be symmetic with “FullCachingLookupProvider”

3. Remove lookup option “lookup.async” because FLIP-234 is planning to move the 
decision of whether to use async mode to the planner.

4. LookupCacheMetricGroup is renamed to CacheMetricGroup because it’s under 
flink-metrics-core and all caching behaviours could be able to reuse it. 

A POC has been pushed to my GitHub repo [2] to reflect the update. Some 
implementations on FullCachingReloadTrigger maybe quite naive to just reflect 
how the interface works. 

Looking forward to your comments!

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric
[2] https://github.com/PatrickRen/flink/tree/FLIP-221-framework

> On Jun 1, 2022, at 04:58, Jing Ge <j...@ververica.com> wrote:
> 
> Hi Jark,
> 
> Thanks for clarifying it. It would be fine. as long as we could provide the
> no-cache solution. I was just wondering if the client side cache could
> really help when HBase is used, since the data to look up should be huge.
> Depending how much data will be cached on the client side, the data that
> should be lru in e.g. LruBlockCache will not be lru anymore. In the worst
> case scenario, once the cached data at client side is expired, the request
> will hit disk which will cause extra latency temporarily, if I am not
> mistaken.
> 
> Best regards,
> Jing
> 
> On Mon, May 30, 2022 at 9:59 AM Jark Wu <imj...@gmail.com> wrote:
> 
>> Hi Jing Ge,
>> 
>> What do you mean about the "impact on the block cache used by HBase"?
>> In my understanding, the connector cache and HBase cache are totally two
>> things.
>> The connector cache is a local/client cache, and the HBase cache is a
>> server cache.
>> 
>>> does it make sense to have a no-cache solution as one of the
>> default solutions so that customers will have no effort for the migration
>> if they want to stick with Hbase cache
>> 
>> The implementation migration should be transparent to users. Take the HBase
>> connector as
>> an example,  it already supports lookup cache but is disabled by default.
>> After migration, the
>> connector still disables cache by default (i.e. no-cache solution). No
>> migration effort for users.
>> 
>> HBase cache and connector cache are two different things. HBase cache can't
>> simply replace
>> connector cache. Because one of the most important usages for connector
>> cache is reducing
>> the I/O request/response and improving the throughput, which can achieve
>> by just using a server cache.
>> 
>> Best,
>> Jark
>> 
>> 
>> 
>> 
>> On Fri, 27 May 2022 at 22:42, Jing Ge <j...@ververica.com> wrote:
>> 
>>> Thanks all for the valuable discussion. The new feature looks very
>>> interesting.
>>> 
>>> According to the FLIP description: "*Currently we have JDBC, Hive and
>> HBase
>>> connector implemented lookup table source. All existing implementations
>>> will be migrated to the current design and the migration will be
>>> transparent to end users*." I was only wondering if we should pay
>> attention
>>> to HBase and similar DBs. Since, commonly, the lookup data will be huge
>>> while using HBase, partial caching will be used in this case, if I am not
>>> mistaken, which might have an impact on the block cache used by HBase,
>> e.g.
>>> LruBlockCache.
>>> Another question is that, since HBase provides a sophisticated cache
>>> solution, does it make sense to have a no-cache solution as one of the
>>> default solutions so that customers will have no effort for the migration
>>> if they want to stick with Hbase cache?
>>> 
>>> Best regards,
>>> Jing
>>> 
>>> On Fri, May 27, 2022 at 11:19 AM Jingsong Li <jingsongl...@gmail.com>
>>> wrote:
>>> 
>>>> Hi all,
>>>> 
>>>> I think the problem now is below:
>>>> 1. AllCache and PartialCache interface on the non-uniform, one needs to
>>>> provide LookupProvider, the other needs to provide CacheBuilder.
>>>> 2. AllCache definition is not flexible, for example, PartialCache can
>> use
>>>> any custom storage, while the AllCache can not, AllCache can also be
>>>> considered to store memory or disk, also need a flexible strategy.
>>>> 3. AllCache can not customize ReloadStrategy, currently only
>>>> ScheduledReloadStrategy.
>>>> 
>>>> In order to solve the above problems, the following are my ideas.
>>>> 
>>>> ## Top level cache interfaces:
>>>> 
>>>> ```
>>>> 
>>>> public interface CacheLookupProvider extends
>>>> LookupTableSource.LookupRuntimeProvider {
>>>> 
>>>>    CacheBuilder createCacheBuilder();
>>>> }
>>>> 
>>>> 
>>>> public interface CacheBuilder {
>>>>    Cache create();
>>>> }
>>>> 
>>>> 
>>>> public interface Cache {
>>>> 
>>>>    /**
>>>>     * Returns the value associated with key in this cache, or null if
>>>> there is no cached value for
>>>>     * key.
>>>>     */
>>>>    @Nullable
>>>>    Collection<RowData> getIfPresent(RowData key);
>>>> 
>>>>    /** Returns the number of key-value mappings in the cache. */
>>>>    long size();
>>>> }
>>>> 
>>>> ```
>>>> 
>>>> ## Partial cache
>>>> 
>>>> ```
>>>> 
>>>> public interface PartialCacheLookupFunction extends
>> CacheLookupProvider {
>>>> 
>>>>    @Override
>>>>    PartialCacheBuilder createCacheBuilder();
>>>> 
>>>> /** Creates an {@link LookupFunction} instance. */
>>>> LookupFunction createLookupFunction();
>>>> }
>>>> 
>>>> 
>>>> public interface PartialCacheBuilder extends CacheBuilder {
>>>> 
>>>>    PartialCache create();
>>>> }
>>>> 
>>>> 
>>>> public interface PartialCache extends Cache {
>>>> 
>>>>    /**
>>>>     * Associates the specified value rows with the specified key row
>>>> in the cache. If the cache
>>>>     * previously contained value associated with the key, the old
>>>> value is replaced by the
>>>>     * specified value.
>>>>     *
>>>>     * @return the previous value rows associated with key, or null if
>>>> there was no mapping for key.
>>>>     * @param key - key row with which the specified value is to be
>>>> associated
>>>>     * @param value – value rows to be associated with the specified
>> key
>>>>     */
>>>>    Collection<RowData> put(RowData key, Collection<RowData> value);
>>>> 
>>>>    /** Discards any cached value for the specified key. */
>>>>    void invalidate(RowData key);
>>>> }
>>>> 
>>>> ```
>>>> 
>>>> ## All cache
>>>> ```
>>>> 
>>>> public interface AllCacheLookupProvider extends CacheLookupProvider {
>>>> 
>>>>    void registerReloadStrategy(ScheduledExecutorService
>>>> executorService, Reloader reloader);
>>>> 
>>>>    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();
>>>> 
>>>>    @Override
>>>>    AllCacheBuilder createCacheBuilder();
>>>> }
>>>> 
>>>> 
>>>> public interface AllCacheBuilder extends CacheBuilder {
>>>> 
>>>>    AllCache create();
>>>> }
>>>> 
>>>> 
>>>> public interface AllCache extends Cache {
>>>> 
>>>>    void putAll(Iterator<Map<RowData, RowData>> allEntries);
>>>> 
>>>>    void clearAll();
>>>> }
>>>> 
>>>> 
>>>> public interface Reloader {
>>>> 
>>>>    void reload();
>>>> }
>>>> 
>>>> ```
>>>> 
>>>> Best,
>>>> Jingsong
>>>> 
>>>> On Fri, May 27, 2022 at 11:10 AM Jingsong Li <jingsongl...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Thanks Qingsheng and all for your discussion.
>>>>> 
>>>>> Very sorry to jump in so late.
>>>>> 
>>>>> Maybe I missed something?
>>>>> My first impression when I saw the cache interface was, why don't we
>>>>> provide an interface similar to guava cache [1], on top of guava
>> cache,
>>>>> caffeine also makes extensions for asynchronous calls.[2]
>>>>> There is also the bulk load in caffeine too.
>>>>> 
>>>>> I am also more confused why first from LookupCacheFactory.Builder and
>>>> then
>>>>> to Factory to create Cache.
>>>>> 
>>>>> [1] https://github.com/google/guava
>>>>> [2] https://github.com/ben-manes/caffeine/wiki/Population
>>>>> 
>>>>> Best,
>>>>> Jingsong
>>>>> 
>>>>> On Thu, May 26, 2022 at 11:17 PM Jark Wu <imj...@gmail.com> wrote:
>>>>> 
>>>>>> After looking at the new introduced ReloadTime and Becket's comment,
>>>>>> I agree with Becket we should have a pluggable reloading strategy.
>>>>>> We can provide some common implementations, e.g., periodic
>> reloading,
>>>> and
>>>>>> daily reloading.
>>>>>> But there definitely be some connector- or business-specific
>> reloading
>>>>>> strategies, e.g.
>>>>>> notify by a zookeeper watcher, reload once a new Hive partition is
>>>>>> complete.
>>>>>> 
>>>>>> Best,
>>>>>> Jark
>>>>>> 
>>>>>> On Thu, 26 May 2022 at 11:52, Becket Qin <becket....@gmail.com>
>>> wrote:
>>>>>> 
>>>>>>> Hi Qingsheng,
>>>>>>> 
>>>>>>> Thanks for updating the FLIP. A few comments / questions below:
>>>>>>> 
>>>>>>> 1. Is there a reason that we have both "XXXFactory" and
>>> "XXXProvider".
>>>>>>> What is the difference between them? If they are the same, can we
>>> just
>>>>>> use
>>>>>>> XXXFactory everywhere?
>>>>>>> 
>>>>>>> 2. Regarding the FullCachingLookupProvider, should the reloading
>>>> policy
>>>>>>> also be pluggable? Periodical reloading could be sometimes be
>> tricky
>>>> in
>>>>>>> practice. For example, if user uses 24 hours as the cache refresh
>>>>>> interval
>>>>>>> and some nightly batch job delayed, the cache update may still see
>>> the
>>>>>>> stale data.
>>>>>>> 
>>>>>>> 3. In DefaultLookupCacheFactory, it looks like InitialCapacity
>>> should
>>>> be
>>>>>>> removed.
>>>>>>> 
>>>>>>> 4. The purpose of LookupFunctionProvider#cacheMissingKey() seems a
>>>>>> little
>>>>>>> confusing to me. If Optional<LookupCacheFactory> getCacheFactory()
>>>>>> returns
>>>>>>> a non-empty factory, doesn't that already indicates the framework
>> to
>>>>>> cache
>>>>>>> the missing keys? Also, why is this method returning an
>>>>>> Optional<Boolean>
>>>>>>> instead of boolean?
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Jiangjie (Becket) Qin
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Wed, May 25, 2022 at 5:07 PM Qingsheng Ren <renqs...@gmail.com
>>> 
>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Lincoln and Jark,
>>>>>>>> 
>>>>>>>> Thanks for the comments! If the community reaches a consensus
>> that
>>> we
>>>>>> use
>>>>>>>> SQL hint instead of table options to decide whether to use sync
>> or
>>>>>> async
>>>>>>>> mode, it’s indeed not necessary to introduce the “lookup.async”
>>>> option.
>>>>>>>> 
>>>>>>>> I think it’s a good idea to let the decision of async made on
>> query
>>>>>>>> level, which could make better optimization with more infomation
>>>>>> gathered
>>>>>>>> by planner. Is there any FLIP describing the issue in
>> FLINK-27625?
>>> I
>>>>>>>> thought FLIP-234 is only proposing adding SQL hint for retry on
>>>> missing
>>>>>>>> instead of the entire async mode to be controlled by hint.
>>>>>>>> 
>>>>>>>> Best regards,
>>>>>>>> 
>>>>>>>> Qingsheng
>>>>>>>> 
>>>>>>>>> On May 25, 2022, at 15:13, Lincoln Lee <lincoln.8...@gmail.com
>>> 
>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi Jark,
>>>>>>>>> 
>>>>>>>>> Thanks for your reply!
>>>>>>>>> 
>>>>>>>>> Currently 'lookup.async' just lies in HBase connector, I have
>> no
>>>> idea
>>>>>>>>> whether or when to remove it (we can discuss it in another
>> issue
>>>> for
>>>>>> the
>>>>>>>>> HBase connector after FLINK-27625 is done), just not add it
>> into
>>> a
>>>>>>>> common
>>>>>>>>> option now.
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> Lincoln Lee
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Jark Wu <imj...@gmail.com> 于2022年5月24日周二 20:14写道:
>>>>>>>>> 
>>>>>>>>>> Hi Lincoln,
>>>>>>>>>> 
>>>>>>>>>> I have taken a look at FLIP-234, and I agree with you that the
>>>>>>>> connectors
>>>>>>>>>> can
>>>>>>>>>> provide both async and sync runtime providers simultaneously
>>>> instead
>>>>>>>> of one
>>>>>>>>>> of them.
>>>>>>>>>> At that point, "lookup.async" looks redundant. If this option
>> is
>>>>>>>> planned to
>>>>>>>>>> be removed
>>>>>>>>>> in the long term, I think it makes sense not to introduce it
>> in
>>>> this
>>>>>>>> FLIP.
>>>>>>>>>> 
>>>>>>>>>> Best,
>>>>>>>>>> Jark
>>>>>>>>>> 
>>>>>>>>>> On Tue, 24 May 2022 at 11:08, Lincoln Lee <
>>> lincoln.8...@gmail.com
>>>>> 
>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hi Qingsheng,
>>>>>>>>>>> 
>>>>>>>>>>> Sorry for jumping into the discussion so late. It's a good
>> idea
>>>>>> that
>>>>>>>> we
>>>>>>>>>> can
>>>>>>>>>>> have a common table option. I have a minor comments on
>>>>>> 'lookup.async'
>>>>>>>>>> that
>>>>>>>>>>> not make it a common option:
>>>>>>>>>>> 
>>>>>>>>>>> The table layer abstracts both sync and async lookup
>>>> capabilities,
>>>>>>>>>>> connectors implementers can choose one or both, in the case
>> of
>>>>>>>>>> implementing
>>>>>>>>>>> only one capability(status of the most of existing builtin
>>>>>> connectors)
>>>>>>>>>>> 'lookup.async' will not be used.  And when a connector has
>> both
>>>>>>>>>>> capabilities, I think this choice is more suitable for making
>>>>>>>> decisions
>>>>>>>>>> at
>>>>>>>>>>> the query level, for example, table planner can choose the
>>>> physical
>>>>>>>>>>> implementation of async lookup or sync lookup based on its
>> cost
>>>>>>>> model, or
>>>>>>>>>>> users can give query hint based on their own better
>>>>>> understanding.  If
>>>>>>>>>>> there is another common table option 'lookup.async', it may
>>>> confuse
>>>>>>>> the
>>>>>>>>>>> users in the long run.
>>>>>>>>>>> 
>>>>>>>>>>> So, I prefer to leave the 'lookup.async' option in private
>>> place
>>>>>> (for
>>>>>>>> the
>>>>>>>>>>> current hbase connector) and not turn it into a common
>> option.
>>>>>>>>>>> 
>>>>>>>>>>> WDYT?
>>>>>>>>>>> 
>>>>>>>>>>> Best,
>>>>>>>>>>> Lincoln Lee
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Qingsheng Ren <renqs...@gmail.com> 于2022年5月23日周一 14:54写道:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi Alexander,
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks for the review! We recently updated the FLIP and you
>>> can
>>>>>> find
>>>>>>>>>>> those
>>>>>>>>>>>> changes from my latest email. Since some terminologies has
>>>>>> changed so
>>>>>>>>>>> I’ll
>>>>>>>>>>>> use the new concept for replying your comments.
>>>>>>>>>>>> 
>>>>>>>>>>>> 1. Builder vs ‘of’
>>>>>>>>>>>> I’m OK to use builder pattern if we have additional optional
>>>>>>>> parameters
>>>>>>>>>>>> for full caching mode (“rescan” previously). The
>>>>>> schedule-with-delay
>>>>>>>>>> idea
>>>>>>>>>>>> looks reasonable to me, but I think we need to redesign the
>>>>>> builder
>>>>>>>> API
>>>>>>>>>>> of
>>>>>>>>>>>> full caching to make it more descriptive for developers.
>> Would
>>>> you
>>>>>>>> mind
>>>>>>>>>>>> sharing your ideas about the API? For accessing the FLIP
>>>> workspace
>>>>>>>> you
>>>>>>>>>>> can
>>>>>>>>>>>> just provide your account ID and ping any PMC member
>> including
>>>>>> Jark.
>>>>>>>>>>>> 
>>>>>>>>>>>> 2. Common table options
>>>>>>>>>>>> We have some discussions these days and propose to
>> introduce 8
>>>>>> common
>>>>>>>>>>>> table options about caching. It has been updated on the
>> FLIP.
>>>>>>>>>>>> 
>>>>>>>>>>>> 3. Retries
>>>>>>>>>>>> I think we are on the same page :-)
>>>>>>>>>>>> 
>>>>>>>>>>>> For your additional concerns:
>>>>>>>>>>>> 1) The table option has been updated.
>>>>>>>>>>>> 2) We got “lookup.cache” back for configuring whether to use
>>>>>> partial
>>>>>>>> or
>>>>>>>>>>>> full caching mode.
>>>>>>>>>>>> 
>>>>>>>>>>>> Best regards,
>>>>>>>>>>>> 
>>>>>>>>>>>> Qingsheng
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>>> On May 19, 2022, at 17:25, Александр Смирнов <
>>>>>> smirale...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Also I have a few additions:
>>>>>>>>>>>>> 1) maybe rename 'lookup.cache.maximum-size' to
>>>>>>>>>>>>> 'lookup.cache.max-rows'? I think it will be more clear that
>>> we
>>>>>> talk
>>>>>>>>>>>>> not about bytes, but about the number of rows. Plus it fits
>>>> more,
>>>>>>>>>>>>> considering my optimization with filters.
>>>>>>>>>>>>> 2) How will users enable rescanning? Are we going to
>> separate
>>>>>>>> caching
>>>>>>>>>>>>> and rescanning from the options point of view? Like
>> initially
>>>> we
>>>>>> had
>>>>>>>>>>>>> one option 'lookup.cache' with values LRU / ALL. I think
>> now
>>> we
>>>>>> can
>>>>>>>>>>>>> make a boolean option 'lookup.rescan'. RescanInterval can
>> be
>>>>>>>>>>>>> 'lookup.rescan.interval', etc.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>> Alexander
>>>>>>>>>>>>> 
>>>>>>>>>>>>> чт, 19 мая 2022 г. в 14:50, Александр Смирнов <
>>>>>> smirale...@gmail.com
>>>>>>>>>>> :
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi Qingsheng and Jark,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 1. Builders vs 'of'
>>>>>>>>>>>>>> I understand that builders are used when we have multiple
>>>>>>>>>> parameters.
>>>>>>>>>>>>>> I suggested them because we could add parameters later. To
>>>>>> prevent
>>>>>>>>>>>>>> Builder for ScanRuntimeProvider from looking redundant I
>> can
>>>>>>>> suggest
>>>>>>>>>>>>>> one more config now - "rescanStartTime".
>>>>>>>>>>>>>> It's a time in UTC (LocalTime class) when the first reload
>>> of
>>>>>> cache
>>>>>>>>>>>>>> starts. This parameter can be thought of as 'initialDelay'
>>>> (diff
>>>>>>>>>>>>>> between current time and rescanStartTime) in method
>>>>>>>>>>>>>> ScheduleExecutorService#scheduleWithFixedDelay [1] . It
>> can
>>> be
>>>>>> very
>>>>>>>>>>>>>> useful when the dimension table is updated by some other
>>>>>> scheduled
>>>>>>>>>> job
>>>>>>>>>>>>>> at a certain time. Or when the user simply wants a second
>>> scan
>>>>>>>>>> (first
>>>>>>>>>>>>>> cache reload) be delayed. This option can be used even
>>> without
>>>>>>>>>>>>>> 'rescanInterval' - in this case 'rescanInterval' will be
>> one
>>>>>> day.
>>>>>>>>>>>>>> If you are fine with this option, I would be very glad if
>>> you
>>>>>> would
>>>>>>>>>>>>>> give me access to edit FLIP page, so I could add it myself
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 2. Common table options
>>>>>>>>>>>>>> I also think that FactoryUtil would be overloaded by all
>>> cache
>>>>>>>>>>>>>> options. But maybe unify all suggested options, not only
>> for
>>>>>>>> default
>>>>>>>>>>>>>> cache? I.e. class 'LookupOptions', that unifies default
>>> cache
>>>>>>>>>> options,
>>>>>>>>>>>>>> rescan options, 'async', 'maxRetries'. WDYT?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 3. Retries
>>>>>>>>>>>>>> I'm fine with suggestion close to
>> RetryUtils#tryTimes(times,
>>>>>> call)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>> Alexander
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> ср, 18 мая 2022 г. в 16:04, Qingsheng Ren <
>>> renqs...@gmail.com
>>>>> :
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi Jark and Alexander,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks for your comments! I’m also OK to introduce common
>>>> table
>>>>>>>>>>>> options. I prefer to introduce a new
>> DefaultLookupCacheOptions
>>>>>> class
>>>>>>>>>> for
>>>>>>>>>>>> holding these option definitions because putting all options
>>>> into
>>>>>>>>>>>> FactoryUtil would make it a bit ”crowded” and not well
>>>>>> categorized.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> FLIP has been updated according to suggestions above:
>>>>>>>>>>>>>>> 1. Use static “of” method for constructing
>>>>>> RescanRuntimeProvider
>>>>>>>>>>>> considering both arguments are required.
>>>>>>>>>>>>>>> 2. Introduce new table options matching
>>>>>> DefaultLookupCacheFactory
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Qingsheng
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Wed, May 18, 2022 at 2:57 PM Jark Wu <
>> imj...@gmail.com>
>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi Alex,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 1) retry logic
>>>>>>>>>>>>>>>> I think we can extract some common retry logic into
>>>> utilities,
>>>>>>>>>> e.g.
>>>>>>>>>>>> RetryUtils#tryTimes(times, call).
>>>>>>>>>>>>>>>> This seems independent of this FLIP and can be reused by
>>>>>>>>>> DataStream
>>>>>>>>>>>> users.
>>>>>>>>>>>>>>>> Maybe we can open an issue to discuss this and where to
>>> put
>>>>>> it.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 2) cache ConfigOptions
>>>>>>>>>>>>>>>> I'm fine with defining cache config options in the
>>>> framework.
>>>>>>>>>>>>>>>> A candidate place to put is FactoryUtil which also
>>> includes
>>>>>>>>>>>> "sink.parallelism", "format" options.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Wed, 18 May 2022 at 13:52, Александр Смирнов <
>>>>>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi Qingsheng,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thank you for considering my comments.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> there might be custom logic before making retry, such
>> as
>>>>>>>>>>>> re-establish the connection
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Yes, I understand that. I meant that such logic can be
>>>>>> placed in
>>>>>>>>>> a
>>>>>>>>>>>>>>>>> separate function, that can be implemented by
>> connectors.
>>>>>> Just
>>>>>>>>>>> moving
>>>>>>>>>>>>>>>>> the retry logic would make connector's LookupFunction
>>> more
>>>>>>>>>> concise
>>>>>>>>>>> +
>>>>>>>>>>>>>>>>> avoid duplicate code. However, it's a minor change. The
>>>>>> decision
>>>>>>>>>> is
>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>> to you.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> We decide not to provide common DDL options and let
>>>>>> developers
>>>>>>>>>> to
>>>>>>>>>>>> define their own options as we do now per connector.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> What is the reason for that? One of the main goals of
>>> this
>>>>>> FLIP
>>>>>>>>>> was
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> unify the configs, wasn't it? I understand that current
>>>> cache
>>>>>>>>>>> design
>>>>>>>>>>>>>>>>> doesn't depend on ConfigOptions, like was before. But
>>> still
>>>>>> we
>>>>>>>>>> can
>>>>>>>>>>>> put
>>>>>>>>>>>>>>>>> these options into the framework, so connectors can
>> reuse
>>>>>> them
>>>>>>>>>> and
>>>>>>>>>>>>>>>>> avoid code duplication, and, what is more significant,
>>>> avoid
>>>>>>>>>>> possible
>>>>>>>>>>>>>>>>> different options naming. This moment can be pointed
>> out
>>> in
>>>>>>>>>>>>>>>>> documentation for connector developers.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>> Alexander
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren <
>>>>>> renqs...@gmail.com>:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi Alexander,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thanks for the review and glad to see we are on the
>> same
>>>>>> page!
>>>>>>>> I
>>>>>>>>>>>> think you forgot to cc the dev mailing list so I’m also
>>> quoting
>>>>>> your
>>>>>>>>>>> reply
>>>>>>>>>>>> under this email.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> We can add 'maxRetryTimes' option into this class
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> In my opinion the retry logic should be implemented in
>>>>>> lookup()
>>>>>>>>>>>> instead of in LookupFunction#eval(). Retrying is only
>>> meaningful
>>>>>>>> under
>>>>>>>>>>> some
>>>>>>>>>>>> specific retriable failures, and there might be custom logic
>>>>>> before
>>>>>>>>>>> making
>>>>>>>>>>>> retry, such as re-establish the connection
>>>>>> (JdbcRowDataLookupFunction
>>>>>>>>>> is
>>>>>>>>>>> an
>>>>>>>>>>>> example), so it's more handy to leave it to the connector.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I don't see DDL options, that were in previous
>> version
>>> of
>>>>>>>> FLIP.
>>>>>>>>>>> Do
>>>>>>>>>>>> you have any special plans for them?
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> We decide not to provide common DDL options and let
>>>>>> developers
>>>>>>>>>> to
>>>>>>>>>>>> define their own options as we do now per connector.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> The rest of comments sound great and I’ll update the
>>> FLIP.
>>>>>> Hope
>>>>>>>>>> we
>>>>>>>>>>>> can finalize our proposal soon!
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Qingsheng
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> On May 17, 2022, at 13:46, Александр Смирнов <
>>>>>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Hi Qingsheng and devs!
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I like the overall design of updated FLIP, however I
>>> have
>>>>>>>>>> several
>>>>>>>>>>>>>>>>>>> suggestions and questions.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 1) Introducing LookupFunction as a subclass of
>>>>>> TableFunction
>>>>>>>>>> is a
>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>>> idea. We can add 'maxRetryTimes' option into this
>>> class.
>>>>>>>> 'eval'
>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>>> of new LookupFunction is great for this purpose. The
>>> same
>>>>>> is
>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> 'async' case.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 2) There might be other configs in future, such as
>>>>>>>>>>>> 'cacheMissingKey'
>>>>>>>>>>>>>>>>>>> in LookupFunctionProvider or 'rescanInterval' in
>>>>>>>>>>>> ScanRuntimeProvider.
>>>>>>>>>>>>>>>>>>> Maybe use Builder pattern in LookupFunctionProvider
>> and
>>>>>>>>>>>>>>>>>>> RescanRuntimeProvider for more flexibility (use one
>>>> 'build'
>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>>> instead of many 'of' methods in future)?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 3) What are the plans for existing
>>> TableFunctionProvider
>>>>>> and
>>>>>>>>>>>>>>>>>>> AsyncTableFunctionProvider? I think they should be
>>>>>> deprecated.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 4) Am I right that the current design does not assume
>>>>>> usage of
>>>>>>>>>>>>>>>>>>> user-provided LookupCache in re-scanning? In this
>> case,
>>>> it
>>>>>> is
>>>>>>>>>> not
>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>> clear why do we need methods such as 'invalidate' or
>>>>>> 'putAll'
>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> LookupCache.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 5) I don't see DDL options, that were in previous
>>> version
>>>>>> of
>>>>>>>>>>> FLIP.
>>>>>>>>>>>> Do
>>>>>>>>>>>>>>>>>>> you have any special plans for them?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> If you don't mind, I would be glad to be able to make
>>>> small
>>>>>>>>>>>>>>>>>>> adjustments to the FLIP document too. I think it's
>>> worth
>>>>>>>>>>> mentioning
>>>>>>>>>>>>>>>>>>> about what exactly optimizations are planning in the
>>>>>> future.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren <
>>>>>> renqs...@gmail.com
>>>>>>>>>>> :
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi Alexander and devs,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thank you very much for the in-depth discussion! As
>>> Jark
>>>>>>>>>>>> mentioned we were inspired by Alexander's idea and made a
>>>>>> refactor on
>>>>>>>>>> our
>>>>>>>>>>>> design. FLIP-221 [1] has been updated to reflect our design
>>> now
>>>>>> and
>>>>>>>> we
>>>>>>>>>>> are
>>>>>>>>>>>> happy to hear more suggestions from you!
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Compared to the previous design:
>>>>>>>>>>>>>>>>>>>> 1. The lookup cache serves at table runtime level
>> and
>>> is
>>>>>>>>>>>> integrated as a component of LookupJoinRunner as discussed
>>>>>>>> previously.
>>>>>>>>>>>>>>>>>>>> 2. Interfaces are renamed and re-designed to reflect
>>> the
>>>>>> new
>>>>>>>>>>>> design.
>>>>>>>>>>>>>>>>>>>> 3. We separate the all-caching case individually and
>>>>>>>>>> introduce a
>>>>>>>>>>>> new RescanRuntimeProvider to reuse the ability of scanning.
>> We
>>>> are
>>>>>>>>>>> planning
>>>>>>>>>>>> to support SourceFunction / InputFormat for now considering
>>> the
>>>>>>>>>>> complexity
>>>>>>>>>>>> of FLIP-27 Source API.
>>>>>>>>>>>>>>>>>>>> 4. A new interface LookupFunction is introduced to
>>> make
>>>>>> the
>>>>>>>>>>>> semantic of lookup more straightforward for developers.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> For replying to Alexander:
>>>>>>>>>>>>>>>>>>>>> However I'm a little confused whether InputFormat
>> is
>>>>>>>>>> deprecated
>>>>>>>>>>>> or not. Am I right that it will be so in the future, but
>>>> currently
>>>>>>>> it's
>>>>>>>>>>> not?
>>>>>>>>>>>>>>>>>>>> Yes you are right. InputFormat is not deprecated for
>>>> now.
>>>>>> I
>>>>>>>>>>> think
>>>>>>>>>>>> it will be deprecated in the future but we don't have a
>> clear
>>>> plan
>>>>>>>> for
>>>>>>>>>>> that.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thanks again for the discussion on this FLIP and
>>> looking
>>>>>>>>>> forward
>>>>>>>>>>>> to cooperating with you after we finalize the design and
>>>>>> interfaces!
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Qingsheng
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов <
>>>>>>>>>>>> smirale...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Hi Jark, Qingsheng and Leonard!
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Glad to see that we came to a consensus on almost
>> all
>>>>>>>> points!
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> However I'm a little confused whether InputFormat
>> is
>>>>>>>>>> deprecated
>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>> not. Am I right that it will be so in the future,
>> but
>>>>>>>>>> currently
>>>>>>>>>>>> it's
>>>>>>>>>>>>>>>>>>>>> not? Actually I also think that for the first
>> version
>>>>>> it's
>>>>>>>> OK
>>>>>>>>>>> to
>>>>>>>>>>>> use
>>>>>>>>>>>>>>>>>>>>> InputFormat in ALL cache realization, because
>>>> supporting
>>>>>>>>>> rescan
>>>>>>>>>>>>>>>>>>>>> ability seems like a very distant prospect. But for
>>>> this
>>>>>>>>>>>> decision we
>>>>>>>>>>>>>>>>>>>>> need a consensus among all discussion participants.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> In general, I don't have something to argue with
>> your
>>>>>>>>>>>> statements. All
>>>>>>>>>>>>>>>>>>>>> of them correspond my ideas. Looking ahead, it
>> would
>>> be
>>>>>> nice
>>>>>>>>>> to
>>>>>>>>>>>> work
>>>>>>>>>>>>>>>>>>>>> on this FLIP cooperatively. I've already done a lot
>>> of
>>>>>> work
>>>>>>>>>> on
>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>>> join caching with realization very close to the one
>>> we
>>>>>> are
>>>>>>>>>>>> discussing,
>>>>>>>>>>>>>>>>>>>>> and want to share the results of this work. Anyway
>>>>>> looking
>>>>>>>>>>>> forward for
>>>>>>>>>>>>>>>>>>>>> the FLIP update!
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu <
>>> imj...@gmail.com
>>>>> :
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Hi Alex,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Thanks for summarizing your points.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> In the past week, Qingsheng, Leonard, and I have
>>>>>> discussed
>>>>>>>>>> it
>>>>>>>>>>>> several times
>>>>>>>>>>>>>>>>>>>>>> and we have totally refactored the design.
>>>>>>>>>>>>>>>>>>>>>> I'm glad to say we have reached a consensus on
>> many
>>> of
>>>>>> your
>>>>>>>>>>>> points!
>>>>>>>>>>>>>>>>>>>>>> Qingsheng is still working on updating the design
>>> docs
>>>>>> and
>>>>>>>>>>>> maybe can be
>>>>>>>>>>>>>>>>>>>>>> available in the next few days.
>>>>>>>>>>>>>>>>>>>>>> I will share some conclusions from our
>> discussions:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 1) we have refactored the design towards to "cache
>>> in
>>>>>>>>>>>> framework" way.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 2) a "LookupCache" interface for users to
>> customize
>>>> and
>>>>>> a
>>>>>>>>>>>> default
>>>>>>>>>>>>>>>>>>>>>> implementation with builder for users to easy-use.
>>>>>>>>>>>>>>>>>>>>>> This can both make it possible to both have
>>>> flexibility
>>>>>> and
>>>>>>>>>>>> conciseness.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 3) Filter pushdown is important for ALL and LRU
>>> lookup
>>>>>>>>>> cache,
>>>>>>>>>>>> esp reducing
>>>>>>>>>>>>>>>>>>>>>> IO.
>>>>>>>>>>>>>>>>>>>>>> Filter pushdown should be the final state and the
>>>>>> unified
>>>>>>>>>> way
>>>>>>>>>>>> to both
>>>>>>>>>>>>>>>>>>>>>> support pruning ALL cache and LRU cache,
>>>>>>>>>>>>>>>>>>>>>> so I think we should make effort in this
>> direction.
>>> If
>>>>>> we
>>>>>>>>>> need
>>>>>>>>>>>> to support
>>>>>>>>>>>>>>>>>>>>>> filter pushdown for ALL cache anyway, why not use
>>>>>>>>>>>>>>>>>>>>>> it for LRU cache as well? Either way, as we decide
>>> to
>>>>>>>>>>> implement
>>>>>>>>>>>> the cache
>>>>>>>>>>>>>>>>>>>>>> in the framework, we have the chance to support
>>>>>>>>>>>>>>>>>>>>>> filter on cache anytime. This is an optimization
>> and
>>>> it
>>>>>>>>>>> doesn't
>>>>>>>>>>>> affect the
>>>>>>>>>>>>>>>>>>>>>> public API. I think we can create a JIRA issue to
>>>>>>>>>>>>>>>>>>>>>> discuss it when the FLIP is accepted.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 4) The idea to support ALL cache is similar to
>> your
>>>>>>>>>> proposal.
>>>>>>>>>>>>>>>>>>>>>> In the first version, we will only support
>>>> InputFormat,
>>>>>>>>>>>> SourceFunction for
>>>>>>>>>>>>>>>>>>>>>> cache all (invoke InputFormat in join operator).
>>>>>>>>>>>>>>>>>>>>>> For FLIP-27 source, we need to join a true source
>>>>>> operator
>>>>>>>>>>>> instead of
>>>>>>>>>>>>>>>>>>>>>> calling it embedded in the join operator.
>>>>>>>>>>>>>>>>>>>>>> However, this needs another FLIP to support the
>>>> re-scan
>>>>>>>>>>> ability
>>>>>>>>>>>> for FLIP-27
>>>>>>>>>>>>>>>>>>>>>> Source, and this can be a large work.
>>>>>>>>>>>>>>>>>>>>>> In order to not block this issue, we can put the
>>>> effort
>>>>>> of
>>>>>>>>>>>> FLIP-27 source
>>>>>>>>>>>>>>>>>>>>>> integration into future work and integrate
>>>>>>>>>>>>>>>>>>>>>> InputFormat&SourceFunction for now.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> I think it's fine to use
>> InputFormat&SourceFunction,
>>>> as
>>>>>>>> they
>>>>>>>>>>>> are not
>>>>>>>>>>>>>>>>>>>>>> deprecated, otherwise, we have to introduce
>> another
>>>>>>>> function
>>>>>>>>>>>>>>>>>>>>>> similar to them which is meaningless. We need to
>>> plan
>>>>>>>>>> FLIP-27
>>>>>>>>>>>> source
>>>>>>>>>>>>>>>>>>>>>> integration ASAP before InputFormat &
>> SourceFunction
>>>> are
>>>>>>>>>>>> deprecated.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов <
>>>>>>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Hi Martijn!
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Got it. Therefore, the realization with
>> InputFormat
>>>> is
>>>>>> not
>>>>>>>>>>>> considered.
>>>>>>>>>>>>>>>>>>>>>>> Thanks for clearing that up!
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser <
>>>>>>>>>>>> mart...@ververica.com>:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> With regards to:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> But if there are plans to refactor all
>> connectors
>>>> to
>>>>>>>>>>> FLIP-27
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Yes, FLIP-27 is the target for all connectors.
>> The
>>>> old
>>>>>>>>>>>> interfaces will be
>>>>>>>>>>>>>>>>>>>>>>>> deprecated and connectors will either be
>>> refactored
>>>> to
>>>>>>>> use
>>>>>>>>>>>> the new ones
>>>>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>> dropped.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> The caching should work for connectors that are
>>>> using
>>>>>>>>>>> FLIP-27
>>>>>>>>>>>> interfaces,
>>>>>>>>>>>>>>>>>>>>>>>> we should not introduce new features for old
>>>>>> interfaces.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Martijn
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов
>> <
>>>>>>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jark!
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Sorry for the late response. I would like to
>> make
>>>>>> some
>>>>>>>>>>>> comments and
>>>>>>>>>>>>>>>>>>>>>>>>> clarify my points.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 1) I agree with your first statement. I think
>> we
>>>> can
>>>>>>>>>>> achieve
>>>>>>>>>>>> both
>>>>>>>>>>>>>>>>>>>>>>>>> advantages this way: put the Cache interface in
>>>>>>>>>>>> flink-table-common,
>>>>>>>>>>>>>>>>>>>>>>>>> but have implementations of it in
>>>>>> flink-table-runtime.
>>>>>>>>>>>> Therefore if a
>>>>>>>>>>>>>>>>>>>>>>>>> connector developer wants to use existing cache
>>>>>>>>>> strategies
>>>>>>>>>>>> and their
>>>>>>>>>>>>>>>>>>>>>>>>> implementations, he can just pass lookupConfig
>> to
>>>> the
>>>>>>>>>>>> planner, but if
>>>>>>>>>>>>>>>>>>>>>>>>> he wants to have its own cache implementation
>> in
>>>> his
>>>>>>>>>>>> TableFunction, it
>>>>>>>>>>>>>>>>>>>>>>>>> will be possible for him to use the existing
>>>>>> interface
>>>>>>>>>> for
>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>> purpose (we can explicitly point this out in
>> the
>>>>>>>>>>>> documentation). In
>>>>>>>>>>>>>>>>>>>>>>>>> this way all configs and metrics will be
>> unified.
>>>>>> WDYT?
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the
>> cache,
>>> we
>>>>>> will
>>>>>>>>>>>> have 90% of
>>>>>>>>>>>>>>>>>>>>>>>>> lookup requests that can never be cached
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 2) Let me clarify the logic filters
>> optimization
>>> in
>>>>>> case
>>>>>>>>>> of
>>>>>>>>>>>> LRU cache.
>>>>>>>>>>>>>>>>>>>>>>>>> It looks like Cache<RowData,
>>> Collection<RowData>>.
>>>>>> Here
>>>>>>>>>> we
>>>>>>>>>>>> always
>>>>>>>>>>>>>>>>>>>>>>>>> store the response of the dimension table in
>>> cache,
>>>>>> even
>>>>>>>>>>>> after
>>>>>>>>>>>>>>>>>>>>>>>>> applying calc function. I.e. if there are no
>> rows
>>>>>> after
>>>>>>>>>>>> applying
>>>>>>>>>>>>>>>>>>>>>>>>> filters to the result of the 'eval' method of
>>>>>>>>>>> TableFunction,
>>>>>>>>>>>> we store
>>>>>>>>>>>>>>>>>>>>>>>>> the empty list by lookup keys. Therefore the
>>> cache
>>>>>> line
>>>>>>>>>>> will
>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>> filled, but will require much less memory (in
>>>> bytes).
>>>>>>>>>> I.e.
>>>>>>>>>>>> we don't
>>>>>>>>>>>>>>>>>>>>>>>>> completely filter keys, by which result was
>>> pruned,
>>>>>> but
>>>>>>>>>>>> significantly
>>>>>>>>>>>>>>>>>>>>>>>>> reduce required memory to store this result. If
>>> the
>>>>>> user
>>>>>>>>>>>> knows about
>>>>>>>>>>>>>>>>>>>>>>>>> this behavior, he can increase the 'max-rows'
>>>> option
>>>>>>>>>> before
>>>>>>>>>>>> the start
>>>>>>>>>>>>>>>>>>>>>>>>> of the job. But actually I came up with the
>> idea
>>>>>> that we
>>>>>>>>>>> can
>>>>>>>>>>>> do this
>>>>>>>>>>>>>>>>>>>>>>>>> automatically by using the 'maximumWeight' and
>>>>>> 'weigher'
>>>>>>>>>>>> methods of
>>>>>>>>>>>>>>>>>>>>>>>>> GuavaCache [1]. Weight can be the size of the
>>>>>> collection
>>>>>>>>>> of
>>>>>>>>>>>> rows
>>>>>>>>>>>>>>>>>>>>>>>>> (value of cache). Therefore cache can
>>> automatically
>>>>>> fit
>>>>>>>>>>> much
>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>>>>> records than before.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Flink SQL has provided a standard way to do
>>>> filters
>>>>>> and
>>>>>>>>>>>> projects
>>>>>>>>>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
>>>>>>>>>>>> SupportsProjectionPushDown.
>>>>>>>>>>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the
>>>> interfaces,
>>>>>>>>>> don't
>>>>>>>>>>>> mean it's
>>>>>>>>>>>>>>>>>>>>>>> hard
>>>>>>>>>>>>>>>>>>>>>>>>> to implement.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> It's debatable how difficult it will be to
>>>> implement
>>>>>>>>>> filter
>>>>>>>>>>>> pushdown.
>>>>>>>>>>>>>>>>>>>>>>>>> But I think the fact that currently there is no
>>>>>> database
>>>>>>>>>>>> connector
>>>>>>>>>>>>>>>>>>>>>>>>> with filter pushdown at least means that this
>>>> feature
>>>>>>>>>> won't
>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>> supported soon in connectors. Moreover, if we
>>> talk
>>>>>> about
>>>>>>>>>>>> other
>>>>>>>>>>>>>>>>>>>>>>>>> connectors (not in Flink repo), their databases
>>>> might
>>>>>>>> not
>>>>>>>>>>>> support all
>>>>>>>>>>>>>>>>>>>>>>>>> Flink filters (or not support filters at all).
>> I
>>>>>> think
>>>>>>>>>>> users
>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>> interested in supporting cache filters
>>> optimization
>>>>>>>>>>>> independently of
>>>>>>>>>>>>>>>>>>>>>>>>> supporting other features and solving more
>>> complex
>>>>>>>>>> problems
>>>>>>>>>>>> (or
>>>>>>>>>>>>>>>>>>>>>>>>> unsolvable at all).
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 3) I agree with your third statement. Actually
>> in
>>>> our
>>>>>>>>>>>> internal version
>>>>>>>>>>>>>>>>>>>>>>>>> I also tried to unify the logic of scanning and
>>>>>>>> reloading
>>>>>>>>>>>> data from
>>>>>>>>>>>>>>>>>>>>>>>>> connectors. But unfortunately, I didn't find a
>>> way
>>>> to
>>>>>>>>>> unify
>>>>>>>>>>>> the logic
>>>>>>>>>>>>>>>>>>>>>>>>> of all ScanRuntimeProviders (InputFormat,
>>>>>>>> SourceFunction,
>>>>>>>>>>>> Source,...)
>>>>>>>>>>>>>>>>>>>>>>>>> and reuse it in reloading ALL cache. As a
>> result
>>> I
>>>>>>>>>> settled
>>>>>>>>>>>> on using
>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat, because it was used for scanning
>> in
>>>> all
>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>>>>>>> connectors. (I didn't know that there are plans
>>> to
>>>>>>>>>>> deprecate
>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO
>>> usage
>>>> of
>>>>>>>>>>>> FLIP-27 source
>>>>>>>>>>>>>>>>>>>>>>>>> in ALL caching is not good idea, because this
>>>> source
>>>>>> was
>>>>>>>>>>>> designed to
>>>>>>>>>>>>>>>>>>>>>>>>> work in distributed environment
>> (SplitEnumerator
>>> on
>>>>>>>>>>>> JobManager and
>>>>>>>>>>>>>>>>>>>>>>>>> SourceReaders on TaskManagers), not in one
>>> operator
>>>>>>>>>> (lookup
>>>>>>>>>>>> join
>>>>>>>>>>>>>>>>>>>>>>>>> operator in our case). There is even no direct
>>> way
>>>> to
>>>>>>>>>> pass
>>>>>>>>>>>> splits from
>>>>>>>>>>>>>>>>>>>>>>>>> SplitEnumerator to SourceReader (this logic
>> works
>>>>>>>> through
>>>>>>>>>>>>>>>>>>>>>>>>> SplitEnumeratorContext, which requires
>>>>>>>>>>>>>>>>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send
>>>>>>>>>> AddSplitEvents).
>>>>>>>>>>>> Usage of
>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat for ALL cache seems much more
>> clearer
>>>> and
>>>>>>>>>>>> easier. But if
>>>>>>>>>>>>>>>>>>>>>>>>> there are plans to refactor all connectors to
>>>>>> FLIP-27, I
>>>>>>>>>>>> have the
>>>>>>>>>>>>>>>>>>>>>>>>> following ideas: maybe we can refuse from
>> lookup
>>>> join
>>>>>>>> ALL
>>>>>>>>>>>> cache in
>>>>>>>>>>>>>>>>>>>>>>>>> favor of simple join with multiple scanning of
>>>> batch
>>>>>>>>>>> source?
>>>>>>>>>>>> The point
>>>>>>>>>>>>>>>>>>>>>>>>> is that the only difference between lookup join
>>> ALL
>>>>>>>> cache
>>>>>>>>>>>> and simple
>>>>>>>>>>>>>>>>>>>>>>>>> join with batch source is that in the first
>> case
>>>>>>>> scanning
>>>>>>>>>>> is
>>>>>>>>>>>> performed
>>>>>>>>>>>>>>>>>>>>>>>>> multiple times, in between which state (cache)
>> is
>>>>>>>> cleared
>>>>>>>>>>>> (correct me
>>>>>>>>>>>>>>>>>>>>>>>>> if I'm wrong). So what if we extend the
>>>>>> functionality of
>>>>>>>>>>>> simple join
>>>>>>>>>>>>>>>>>>>>>>>>> to support state reloading + extend the
>>>>>> functionality of
>>>>>>>>>>>> scanning
>>>>>>>>>>>>>>>>>>>>>>>>> batch source multiple times (this one should be
>>>> easy
>>>>>>>> with
>>>>>>>>>>>> new FLIP-27
>>>>>>>>>>>>>>>>>>>>>>>>> source, that unifies streaming/batch reading -
>> we
>>>>>> will
>>>>>>>>>> need
>>>>>>>>>>>> to change
>>>>>>>>>>>>>>>>>>>>>>>>> only SplitEnumerator, which will pass splits
>>> again
>>>>>> after
>>>>>>>>>>>> some TTL).
>>>>>>>>>>>>>>>>>>>>>>>>> WDYT? I must say that this looks like a
>> long-term
>>>>>> goal
>>>>>>>>>> and
>>>>>>>>>>>> will make
>>>>>>>>>>>>>>>>>>>>>>>>> the scope of this FLIP even larger than you
>> said.
>>>>>> Maybe
>>>>>>>>>> we
>>>>>>>>>>>> can limit
>>>>>>>>>>>>>>>>>>>>>>>>> ourselves to a simpler solution now
>>> (InputFormats).
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> So to sum up, my points is like this:
>>>>>>>>>>>>>>>>>>>>>>>>> 1) There is a way to make both concise and
>>> flexible
>>>>>>>>>>>> interfaces for
>>>>>>>>>>>>>>>>>>>>>>>>> caching in lookup join.
>>>>>>>>>>>>>>>>>>>>>>>>> 2) Cache filters optimization is important both
>>> in
>>>>>> LRU
>>>>>>>>>> and
>>>>>>>>>>>> ALL caches.
>>>>>>>>>>>>>>>>>>>>>>>>> 3) It is unclear when filter pushdown will be
>>>>>> supported
>>>>>>>>>> in
>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>>>>>>>>>>>> connectors, some of the connectors might not
>> have
>>>> the
>>>>>>>>>>>> opportunity to
>>>>>>>>>>>>>>>>>>>>>>>>> support filter pushdown + as I know, currently
>>>> filter
>>>>>>>>>>>> pushdown works
>>>>>>>>>>>>>>>>>>>>>>>>> only for scanning (not lookup). So cache
>> filters
>>> +
>>>>>>>>>>>> projections
>>>>>>>>>>>>>>>>>>>>>>>>> optimization should be independent from other
>>>>>> features.
>>>>>>>>>>>>>>>>>>>>>>>>> 4) ALL cache realization is a complex topic
>> that
>>>>>>>> involves
>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>>>>>>>>>> aspects of how Flink is developing. Refusing
>> from
>>>>>>>>>>>> InputFormat in favor
>>>>>>>>>>>>>>>>>>>>>>>>> of FLIP-27 Source will make ALL cache
>> realization
>>>>>> really
>>>>>>>>>>>> complex and
>>>>>>>>>>>>>>>>>>>>>>>>> not clear, so maybe instead of that we can
>> extend
>>>> the
>>>>>>>>>>>> functionality of
>>>>>>>>>>>>>>>>>>>>>>>>> simple join or not refuse from InputFormat in
>>> case
>>>> of
>>>>>>>>>>> lookup
>>>>>>>>>>>> join ALL
>>>>>>>>>>>>>>>>>>>>>>>>> cache?
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher)
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu <
>>>> imj...@gmail.com
>>>>>>> :
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> It's great to see the active discussion! I
>> want
>>> to
>>>>>>>> share
>>>>>>>>>>> my
>>>>>>>>>>>> ideas:
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 1) implement the cache in framework vs.
>>> connectors
>>>>>> base
>>>>>>>>>>>>>>>>>>>>>>>>>> I don't have a strong opinion on this. Both
>> ways
>>>>>> should
>>>>>>>>>>>> work (e.g.,
>>>>>>>>>>>>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>>>>>>>>> pruning, compatibility).
>>>>>>>>>>>>>>>>>>>>>>>>>> The framework way can provide more concise
>>>>>> interfaces.
>>>>>>>>>>>>>>>>>>>>>>>>>> The connector base way can define more
>> flexible
>>>>>> cache
>>>>>>>>>>>>>>>>>>>>>>>>>> strategies/implementations.
>>>>>>>>>>>>>>>>>>>>>>>>>> We are still investigating a way to see if we
>>> can
>>>>>> have
>>>>>>>>>>> both
>>>>>>>>>>>>>>>>>>>>>>> advantages.
>>>>>>>>>>>>>>>>>>>>>>>>>> We should reach a consensus that the way
>> should
>>>> be a
>>>>>>>>>> final
>>>>>>>>>>>> state,
>>>>>>>>>>>>>>>>>>>>>>> and we
>>>>>>>>>>>>>>>>>>>>>>>>>> are on the path to it.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 2) filters and projections pushdown:
>>>>>>>>>>>>>>>>>>>>>>>>>> I agree with Alex that the filter pushdown
>> into
>>>>>> cache
>>>>>>>>>> can
>>>>>>>>>>>> benefit a
>>>>>>>>>>>>>>>>>>>>>>> lot
>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>> ALL cache.
>>>>>>>>>>>>>>>>>>>>>>>>>> However, this is not true for LRU cache.
>>>> Connectors
>>>>>> use
>>>>>>>>>>>> cache to
>>>>>>>>>>>>>>>>>>>>>>> reduce
>>>>>>>>>>>>>>>>>>>>>>>>> IO
>>>>>>>>>>>>>>>>>>>>>>>>>> requests to databases for better throughput.
>>>>>>>>>>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the
>> cache,
>>> we
>>>>>> will
>>>>>>>>>>>> have 90% of
>>>>>>>>>>>>>>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>>>>>>>> requests that can never be cached
>>>>>>>>>>>>>>>>>>>>>>>>>> and hit directly to the databases. That means
>>> the
>>>>>> cache
>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>> meaningless in
>>>>>>>>>>>>>>>>>>>>>>>>>> this case.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> IMO, Flink SQL has provided a standard way to
>> do
>>>>>>>> filters
>>>>>>>>>>>> and projects
>>>>>>>>>>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
>>>>>>>>>>>>>>>>>>>>>>> SupportsProjectionPushDown.
>>>>>>>>>>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the
>>>> interfaces,
>>>>>>>>>> don't
>>>>>>>>>>>> mean it's
>>>>>>>>>>>>>>>>>>>>>>> hard
>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>> implement.
>>>>>>>>>>>>>>>>>>>>>>>>>> They should implement the pushdown interfaces
>> to
>>>>>> reduce
>>>>>>>>>> IO
>>>>>>>>>>>> and the
>>>>>>>>>>>>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>>>>>>>>> size.
>>>>>>>>>>>>>>>>>>>>>>>>>> That should be a final state that the scan
>>> source
>>>>>> and
>>>>>>>>>>>> lookup source
>>>>>>>>>>>>>>>>>>>>>>> share
>>>>>>>>>>>>>>>>>>>>>>>>>> the exact pushdown implementation.
>>>>>>>>>>>>>>>>>>>>>>>>>> I don't see why we need to duplicate the
>>> pushdown
>>>>>> logic
>>>>>>>>>> in
>>>>>>>>>>>> caches,
>>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>>> will complex the lookup join design.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 3) ALL cache abstraction
>>>>>>>>>>>>>>>>>>>>>>>>>> All cache might be the most challenging part
>> of
>>>> this
>>>>>>>>>> FLIP.
>>>>>>>>>>>> We have
>>>>>>>>>>>>>>>>>>>>>>> never
>>>>>>>>>>>>>>>>>>>>>>>>>> provided a reload-lookup public interface.
>>>>>>>>>>>>>>>>>>>>>>>>>> Currently, we put the reload logic in the
>> "eval"
>>>>>> method
>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>> TableFunction.
>>>>>>>>>>>>>>>>>>>>>>>>>> That's hard for some sources (e.g., Hive).
>>>>>>>>>>>>>>>>>>>>>>>>>> Ideally, connector implementation should share
>>> the
>>>>>>>> logic
>>>>>>>>>>> of
>>>>>>>>>>>> reload
>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>> scan, i.e. ScanTableSource with
>>>>>>>>>>>> InputFormat/SourceFunction/FLIP-27
>>>>>>>>>>>>>>>>>>>>>>>>> Source.
>>>>>>>>>>>>>>>>>>>>>>>>>> However, InputFormat/SourceFunction are
>>>> deprecated,
>>>>>> and
>>>>>>>>>>> the
>>>>>>>>>>>> FLIP-27
>>>>>>>>>>>>>>>>>>>>>>>>> source
>>>>>>>>>>>>>>>>>>>>>>>>>> is deeply coupled with SourceOperator.
>>>>>>>>>>>>>>>>>>>>>>>>>> If we want to invoke the FLIP-27 source in
>>>>>> LookupJoin,
>>>>>>>>>>> this
>>>>>>>>>>>> may make
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> scope of this FLIP much larger.
>>>>>>>>>>>>>>>>>>>>>>>>>> We are still investigating how to abstract the
>>> ALL
>>>>>>>> cache
>>>>>>>>>>>> logic and
>>>>>>>>>>>>>>>>>>>>>>> reuse
>>>>>>>>>>>>>>>>>>>>>>>>>> the existing source interfaces.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko <
>>>>>>>>>>>> ro.v.bo...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> It's a much more complicated activity and
>> lies
>>>> out
>>>>>> of
>>>>>>>>>> the
>>>>>>>>>>>> scope of
>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>> improvement. Because such pushdowns should be
>>>> done
>>>>>> for
>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>>>>> ScanTableSource
>>>>>>>>>>>>>>>>>>>>>>>>>>> implementations (not only for Lookup ones).
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser <
>>>>>>>>>>>>>>>>>>>>>>> martijnvis...@apache.org>
>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> One question regarding "And Alexander
>>> correctly
>>>>>>>>>>> mentioned
>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>> filter
>>>>>>>>>>>>>>>>>>>>>>>>>>>> pushdown still is not implemented for
>>>>>>>>>> jdbc/hive/hbase."
>>>>>>>>>>>> -> Would
>>>>>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>>>>>>> alternative solution be to actually
>> implement
>>>>>> these
>>>>>>>>>>> filter
>>>>>>>>>>>>>>>>>>>>>>> pushdowns?
>>>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>>>> imagine that there are many more benefits to
>>>> doing
>>>>>>>>>> that,
>>>>>>>>>>>> outside
>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>>>>>>>>>> caching and metrics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Martijn Visser
>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://twitter.com/MartijnVisser82
>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/MartijnVisser
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko <
>>>>>>>>>>>> ro.v.bo...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving such a valuable
>>> improvement!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I do think that single cache implementation
>>>>>> would be
>>>>>>>>>> a
>>>>>>>>>>>> nice
>>>>>>>>>>>>>>>>>>>>>>>>> opportunity
>>>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users. And it will break the "FOR
>> SYSTEM_TIME
>>>> AS
>>>>>> OF
>>>>>>>>>>>> proc_time"
>>>>>>>>>>>>>>>>>>>>>>>>> semantics
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> anyway - doesn't matter how it will be
>>>>>> implemented.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Putting myself in the user's shoes, I can
>> say
>>>>>> that:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) I would prefer to have the opportunity
>> to
>>>> cut
>>>>>> off
>>>>>>>>>>> the
>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>>>>>> size
>>>>>>>>>>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> simply filtering unnecessary data. And the
>>> most
>>>>>>>> handy
>>>>>>>>>>>> way to do
>>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it inside LookupRunners. It would be a bit
>>>>>> harder to
>>>>>>>>>>>> pass it
>>>>>>>>>>>>>>>>>>>>>>>>> through the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoin node to TableFunction. And
>>> Alexander
>>>>>>>>>>> correctly
>>>>>>>>>>>>>>>>>>>>>>> mentioned
>>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filter pushdown still is not implemented
>> for
>>>>>>>>>>>> jdbc/hive/hbase.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) The ability to set the different caching
>>>>>>>>>> parameters
>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>>>>>>>> tables
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is quite important. So I would prefer to
>> set
>>> it
>>>>>>>>>> through
>>>>>>>>>>>> DDL
>>>>>>>>>>>>>>>>>>>>>>> rather
>>>>>>>>>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have similar ttla, strategy and other
>> options
>>>> for
>>>>>>>> all
>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>>>>> tables.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Providing the cache into the framework
>>>> really
>>>>>>>>>>>> deprives us of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> extensibility (users won't be able to
>>> implement
>>>>>>>> their
>>>>>>>>>>> own
>>>>>>>>>>>>>>>>>>>>>>> cache).
>>>>>>>>>>>>>>>>>>>>>>>>> But
>>>>>>>>>>>>>>>>>>>>>>>>>>>> most
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> probably it might be solved by creating
>> more
>>>>>>>>>> different
>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>>>>>>>> strategies
>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a wider set of configurations.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> All these points are much closer to the
>>> schema
>>>>>>>>>> proposed
>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>>>>>>> Alexander.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm not
>>>> right
>>>>>> and
>>>>>>>>>>> all
>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>>>>>>>>>>> facilities
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might be simply implemented in your
>>>> architecture?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Roman Boyko
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn
>> Visser <
>>>>>>>>>>>>>>>>>>>>>>>>> martijnvis...@apache.org>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't have much to chip in, but just
>>> wanted
>>>> to
>>>>>>>>>>>> express that
>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> appreciate the in-depth discussion on this
>>>> topic
>>>>>>>>>> and I
>>>>>>>>>>>> hope
>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>> others
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will join the conversation.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Martijn
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр
>>>> Смирнов <
>>>>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your detailed feedback!
>>> However, I
>>>>>> have
>>>>>>>>>>>> questions
>>>>>>>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> some of your statements (maybe I didn't
>> get
>>>>>>>>>>>> something?).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Caching actually breaks the semantic of
>>> "FOR
>>>>>>>>>>>> SYSTEM_TIME
>>>>>>>>>>>>>>>>>>>>>>> AS OF
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proc_time”
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree that the semantics of "FOR
>>>> SYSTEM_TIME
>>>>>> AS
>>>>>>>>>> OF
>>>>>>>>>>>>>>>>>>>>>>> proc_time"
>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fully implemented with caching, but as
>> you
>>>>>> said,
>>>>>>>>>>> users
>>>>>>>>>>>> go
>>>>>>>>>>>>>>>>>>>>>>> on it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consciously to achieve better performance
>>> (no
>>>>>> one
>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>> enable
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> caching by default, etc.). Or by users do
>>> you
>>>>>> mean
>>>>>>>>>>>> other
>>>>>>>>>>>>>>>>>>>>>>>>> developers
>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connectors? In this case developers
>>>> explicitly
>>>>>>>>>>> specify
>>>>>>>>>>>>>>>>>>>>>>> whether
>>>>>>>>>>>>>>>>>>>>>>>>> their
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector supports caching or not (in the
>>>> list
>>>>>> of
>>>>>>>>>>>> supported
>>>>>>>>>>>>>>>>>>>>>>>>>>>> options),
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> no one makes them do that if they don't
>>> want
>>>>>> to.
>>>>>>>> So
>>>>>>>>>>>> what
>>>>>>>>>>>>>>>>>>>>>>>>> exactly is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the difference between implementing
>> caching
>>>> in
>>>>>>>>>>> modules
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime and in
>>> flink-table-common
>>>>>> from
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> considered
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> point of view? How does it affect on
>>>>>>>>>>>> breaking/non-breaking
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF
>>>> proc_time"?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> confront a situation that allows table
>>>>>> options in
>>>>>>>>>>> DDL
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>> control
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> behavior of the framework, which has
>> never
>>>>>>>> happened
>>>>>>>>>>>>>>>>>>>>>>> previously
>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be cautious
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we talk about main differences of
>>>> semantics
>>>>>> of
>>>>>>>>>> DDL
>>>>>>>>>>>>>>>>>>>>>>> options
>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't
>> it
>>>>>> about
>>>>>>>>>>>> limiting
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> scope
>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the options + importance for the user
>>>> business
>>>>>>>>>> logic
>>>>>>>>>>>> rather
>>>>>>>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specific location of corresponding logic
>> in
>>>> the
>>>>>>>>>>>> framework? I
>>>>>>>>>>>>>>>>>>>>>>>>> mean
>>>>>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in my design, for example, putting an
>>> option
>>>>>> with
>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> strategy in configurations would  be the
>>>> wrong
>>>>>>>>>>>> decision,
>>>>>>>>>>>>>>>>>>>>>>>>> because it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> directly affects the user's business
>> logic
>>>> (not
>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>>>> performance
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimization) + touches just several
>>>> functions
>>>>>> of
>>>>>>>>>> ONE
>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>> (there
>>>>>>>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be multiple tables with different
>> caches).
>>>>>> Does it
>>>>>>>>>>>> really
>>>>>>>>>>>>>>>>>>>>>>>>> matter for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the user (or someone else) where the
>> logic
>>> is
>>>>>>>>>>> located,
>>>>>>>>>>>>>>>>>>>>>>> which is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> affected by the applied option?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also I can remember DDL option
>>>>>> 'sink.parallelism',
>>>>>>>>>>>> which in
>>>>>>>>>>>>>>>>>>>>>>>>> some way
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "controls the behavior of the framework"
>>> and
>>>> I
>>>>>>>>>> don't
>>>>>>>>>>>> see any
>>>>>>>>>>>>>>>>>>>>>>>>> problem
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> here.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introduce a new interface for this
>>>> all-caching
>>>>>>>>>>>> scenario
>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> design
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would become more complex
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is a subject for a separate
>>> discussion,
>>>>>> but
>>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>>>>>>> in our
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal version we solved this problem
>>> quite
>>>>>>>>>> easily
>>>>>>>>>>> -
>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>> reused
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat class (so there is no need
>> for
>>> a
>>>>>> new
>>>>>>>>>>> API).
>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>>>>> point is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that currently all lookup connectors use
>>>>>>>>>> InputFormat
>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>> scanning
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and even
>>> Hive
>>>>>> - it
>>>>>>>>>>> uses
>>>>>>>>>>>>>>>>>>>>>>> class
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PartitionReader, that is actually just a
>>>>>> wrapper
>>>>>>>>>>> around
>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The advantage of this solution is the
>>> ability
>>>>>> to
>>>>>>>>>>> reload
>>>>>>>>>>>>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parallel (number of threads depends on
>>> number
>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>> InputSplits,
>>>>>>>>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> an upper limit). As a result cache reload
>>>> time
>>>>>>>>>>>> significantly
>>>>>>>>>>>>>>>>>>>>>>>>> reduces
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (as well as time of input stream
>>> blocking). I
>>>>>> know
>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>> usually
>>>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>>>> try
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink
>>> code,
>>>>>> but
>>>>>>>>>>> maybe
>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's
>> an
>>>>>> ideal
>>>>>>>>>>>> solution,
>>>>>>>>>>>>>>>>>>>>>>> maybe
>>>>>>>>>>>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are better ones.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Providing the cache in the framework
>> might
>>>>>>>>>> introduce
>>>>>>>>>>>>>>>>>>>>>>>>> compatibility
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> issues
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It's possible only in cases when the
>>>> developer
>>>>>> of
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> connector
>>>>>>>>>>>>>>>>>>>>>>>>>>>> won't
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> properly refactor his code and will use
>> new
>>>>>> cache
>>>>>>>>>>>> options
>>>>>>>>>>>>>>>>>>>>>>>>>>>> incorrectly
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (i.e. explicitly provide the same options
>>>> into
>>>>>> 2
>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> places). For correct behavior all he will
>>>> need
>>>>>> to
>>>>>>>>>> do
>>>>>>>>>>>> is to
>>>>>>>>>>>>>>>>>>>>>>>>> redirect
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> existing options to the framework's
>>>>>> LookupConfig
>>>>>>>> (+
>>>>>>>>>>>> maybe
>>>>>>>>>>>>>>>>>>>>>>> add an
>>>>>>>>>>>>>>>>>>>>>>>>>>>> alias
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for options, if there was different
>>> naming),
>>>>>>>>>>> everything
>>>>>>>>>>>>>>>>>>>>>>> will be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transparent for users. If the developer
>>> won't
>>>>>> do
>>>>>>>>>>>>>>>>>>>>>>> refactoring at
>>>>>>>>>>>>>>>>>>>>>>>>> all,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> nothing will be changed for the connector
>>>>>> because
>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>> backward
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility. Also if a developer wants
>> to
>>>> use
>>>>>>>> his
>>>>>>>>>>> own
>>>>>>>>>>>>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>>>>>>>> logic,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> he just can refuse to pass some of the
>>>> configs
>>>>>>>> into
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> framework,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> instead make his own implementation with
>>>>>> already
>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>>>>>>>>> configs
>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics (but actually I think that it's a
>>>> rare
>>>>>>>>>> case).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections should be pushed
>>> all
>>>>>> the
>>>>>>>>>> way
>>>>>>>>>>>> down
>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> function, like what we do in the scan
>>> source
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It's the great purpose. But the truth is
>>> that
>>>>>> the
>>>>>>>>>>> ONLY
>>>>>>>>>>>>>>>>>>>>>>> connector
>>>>>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> supports filter pushdown is
>>>>>> FileSystemTableSource
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (no database connector supports it
>>>> currently).
>>>>>>>> Also
>>>>>>>>>>>> for some
>>>>>>>>>>>>>>>>>>>>>>>>>>>> databases
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's simply impossible to pushdown such
>>>> complex
>>>>>>>>>>> filters
>>>>>>>>>>>>>>>>>>>>>>> that we
>>>>>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in Flink.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> only applying these optimizations to the
>>>> cache
>>>>>>>>>> seems
>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>> quite
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> useful
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily large
>>>>>> amount of
>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dimension table. For a simple example,
>>>> suppose
>>>>>> in
>>>>>>>>>>>> dimension
>>>>>>>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'users'
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we have column 'age' with values from 20
>> to
>>>> 40,
>>>>>>>> and
>>>>>>>>>>>> input
>>>>>>>>>>>>>>>>>>>>>>> stream
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed
>> by
>>>> age
>>>>>> of
>>>>>>>>>>>> users. If
>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filter 'age > 30',
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there will be twice less data in cache.
>>> This
>>>>>> means
>>>>>>>>>>> the
>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by
>> almost
>>> 2
>>>>>>>> times.
>>>>>>>>>>> It
>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>>> gain a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> huge
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> performance boost. Moreover, this
>>>> optimization
>>>>>>>>>> starts
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>>>>>>>>>>>>>>> shine
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without
>>> filters
>>>>>> and
>>>>>>>>>>>> projections
>>>>>>>>>>>>>>>>>>>>>>>>> can't
>>>>>>>>>>>>>>>>>>>>>>>>>>>> fit
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in memory, but with them - can. This
>> opens
>>> up
>>>>>>>>>>>> additional
>>>>>>>>>>>>>>>>>>>>>>>>>>>> possibilities
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for users. And this doesn't sound as 'not
>>>> quite
>>>>>>>>>>>> useful'.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It would be great to hear other voices
>>>>>> regarding
>>>>>>>>>> this
>>>>>>>>>>>> topic!
>>>>>>>>>>>>>>>>>>>>>>>>> Because
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we have quite a lot of controversial
>>> points,
>>>>>> and I
>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> help
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of others it will be easier for us to
>> come
>>>> to a
>>>>>>>>>>>> consensus.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng
>> Ren
>>> <
>>>>>>>>>>>>>>>>>>>>>>> renqs...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>> :
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Alexander and Arvid,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry for
>> my
>>>>>> late
>>>>>>>>>>>> response!
>>>>>>>>>>>>>>>>>>>>>>> We
>>>>>>>>>>>>>>>>>>>>>>>>> had
>>>>>>>>>>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal discussion together with Jark
>> and
>>>>>> Leonard
>>>>>>>>>>> and
>>>>>>>>>>>> I’d
>>>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> summarize our ideas. Instead of
>>> implementing
>>>>>> the
>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>>>>>> logic in
>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runtime layer or wrapping around the
>>>>>> user-provided
>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>> function,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> prefer to introduce some new APIs
>> extending
>>>>>>>>>>>> TableFunction
>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> concerns:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. Caching actually breaks the semantic
>> of
>>>>>> "FOR
>>>>>>>>>>>>>>>>>>>>>>> SYSTEM_TIME
>>>>>>>>>>>>>>>>>>>>>>>>> AS OF
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly
>>> reflect
>>>>>> the
>>>>>>>>>>>> content
>>>>>>>>>>>>>>>>>>>>>>> of the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table at the moment of querying. If users
>>>>>> choose
>>>>>>>> to
>>>>>>>>>>>> enable
>>>>>>>>>>>>>>>>>>>>>>>>> caching
>>>>>>>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lookup table, they implicitly indicate
>> that
>>>>>> this
>>>>>>>>>>>> breakage is
>>>>>>>>>>>>>>>>>>>>>>>>>>>> acceptable
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> exchange for the performance. So we
>> prefer
>>>> not
>>>>>> to
>>>>>>>>>>>> provide
>>>>>>>>>>>>>>>>>>>>>>>>> caching on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table runtime level.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. If we make the cache implementation
>> in
>>>> the
>>>>>>>>>>>> framework
>>>>>>>>>>>>>>>>>>>>>>>>> (whether
>>>>>>>>>>>>>>>>>>>>>>>>>>>> in a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runner or a wrapper around
>> TableFunction),
>>> we
>>>>>> have
>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> confront a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> situation
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that allows table options in DDL to
>> control
>>>> the
>>>>>>>>>>>> behavior of
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> framework,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which has never happened previously and
>>>> should
>>>>>> be
>>>>>>>>>>>> cautious.
>>>>>>>>>>>>>>>>>>>>>>>>> Under
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> current design the behavior of the
>>> framework
>>>>>>>> should
>>>>>>>>>>>> only be
>>>>>>>>>>>>>>>>>>>>>>>>>>>> specified
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and
>> it’s
>>>>>> hard
>>>>>>>> to
>>>>>>>>>>>> apply
>>>>>>>>>>>>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>>>>>>>>>>> general
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configs to a specific table.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. We have use cases that lookup source
>>>> loads
>>>>>> and
>>>>>>>>>>>> refresh
>>>>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> periodically into the memory to achieve
>>> high
>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>>>>> performance
>>>>>>>>>>>>>>>>>>>>>>>>>>>> (like
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hive
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector in the community, and also
>> widely
>>>>>> used
>>>>>>>> by
>>>>>>>>>>> our
>>>>>>>>>>>>>>>>>>>>>>> internal
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connectors). Wrapping the cache around
>> the
>>>>>> user’s
>>>>>>>>>>>>>>>>>>>>>>> TableFunction
>>>>>>>>>>>>>>>>>>>>>>>>>>>> works
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fine
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for LRU caches, but I think we have to
>>>>>> introduce a
>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>>>>> interface for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all-caching scenario and the design would
>>>>>> become
>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>>> complex.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. Providing the cache in the framework
>>>> might
>>>>>>>>>>>> introduce
>>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> issues to existing lookup sources like
>>> there
>>>>>> might
>>>>>>>>>>>> exist two
>>>>>>>>>>>>>>>>>>>>>>>>> caches
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> totally different strategies if the user
>>>>>>>>>> incorrectly
>>>>>>>>>>>>>>>>>>>>>>> configures
>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (one in the framework and another
>>> implemented
>>>>>> by
>>>>>>>>>> the
>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>>>>>>> source).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the optimization mentioned by
>>>>>> Alexander, I
>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>>>> filters
>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> projections should be pushed all the way
>>> down
>>>>>> to
>>>>>>>>>> the
>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>> function,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> what we do in the scan source, instead of
>>> the
>>>>>>>>>> runner
>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> cache.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> goal of using cache is to reduce the
>>> network
>>>>>> I/O
>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>> pressure
>>>>>>>>>>>>>>>>>>>>>>>>> on the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> external system, and only applying these
>>>>>>>>>>> optimizations
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not quite useful.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to
>>>> reflect
>>>>>> our
>>>>>>>>>>>> ideas.
>>>>>>>>>>>>>>>>>>>>>>> We
>>>>>>>>>>>>>>>>>>>>>>>>>>>> prefer to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> keep the cache implementation as a part
>> of
>>>>>>>>>>>> TableFunction,
>>>>>>>>>>>>>>>>>>>>>>> and we
>>>>>>>>>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> provide some helper classes
>>>>>> (CachingTableFunction,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AllCachingTableFunction,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to developers
>>> and
>>>>>>>>>> regulate
>>>>>>>>>>>>>>>>>>>>>>> metrics
>>>>>>>>>>>>>>>>>>>>>>>>> of the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cache.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also, I made a POC[2] for your reference.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your ideas!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>> https://github.com/PatrickRen/flink/tree/FLIP-221
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM
>> Александр
>>>>>> Смирнов
>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have few comments on your message.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> but could also live with an easier
>>>> solution
>>>>>> as
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>>>>>>>>>>> step:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think that these 2 ways are mutually
>>>>>> exclusive
>>>>>>>>>>>>>>>>>>>>>>> (originally
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because
>>>> conceptually
>>>>>>>> they
>>>>>>>>>>>> follow
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> goal, but implementation details are
>>>>>> different.
>>>>>>>>>> If
>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>>> go one
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> way,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> moving to another way in the future
>> will
>>>> mean
>>>>>>>>>>>> deleting
>>>>>>>>>>>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and once again changing the API for
>>>>>> connectors.
>>>>>>>>>> So
>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>> think we
>>>>>>>>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reach a consensus with the community
>>> about
>>>>>> that
>>>>>>>>>> and
>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>>> work
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> together
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on
>>> tasks
>>>>>> for
>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>>>>> parts
>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flip (for example, LRU cache
>> unification
>>> /
>>>>>>>>>>>> introducing
>>>>>>>>>>>>>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>>>>>>>>>>>>> set
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT,
>>> Qingsheng?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as the source will only receive the
>>>> requests
>>>>>>>>>> after
>>>>>>>>>>>>>>>>>>>>>>> filter
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually if filters are applied to
>> fields
>>>> of
>>>>>> the
>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>>>>>>> table, we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> firstly must do requests, and only
>> after
>>>>>> that we
>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>> filter
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> responses,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> because lookup connectors don't have
>>> filter
>>>>>>>>>>>> pushdown. So
>>>>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filtering
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is done before caching, there will be
>>> much
>>>>>> less
>>>>>>>>>>> rows
>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>> cache.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your
>>>> architecture
>>>>>> is
>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>> shared.
>>>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> know the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such
>>> kinds
>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>> conversations
>>>>>>>>>>>>>>>>>>>>>>>>> :)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have no write access to the
>> confluence,
>>>> so
>>>>>> I
>>>>>>>>>>> made a
>>>>>>>>>>>>>>>>>>>>>>> Jira
>>>>>>>>>>>>>>>>>>>>>>>>> issue,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> where described the proposed changes in
>>>> more
>>>>>>>>>>> details
>>>>>>>>>>>> -
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-27411.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Will happy to get more feedback!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid
>> Heise
>>> <
>>>>>>>>>>>>>>>>>>>>>>> ar...@apache.org>:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving this; the
>>> inconsistency
>>>>>> was
>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>> satisfying
>>>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> me.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but
>>> could
>>>>>> also
>>>>>>>>>>> live
>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> easier
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution as the first step: Instead of
>>>>>> making
>>>>>>>>>>>> caching
>>>>>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather
>>> devise a
>>>>>>>>>> caching
>>>>>>>>>>>>>>>>>>>>>>> layer
>>>>>>>>>>>>>>>>>>>>>>>>>>>> around X.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal would be a
>> CachingTableFunction
>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>> delegates to
>>>>>>>>>>>>>>>>>>>>>>>>> X in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> case
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> misses and else manages the cache.
>>> Lifting
>>>>>> it
>>>>>>>>>> into
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> model
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposed would be even better but is
>>>>>> probably
>>>>>>>>>>>>>>>>>>>>>>> unnecessary
>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> first step
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for a lookup source (as the source
>> will
>>>> only
>>>>>>>>>>> receive
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> requests
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> after
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filter; applying projection may be
>> more
>>>>>>>>>>> interesting
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> save
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> memory).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Another advantage is that all the
>>> changes
>>>> of
>>>>>>>>>> this
>>>>>>>>>>>> FLIP
>>>>>>>>>>>>>>>>>>>>>>>>> would be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> limited to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> options, no need for new public
>>>> interfaces.
>>>>>>>>>>>> Everything
>>>>>>>>>>>>>>>>>>>>>>> else
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> remains
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation of Table runtime. That
>>>> means
>>>>>> we
>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>> easily
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> incorporate
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimization potential that Alexander
>>>>>> pointed
>>>>>>>>>> out
>>>>>>>>>>>>>>>>>>>>>>> later.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your
>>>> architecture
>>>>>> is
>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>> shared.
>>>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> know the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM
>>> Александр
>>>>>>>>>> Смирнов
>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander,
>> I'm
>>>>>> not a
>>>>>>>>>>>>>>>>>>>>>>> committer
>>>>>>>>>>>>>>>>>>>>>>>>> yet,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'd
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> really like to become one. And this
>>> FLIP
>>>>>>>> really
>>>>>>>>>>>>>>>>>>>>>>>>> interested
>>>>>>>>>>>>>>>>>>>>>>>>>>>> me.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar
>>>>>> feature in
>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>>>>> company’s
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fork, and we would like to share our
>>>>>> thoughts
>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>> this and
>>>>>>>>>>>>>>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> open source.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think there is a better alternative
>>>> than
>>>>>>>>>>>>>>>>>>>>>>> introducing an
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> abstract
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class for TableFunction
>>>>>>>> (CachingTableFunction).
>>>>>>>>>>> As
>>>>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>>> know,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> TableFunction exists in the
>>>>>> flink-table-common
>>>>>>>>>>>>>>>>>>>>>>> module,
>>>>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> provides
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> only an API for working with tables –
>>>> it’s
>>>>>>>> very
>>>>>>>>>>>>>>>>>>>>>>>>> convenient
>>>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> importing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in connectors. In turn,
>>>>>> CachingTableFunction
>>>>>>>>>>>> contains
>>>>>>>>>>>>>>>>>>>>>>>>> logic
>>>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runtime execution,  so this class and
>>>>>>>>>> everything
>>>>>>>>>>>>>>>>>>>>>>>>> connected
>>>>>>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should be located in another module,
>>>>>> probably
>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> But this will require connectors to
>>>> depend
>>>>>> on
>>>>>>>>>>>> another
>>>>>>>>>>>>>>>>>>>>>>>>> module,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic,
>> which
>>>>>> doesn’t
>>>>>>>>>>>> sound
>>>>>>>>>>>>>>>>>>>>>>>>> good.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding a new method
>>>>>>>> ‘getLookupConfig’
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupTableSource
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow
>>>>>> connectors
>>>>>>>> to
>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>>>>>> pass
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations to the planner,
>>> therefore
>>>>>> they
>>>>>>>>>>> won’t
>>>>>>>>>>>>>>>>>>>>>>>>> depend on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runtime
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> realization. Based on these configs
>>>> planner
>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>>> construct a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> join operator with corresponding
>>> runtime
>>>>>> logic
>>>>>>>>>>>>>>>>>>>>>>>>>>>> (ProcessFunctions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> module flink-table-runtime).
>>> Architecture
>>>>>>>> looks
>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pinned
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is
>>>> actually
>>>>>>>>>> yours
>>>>>>>>>>>>>>>>>>>>>>>>>>>> CacheConfig).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner, that
>>> will
>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>> responsible
>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> –
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his
>>>>>> inheritors.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in
>>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime
>>>>>>>>>>>>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunner,
>>> AsyncLookupJoinRunner,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunnerWithCalc,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding classes
>>>>>>>>>> LookupJoinCachingRunner,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> And here comes another more powerful
>>>>>> advantage
>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>> such a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower
>> level,
>>>> we
>>>>>> can
>>>>>>>>>>>> apply
>>>>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimizations to it.
>>>>>> LookupJoinRunnerWithCalc
>>>>>>>>>> was
>>>>>>>>>>>>>>>>>>>>>>> named
>>>>>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’ function,
>>>> which
>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>>>>>>>>> mostly
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consists
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example, in join table A with
>>> lookup
>>>>>> table
>>>>>>>>>> B
>>>>>>>>>>>>>>>>>>>>>>>>> condition
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ‘JOIN …
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ON
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10
>>> WHERE
>>>>>>>>>>> B.salary >
>>>>>>>>>>>>>>>>>>>>>>> 1000’
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ‘calc’
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> function will contain filters A.age =
>>>>>> B.age +
>>>>>>>>>> 10
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>> B.salary >
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1000.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we apply this function before
>>> storing
>>>>>>>>>> records
>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>> cache,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> size
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cache will be significantly reduced:
>>>>>> filters =
>>>>>>>>>>>> avoid
>>>>>>>>>>>>>>>>>>>>>>>>> storing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> useless
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> records in cache, projections =
>> reduce
>>>>>>>> records’
>>>>>>>>>>>>>>>>>>>>>>> size. So
>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> initial
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> max number of records in cache can be
>>>>>>>> increased
>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> user.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What do you think about it?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren
>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi devs,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a
>>>>>> discussion
>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP-221[1],
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup
>>> table
>>>>>>>> cache
>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>>>>>>>>>>> standard
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source
>>>> should
>>>>>>>>>>>> implement
>>>>>>>>>>>>>>>>>>>>>>>>> their
>>>>>>>>>>>>>>>>>>>>>>>>>>>> own
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cache to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> store lookup results, and there
>> isn’t a
>>>>>>>>>> standard
>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>> metrics
>>>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs with
>>>> lookup
>>>>>>>>>>> joins,
>>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>> is a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> quite
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> common
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs
>>>>>> including
>>>>>>>>>>>> cache,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrapper
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new
>> table
>>>>>>>> options.
>>>>>>>>>>>>>>>>>>>>>>> Please
>>>>>>>>>>>>>>>>>>>>>>>>> take a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> look
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details.
>> Any
>>>>>>>>>>> suggestions
>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>> comments
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> appreciated!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng Ren
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Real-time Computing Team
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Alibaba Cloud
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>> Roman Boyko
>>>>>>>>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Qingsheng Ren
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Real-time Computing Team
>>>>>>>>>>>>>>>>>>>> Alibaba Cloud
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Reply via email to