Hi Becket, 

Thanks for your comments! 

1. We have removed the LookupCacheFactory in the latest design and we added 
open/close method to the LookupCache for initialization.

2. Custom reload strategy is a great idea! We added a new interface 
FullCachingReloadTrigger for developers to implement their own reload 
strategies.

3. Fixed in the latest version.

4. cacheMissingKey should only be meaningful if the cache is supplied, so we 
made it as an Optional<Boolean> to align with Optional<LookupCache>. To make it 
easier to understand we improved the builder of LookupFunctionProvider (now 
renamed as PartialCachingLookupProvider) that Builder#withCache requires both 
cache and cacheMissingKey as its arguments.

Best regards, 

Qingsheng

> On May 26, 2022, at 11:52, Becket Qin <becket....@gmail.com> wrote:
> 
> Hi Qingsheng,
> 
> Thanks for updating the FLIP. A few comments / questions below:
> 
> 1. Is there a reason that we have both "XXXFactory" and "XXXProvider". What
> is the difference between them? If they are the same, can we just use
> XXXFactory everywhere?
> 
> 2. Regarding the FullCachingLookupProvider, should the reloading policy
> also be pluggable? Periodical reloading could be sometimes be tricky in
> practice. For example, if user uses 24 hours as the cache refresh interval
> and some nightly batch job delayed, the cache update may still see the
> stale data.
> 
> 3. In DefaultLookupCacheFactory, it looks like InitialCapacity should be
> removed.
> 
> 4. The purpose of LookupFunctionProvider#cacheMissingKey() seems a little
> confusing to me. If Optional<LookupCacheFactory> getCacheFactory() returns
> a non-empty factory, doesn't that already indicates the framework to cache
> the missing keys? Also, why is this method returning an Optional<Boolean>
> instead of boolean?
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> 
> 
> On Wed, May 25, 2022 at 5:07 PM Qingsheng Ren <renqs...@gmail.com> wrote:
> 
>> Hi Lincoln and Jark,
>> 
>> Thanks for the comments! If the community reaches a consensus that we use
>> SQL hint instead of table options to decide whether to use sync or async
>> mode, it’s indeed not necessary to introduce the “lookup.async” option.
>> 
>> I think it’s a good idea to let the decision of async made on query level,
>> which could make better optimization with more infomation gathered by
>> planner. Is there any FLIP describing the issue in FLINK-27625? I thought
>> FLIP-234 is only proposing adding SQL hint for retry on missing instead of
>> the entire async mode to be controlled by hint.
>> 
>> Best regards,
>> 
>> Qingsheng
>> 
>>> On May 25, 2022, at 15:13, Lincoln Lee <lincoln.8...@gmail.com> wrote:
>>> 
>>> Hi Jark,
>>> 
>>> Thanks for your reply!
>>> 
>>> Currently 'lookup.async' just lies in HBase connector, I have no idea
>>> whether or when to remove it (we can discuss it in another issue for the
>>> HBase connector after FLINK-27625 is done), just not add it into a common
>>> option now.
>>> 
>>> Best,
>>> Lincoln Lee
>>> 
>>> 
>>> Jark Wu <imj...@gmail.com> 于2022年5月24日周二 20:14写道:
>>> 
>>>> Hi Lincoln,
>>>> 
>>>> I have taken a look at FLIP-234, and I agree with you that the
>> connectors
>>>> can
>>>> provide both async and sync runtime providers simultaneously instead of
>> one
>>>> of them.
>>>> At that point, "lookup.async" looks redundant. If this option is
>> planned to
>>>> be removed
>>>> in the long term, I think it makes sense not to introduce it in this
>> FLIP.
>>>> 
>>>> Best,
>>>> Jark
>>>> 
>>>> On Tue, 24 May 2022 at 11:08, Lincoln Lee <lincoln.8...@gmail.com>
>> wrote:
>>>> 
>>>>> Hi Qingsheng,
>>>>> 
>>>>> Sorry for jumping into the discussion so late. It's a good idea that we
>>>> can
>>>>> have a common table option. I have a minor comments on  'lookup.async'
>>>> that
>>>>> not make it a common option:
>>>>> 
>>>>> The table layer abstracts both sync and async lookup capabilities,
>>>>> connectors implementers can choose one or both, in the case of
>>>> implementing
>>>>> only one capability(status of the most of existing builtin connectors)
>>>>> 'lookup.async' will not be used.  And when a connector has both
>>>>> capabilities, I think this choice is more suitable for making decisions
>>>> at
>>>>> the query level, for example, table planner can choose the physical
>>>>> implementation of async lookup or sync lookup based on its cost model,
>> or
>>>>> users can give query hint based on their own better understanding.  If
>>>>> there is another common table option 'lookup.async', it may confuse the
>>>>> users in the long run.
>>>>> 
>>>>> So, I prefer to leave the 'lookup.async' option in private place (for
>> the
>>>>> current hbase connector) and not turn it into a common option.
>>>>> 
>>>>> WDYT?
>>>>> 
>>>>> Best,
>>>>> Lincoln Lee
>>>>> 
>>>>> 
>>>>> Qingsheng Ren <renqs...@gmail.com> 于2022年5月23日周一 14:54写道:
>>>>> 
>>>>>> Hi Alexander,
>>>>>> 
>>>>>> Thanks for the review! We recently updated the FLIP and you can find
>>>>> those
>>>>>> changes from my latest email. Since some terminologies has changed so
>>>>> I’ll
>>>>>> use the new concept for replying your comments.
>>>>>> 
>>>>>> 1. Builder vs ‘of’
>>>>>> I’m OK to use builder pattern if we have additional optional
>> parameters
>>>>>> for full caching mode (“rescan” previously). The schedule-with-delay
>>>> idea
>>>>>> looks reasonable to me, but I think we need to redesign the builder
>> API
>>>>> of
>>>>>> full caching to make it more descriptive for developers. Would you
>> mind
>>>>>> sharing your ideas about the API? For accessing the FLIP workspace you
>>>>> can
>>>>>> just provide your account ID and ping any PMC member including Jark.
>>>>>> 
>>>>>> 2. Common table options
>>>>>> We have some discussions these days and propose to introduce 8 common
>>>>>> table options about caching. It has been updated on the FLIP.
>>>>>> 
>>>>>> 3. Retries
>>>>>> I think we are on the same page :-)
>>>>>> 
>>>>>> For your additional concerns:
>>>>>> 1) The table option has been updated.
>>>>>> 2) We got “lookup.cache” back for configuring whether to use partial
>> or
>>>>>> full caching mode.
>>>>>> 
>>>>>> Best regards,
>>>>>> 
>>>>>> Qingsheng
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> On May 19, 2022, at 17:25, Александр Смирнов <smirale...@gmail.com>
>>>>>> wrote:
>>>>>>> 
>>>>>>> Also I have a few additions:
>>>>>>> 1) maybe rename 'lookup.cache.maximum-size' to
>>>>>>> 'lookup.cache.max-rows'? I think it will be more clear that we talk
>>>>>>> not about bytes, but about the number of rows. Plus it fits more,
>>>>>>> considering my optimization with filters.
>>>>>>> 2) How will users enable rescanning? Are we going to separate caching
>>>>>>> and rescanning from the options point of view? Like initially we had
>>>>>>> one option 'lookup.cache' with values LRU / ALL. I think now we can
>>>>>>> make a boolean option 'lookup.rescan'. RescanInterval can be
>>>>>>> 'lookup.rescan.interval', etc.
>>>>>>> 
>>>>>>> Best regards,
>>>>>>> Alexander
>>>>>>> 
>>>>>>> чт, 19 мая 2022 г. в 14:50, Александр Смирнов <smirale...@gmail.com
>>>>> :
>>>>>>>> 
>>>>>>>> Hi Qingsheng and Jark,
>>>>>>>> 
>>>>>>>> 1. Builders vs 'of'
>>>>>>>> I understand that builders are used when we have multiple
>>>> parameters.
>>>>>>>> I suggested them because we could add parameters later. To prevent
>>>>>>>> Builder for ScanRuntimeProvider from looking redundant I can suggest
>>>>>>>> one more config now - "rescanStartTime".
>>>>>>>> It's a time in UTC (LocalTime class) when the first reload of cache
>>>>>>>> starts. This parameter can be thought of as 'initialDelay' (diff
>>>>>>>> between current time and rescanStartTime) in method
>>>>>>>> ScheduleExecutorService#scheduleWithFixedDelay [1] . It can be very
>>>>>>>> useful when the dimension table is updated by some other scheduled
>>>> job
>>>>>>>> at a certain time. Or when the user simply wants a second scan
>>>> (first
>>>>>>>> cache reload) be delayed. This option can be used even without
>>>>>>>> 'rescanInterval' - in this case 'rescanInterval' will be one day.
>>>>>>>> If you are fine with this option, I would be very glad if you would
>>>>>>>> give me access to edit FLIP page, so I could add it myself
>>>>>>>> 
>>>>>>>> 2. Common table options
>>>>>>>> I also think that FactoryUtil would be overloaded by all cache
>>>>>>>> options. But maybe unify all suggested options, not only for default
>>>>>>>> cache? I.e. class 'LookupOptions', that unifies default cache
>>>> options,
>>>>>>>> rescan options, 'async', 'maxRetries'. WDYT?
>>>>>>>> 
>>>>>>>> 3. Retries
>>>>>>>> I'm fine with suggestion close to RetryUtils#tryTimes(times, call)
>>>>>>>> 
>>>>>>>> [1]
>>>>>> 
>>>>> 
>>>> 
>> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
>>>>>>>> 
>>>>>>>> Best regards,
>>>>>>>> Alexander
>>>>>>>> 
>>>>>>>> ср, 18 мая 2022 г. в 16:04, Qingsheng Ren <renqs...@gmail.com>:
>>>>>>>>> 
>>>>>>>>> Hi Jark and Alexander,
>>>>>>>>> 
>>>>>>>>> Thanks for your comments! I’m also OK to introduce common table
>>>>>> options. I prefer to introduce a new DefaultLookupCacheOptions class
>>>> for
>>>>>> holding these option definitions because putting all options into
>>>>>> FactoryUtil would make it a bit ”crowded” and not well categorized.
>>>>>>>>> 
>>>>>>>>> FLIP has been updated according to suggestions above:
>>>>>>>>> 1. Use static “of” method for constructing RescanRuntimeProvider
>>>>>> considering both arguments are required.
>>>>>>>>> 2. Introduce new table options matching DefaultLookupCacheFactory
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> Qingsheng
>>>>>>>>> 
>>>>>>>>> On Wed, May 18, 2022 at 2:57 PM Jark Wu <imj...@gmail.com> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi Alex,
>>>>>>>>>> 
>>>>>>>>>> 1) retry logic
>>>>>>>>>> I think we can extract some common retry logic into utilities,
>>>> e.g.
>>>>>> RetryUtils#tryTimes(times, call).
>>>>>>>>>> This seems independent of this FLIP and can be reused by
>>>> DataStream
>>>>>> users.
>>>>>>>>>> Maybe we can open an issue to discuss this and where to put it.
>>>>>>>>>> 
>>>>>>>>>> 2) cache ConfigOptions
>>>>>>>>>> I'm fine with defining cache config options in the framework.
>>>>>>>>>> A candidate place to put is FactoryUtil which also includes
>>>>>> "sink.parallelism", "format" options.
>>>>>>>>>> 
>>>>>>>>>> Best,
>>>>>>>>>> Jark
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Wed, 18 May 2022 at 13:52, Александр Смирнов <
>>>>> smirale...@gmail.com>
>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi Qingsheng,
>>>>>>>>>>> 
>>>>>>>>>>> Thank you for considering my comments.
>>>>>>>>>>> 
>>>>>>>>>>>> there might be custom logic before making retry, such as
>>>>>> re-establish the connection
>>>>>>>>>>> 
>>>>>>>>>>> Yes, I understand that. I meant that such logic can be placed in
>>>> a
>>>>>>>>>>> separate function, that can be implemented by connectors. Just
>>>>> moving
>>>>>>>>>>> the retry logic would make connector's LookupFunction more
>>>> concise
>>>>> +
>>>>>>>>>>> avoid duplicate code. However, it's a minor change. The decision
>>>> is
>>>>>> up
>>>>>>>>>>> to you.
>>>>>>>>>>> 
>>>>>>>>>>>> We decide not to provide common DDL options and let developers
>>>> to
>>>>>> define their own options as we do now per connector.
>>>>>>>>>>> 
>>>>>>>>>>> What is the reason for that? One of the main goals of this FLIP
>>>> was
>>>>>> to
>>>>>>>>>>> unify the configs, wasn't it? I understand that current cache
>>>>> design
>>>>>>>>>>> doesn't depend on ConfigOptions, like was before. But still we
>>>> can
>>>>>> put
>>>>>>>>>>> these options into the framework, so connectors can reuse them
>>>> and
>>>>>>>>>>> avoid code duplication, and, what is more significant, avoid
>>>>> possible
>>>>>>>>>>> different options naming. This moment can be pointed out in
>>>>>>>>>>> documentation for connector developers.
>>>>>>>>>>> 
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Alexander
>>>>>>>>>>> 
>>>>>>>>>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren <renqs...@gmail.com>:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hi Alexander,
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks for the review and glad to see we are on the same page! I
>>>>>> think you forgot to cc the dev mailing list so I’m also quoting your
>>>>> reply
>>>>>> under this email.
>>>>>>>>>>>> 
>>>>>>>>>>>>> We can add 'maxRetryTimes' option into this class
>>>>>>>>>>>> 
>>>>>>>>>>>> In my opinion the retry logic should be implemented in lookup()
>>>>>> instead of in LookupFunction#eval(). Retrying is only meaningful under
>>>>> some
>>>>>> specific retriable failures, and there might be custom logic before
>>>>> making
>>>>>> retry, such as re-establish the connection (JdbcRowDataLookupFunction
>>>> is
>>>>> an
>>>>>> example), so it's more handy to leave it to the connector.
>>>>>>>>>>>> 
>>>>>>>>>>>>> I don't see DDL options, that were in previous version of FLIP.
>>>>> Do
>>>>>> you have any special plans for them?
>>>>>>>>>>>> 
>>>>>>>>>>>> We decide not to provide common DDL options and let developers
>>>> to
>>>>>> define their own options as we do now per connector.
>>>>>>>>>>>> 
>>>>>>>>>>>> The rest of comments sound great and I’ll update the FLIP. Hope
>>>> we
>>>>>> can finalize our proposal soon!
>>>>>>>>>>>> 
>>>>>>>>>>>> Best,
>>>>>>>>>>>> 
>>>>>>>>>>>> Qingsheng
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>>> On May 17, 2022, at 13:46, Александр Смирнов <
>>>>> smirale...@gmail.com>
>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Qingsheng and devs!
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I like the overall design of updated FLIP, however I have
>>>> several
>>>>>>>>>>>>> suggestions and questions.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 1) Introducing LookupFunction as a subclass of TableFunction
>>>> is a
>>>>>> good
>>>>>>>>>>>>> idea. We can add 'maxRetryTimes' option into this class. 'eval'
>>>>>> method
>>>>>>>>>>>>> of new LookupFunction is great for this purpose. The same is
>>>> for
>>>>>>>>>>>>> 'async' case.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2) There might be other configs in future, such as
>>>>>> 'cacheMissingKey'
>>>>>>>>>>>>> in LookupFunctionProvider or 'rescanInterval' in
>>>>>> ScanRuntimeProvider.
>>>>>>>>>>>>> Maybe use Builder pattern in LookupFunctionProvider and
>>>>>>>>>>>>> RescanRuntimeProvider for more flexibility (use one 'build'
>>>>> method
>>>>>>>>>>>>> instead of many 'of' methods in future)?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 3) What are the plans for existing TableFunctionProvider and
>>>>>>>>>>>>> AsyncTableFunctionProvider? I think they should be deprecated.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 4) Am I right that the current design does not assume usage of
>>>>>>>>>>>>> user-provided LookupCache in re-scanning? In this case, it is
>>>> not
>>>>>> very
>>>>>>>>>>>>> clear why do we need methods such as 'invalidate' or 'putAll'
>>>> in
>>>>>>>>>>>>> LookupCache.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 5) I don't see DDL options, that were in previous version of
>>>>> FLIP.
>>>>>> Do
>>>>>>>>>>>>> you have any special plans for them?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> If you don't mind, I would be glad to be able to make small
>>>>>>>>>>>>> adjustments to the FLIP document too. I think it's worth
>>>>> mentioning
>>>>>>>>>>>>> about what exactly optimizations are planning in the future.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>> 
>>>>>>>>>>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren <renqs...@gmail.com
>>>>> :
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi Alexander and devs,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thank you very much for the in-depth discussion! As Jark
>>>>>> mentioned we were inspired by Alexander's idea and made a refactor on
>>>> our
>>>>>> design. FLIP-221 [1] has been updated to reflect our design now and we
>>>>> are
>>>>>> happy to hear more suggestions from you!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Compared to the previous design:
>>>>>>>>>>>>>> 1. The lookup cache serves at table runtime level and is
>>>>>> integrated as a component of LookupJoinRunner as discussed previously.
>>>>>>>>>>>>>> 2. Interfaces are renamed and re-designed to reflect the new
>>>>>> design.
>>>>>>>>>>>>>> 3. We separate the all-caching case individually and
>>>> introduce a
>>>>>> new RescanRuntimeProvider to reuse the ability of scanning. We are
>>>>> planning
>>>>>> to support SourceFunction / InputFormat for now considering the
>>>>> complexity
>>>>>> of FLIP-27 Source API.
>>>>>>>>>>>>>> 4. A new interface LookupFunction is introduced to make the
>>>>>> semantic of lookup more straightforward for developers.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> For replying to Alexander:
>>>>>>>>>>>>>>> However I'm a little confused whether InputFormat is
>>>> deprecated
>>>>>> or not. Am I right that it will be so in the future, but currently
>> it's
>>>>> not?
>>>>>>>>>>>>>> Yes you are right. InputFormat is not deprecated for now. I
>>>>> think
>>>>>> it will be deprecated in the future but we don't have a clear plan for
>>>>> that.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks again for the discussion on this FLIP and looking
>>>> forward
>>>>>> to cooperating with you after we finalize the design and interfaces!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> [1]
>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Qingsheng
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов <
>>>>>> smirale...@gmail.com> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi Jark, Qingsheng and Leonard!
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Glad to see that we came to a consensus on almost all points!
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> However I'm a little confused whether InputFormat is
>>>> deprecated
>>>>>> or
>>>>>>>>>>>>>>> not. Am I right that it will be so in the future, but
>>>> currently
>>>>>> it's
>>>>>>>>>>>>>>> not? Actually I also think that for the first version it's OK
>>>>> to
>>>>>> use
>>>>>>>>>>>>>>> InputFormat in ALL cache realization, because supporting
>>>> rescan
>>>>>>>>>>>>>>> ability seems like a very distant prospect. But for this
>>>>>> decision we
>>>>>>>>>>>>>>> need a consensus among all discussion participants.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> In general, I don't have something to argue with your
>>>>>> statements. All
>>>>>>>>>>>>>>> of them correspond my ideas. Looking ahead, it would be nice
>>>> to
>>>>>> work
>>>>>>>>>>>>>>> on this FLIP cooperatively. I've already done a lot of work
>>>> on
>>>>>> lookup
>>>>>>>>>>>>>>> join caching with realization very close to the one we are
>>>>>> discussing,
>>>>>>>>>>>>>>> and want to share the results of this work. Anyway looking
>>>>>> forward for
>>>>>>>>>>>>>>> the FLIP update!
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi Alex,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks for summarizing your points.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> In the past week, Qingsheng, Leonard, and I have discussed
>>>> it
>>>>>> several times
>>>>>>>>>>>>>>>> and we have totally refactored the design.
>>>>>>>>>>>>>>>> I'm glad to say we have reached a consensus on many of your
>>>>>> points!
>>>>>>>>>>>>>>>> Qingsheng is still working on updating the design docs and
>>>>>> maybe can be
>>>>>>>>>>>>>>>> available in the next few days.
>>>>>>>>>>>>>>>> I will share some conclusions from our discussions:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 1) we have refactored the design towards to "cache in
>>>>>> framework" way.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 2) a "LookupCache" interface for users to customize and a
>>>>>> default
>>>>>>>>>>>>>>>> implementation with builder for users to easy-use.
>>>>>>>>>>>>>>>> This can both make it possible to both have flexibility and
>>>>>> conciseness.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 3) Filter pushdown is important for ALL and LRU lookup
>>>> cache,
>>>>>> esp reducing
>>>>>>>>>>>>>>>> IO.
>>>>>>>>>>>>>>>> Filter pushdown should be the final state and the unified
>>>> way
>>>>>> to both
>>>>>>>>>>>>>>>> support pruning ALL cache and LRU cache,
>>>>>>>>>>>>>>>> so I think we should make effort in this direction. If we
>>>> need
>>>>>> to support
>>>>>>>>>>>>>>>> filter pushdown for ALL cache anyway, why not use
>>>>>>>>>>>>>>>> it for LRU cache as well? Either way, as we decide to
>>>>> implement
>>>>>> the cache
>>>>>>>>>>>>>>>> in the framework, we have the chance to support
>>>>>>>>>>>>>>>> filter on cache anytime. This is an optimization and it
>>>>> doesn't
>>>>>> affect the
>>>>>>>>>>>>>>>> public API. I think we can create a JIRA issue to
>>>>>>>>>>>>>>>> discuss it when the FLIP is accepted.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 4) The idea to support ALL cache is similar to your
>>>> proposal.
>>>>>>>>>>>>>>>> In the first version, we will only support InputFormat,
>>>>>> SourceFunction for
>>>>>>>>>>>>>>>> cache all (invoke InputFormat in join operator).
>>>>>>>>>>>>>>>> For FLIP-27 source, we need to join a true source operator
>>>>>> instead of
>>>>>>>>>>>>>>>> calling it embedded in the join operator.
>>>>>>>>>>>>>>>> However, this needs another FLIP to support the re-scan
>>>>> ability
>>>>>> for FLIP-27
>>>>>>>>>>>>>>>> Source, and this can be a large work.
>>>>>>>>>>>>>>>> In order to not block this issue, we can put the effort of
>>>>>> FLIP-27 source
>>>>>>>>>>>>>>>> integration into future work and integrate
>>>>>>>>>>>>>>>> InputFormat&SourceFunction for now.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I think it's fine to use InputFormat&SourceFunction, as they
>>>>>> are not
>>>>>>>>>>>>>>>> deprecated, otherwise, we have to introduce another function
>>>>>>>>>>>>>>>> similar to them which is meaningless. We need to plan
>>>> FLIP-27
>>>>>> source
>>>>>>>>>>>>>>>> integration ASAP before InputFormat & SourceFunction are
>>>>>> deprecated.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов <
>>>>>> smirale...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi Martijn!
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Got it. Therefore, the realization with InputFormat is not
>>>>>> considered.
>>>>>>>>>>>>>>>>> Thanks for clearing that up!
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser <
>>>>>> mart...@ververica.com>:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> With regards to:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> But if there are plans to refactor all connectors to
>>>>> FLIP-27
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Yes, FLIP-27 is the target for all connectors. The old
>>>>>> interfaces will be
>>>>>>>>>>>>>>>>>> deprecated and connectors will either be refactored to use
>>>>>> the new ones
>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>> dropped.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> The caching should work for connectors that are using
>>>>> FLIP-27
>>>>>> interfaces,
>>>>>>>>>>>>>>>>>> we should not introduce new features for old interfaces.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Martijn
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов <
>>>>>> smirale...@gmail.com>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Hi Jark!
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Sorry for the late response. I would like to make some
>>>>>> comments and
>>>>>>>>>>>>>>>>>>> clarify my points.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 1) I agree with your first statement. I think we can
>>>>> achieve
>>>>>> both
>>>>>>>>>>>>>>>>>>> advantages this way: put the Cache interface in
>>>>>> flink-table-common,
>>>>>>>>>>>>>>>>>>> but have implementations of it in flink-table-runtime.
>>>>>> Therefore if a
>>>>>>>>>>>>>>>>>>> connector developer wants to use existing cache
>>>> strategies
>>>>>> and their
>>>>>>>>>>>>>>>>>>> implementations, he can just pass lookupConfig to the
>>>>>> planner, but if
>>>>>>>>>>>>>>>>>>> he wants to have its own cache implementation in his
>>>>>> TableFunction, it
>>>>>>>>>>>>>>>>>>> will be possible for him to use the existing interface
>>>> for
>>>>>> this
>>>>>>>>>>>>>>>>>>> purpose (we can explicitly point this out in the
>>>>>> documentation). In
>>>>>>>>>>>>>>>>>>> this way all configs and metrics will be unified. WDYT?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we will
>>>>>> have 90% of
>>>>>>>>>>>>>>>>>>> lookup requests that can never be cached
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 2) Let me clarify the logic filters optimization in case
>>>> of
>>>>>> LRU cache.
>>>>>>>>>>>>>>>>>>> It looks like Cache<RowData, Collection<RowData>>. Here
>>>> we
>>>>>> always
>>>>>>>>>>>>>>>>>>> store the response of the dimension table in cache, even
>>>>>> after
>>>>>>>>>>>>>>>>>>> applying calc function. I.e. if there are no rows after
>>>>>> applying
>>>>>>>>>>>>>>>>>>> filters to the result of the 'eval' method of
>>>>> TableFunction,
>>>>>> we store
>>>>>>>>>>>>>>>>>>> the empty list by lookup keys. Therefore the cache line
>>>>> will
>>>>>> be
>>>>>>>>>>>>>>>>>>> filled, but will require much less memory (in bytes).
>>>> I.e.
>>>>>> we don't
>>>>>>>>>>>>>>>>>>> completely filter keys, by which result was pruned, but
>>>>>> significantly
>>>>>>>>>>>>>>>>>>> reduce required memory to store this result. If the user
>>>>>> knows about
>>>>>>>>>>>>>>>>>>> this behavior, he can increase the 'max-rows' option
>>>> before
>>>>>> the start
>>>>>>>>>>>>>>>>>>> of the job. But actually I came up with the idea that we
>>>>> can
>>>>>> do this
>>>>>>>>>>>>>>>>>>> automatically by using the 'maximumWeight' and 'weigher'
>>>>>> methods of
>>>>>>>>>>>>>>>>>>> GuavaCache [1]. Weight can be the size of the collection
>>>> of
>>>>>> rows
>>>>>>>>>>>>>>>>>>> (value of cache). Therefore cache can automatically fit
>>>>> much
>>>>>> more
>>>>>>>>>>>>>>>>>>> records than before.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Flink SQL has provided a standard way to do filters and
>>>>>> projects
>>>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
>>>>>> SupportsProjectionPushDown.
>>>>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces,
>>>> don't
>>>>>> mean it's
>>>>>>>>>>>>>>>>> hard
>>>>>>>>>>>>>>>>>>> to implement.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> It's debatable how difficult it will be to implement
>>>> filter
>>>>>> pushdown.
>>>>>>>>>>>>>>>>>>> But I think the fact that currently there is no database
>>>>>> connector
>>>>>>>>>>>>>>>>>>> with filter pushdown at least means that this feature
>>>> won't
>>>>>> be
>>>>>>>>>>>>>>>>>>> supported soon in connectors. Moreover, if we talk about
>>>>>> other
>>>>>>>>>>>>>>>>>>> connectors (not in Flink repo), their databases might not
>>>>>> support all
>>>>>>>>>>>>>>>>>>> Flink filters (or not support filters at all). I think
>>>>> users
>>>>>> are
>>>>>>>>>>>>>>>>>>> interested in supporting cache filters optimization
>>>>>> independently of
>>>>>>>>>>>>>>>>>>> supporting other features and solving more complex
>>>> problems
>>>>>> (or
>>>>>>>>>>>>>>>>>>> unsolvable at all).
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 3) I agree with your third statement. Actually in our
>>>>>> internal version
>>>>>>>>>>>>>>>>>>> I also tried to unify the logic of scanning and reloading
>>>>>> data from
>>>>>>>>>>>>>>>>>>> connectors. But unfortunately, I didn't find a way to
>>>> unify
>>>>>> the logic
>>>>>>>>>>>>>>>>>>> of all ScanRuntimeProviders (InputFormat, SourceFunction,
>>>>>> Source,...)
>>>>>>>>>>>>>>>>>>> and reuse it in reloading ALL cache. As a result I
>>>> settled
>>>>>> on using
>>>>>>>>>>>>>>>>>>> InputFormat, because it was used for scanning in all
>>>> lookup
>>>>>>>>>>>>>>>>>>> connectors. (I didn't know that there are plans to
>>>>> deprecate
>>>>>>>>>>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO usage of
>>>>>> FLIP-27 source
>>>>>>>>>>>>>>>>>>> in ALL caching is not good idea, because this source was
>>>>>> designed to
>>>>>>>>>>>>>>>>>>> work in distributed environment (SplitEnumerator on
>>>>>> JobManager and
>>>>>>>>>>>>>>>>>>> SourceReaders on TaskManagers), not in one operator
>>>> (lookup
>>>>>> join
>>>>>>>>>>>>>>>>>>> operator in our case). There is even no direct way to
>>>> pass
>>>>>> splits from
>>>>>>>>>>>>>>>>>>> SplitEnumerator to SourceReader (this logic works through
>>>>>>>>>>>>>>>>>>> SplitEnumeratorContext, which requires
>>>>>>>>>>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send
>>>> AddSplitEvents).
>>>>>> Usage of
>>>>>>>>>>>>>>>>>>> InputFormat for ALL cache seems much more clearer and
>>>>>> easier. But if
>>>>>>>>>>>>>>>>>>> there are plans to refactor all connectors to FLIP-27, I
>>>>>> have the
>>>>>>>>>>>>>>>>>>> following ideas: maybe we can refuse from lookup join ALL
>>>>>> cache in
>>>>>>>>>>>>>>>>>>> favor of simple join with multiple scanning of batch
>>>>> source?
>>>>>> The point
>>>>>>>>>>>>>>>>>>> is that the only difference between lookup join ALL cache
>>>>>> and simple
>>>>>>>>>>>>>>>>>>> join with batch source is that in the first case scanning
>>>>> is
>>>>>> performed
>>>>>>>>>>>>>>>>>>> multiple times, in between which state (cache) is cleared
>>>>>> (correct me
>>>>>>>>>>>>>>>>>>> if I'm wrong). So what if we extend the functionality of
>>>>>> simple join
>>>>>>>>>>>>>>>>>>> to support state reloading + extend the functionality of
>>>>>> scanning
>>>>>>>>>>>>>>>>>>> batch source multiple times (this one should be easy with
>>>>>> new FLIP-27
>>>>>>>>>>>>>>>>>>> source, that unifies streaming/batch reading - we will
>>>> need
>>>>>> to change
>>>>>>>>>>>>>>>>>>> only SplitEnumerator, which will pass splits again after
>>>>>> some TTL).
>>>>>>>>>>>>>>>>>>> WDYT? I must say that this looks like a long-term goal
>>>> and
>>>>>> will make
>>>>>>>>>>>>>>>>>>> the scope of this FLIP even larger than you said. Maybe
>>>> we
>>>>>> can limit
>>>>>>>>>>>>>>>>>>> ourselves to a simpler solution now (InputFormats).
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> So to sum up, my points is like this:
>>>>>>>>>>>>>>>>>>> 1) There is a way to make both concise and flexible
>>>>>> interfaces for
>>>>>>>>>>>>>>>>>>> caching in lookup join.
>>>>>>>>>>>>>>>>>>> 2) Cache filters optimization is important both in LRU
>>>> and
>>>>>> ALL caches.
>>>>>>>>>>>>>>>>>>> 3) It is unclear when filter pushdown will be supported
>>>> in
>>>>>> Flink
>>>>>>>>>>>>>>>>>>> connectors, some of the connectors might not have the
>>>>>> opportunity to
>>>>>>>>>>>>>>>>>>> support filter pushdown + as I know, currently filter
>>>>>> pushdown works
>>>>>>>>>>>>>>>>>>> only for scanning (not lookup). So cache filters +
>>>>>> projections
>>>>>>>>>>>>>>>>>>> optimization should be independent from other features.
>>>>>>>>>>>>>>>>>>> 4) ALL cache realization is a complex topic that involves
>>>>>> multiple
>>>>>>>>>>>>>>>>>>> aspects of how Flink is developing. Refusing from
>>>>>> InputFormat in favor
>>>>>>>>>>>>>>>>>>> of FLIP-27 Source will make ALL cache realization really
>>>>>> complex and
>>>>>>>>>>>>>>>>>>> not clear, so maybe instead of that we can extend the
>>>>>> functionality of
>>>>>>>>>>>>>>>>>>> simple join or not refuse from InputFormat in case of
>>>>> lookup
>>>>>> join ALL
>>>>>>>>>>>>>>>>>>> cache?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher)
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> It's great to see the active discussion! I want to share
>>>>> my
>>>>>> ideas:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 1) implement the cache in framework vs. connectors base
>>>>>>>>>>>>>>>>>>>> I don't have a strong opinion on this. Both ways should
>>>>>> work (e.g.,
>>>>>>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>>> pruning, compatibility).
>>>>>>>>>>>>>>>>>>>> The framework way can provide more concise interfaces.
>>>>>>>>>>>>>>>>>>>> The connector base way can define more flexible cache
>>>>>>>>>>>>>>>>>>>> strategies/implementations.
>>>>>>>>>>>>>>>>>>>> We are still investigating a way to see if we can have
>>>>> both
>>>>>>>>>>>>>>>>> advantages.
>>>>>>>>>>>>>>>>>>>> We should reach a consensus that the way should be a
>>>> final
>>>>>> state,
>>>>>>>>>>>>>>>>> and we
>>>>>>>>>>>>>>>>>>>> are on the path to it.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 2) filters and projections pushdown:
>>>>>>>>>>>>>>>>>>>> I agree with Alex that the filter pushdown into cache
>>>> can
>>>>>> benefit a
>>>>>>>>>>>>>>>>> lot
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> ALL cache.
>>>>>>>>>>>>>>>>>>>> However, this is not true for LRU cache. Connectors use
>>>>>> cache to
>>>>>>>>>>>>>>>>> reduce
>>>>>>>>>>>>>>>>>>> IO
>>>>>>>>>>>>>>>>>>>> requests to databases for better throughput.
>>>>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we will
>>>>>> have 90% of
>>>>>>>>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>> requests that can never be cached
>>>>>>>>>>>>>>>>>>>> and hit directly to the databases. That means the cache
>>>> is
>>>>>>>>>>>>>>>>> meaningless in
>>>>>>>>>>>>>>>>>>>> this case.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> IMO, Flink SQL has provided a standard way to do filters
>>>>>> and projects
>>>>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
>>>>>>>>>>>>>>>>> SupportsProjectionPushDown.
>>>>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces,
>>>> don't
>>>>>> mean it's
>>>>>>>>>>>>>>>>> hard
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> implement.
>>>>>>>>>>>>>>>>>>>> They should implement the pushdown interfaces to reduce
>>>> IO
>>>>>> and the
>>>>>>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>>> size.
>>>>>>>>>>>>>>>>>>>> That should be a final state that the scan source and
>>>>>> lookup source
>>>>>>>>>>>>>>>>> share
>>>>>>>>>>>>>>>>>>>> the exact pushdown implementation.
>>>>>>>>>>>>>>>>>>>> I don't see why we need to duplicate the pushdown logic
>>>> in
>>>>>> caches,
>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>> will complex the lookup join design.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 3) ALL cache abstraction
>>>>>>>>>>>>>>>>>>>> All cache might be the most challenging part of this
>>>> FLIP.
>>>>>> We have
>>>>>>>>>>>>>>>>> never
>>>>>>>>>>>>>>>>>>>> provided a reload-lookup public interface.
>>>>>>>>>>>>>>>>>>>> Currently, we put the reload logic in the "eval" method
>>>> of
>>>>>>>>>>>>>>>>> TableFunction.
>>>>>>>>>>>>>>>>>>>> That's hard for some sources (e.g., Hive).
>>>>>>>>>>>>>>>>>>>> Ideally, connector implementation should share the logic
>>>>> of
>>>>>> reload
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> scan, i.e. ScanTableSource with
>>>>>> InputFormat/SourceFunction/FLIP-27
>>>>>>>>>>>>>>>>>>> Source.
>>>>>>>>>>>>>>>>>>>> However, InputFormat/SourceFunction are deprecated, and
>>>>> the
>>>>>> FLIP-27
>>>>>>>>>>>>>>>>>>> source
>>>>>>>>>>>>>>>>>>>> is deeply coupled with SourceOperator.
>>>>>>>>>>>>>>>>>>>> If we want to invoke the FLIP-27 source in LookupJoin,
>>>>> this
>>>>>> may make
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> scope of this FLIP much larger.
>>>>>>>>>>>>>>>>>>>> We are still investigating how to abstract the ALL cache
>>>>>> logic and
>>>>>>>>>>>>>>>>> reuse
>>>>>>>>>>>>>>>>>>>> the existing source interfaces.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko <
>>>>>> ro.v.bo...@gmail.com>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> It's a much more complicated activity and lies out of
>>>> the
>>>>>> scope of
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>> improvement. Because such pushdowns should be done for
>>>>> all
>>>>>>>>>>>>>>>>>>> ScanTableSource
>>>>>>>>>>>>>>>>>>>>> implementations (not only for Lookup ones).
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser <
>>>>>>>>>>>>>>>>> martijnvis...@apache.org>
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> One question regarding "And Alexander correctly
>>>>> mentioned
>>>>>> that
>>>>>>>>>>>>>>>>> filter
>>>>>>>>>>>>>>>>>>>>>> pushdown still is not implemented for
>>>> jdbc/hive/hbase."
>>>>>> -> Would
>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>> alternative solution be to actually implement these
>>>>> filter
>>>>>>>>>>>>>>>>> pushdowns?
>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>> imagine that there are many more benefits to doing
>>>> that,
>>>>>> outside
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>>>> caching and metrics.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Martijn Visser
>>>>>>>>>>>>>>>>>>>>>> https://twitter.com/MartijnVisser82
>>>>>>>>>>>>>>>>>>>>>> https://github.com/MartijnVisser
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko <
>>>>>> ro.v.bo...@gmail.com>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Hi everyone!
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving such a valuable improvement!
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> I do think that single cache implementation would be
>>>> a
>>>>>> nice
>>>>>>>>>>>>>>>>>>> opportunity
>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>> users. And it will break the "FOR SYSTEM_TIME AS OF
>>>>>> proc_time"
>>>>>>>>>>>>>>>>>>> semantics
>>>>>>>>>>>>>>>>>>>>>>> anyway - doesn't matter how it will be implemented.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Putting myself in the user's shoes, I can say that:
>>>>>>>>>>>>>>>>>>>>>>> 1) I would prefer to have the opportunity to cut off
>>>>> the
>>>>>> cache
>>>>>>>>>>>>>>>>> size
>>>>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>>>>> simply filtering unnecessary data. And the most handy
>>>>>> way to do
>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> apply
>>>>>>>>>>>>>>>>>>>>>>> it inside LookupRunners. It would be a bit harder to
>>>>>> pass it
>>>>>>>>>>>>>>>>>>> through the
>>>>>>>>>>>>>>>>>>>>>>> LookupJoin node to TableFunction. And Alexander
>>>>> correctly
>>>>>>>>>>>>>>>>> mentioned
>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>> filter pushdown still is not implemented for
>>>>>> jdbc/hive/hbase.
>>>>>>>>>>>>>>>>>>>>>>> 2) The ability to set the different caching
>>>> parameters
>>>>>> for
>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>> tables
>>>>>>>>>>>>>>>>>>>>>>> is quite important. So I would prefer to set it
>>>> through
>>>>>> DDL
>>>>>>>>>>>>>>>>> rather
>>>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>>>>> have similar ttla, strategy and other options for all
>>>>>> lookup
>>>>>>>>>>>>>>>>> tables.
>>>>>>>>>>>>>>>>>>>>>>> 3) Providing the cache into the framework really
>>>>>> deprives us of
>>>>>>>>>>>>>>>>>>>>>>> extensibility (users won't be able to implement their
>>>>> own
>>>>>>>>>>>>>>>>> cache).
>>>>>>>>>>>>>>>>>>> But
>>>>>>>>>>>>>>>>>>>>>> most
>>>>>>>>>>>>>>>>>>>>>>> probably it might be solved by creating more
>>>> different
>>>>>> cache
>>>>>>>>>>>>>>>>>>> strategies
>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>> a wider set of configurations.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> All these points are much closer to the schema
>>>> proposed
>>>>>> by
>>>>>>>>>>>>>>>>>>> Alexander.
>>>>>>>>>>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm not right and
>>>>> all
>>>>>> these
>>>>>>>>>>>>>>>>>>>>>> facilities
>>>>>>>>>>>>>>>>>>>>>>> might be simply implemented in your architecture?
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>>> Roman Boyko
>>>>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn Visser <
>>>>>>>>>>>>>>>>>>> martijnvis...@apache.org>
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> I don't have much to chip in, but just wanted to
>>>>>> express that
>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>>>>>>>>>>> appreciate the in-depth discussion on this topic
>>>> and I
>>>>>> hope
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>> others
>>>>>>>>>>>>>>>>>>>>>>>> will join the conversation.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Martijn
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр Смирнов <
>>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark,
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your detailed feedback! However, I have
>>>>>> questions
>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>>>>>> some of your statements (maybe I didn't get
>>>>>> something?).
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Caching actually breaks the semantic of "FOR
>>>>>> SYSTEM_TIME
>>>>>>>>>>>>>>>>> AS OF
>>>>>>>>>>>>>>>>>>>>>>>> proc_time”
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> I agree that the semantics of "FOR SYSTEM_TIME AS
>>>> OF
>>>>>>>>>>>>>>>>> proc_time"
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>> fully implemented with caching, but as you said,
>>>>> users
>>>>>> go
>>>>>>>>>>>>>>>>> on it
>>>>>>>>>>>>>>>>>>>>>>>>> consciously to achieve better performance (no one
>>>>>> proposed
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> enable
>>>>>>>>>>>>>>>>>>>>>>>>> caching by default, etc.). Or by users do you mean
>>>>>> other
>>>>>>>>>>>>>>>>>>> developers
>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>> connectors? In this case developers explicitly
>>>>> specify
>>>>>>>>>>>>>>>>> whether
>>>>>>>>>>>>>>>>>>> their
>>>>>>>>>>>>>>>>>>>>>>>>> connector supports caching or not (in the list of
>>>>>> supported
>>>>>>>>>>>>>>>>>>>>>> options),
>>>>>>>>>>>>>>>>>>>>>>>>> no one makes them do that if they don't want to. So
>>>>>> what
>>>>>>>>>>>>>>>>>>> exactly is
>>>>>>>>>>>>>>>>>>>>>>>>> the difference between implementing caching in
>>>>> modules
>>>>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime and in flink-table-common from
>>>>> the
>>>>>>>>>>>>>>>>>>> considered
>>>>>>>>>>>>>>>>>>>>>>>>> point of view? How does it affect on
>>>>>> breaking/non-breaking
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF proc_time"?
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> confront a situation that allows table options in
>>>>> DDL
>>>>>> to
>>>>>>>>>>>>>>>>>>> control
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> behavior of the framework, which has never happened
>>>>>>>>>>>>>>>>> previously
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>> be cautious
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> If we talk about main differences of semantics of
>>>> DDL
>>>>>>>>>>>>>>>>> options
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't it about
>>>>>> limiting
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> scope
>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>> the options + importance for the user business
>>>> logic
>>>>>> rather
>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>>>>>>> specific location of corresponding logic in the
>>>>>> framework? I
>>>>>>>>>>>>>>>>>>> mean
>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>> in my design, for example, putting an option with
>>>>>> lookup
>>>>>>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>>>>>>>> strategy in configurations would  be the wrong
>>>>>> decision,
>>>>>>>>>>>>>>>>>>> because it
>>>>>>>>>>>>>>>>>>>>>>>>> directly affects the user's business logic (not
>>>> just
>>>>>>>>>>>>>>>>> performance
>>>>>>>>>>>>>>>>>>>>>>>>> optimization) + touches just several functions of
>>>> ONE
>>>>>> table
>>>>>>>>>>>>>>>>>>> (there
>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>> be multiple tables with different caches). Does it
>>>>>> really
>>>>>>>>>>>>>>>>>>> matter for
>>>>>>>>>>>>>>>>>>>>>>>>> the user (or someone else) where the logic is
>>>>> located,
>>>>>>>>>>>>>>>>> which is
>>>>>>>>>>>>>>>>>>>>>>>>> affected by the applied option?
>>>>>>>>>>>>>>>>>>>>>>>>> Also I can remember DDL option 'sink.parallelism',
>>>>>> which in
>>>>>>>>>>>>>>>>>>> some way
>>>>>>>>>>>>>>>>>>>>>>>>> "controls the behavior of the framework" and I
>>>> don't
>>>>>> see any
>>>>>>>>>>>>>>>>>>> problem
>>>>>>>>>>>>>>>>>>>>>>>>> here.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> introduce a new interface for this all-caching
>>>>>> scenario
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> design
>>>>>>>>>>>>>>>>>>>>>>>>> would become more complex
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> This is a subject for a separate discussion, but
>>>>>> actually
>>>>>>>>>>>>>>>>> in our
>>>>>>>>>>>>>>>>>>>>>>>>> internal version we solved this problem quite
>>>> easily
>>>>> -
>>>>>> we
>>>>>>>>>>>>>>>>> reused
>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat class (so there is no need for a new
>>>>> API).
>>>>>> The
>>>>>>>>>>>>>>>>>>> point is
>>>>>>>>>>>>>>>>>>>>>>>>> that currently all lookup connectors use
>>>> InputFormat
>>>>>> for
>>>>>>>>>>>>>>>>>>> scanning
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and even Hive - it
>>>>> uses
>>>>>>>>>>>>>>>>> class
>>>>>>>>>>>>>>>>>>>>>>>>> PartitionReader, that is actually just a wrapper
>>>>> around
>>>>>>>>>>>>>>>>>>> InputFormat.
>>>>>>>>>>>>>>>>>>>>>>>>> The advantage of this solution is the ability to
>>>>> reload
>>>>>>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>> parallel (number of threads depends on number of
>>>>>>>>>>>>>>>>> InputSplits,
>>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>>>>>>>>>> an upper limit). As a result cache reload time
>>>>>> significantly
>>>>>>>>>>>>>>>>>>> reduces
>>>>>>>>>>>>>>>>>>>>>>>>> (as well as time of input stream blocking). I know
>>>>> that
>>>>>>>>>>>>>>>>> usually
>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>> try
>>>>>>>>>>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink code, but
>>>>> maybe
>>>>>> this
>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's an ideal
>>>>>> solution,
>>>>>>>>>>>>>>>>> maybe
>>>>>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>>>>>>> are better ones.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Providing the cache in the framework might
>>>> introduce
>>>>>>>>>>>>>>>>>>> compatibility
>>>>>>>>>>>>>>>>>>>>>>>> issues
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> It's possible only in cases when the developer of
>>>> the
>>>>>>>>>>>>>>>>> connector
>>>>>>>>>>>>>>>>>>>>>> won't
>>>>>>>>>>>>>>>>>>>>>>>>> properly refactor his code and will use new cache
>>>>>> options
>>>>>>>>>>>>>>>>>>>>>> incorrectly
>>>>>>>>>>>>>>>>>>>>>>>>> (i.e. explicitly provide the same options into 2
>>>>>> different
>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>>>>>>>> places). For correct behavior all he will need to
>>>> do
>>>>>> is to
>>>>>>>>>>>>>>>>>>> redirect
>>>>>>>>>>>>>>>>>>>>>>>>> existing options to the framework's LookupConfig (+
>>>>>> maybe
>>>>>>>>>>>>>>>>> add an
>>>>>>>>>>>>>>>>>>>>>> alias
>>>>>>>>>>>>>>>>>>>>>>>>> for options, if there was different naming),
>>>>> everything
>>>>>>>>>>>>>>>>> will be
>>>>>>>>>>>>>>>>>>>>>>>>> transparent for users. If the developer won't do
>>>>>>>>>>>>>>>>> refactoring at
>>>>>>>>>>>>>>>>>>> all,
>>>>>>>>>>>>>>>>>>>>>>>>> nothing will be changed for the connector because
>>>> of
>>>>>>>>>>>>>>>>> backward
>>>>>>>>>>>>>>>>>>>>>>>>> compatibility. Also if a developer wants to use his
>>>>> own
>>>>>>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>> logic,
>>>>>>>>>>>>>>>>>>>>>>>>> he just can refuse to pass some of the configs into
>>>>> the
>>>>>>>>>>>>>>>>>>> framework,
>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>> instead make his own implementation with already
>>>>>> existing
>>>>>>>>>>>>>>>>>>> configs
>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>> metrics (but actually I think that it's a rare
>>>> case).
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections should be pushed all the
>>>> way
>>>>>> down
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>> function, like what we do in the scan source
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> It's the great purpose. But the truth is that the
>>>>> ONLY
>>>>>>>>>>>>>>>>> connector
>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>> supports filter pushdown is FileSystemTableSource
>>>>>>>>>>>>>>>>>>>>>>>>> (no database connector supports it currently). Also
>>>>>> for some
>>>>>>>>>>>>>>>>>>>>>> databases
>>>>>>>>>>>>>>>>>>>>>>>>> it's simply impossible to pushdown such complex
>>>>> filters
>>>>>>>>>>>>>>>>> that we
>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>> in Flink.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> only applying these optimizations to the cache
>>>> seems
>>>>>> not
>>>>>>>>>>>>>>>>>>> quite
>>>>>>>>>>>>>>>>>>>>>>> useful
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily large amount of
>>>>> data
>>>>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>>>>>>>>>>>>> dimension table. For a simple example, suppose in
>>>>>> dimension
>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>> 'users'
>>>>>>>>>>>>>>>>>>>>>>>>> we have column 'age' with values from 20 to 40, and
>>>>>> input
>>>>>>>>>>>>>>>>> stream
>>>>>>>>>>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed by age of
>>>>>> users. If
>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>> filter 'age > 30',
>>>>>>>>>>>>>>>>>>>>>>>>> there will be twice less data in cache. This means
>>>>> the
>>>>>> user
>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by almost 2 times.
>>>>> It
>>>>>> will
>>>>>>>>>>>>>>>>>>> gain a
>>>>>>>>>>>>>>>>>>>>>>>>> huge
>>>>>>>>>>>>>>>>>>>>>>>>> performance boost. Moreover, this optimization
>>>> starts
>>>>>> to
>>>>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>>>>>>>>> shine
>>>>>>>>>>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without filters and
>>>>>> projections
>>>>>>>>>>>>>>>>>>> can't
>>>>>>>>>>>>>>>>>>>>>> fit
>>>>>>>>>>>>>>>>>>>>>>>>> in memory, but with them - can. This opens up
>>>>>> additional
>>>>>>>>>>>>>>>>>>>>>> possibilities
>>>>>>>>>>>>>>>>>>>>>>>>> for users. And this doesn't sound as 'not quite
>>>>>> useful'.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> It would be great to hear other voices regarding
>>>> this
>>>>>> topic!
>>>>>>>>>>>>>>>>>>> Because
>>>>>>>>>>>>>>>>>>>>>>>>> we have quite a lot of controversial points, and I
>>>>>> think
>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> help
>>>>>>>>>>>>>>>>>>>>>>>>> of others it will be easier for us to come to a
>>>>>> consensus.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <
>>>>>>>>>>>>>>>>> renqs...@gmail.com
>>>>>>>>>>>>>>>>>>>> :
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Alexander and Arvid,
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry for my late
>>>>>> response!
>>>>>>>>>>>>>>>>> We
>>>>>>>>>>>>>>>>>>> had
>>>>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>>>> internal discussion together with Jark and Leonard
>>>>> and
>>>>>> I’d
>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>> summarize our ideas. Instead of implementing the
>>>>> cache
>>>>>>>>>>>>>>>>> logic in
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>> runtime layer or wrapping around the user-provided
>>>>>> table
>>>>>>>>>>>>>>>>>>> function,
>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>> prefer to introduce some new APIs extending
>>>>>> TableFunction
>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>>>>>>>> concerns:
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 1. Caching actually breaks the semantic of "FOR
>>>>>>>>>>>>>>>>> SYSTEM_TIME
>>>>>>>>>>>>>>>>>>> AS OF
>>>>>>>>>>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly reflect the
>>>>>> content
>>>>>>>>>>>>>>>>> of the
>>>>>>>>>>>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>>>>>>> table at the moment of querying. If users choose to
>>>>>> enable
>>>>>>>>>>>>>>>>>>> caching
>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> lookup table, they implicitly indicate that this
>>>>>> breakage is
>>>>>>>>>>>>>>>>>>>>>> acceptable
>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>> exchange for the performance. So we prefer not to
>>>>>> provide
>>>>>>>>>>>>>>>>>>> caching on
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> table runtime level.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 2. If we make the cache implementation in the
>>>>>> framework
>>>>>>>>>>>>>>>>>>> (whether
>>>>>>>>>>>>>>>>>>>>>> in a
>>>>>>>>>>>>>>>>>>>>>>>>> runner or a wrapper around TableFunction), we have
>>>> to
>>>>>>>>>>>>>>>>> confront a
>>>>>>>>>>>>>>>>>>>>>>>> situation
>>>>>>>>>>>>>>>>>>>>>>>>> that allows table options in DDL to control the
>>>>>> behavior of
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> framework,
>>>>>>>>>>>>>>>>>>>>>>>>> which has never happened previously and should be
>>>>>> cautious.
>>>>>>>>>>>>>>>>>>> Under
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> current design the behavior of the framework should
>>>>>> only be
>>>>>>>>>>>>>>>>>>>>>> specified
>>>>>>>>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and it’s hard to
>>>>>> apply
>>>>>>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>>>>> general
>>>>>>>>>>>>>>>>>>>>>>>>> configs to a specific table.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 3. We have use cases that lookup source loads and
>>>>>> refresh
>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>>>>>>>>>>>> periodically into the memory to achieve high lookup
>>>>>>>>>>>>>>>>> performance
>>>>>>>>>>>>>>>>>>>>>> (like
>>>>>>>>>>>>>>>>>>>>>>>> Hive
>>>>>>>>>>>>>>>>>>>>>>>>> connector in the community, and also widely used by
>>>>> our
>>>>>>>>>>>>>>>>> internal
>>>>>>>>>>>>>>>>>>>>>>>>> connectors). Wrapping the cache around the user’s
>>>>>>>>>>>>>>>>> TableFunction
>>>>>>>>>>>>>>>>>>>>>> works
>>>>>>>>>>>>>>>>>>>>>>>> fine
>>>>>>>>>>>>>>>>>>>>>>>>> for LRU caches, but I think we have to introduce a
>>>>> new
>>>>>>>>>>>>>>>>>>> interface for
>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>> all-caching scenario and the design would become
>>>> more
>>>>>>>>>>>>>>>>> complex.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 4. Providing the cache in the framework might
>>>>>> introduce
>>>>>>>>>>>>>>>>>>>>>> compatibility
>>>>>>>>>>>>>>>>>>>>>>>>> issues to existing lookup sources like there might
>>>>>> exist two
>>>>>>>>>>>>>>>>>>> caches
>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>> totally different strategies if the user
>>>> incorrectly
>>>>>>>>>>>>>>>>> configures
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>> (one in the framework and another implemented by
>>>> the
>>>>>> lookup
>>>>>>>>>>>>>>>>>>> source).
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> As for the optimization mentioned by Alexander, I
>>>>>> think
>>>>>>>>>>>>>>>>>>> filters
>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>> projections should be pushed all the way down to
>>>> the
>>>>>> table
>>>>>>>>>>>>>>>>>>> function,
>>>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>> what we do in the scan source, instead of the
>>>> runner
>>>>>> with
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> cache.
>>>>>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>>>>> goal of using cache is to reduce the network I/O
>>>> and
>>>>>>>>>>>>>>>>> pressure
>>>>>>>>>>>>>>>>>>> on the
>>>>>>>>>>>>>>>>>>>>>>>>> external system, and only applying these
>>>>> optimizations
>>>>>> to
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>>>>>>>>> not quite useful.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to reflect our
>>>>>> ideas.
>>>>>>>>>>>>>>>>> We
>>>>>>>>>>>>>>>>>>>>>> prefer to
>>>>>>>>>>>>>>>>>>>>>>>>> keep the cache implementation as a part of
>>>>>> TableFunction,
>>>>>>>>>>>>>>>>> and we
>>>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>>>>> provide some helper classes (CachingTableFunction,
>>>>>>>>>>>>>>>>>>>>>>>> AllCachingTableFunction,
>>>>>>>>>>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to developers and
>>>> regulate
>>>>>>>>>>>>>>>>> metrics
>>>>>>>>>>>>>>>>>>> of the
>>>>>>>>>>>>>>>>>>>>>>>> cache.
>>>>>>>>>>>>>>>>>>>>>>>>> Also, I made a POC[2] for your reference.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your ideas!
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>>>>>>>>>>>>>>>>>>>>>>>>>> [2]
>>>>> https://github.com/PatrickRen/flink/tree/FLIP-221
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов
>>>> <
>>>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid!
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> I have few comments on your message.
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> but could also live with an easier solution as
>>>> the
>>>>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>>>>> step:
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> I think that these 2 ways are mutually exclusive
>>>>>>>>>>>>>>>>> (originally
>>>>>>>>>>>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because conceptually they
>>>>>> follow
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>>>>>> goal, but implementation details are different.
>>>> If
>>>>> we
>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>> go one
>>>>>>>>>>>>>>>>>>>>>>> way,
>>>>>>>>>>>>>>>>>>>>>>>>>>> moving to another way in the future will mean
>>>>>> deleting
>>>>>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>>>>>>>>>> and once again changing the API for connectors.
>>>> So
>>>>> I
>>>>>>>>>>>>>>>>> think we
>>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>>>> reach a consensus with the community about that
>>>> and
>>>>>> then
>>>>>>>>>>>>>>>>> work
>>>>>>>>>>>>>>>>>>>>>>> together
>>>>>>>>>>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on tasks for
>>>>>> different
>>>>>>>>>>>>>>>>>>> parts
>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> flip (for example, LRU cache unification /
>>>>>> introducing
>>>>>>>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>>>>>>> set
>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT, Qingsheng?
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> as the source will only receive the requests
>>>> after
>>>>>>>>>>>>>>>>> filter
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually if filters are applied to fields of the
>>>>>> lookup
>>>>>>>>>>>>>>>>>>> table, we
>>>>>>>>>>>>>>>>>>>>>>>>>>> firstly must do requests, and only after that we
>>>>> can
>>>>>>>>>>>>>>>>> filter
>>>>>>>>>>>>>>>>>>>>>>> responses,
>>>>>>>>>>>>>>>>>>>>>>>>>>> because lookup connectors don't have filter
>>>>>> pushdown. So
>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>> filtering
>>>>>>>>>>>>>>>>>>>>>>>>>>> is done before caching, there will be much less
>>>>> rows
>>>>>> in
>>>>>>>>>>>>>>>>>>> cache.
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is
>>>> not
>>>>>>>>>>>>>>>>> shared.
>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>>>>>>>>> know the
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest.
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such kinds of
>>>>>>>>>>>>>>>>> conversations
>>>>>>>>>>>>>>>>>>> :)
>>>>>>>>>>>>>>>>>>>>>>>>>>> I have no write access to the confluence, so I
>>>>> made a
>>>>>>>>>>>>>>>>> Jira
>>>>>>>>>>>>>>>>>>> issue,
>>>>>>>>>>>>>>>>>>>>>>>>>>> where described the proposed changes in more
>>>>> details
>>>>>> -
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>> https://issues.apache.org/jira/browse/FLINK-27411.
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Will happy to get more feedback!
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <
>>>>>>>>>>>>>>>>> ar...@apache.org>:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving this; the inconsistency was
>>>> not
>>>>>>>>>>>>>>>>>>> satisfying
>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>> me.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but could also
>>>>> live
>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>> easier
>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution as the first step: Instead of making
>>>>>> caching
>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather devise a
>>>> caching
>>>>>>>>>>>>>>>>> layer
>>>>>>>>>>>>>>>>>>>>>> around X.
>>>>>>>>>>>>>>>>>>>>>>>> So
>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal would be a CachingTableFunction that
>>>>>>>>>>>>>>>>> delegates to
>>>>>>>>>>>>>>>>>>> X in
>>>>>>>>>>>>>>>>>>>>>>> case
>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>> misses and else manages the cache. Lifting it
>>>> into
>>>>>> the
>>>>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>>>>>>>>> model
>>>>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposed would be even better but is probably
>>>>>>>>>>>>>>>>> unnecessary
>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> first step
>>>>>>>>>>>>>>>>>>>>>>>>>>>> for a lookup source (as the source will only
>>>>> receive
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> requests
>>>>>>>>>>>>>>>>>>>>>>>>> after
>>>>>>>>>>>>>>>>>>>>>>>>>>>> filter; applying projection may be more
>>>>> interesting
>>>>>> to
>>>>>>>>>>>>>>>>> save
>>>>>>>>>>>>>>>>>>>>>>> memory).
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Another advantage is that all the changes of
>>>> this
>>>>>> FLIP
>>>>>>>>>>>>>>>>>>> would be
>>>>>>>>>>>>>>>>>>>>>>>>> limited to
>>>>>>>>>>>>>>>>>>>>>>>>>>>> options, no need for new public interfaces.
>>>>>> Everything
>>>>>>>>>>>>>>>>> else
>>>>>>>>>>>>>>>>>>>>>>> remains
>>>>>>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation of Table runtime. That means we
>>>> can
>>>>>>>>>>>>>>>>> easily
>>>>>>>>>>>>>>>>>>>>>>>> incorporate
>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimization potential that Alexander pointed
>>>> out
>>>>>>>>>>>>>>>>> later.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is
>>>> not
>>>>>>>>>>>>>>>>> shared.
>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>>>>>>>>> know the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM Александр
>>>> Смирнов
>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, I'm not a
>>>>>>>>>>>>>>>>> committer
>>>>>>>>>>>>>>>>>>> yet,
>>>>>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>>>> I'd
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> really like to become one. And this FLIP really
>>>>>>>>>>>>>>>>>>> interested
>>>>>>>>>>>>>>>>>>>>>> me.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar feature in
>>>> my
>>>>>>>>>>>>>>>>>>> company’s
>>>>>>>>>>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fork, and we would like to share our thoughts
>>>> on
>>>>>>>>>>>>>>>>> this and
>>>>>>>>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> open source.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think there is a better alternative than
>>>>>>>>>>>>>>>>> introducing an
>>>>>>>>>>>>>>>>>>>>>>> abstract
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class for TableFunction (CachingTableFunction).
>>>>> As
>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>> know,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> TableFunction exists in the flink-table-common
>>>>>>>>>>>>>>>>> module,
>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>> provides
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> only an API for working with tables – it’s very
>>>>>>>>>>>>>>>>>>> convenient
>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>> importing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in connectors. In turn, CachingTableFunction
>>>>>> contains
>>>>>>>>>>>>>>>>>>> logic
>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runtime execution,  so this class and
>>>> everything
>>>>>>>>>>>>>>>>>>> connected
>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should be located in another module, probably
>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> But this will require connectors to depend on
>>>>>> another
>>>>>>>>>>>>>>>>>>> module,
>>>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic, which doesn’t
>>>>>> sound
>>>>>>>>>>>>>>>>>>> good.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding a new method ‘getLookupConfig’
>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> LookupTableSource
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow connectors to
>>>>>> only
>>>>>>>>>>>>>>>>> pass
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations to the planner, therefore they
>>>>> won’t
>>>>>>>>>>>>>>>>>>> depend on
>>>>>>>>>>>>>>>>>>>>>>>>> runtime
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> realization. Based on these configs planner
>>>> will
>>>>>>>>>>>>>>>>>>> construct a
>>>>>>>>>>>>>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> join operator with corresponding runtime logic
>>>>>>>>>>>>>>>>>>>>>> (ProcessFunctions
>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> module flink-table-runtime). Architecture looks
>>>>>> like
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> pinned
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is actually
>>>> yours
>>>>>>>>>>>>>>>>>>>>>> CacheConfig).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner, that will be
>>>>>>>>>>>>>>>>> responsible
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>> –
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his inheritors.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in
>>>>>>>>>>>>>>>>> flink-table-runtime
>>>>>>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunner, AsyncLookupJoinRunner,
>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunnerWithCalc,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding classes
>>>> LookupJoinCachingRunner,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> And here comes another more powerful advantage
>>>> of
>>>>>>>>>>>>>>>>> such a
>>>>>>>>>>>>>>>>>>>>>>> solution.
>>>>>>>>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower level, we can
>>>>>> apply
>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimizations to it. LookupJoinRunnerWithCalc
>>>> was
>>>>>>>>>>>>>>>>> named
>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’ function, which
>>>>> actually
>>>>>>>>>>>>>>>>>>> mostly
>>>>>>>>>>>>>>>>>>>>>>>> consists
>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example, in join table A with lookup table
>>>> B
>>>>>>>>>>>>>>>>>>> condition
>>>>>>>>>>>>>>>>>>>>>>> ‘JOIN …
>>>>>>>>>>>>>>>>>>>>>>>>> ON
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 WHERE
>>>>> B.salary >
>>>>>>>>>>>>>>>>> 1000’
>>>>>>>>>>>>>>>>>>>>>>> ‘calc’
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> function will contain filters A.age = B.age +
>>>> 10
>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> B.salary >
>>>>>>>>>>>>>>>>>>>>>>>>> 1000.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we apply this function before storing
>>>> records
>>>>> in
>>>>>>>>>>>>>>>>>>> cache,
>>>>>>>>>>>>>>>>>>>>>> size
>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cache will be significantly reduced: filters =
>>>>>> avoid
>>>>>>>>>>>>>>>>>>> storing
>>>>>>>>>>>>>>>>>>>>>>>> useless
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> records in cache, projections = reduce records’
>>>>>>>>>>>>>>>>> size. So
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> initial
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> max number of records in cache can be increased
>>>>> by
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> user.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What do you think about it?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi devs,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a discussion
>>>>> about
>>>>>>>>>>>>>>>>>>>>>> FLIP-221[1],
>>>>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup table cache
>>>>> and
>>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>>>>> standard
>>>>>>>>>>>>>>>>>>>>>>>>> metrics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source should
>>>>>> implement
>>>>>>>>>>>>>>>>>>> their
>>>>>>>>>>>>>>>>>>>>>> own
>>>>>>>>>>>>>>>>>>>>>>>>> cache to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> store lookup results, and there isn’t a
>>>> standard
>>>>> of
>>>>>>>>>>>>>>>>>>> metrics
>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>> users and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs with lookup
>>>>> joins,
>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>> is a
>>>>>>>>>>>>>>>>>>>>>>>> quite
>>>>>>>>>>>>>>>>>>>>>>>>> common
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs including
>>>>>> cache,
>>>>>>>>>>>>>>>>>>>>>> metrics,
>>>>>>>>>>>>>>>>>>>>>>>>> wrapper
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new table options.
>>>>>>>>>>>>>>>>> Please
>>>>>>>>>>>>>>>>>>> take a
>>>>>>>>>>>>>>>>>>>>>>> look
>>>>>>>>>>>>>>>>>>>>>>>>> at the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details. Any
>>>>> suggestions
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> comments
>>>>>>>>>>>>>>>>>>>>>>>>> would be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> appreciated!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng Ren
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Real-time Computing Team
>>>>>>>>>>>>>>>>>>>>>>>>>> Alibaba Cloud
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>> Roman Boyko
>>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Qingsheng Ren
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Real-time Computing Team
>>>>>>>>>>>>>> Alibaba Cloud
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Email: renqs...@gmail.com
>>>>>>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 
>> 

Reply via email to