After looking at the new introduced ReloadTime and Becket's comment,
I agree with Becket we should have a pluggable reloading strategy.
We can provide some common implementations, e.g., periodic reloading, and
daily reloading.
But there definitely be some connector- or business-specific reloading
strategies, e.g.
notify by a zookeeper watcher, reload once a new Hive partition is
complete.

Best,
Jark

On Thu, 26 May 2022 at 11:52, Becket Qin <becket....@gmail.com> wrote:

> Hi Qingsheng,
>
> Thanks for updating the FLIP. A few comments / questions below:
>
> 1. Is there a reason that we have both "XXXFactory" and "XXXProvider".
> What is the difference between them? If they are the same, can we just use
> XXXFactory everywhere?
>
> 2. Regarding the FullCachingLookupProvider, should the reloading policy
> also be pluggable? Periodical reloading could be sometimes be tricky in
> practice. For example, if user uses 24 hours as the cache refresh interval
> and some nightly batch job delayed, the cache update may still see the
> stale data.
>
> 3. In DefaultLookupCacheFactory, it looks like InitialCapacity should be
> removed.
>
> 4. The purpose of LookupFunctionProvider#cacheMissingKey() seems a little
> confusing to me. If Optional<LookupCacheFactory> getCacheFactory() returns
> a non-empty factory, doesn't that already indicates the framework to cache
> the missing keys? Also, why is this method returning an Optional<Boolean>
> instead of boolean?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Wed, May 25, 2022 at 5:07 PM Qingsheng Ren <renqs...@gmail.com> wrote:
>
>> Hi Lincoln and Jark,
>>
>> Thanks for the comments! If the community reaches a consensus that we use
>> SQL hint instead of table options to decide whether to use sync or async
>> mode, it’s indeed not necessary to introduce the “lookup.async” option.
>>
>> I think it’s a good idea to let the decision of async made on query
>> level, which could make better optimization with more infomation gathered
>> by planner. Is there any FLIP describing the issue in FLINK-27625? I
>> thought FLIP-234 is only proposing adding SQL hint for retry on missing
>> instead of the entire async mode to be controlled by hint.
>>
>> Best regards,
>>
>> Qingsheng
>>
>> > On May 25, 2022, at 15:13, Lincoln Lee <lincoln.8...@gmail.com> wrote:
>> >
>> > Hi Jark,
>> >
>> > Thanks for your reply!
>> >
>> > Currently 'lookup.async' just lies in HBase connector, I have no idea
>> > whether or when to remove it (we can discuss it in another issue for the
>> > HBase connector after FLINK-27625 is done), just not add it into a
>> common
>> > option now.
>> >
>> > Best,
>> > Lincoln Lee
>> >
>> >
>> > Jark Wu <imj...@gmail.com> 于2022年5月24日周二 20:14写道:
>> >
>> >> Hi Lincoln,
>> >>
>> >> I have taken a look at FLIP-234, and I agree with you that the
>> connectors
>> >> can
>> >> provide both async and sync runtime providers simultaneously instead
>> of one
>> >> of them.
>> >> At that point, "lookup.async" looks redundant. If this option is
>> planned to
>> >> be removed
>> >> in the long term, I think it makes sense not to introduce it in this
>> FLIP.
>> >>
>> >> Best,
>> >> Jark
>> >>
>> >> On Tue, 24 May 2022 at 11:08, Lincoln Lee <lincoln.8...@gmail.com>
>> wrote:
>> >>
>> >>> Hi Qingsheng,
>> >>>
>> >>> Sorry for jumping into the discussion so late. It's a good idea that
>> we
>> >> can
>> >>> have a common table option. I have a minor comments on  'lookup.async'
>> >> that
>> >>> not make it a common option:
>> >>>
>> >>> The table layer abstracts both sync and async lookup capabilities,
>> >>> connectors implementers can choose one or both, in the case of
>> >> implementing
>> >>> only one capability(status of the most of existing builtin connectors)
>> >>> 'lookup.async' will not be used.  And when a connector has both
>> >>> capabilities, I think this choice is more suitable for making
>> decisions
>> >> at
>> >>> the query level, for example, table planner can choose the physical
>> >>> implementation of async lookup or sync lookup based on its cost
>> model, or
>> >>> users can give query hint based on their own better understanding.  If
>> >>> there is another common table option 'lookup.async', it may confuse
>> the
>> >>> users in the long run.
>> >>>
>> >>> So, I prefer to leave the 'lookup.async' option in private place (for
>> the
>> >>> current hbase connector) and not turn it into a common option.
>> >>>
>> >>> WDYT?
>> >>>
>> >>> Best,
>> >>> Lincoln Lee
>> >>>
>> >>>
>> >>> Qingsheng Ren <renqs...@gmail.com> 于2022年5月23日周一 14:54写道:
>> >>>
>> >>>> Hi Alexander,
>> >>>>
>> >>>> Thanks for the review! We recently updated the FLIP and you can find
>> >>> those
>> >>>> changes from my latest email. Since some terminologies has changed so
>> >>> I’ll
>> >>>> use the new concept for replying your comments.
>> >>>>
>> >>>> 1. Builder vs ‘of’
>> >>>> I’m OK to use builder pattern if we have additional optional
>> parameters
>> >>>> for full caching mode (“rescan” previously). The schedule-with-delay
>> >> idea
>> >>>> looks reasonable to me, but I think we need to redesign the builder
>> API
>> >>> of
>> >>>> full caching to make it more descriptive for developers. Would you
>> mind
>> >>>> sharing your ideas about the API? For accessing the FLIP workspace
>> you
>> >>> can
>> >>>> just provide your account ID and ping any PMC member including Jark.
>> >>>>
>> >>>> 2. Common table options
>> >>>> We have some discussions these days and propose to introduce 8 common
>> >>>> table options about caching. It has been updated on the FLIP.
>> >>>>
>> >>>> 3. Retries
>> >>>> I think we are on the same page :-)
>> >>>>
>> >>>> For your additional concerns:
>> >>>> 1) The table option has been updated.
>> >>>> 2) We got “lookup.cache” back for configuring whether to use partial
>> or
>> >>>> full caching mode.
>> >>>>
>> >>>> Best regards,
>> >>>>
>> >>>> Qingsheng
>> >>>>
>> >>>>
>> >>>>
>> >>>>> On May 19, 2022, at 17:25, Александр Смирнов <smirale...@gmail.com>
>> >>>> wrote:
>> >>>>>
>> >>>>> Also I have a few additions:
>> >>>>> 1) maybe rename 'lookup.cache.maximum-size' to
>> >>>>> 'lookup.cache.max-rows'? I think it will be more clear that we talk
>> >>>>> not about bytes, but about the number of rows. Plus it fits more,
>> >>>>> considering my optimization with filters.
>> >>>>> 2) How will users enable rescanning? Are we going to separate
>> caching
>> >>>>> and rescanning from the options point of view? Like initially we had
>> >>>>> one option 'lookup.cache' with values LRU / ALL. I think now we can
>> >>>>> make a boolean option 'lookup.rescan'. RescanInterval can be
>> >>>>> 'lookup.rescan.interval', etc.
>> >>>>>
>> >>>>> Best regards,
>> >>>>> Alexander
>> >>>>>
>> >>>>> чт, 19 мая 2022 г. в 14:50, Александр Смирнов <smirale...@gmail.com
>> >>> :
>> >>>>>>
>> >>>>>> Hi Qingsheng and Jark,
>> >>>>>>
>> >>>>>> 1. Builders vs 'of'
>> >>>>>> I understand that builders are used when we have multiple
>> >> parameters.
>> >>>>>> I suggested them because we could add parameters later. To prevent
>> >>>>>> Builder for ScanRuntimeProvider from looking redundant I can
>> suggest
>> >>>>>> one more config now - "rescanStartTime".
>> >>>>>> It's a time in UTC (LocalTime class) when the first reload of cache
>> >>>>>> starts. This parameter can be thought of as 'initialDelay' (diff
>> >>>>>> between current time and rescanStartTime) in method
>> >>>>>> ScheduleExecutorService#scheduleWithFixedDelay [1] . It can be very
>> >>>>>> useful when the dimension table is updated by some other scheduled
>> >> job
>> >>>>>> at a certain time. Or when the user simply wants a second scan
>> >> (first
>> >>>>>> cache reload) be delayed. This option can be used even without
>> >>>>>> 'rescanInterval' - in this case 'rescanInterval' will be one day.
>> >>>>>> If you are fine with this option, I would be very glad if you would
>> >>>>>> give me access to edit FLIP page, so I could add it myself
>> >>>>>>
>> >>>>>> 2. Common table options
>> >>>>>> I also think that FactoryUtil would be overloaded by all cache
>> >>>>>> options. But maybe unify all suggested options, not only for
>> default
>> >>>>>> cache? I.e. class 'LookupOptions', that unifies default cache
>> >> options,
>> >>>>>> rescan options, 'async', 'maxRetries'. WDYT?
>> >>>>>>
>> >>>>>> 3. Retries
>> >>>>>> I'm fine with suggestion close to RetryUtils#tryTimes(times, call)
>> >>>>>>
>> >>>>>> [1]
>> >>>>
>> >>>
>> >>
>> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
>> >>>>>>
>> >>>>>> Best regards,
>> >>>>>> Alexander
>> >>>>>>
>> >>>>>> ср, 18 мая 2022 г. в 16:04, Qingsheng Ren <renqs...@gmail.com>:
>> >>>>>>>
>> >>>>>>> Hi Jark and Alexander,
>> >>>>>>>
>> >>>>>>> Thanks for your comments! I’m also OK to introduce common table
>> >>>> options. I prefer to introduce a new DefaultLookupCacheOptions class
>> >> for
>> >>>> holding these option definitions because putting all options into
>> >>>> FactoryUtil would make it a bit ”crowded” and not well categorized.
>> >>>>>>>
>> >>>>>>> FLIP has been updated according to suggestions above:
>> >>>>>>> 1. Use static “of” method for constructing RescanRuntimeProvider
>> >>>> considering both arguments are required.
>> >>>>>>> 2. Introduce new table options matching DefaultLookupCacheFactory
>> >>>>>>>
>> >>>>>>> Best,
>> >>>>>>> Qingsheng
>> >>>>>>>
>> >>>>>>> On Wed, May 18, 2022 at 2:57 PM Jark Wu <imj...@gmail.com> wrote:
>> >>>>>>>>
>> >>>>>>>> Hi Alex,
>> >>>>>>>>
>> >>>>>>>> 1) retry logic
>> >>>>>>>> I think we can extract some common retry logic into utilities,
>> >> e.g.
>> >>>> RetryUtils#tryTimes(times, call).
>> >>>>>>>> This seems independent of this FLIP and can be reused by
>> >> DataStream
>> >>>> users.
>> >>>>>>>> Maybe we can open an issue to discuss this and where to put it.
>> >>>>>>>>
>> >>>>>>>> 2) cache ConfigOptions
>> >>>>>>>> I'm fine with defining cache config options in the framework.
>> >>>>>>>> A candidate place to put is FactoryUtil which also includes
>> >>>> "sink.parallelism", "format" options.
>> >>>>>>>>
>> >>>>>>>> Best,
>> >>>>>>>> Jark
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Wed, 18 May 2022 at 13:52, Александр Смирнов <
>> >>> smirale...@gmail.com>
>> >>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi Qingsheng,
>> >>>>>>>>>
>> >>>>>>>>> Thank you for considering my comments.
>> >>>>>>>>>
>> >>>>>>>>>> there might be custom logic before making retry, such as
>> >>>> re-establish the connection
>> >>>>>>>>>
>> >>>>>>>>> Yes, I understand that. I meant that such logic can be placed in
>> >> a
>> >>>>>>>>> separate function, that can be implemented by connectors. Just
>> >>> moving
>> >>>>>>>>> the retry logic would make connector's LookupFunction more
>> >> concise
>> >>> +
>> >>>>>>>>> avoid duplicate code. However, it's a minor change. The decision
>> >> is
>> >>>> up
>> >>>>>>>>> to you.
>> >>>>>>>>>
>> >>>>>>>>>> We decide not to provide common DDL options and let developers
>> >> to
>> >>>> define their own options as we do now per connector.
>> >>>>>>>>>
>> >>>>>>>>> What is the reason for that? One of the main goals of this FLIP
>> >> was
>> >>>> to
>> >>>>>>>>> unify the configs, wasn't it? I understand that current cache
>> >>> design
>> >>>>>>>>> doesn't depend on ConfigOptions, like was before. But still we
>> >> can
>> >>>> put
>> >>>>>>>>> these options into the framework, so connectors can reuse them
>> >> and
>> >>>>>>>>> avoid code duplication, and, what is more significant, avoid
>> >>> possible
>> >>>>>>>>> different options naming. This moment can be pointed out in
>> >>>>>>>>> documentation for connector developers.
>> >>>>>>>>>
>> >>>>>>>>> Best regards,
>> >>>>>>>>> Alexander
>> >>>>>>>>>
>> >>>>>>>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren <renqs...@gmail.com>:
>> >>>>>>>>>>
>> >>>>>>>>>> Hi Alexander,
>> >>>>>>>>>>
>> >>>>>>>>>> Thanks for the review and glad to see we are on the same page!
>> I
>> >>>> think you forgot to cc the dev mailing list so I’m also quoting your
>> >>> reply
>> >>>> under this email.
>> >>>>>>>>>>
>> >>>>>>>>>>> We can add 'maxRetryTimes' option into this class
>> >>>>>>>>>>
>> >>>>>>>>>> In my opinion the retry logic should be implemented in lookup()
>> >>>> instead of in LookupFunction#eval(). Retrying is only meaningful
>> under
>> >>> some
>> >>>> specific retriable failures, and there might be custom logic before
>> >>> making
>> >>>> retry, such as re-establish the connection (JdbcRowDataLookupFunction
>> >> is
>> >>> an
>> >>>> example), so it's more handy to leave it to the connector.
>> >>>>>>>>>>
>> >>>>>>>>>>> I don't see DDL options, that were in previous version of
>> FLIP.
>> >>> Do
>> >>>> you have any special plans for them?
>> >>>>>>>>>>
>> >>>>>>>>>> We decide not to provide common DDL options and let developers
>> >> to
>> >>>> define their own options as we do now per connector.
>> >>>>>>>>>>
>> >>>>>>>>>> The rest of comments sound great and I’ll update the FLIP. Hope
>> >> we
>> >>>> can finalize our proposal soon!
>> >>>>>>>>>>
>> >>>>>>>>>> Best,
>> >>>>>>>>>>
>> >>>>>>>>>> Qingsheng
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>> On May 17, 2022, at 13:46, Александр Смирнов <
>> >>> smirale...@gmail.com>
>> >>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>> Hi Qingsheng and devs!
>> >>>>>>>>>>>
>> >>>>>>>>>>> I like the overall design of updated FLIP, however I have
>> >> several
>> >>>>>>>>>>> suggestions and questions.
>> >>>>>>>>>>>
>> >>>>>>>>>>> 1) Introducing LookupFunction as a subclass of TableFunction
>> >> is a
>> >>>> good
>> >>>>>>>>>>> idea. We can add 'maxRetryTimes' option into this class.
>> 'eval'
>> >>>> method
>> >>>>>>>>>>> of new LookupFunction is great for this purpose. The same is
>> >> for
>> >>>>>>>>>>> 'async' case.
>> >>>>>>>>>>>
>> >>>>>>>>>>> 2) There might be other configs in future, such as
>> >>>> 'cacheMissingKey'
>> >>>>>>>>>>> in LookupFunctionProvider or 'rescanInterval' in
>> >>>> ScanRuntimeProvider.
>> >>>>>>>>>>> Maybe use Builder pattern in LookupFunctionProvider and
>> >>>>>>>>>>> RescanRuntimeProvider for more flexibility (use one 'build'
>> >>> method
>> >>>>>>>>>>> instead of many 'of' methods in future)?
>> >>>>>>>>>>>
>> >>>>>>>>>>> 3) What are the plans for existing TableFunctionProvider and
>> >>>>>>>>>>> AsyncTableFunctionProvider? I think they should be deprecated.
>> >>>>>>>>>>>
>> >>>>>>>>>>> 4) Am I right that the current design does not assume usage of
>> >>>>>>>>>>> user-provided LookupCache in re-scanning? In this case, it is
>> >> not
>> >>>> very
>> >>>>>>>>>>> clear why do we need methods such as 'invalidate' or 'putAll'
>> >> in
>> >>>>>>>>>>> LookupCache.
>> >>>>>>>>>>>
>> >>>>>>>>>>> 5) I don't see DDL options, that were in previous version of
>> >>> FLIP.
>> >>>> Do
>> >>>>>>>>>>> you have any special plans for them?
>> >>>>>>>>>>>
>> >>>>>>>>>>> If you don't mind, I would be glad to be able to make small
>> >>>>>>>>>>> adjustments to the FLIP document too. I think it's worth
>> >>> mentioning
>> >>>>>>>>>>> about what exactly optimizations are planning in the future.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Best regards,
>> >>>>>>>>>>> Smirnov Alexander
>> >>>>>>>>>>>
>> >>>>>>>>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren <renqs...@gmail.com
>> >>> :
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Hi Alexander and devs,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Thank you very much for the in-depth discussion! As Jark
>> >>>> mentioned we were inspired by Alexander's idea and made a refactor on
>> >> our
>> >>>> design. FLIP-221 [1] has been updated to reflect our design now and
>> we
>> >>> are
>> >>>> happy to hear more suggestions from you!
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Compared to the previous design:
>> >>>>>>>>>>>> 1. The lookup cache serves at table runtime level and is
>> >>>> integrated as a component of LookupJoinRunner as discussed
>> previously.
>> >>>>>>>>>>>> 2. Interfaces are renamed and re-designed to reflect the new
>> >>>> design.
>> >>>>>>>>>>>> 3. We separate the all-caching case individually and
>> >> introduce a
>> >>>> new RescanRuntimeProvider to reuse the ability of scanning. We are
>> >>> planning
>> >>>> to support SourceFunction / InputFormat for now considering the
>> >>> complexity
>> >>>> of FLIP-27 Source API.
>> >>>>>>>>>>>> 4. A new interface LookupFunction is introduced to make the
>> >>>> semantic of lookup more straightforward for developers.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> For replying to Alexander:
>> >>>>>>>>>>>>> However I'm a little confused whether InputFormat is
>> >> deprecated
>> >>>> or not. Am I right that it will be so in the future, but currently
>> it's
>> >>> not?
>> >>>>>>>>>>>> Yes you are right. InputFormat is not deprecated for now. I
>> >>> think
>> >>>> it will be deprecated in the future but we don't have a clear plan
>> for
>> >>> that.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Thanks again for the discussion on this FLIP and looking
>> >> forward
>> >>>> to cooperating with you after we finalize the design and interfaces!
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> [1]
>> >>>>
>> >>>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Qingsheng
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов <
>> >>>> smirale...@gmail.com> wrote:
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Hi Jark, Qingsheng and Leonard!
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Glad to see that we came to a consensus on almost all
>> points!
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> However I'm a little confused whether InputFormat is
>> >> deprecated
>> >>>> or
>> >>>>>>>>>>>>> not. Am I right that it will be so in the future, but
>> >> currently
>> >>>> it's
>> >>>>>>>>>>>>> not? Actually I also think that for the first version it's
>> OK
>> >>> to
>> >>>> use
>> >>>>>>>>>>>>> InputFormat in ALL cache realization, because supporting
>> >> rescan
>> >>>>>>>>>>>>> ability seems like a very distant prospect. But for this
>> >>>> decision we
>> >>>>>>>>>>>>> need a consensus among all discussion participants.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> In general, I don't have something to argue with your
>> >>>> statements. All
>> >>>>>>>>>>>>> of them correspond my ideas. Looking ahead, it would be nice
>> >> to
>> >>>> work
>> >>>>>>>>>>>>> on this FLIP cooperatively. I've already done a lot of work
>> >> on
>> >>>> lookup
>> >>>>>>>>>>>>> join caching with realization very close to the one we are
>> >>>> discussing,
>> >>>>>>>>>>>>> and want to share the results of this work. Anyway looking
>> >>>> forward for
>> >>>>>>>>>>>>> the FLIP update!
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>>> Smirnov Alexander
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Hi Alex,
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Thanks for summarizing your points.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> In the past week, Qingsheng, Leonard, and I have discussed
>> >> it
>> >>>> several times
>> >>>>>>>>>>>>>> and we have totally refactored the design.
>> >>>>>>>>>>>>>> I'm glad to say we have reached a consensus on many of your
>> >>>> points!
>> >>>>>>>>>>>>>> Qingsheng is still working on updating the design docs and
>> >>>> maybe can be
>> >>>>>>>>>>>>>> available in the next few days.
>> >>>>>>>>>>>>>> I will share some conclusions from our discussions:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> 1) we have refactored the design towards to "cache in
>> >>>> framework" way.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> 2) a "LookupCache" interface for users to customize and a
>> >>>> default
>> >>>>>>>>>>>>>> implementation with builder for users to easy-use.
>> >>>>>>>>>>>>>> This can both make it possible to both have flexibility and
>> >>>> conciseness.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> 3) Filter pushdown is important for ALL and LRU lookup
>> >> cache,
>> >>>> esp reducing
>> >>>>>>>>>>>>>> IO.
>> >>>>>>>>>>>>>> Filter pushdown should be the final state and the unified
>> >> way
>> >>>> to both
>> >>>>>>>>>>>>>> support pruning ALL cache and LRU cache,
>> >>>>>>>>>>>>>> so I think we should make effort in this direction. If we
>> >> need
>> >>>> to support
>> >>>>>>>>>>>>>> filter pushdown for ALL cache anyway, why not use
>> >>>>>>>>>>>>>> it for LRU cache as well? Either way, as we decide to
>> >>> implement
>> >>>> the cache
>> >>>>>>>>>>>>>> in the framework, we have the chance to support
>> >>>>>>>>>>>>>> filter on cache anytime. This is an optimization and it
>> >>> doesn't
>> >>>> affect the
>> >>>>>>>>>>>>>> public API. I think we can create a JIRA issue to
>> >>>>>>>>>>>>>> discuss it when the FLIP is accepted.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> 4) The idea to support ALL cache is similar to your
>> >> proposal.
>> >>>>>>>>>>>>>> In the first version, we will only support InputFormat,
>> >>>> SourceFunction for
>> >>>>>>>>>>>>>> cache all (invoke InputFormat in join operator).
>> >>>>>>>>>>>>>> For FLIP-27 source, we need to join a true source operator
>> >>>> instead of
>> >>>>>>>>>>>>>> calling it embedded in the join operator.
>> >>>>>>>>>>>>>> However, this needs another FLIP to support the re-scan
>> >>> ability
>> >>>> for FLIP-27
>> >>>>>>>>>>>>>> Source, and this can be a large work.
>> >>>>>>>>>>>>>> In order to not block this issue, we can put the effort of
>> >>>> FLIP-27 source
>> >>>>>>>>>>>>>> integration into future work and integrate
>> >>>>>>>>>>>>>> InputFormat&SourceFunction for now.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> I think it's fine to use InputFormat&SourceFunction, as
>> they
>> >>>> are not
>> >>>>>>>>>>>>>> deprecated, otherwise, we have to introduce another
>> function
>> >>>>>>>>>>>>>> similar to them which is meaningless. We need to plan
>> >> FLIP-27
>> >>>> source
>> >>>>>>>>>>>>>> integration ASAP before InputFormat & SourceFunction are
>> >>>> deprecated.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Best,
>> >>>>>>>>>>>>>> Jark
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов <
>> >>>> smirale...@gmail.com>
>> >>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Hi Martijn!
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Got it. Therefore, the realization with InputFormat is not
>> >>>> considered.
>> >>>>>>>>>>>>>>> Thanks for clearing that up!
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>>>>> Smirnov Alexander
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser <
>> >>>> mart...@ververica.com>:
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Hi,
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> With regards to:
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> But if there are plans to refactor all connectors to
>> >>> FLIP-27
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Yes, FLIP-27 is the target for all connectors. The old
>> >>>> interfaces will be
>> >>>>>>>>>>>>>>>> deprecated and connectors will either be refactored to
>> use
>> >>>> the new ones
>> >>>>>>>>>>>>>>> or
>> >>>>>>>>>>>>>>>> dropped.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> The caching should work for connectors that are using
>> >>> FLIP-27
>> >>>> interfaces,
>> >>>>>>>>>>>>>>>> we should not introduce new features for old interfaces.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Martijn
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов <
>> >>>> smirale...@gmail.com>
>> >>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Hi Jark!
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Sorry for the late response. I would like to make some
>> >>>> comments and
>> >>>>>>>>>>>>>>>>> clarify my points.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> 1) I agree with your first statement. I think we can
>> >>> achieve
>> >>>> both
>> >>>>>>>>>>>>>>>>> advantages this way: put the Cache interface in
>> >>>> flink-table-common,
>> >>>>>>>>>>>>>>>>> but have implementations of it in flink-table-runtime.
>> >>>> Therefore if a
>> >>>>>>>>>>>>>>>>> connector developer wants to use existing cache
>> >> strategies
>> >>>> and their
>> >>>>>>>>>>>>>>>>> implementations, he can just pass lookupConfig to the
>> >>>> planner, but if
>> >>>>>>>>>>>>>>>>> he wants to have its own cache implementation in his
>> >>>> TableFunction, it
>> >>>>>>>>>>>>>>>>> will be possible for him to use the existing interface
>> >> for
>> >>>> this
>> >>>>>>>>>>>>>>>>> purpose (we can explicitly point this out in the
>> >>>> documentation). In
>> >>>>>>>>>>>>>>>>> this way all configs and metrics will be unified. WDYT?
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we will
>> >>>> have 90% of
>> >>>>>>>>>>>>>>>>> lookup requests that can never be cached
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> 2) Let me clarify the logic filters optimization in case
>> >> of
>> >>>> LRU cache.
>> >>>>>>>>>>>>>>>>> It looks like Cache<RowData, Collection<RowData>>. Here
>> >> we
>> >>>> always
>> >>>>>>>>>>>>>>>>> store the response of the dimension table in cache, even
>> >>>> after
>> >>>>>>>>>>>>>>>>> applying calc function. I.e. if there are no rows after
>> >>>> applying
>> >>>>>>>>>>>>>>>>> filters to the result of the 'eval' method of
>> >>> TableFunction,
>> >>>> we store
>> >>>>>>>>>>>>>>>>> the empty list by lookup keys. Therefore the cache line
>> >>> will
>> >>>> be
>> >>>>>>>>>>>>>>>>> filled, but will require much less memory (in bytes).
>> >> I.e.
>> >>>> we don't
>> >>>>>>>>>>>>>>>>> completely filter keys, by which result was pruned, but
>> >>>> significantly
>> >>>>>>>>>>>>>>>>> reduce required memory to store this result. If the user
>> >>>> knows about
>> >>>>>>>>>>>>>>>>> this behavior, he can increase the 'max-rows' option
>> >> before
>> >>>> the start
>> >>>>>>>>>>>>>>>>> of the job. But actually I came up with the idea that we
>> >>> can
>> >>>> do this
>> >>>>>>>>>>>>>>>>> automatically by using the 'maximumWeight' and 'weigher'
>> >>>> methods of
>> >>>>>>>>>>>>>>>>> GuavaCache [1]. Weight can be the size of the collection
>> >> of
>> >>>> rows
>> >>>>>>>>>>>>>>>>> (value of cache). Therefore cache can automatically fit
>> >>> much
>> >>>> more
>> >>>>>>>>>>>>>>>>> records than before.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> Flink SQL has provided a standard way to do filters and
>> >>>> projects
>> >>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
>> >>>> SupportsProjectionPushDown.
>> >>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces,
>> >> don't
>> >>>> mean it's
>> >>>>>>>>>>>>>>> hard
>> >>>>>>>>>>>>>>>>> to implement.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> It's debatable how difficult it will be to implement
>> >> filter
>> >>>> pushdown.
>> >>>>>>>>>>>>>>>>> But I think the fact that currently there is no database
>> >>>> connector
>> >>>>>>>>>>>>>>>>> with filter pushdown at least means that this feature
>> >> won't
>> >>>> be
>> >>>>>>>>>>>>>>>>> supported soon in connectors. Moreover, if we talk about
>> >>>> other
>> >>>>>>>>>>>>>>>>> connectors (not in Flink repo), their databases might
>> not
>> >>>> support all
>> >>>>>>>>>>>>>>>>> Flink filters (or not support filters at all). I think
>> >>> users
>> >>>> are
>> >>>>>>>>>>>>>>>>> interested in supporting cache filters optimization
>> >>>> independently of
>> >>>>>>>>>>>>>>>>> supporting other features and solving more complex
>> >> problems
>> >>>> (or
>> >>>>>>>>>>>>>>>>> unsolvable at all).
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> 3) I agree with your third statement. Actually in our
>> >>>> internal version
>> >>>>>>>>>>>>>>>>> I also tried to unify the logic of scanning and
>> reloading
>> >>>> data from
>> >>>>>>>>>>>>>>>>> connectors. But unfortunately, I didn't find a way to
>> >> unify
>> >>>> the logic
>> >>>>>>>>>>>>>>>>> of all ScanRuntimeProviders (InputFormat,
>> SourceFunction,
>> >>>> Source,...)
>> >>>>>>>>>>>>>>>>> and reuse it in reloading ALL cache. As a result I
>> >> settled
>> >>>> on using
>> >>>>>>>>>>>>>>>>> InputFormat, because it was used for scanning in all
>> >> lookup
>> >>>>>>>>>>>>>>>>> connectors. (I didn't know that there are plans to
>> >>> deprecate
>> >>>>>>>>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO usage of
>> >>>> FLIP-27 source
>> >>>>>>>>>>>>>>>>> in ALL caching is not good idea, because this source was
>> >>>> designed to
>> >>>>>>>>>>>>>>>>> work in distributed environment (SplitEnumerator on
>> >>>> JobManager and
>> >>>>>>>>>>>>>>>>> SourceReaders on TaskManagers), not in one operator
>> >> (lookup
>> >>>> join
>> >>>>>>>>>>>>>>>>> operator in our case). There is even no direct way to
>> >> pass
>> >>>> splits from
>> >>>>>>>>>>>>>>>>> SplitEnumerator to SourceReader (this logic works
>> through
>> >>>>>>>>>>>>>>>>> SplitEnumeratorContext, which requires
>> >>>>>>>>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send
>> >> AddSplitEvents).
>> >>>> Usage of
>> >>>>>>>>>>>>>>>>> InputFormat for ALL cache seems much more clearer and
>> >>>> easier. But if
>> >>>>>>>>>>>>>>>>> there are plans to refactor all connectors to FLIP-27, I
>> >>>> have the
>> >>>>>>>>>>>>>>>>> following ideas: maybe we can refuse from lookup join
>> ALL
>> >>>> cache in
>> >>>>>>>>>>>>>>>>> favor of simple join with multiple scanning of batch
>> >>> source?
>> >>>> The point
>> >>>>>>>>>>>>>>>>> is that the only difference between lookup join ALL
>> cache
>> >>>> and simple
>> >>>>>>>>>>>>>>>>> join with batch source is that in the first case
>> scanning
>> >>> is
>> >>>> performed
>> >>>>>>>>>>>>>>>>> multiple times, in between which state (cache) is
>> cleared
>> >>>> (correct me
>> >>>>>>>>>>>>>>>>> if I'm wrong). So what if we extend the functionality of
>> >>>> simple join
>> >>>>>>>>>>>>>>>>> to support state reloading + extend the functionality of
>> >>>> scanning
>> >>>>>>>>>>>>>>>>> batch source multiple times (this one should be easy
>> with
>> >>>> new FLIP-27
>> >>>>>>>>>>>>>>>>> source, that unifies streaming/batch reading - we will
>> >> need
>> >>>> to change
>> >>>>>>>>>>>>>>>>> only SplitEnumerator, which will pass splits again after
>> >>>> some TTL).
>> >>>>>>>>>>>>>>>>> WDYT? I must say that this looks like a long-term goal
>> >> and
>> >>>> will make
>> >>>>>>>>>>>>>>>>> the scope of this FLIP even larger than you said. Maybe
>> >> we
>> >>>> can limit
>> >>>>>>>>>>>>>>>>> ourselves to a simpler solution now (InputFormats).
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> So to sum up, my points is like this:
>> >>>>>>>>>>>>>>>>> 1) There is a way to make both concise and flexible
>> >>>> interfaces for
>> >>>>>>>>>>>>>>>>> caching in lookup join.
>> >>>>>>>>>>>>>>>>> 2) Cache filters optimization is important both in LRU
>> >> and
>> >>>> ALL caches.
>> >>>>>>>>>>>>>>>>> 3) It is unclear when filter pushdown will be supported
>> >> in
>> >>>> Flink
>> >>>>>>>>>>>>>>>>> connectors, some of the connectors might not have the
>> >>>> opportunity to
>> >>>>>>>>>>>>>>>>> support filter pushdown + as I know, currently filter
>> >>>> pushdown works
>> >>>>>>>>>>>>>>>>> only for scanning (not lookup). So cache filters +
>> >>>> projections
>> >>>>>>>>>>>>>>>>> optimization should be independent from other features.
>> >>>>>>>>>>>>>>>>> 4) ALL cache realization is a complex topic that
>> involves
>> >>>> multiple
>> >>>>>>>>>>>>>>>>> aspects of how Flink is developing. Refusing from
>> >>>> InputFormat in favor
>> >>>>>>>>>>>>>>>>> of FLIP-27 Source will make ALL cache realization really
>> >>>> complex and
>> >>>>>>>>>>>>>>>>> not clear, so maybe instead of that we can extend the
>> >>>> functionality of
>> >>>>>>>>>>>>>>>>> simple join or not refuse from InputFormat in case of
>> >>> lookup
>> >>>> join ALL
>> >>>>>>>>>>>>>>>>> cache?
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>>>>>>> Smirnov Alexander
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> [1]
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>
>> >>>
>> >>
>> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher)
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>:
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> It's great to see the active discussion! I want to
>> share
>> >>> my
>> >>>> ideas:
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> 1) implement the cache in framework vs. connectors base
>> >>>>>>>>>>>>>>>>>> I don't have a strong opinion on this. Both ways should
>> >>>> work (e.g.,
>> >>>>>>>>>>>>>>> cache
>> >>>>>>>>>>>>>>>>>> pruning, compatibility).
>> >>>>>>>>>>>>>>>>>> The framework way can provide more concise interfaces.
>> >>>>>>>>>>>>>>>>>> The connector base way can define more flexible cache
>> >>>>>>>>>>>>>>>>>> strategies/implementations.
>> >>>>>>>>>>>>>>>>>> We are still investigating a way to see if we can have
>> >>> both
>> >>>>>>>>>>>>>>> advantages.
>> >>>>>>>>>>>>>>>>>> We should reach a consensus that the way should be a
>> >> final
>> >>>> state,
>> >>>>>>>>>>>>>>> and we
>> >>>>>>>>>>>>>>>>>> are on the path to it.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> 2) filters and projections pushdown:
>> >>>>>>>>>>>>>>>>>> I agree with Alex that the filter pushdown into cache
>> >> can
>> >>>> benefit a
>> >>>>>>>>>>>>>>> lot
>> >>>>>>>>>>>>>>>>> for
>> >>>>>>>>>>>>>>>>>> ALL cache.
>> >>>>>>>>>>>>>>>>>> However, this is not true for LRU cache. Connectors use
>> >>>> cache to
>> >>>>>>>>>>>>>>> reduce
>> >>>>>>>>>>>>>>>>> IO
>> >>>>>>>>>>>>>>>>>> requests to databases for better throughput.
>> >>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we will
>> >>>> have 90% of
>> >>>>>>>>>>>>>>>>> lookup
>> >>>>>>>>>>>>>>>>>> requests that can never be cached
>> >>>>>>>>>>>>>>>>>> and hit directly to the databases. That means the cache
>> >> is
>> >>>>>>>>>>>>>>> meaningless in
>> >>>>>>>>>>>>>>>>>> this case.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> IMO, Flink SQL has provided a standard way to do
>> filters
>> >>>> and projects
>> >>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
>> >>>>>>>>>>>>>>> SupportsProjectionPushDown.
>> >>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces,
>> >> don't
>> >>>> mean it's
>> >>>>>>>>>>>>>>> hard
>> >>>>>>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>>>> implement.
>> >>>>>>>>>>>>>>>>>> They should implement the pushdown interfaces to reduce
>> >> IO
>> >>>> and the
>> >>>>>>>>>>>>>>> cache
>> >>>>>>>>>>>>>>>>>> size.
>> >>>>>>>>>>>>>>>>>> That should be a final state that the scan source and
>> >>>> lookup source
>> >>>>>>>>>>>>>>> share
>> >>>>>>>>>>>>>>>>>> the exact pushdown implementation.
>> >>>>>>>>>>>>>>>>>> I don't see why we need to duplicate the pushdown logic
>> >> in
>> >>>> caches,
>> >>>>>>>>>>>>>>> which
>> >>>>>>>>>>>>>>>>>> will complex the lookup join design.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> 3) ALL cache abstraction
>> >>>>>>>>>>>>>>>>>> All cache might be the most challenging part of this
>> >> FLIP.
>> >>>> We have
>> >>>>>>>>>>>>>>> never
>> >>>>>>>>>>>>>>>>>> provided a reload-lookup public interface.
>> >>>>>>>>>>>>>>>>>> Currently, we put the reload logic in the "eval" method
>> >> of
>> >>>>>>>>>>>>>>> TableFunction.
>> >>>>>>>>>>>>>>>>>> That's hard for some sources (e.g., Hive).
>> >>>>>>>>>>>>>>>>>> Ideally, connector implementation should share the
>> logic
>> >>> of
>> >>>> reload
>> >>>>>>>>>>>>>>> and
>> >>>>>>>>>>>>>>>>>> scan, i.e. ScanTableSource with
>> >>>> InputFormat/SourceFunction/FLIP-27
>> >>>>>>>>>>>>>>>>> Source.
>> >>>>>>>>>>>>>>>>>> However, InputFormat/SourceFunction are deprecated, and
>> >>> the
>> >>>> FLIP-27
>> >>>>>>>>>>>>>>>>> source
>> >>>>>>>>>>>>>>>>>> is deeply coupled with SourceOperator.
>> >>>>>>>>>>>>>>>>>> If we want to invoke the FLIP-27 source in LookupJoin,
>> >>> this
>> >>>> may make
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>> scope of this FLIP much larger.
>> >>>>>>>>>>>>>>>>>> We are still investigating how to abstract the ALL
>> cache
>> >>>> logic and
>> >>>>>>>>>>>>>>> reuse
>> >>>>>>>>>>>>>>>>>> the existing source interfaces.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> Best,
>> >>>>>>>>>>>>>>>>>> Jark
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko <
>> >>>> ro.v.bo...@gmail.com>
>> >>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> It's a much more complicated activity and lies out of
>> >> the
>> >>>> scope of
>> >>>>>>>>>>>>>>> this
>> >>>>>>>>>>>>>>>>>>> improvement. Because such pushdowns should be done for
>> >>> all
>> >>>>>>>>>>>>>>>>> ScanTableSource
>> >>>>>>>>>>>>>>>>>>> implementations (not only for Lookup ones).
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser <
>> >>>>>>>>>>>>>>> martijnvis...@apache.org>
>> >>>>>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> Hi everyone,
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> One question regarding "And Alexander correctly
>> >>> mentioned
>> >>>> that
>> >>>>>>>>>>>>>>> filter
>> >>>>>>>>>>>>>>>>>>>> pushdown still is not implemented for
>> >> jdbc/hive/hbase."
>> >>>> -> Would
>> >>>>>>>>>>>>>>> an
>> >>>>>>>>>>>>>>>>>>>> alternative solution be to actually implement these
>> >>> filter
>> >>>>>>>>>>>>>>> pushdowns?
>> >>>>>>>>>>>>>>>>> I
>> >>>>>>>>>>>>>>>>>>>> can
>> >>>>>>>>>>>>>>>>>>>> imagine that there are many more benefits to doing
>> >> that,
>> >>>> outside
>> >>>>>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>>> lookup
>> >>>>>>>>>>>>>>>>>>>> caching and metrics.
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> Martijn Visser
>> >>>>>>>>>>>>>>>>>>>> https://twitter.com/MartijnVisser82
>> >>>>>>>>>>>>>>>>>>>> https://github.com/MartijnVisser
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko <
>> >>>> ro.v.bo...@gmail.com>
>> >>>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> Hi everyone!
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> Thanks for driving such a valuable improvement!
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> I do think that single cache implementation would be
>> >> a
>> >>>> nice
>> >>>>>>>>>>>>>>>>> opportunity
>> >>>>>>>>>>>>>>>>>>>> for
>> >>>>>>>>>>>>>>>>>>>>> users. And it will break the "FOR SYSTEM_TIME AS OF
>> >>>> proc_time"
>> >>>>>>>>>>>>>>>>> semantics
>> >>>>>>>>>>>>>>>>>>>>> anyway - doesn't matter how it will be implemented.
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> Putting myself in the user's shoes, I can say that:
>> >>>>>>>>>>>>>>>>>>>>> 1) I would prefer to have the opportunity to cut off
>> >>> the
>> >>>> cache
>> >>>>>>>>>>>>>>> size
>> >>>>>>>>>>>>>>>>> by
>> >>>>>>>>>>>>>>>>>>>>> simply filtering unnecessary data. And the most
>> handy
>> >>>> way to do
>> >>>>>>>>>>>>>>> it
>> >>>>>>>>>>>>>>>>> is
>> >>>>>>>>>>>>>>>>>>>> apply
>> >>>>>>>>>>>>>>>>>>>>> it inside LookupRunners. It would be a bit harder to
>> >>>> pass it
>> >>>>>>>>>>>>>>>>> through the
>> >>>>>>>>>>>>>>>>>>>>> LookupJoin node to TableFunction. And Alexander
>> >>> correctly
>> >>>>>>>>>>>>>>> mentioned
>> >>>>>>>>>>>>>>>>> that
>> >>>>>>>>>>>>>>>>>>>>> filter pushdown still is not implemented for
>> >>>> jdbc/hive/hbase.
>> >>>>>>>>>>>>>>>>>>>>> 2) The ability to set the different caching
>> >> parameters
>> >>>> for
>> >>>>>>>>>>>>>>> different
>> >>>>>>>>>>>>>>>>>>>> tables
>> >>>>>>>>>>>>>>>>>>>>> is quite important. So I would prefer to set it
>> >> through
>> >>>> DDL
>> >>>>>>>>>>>>>>> rather
>> >>>>>>>>>>>>>>>>> than
>> >>>>>>>>>>>>>>>>>>>>> have similar ttla, strategy and other options for
>> all
>> >>>> lookup
>> >>>>>>>>>>>>>>> tables.
>> >>>>>>>>>>>>>>>>>>>>> 3) Providing the cache into the framework really
>> >>>> deprives us of
>> >>>>>>>>>>>>>>>>>>>>> extensibility (users won't be able to implement
>> their
>> >>> own
>> >>>>>>>>>>>>>>> cache).
>> >>>>>>>>>>>>>>>>> But
>> >>>>>>>>>>>>>>>>>>>> most
>> >>>>>>>>>>>>>>>>>>>>> probably it might be solved by creating more
>> >> different
>> >>>> cache
>> >>>>>>>>>>>>>>>>> strategies
>> >>>>>>>>>>>>>>>>>>>> and
>> >>>>>>>>>>>>>>>>>>>>> a wider set of configurations.
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> All these points are much closer to the schema
>> >> proposed
>> >>>> by
>> >>>>>>>>>>>>>>>>> Alexander.
>> >>>>>>>>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm not right and
>> >>> all
>> >>>> these
>> >>>>>>>>>>>>>>>>>>>> facilities
>> >>>>>>>>>>>>>>>>>>>>> might be simply implemented in your architecture?
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>>>>>>>>>>> Roman Boyko
>> >>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn Visser <
>> >>>>>>>>>>>>>>>>> martijnvis...@apache.org>
>> >>>>>>>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> Hi everyone,
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> I don't have much to chip in, but just wanted to
>> >>>> express that
>> >>>>>>>>>>>>>>> I
>> >>>>>>>>>>>>>>>>> really
>> >>>>>>>>>>>>>>>>>>>>>> appreciate the in-depth discussion on this topic
>> >> and I
>> >>>> hope
>> >>>>>>>>>>>>>>> that
>> >>>>>>>>>>>>>>>>>>>> others
>> >>>>>>>>>>>>>>>>>>>>>> will join the conversation.
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> Martijn
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр Смирнов <
>> >>>>>>>>>>>>>>>>> smirale...@gmail.com>
>> >>>>>>>>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark,
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> Thanks for your detailed feedback! However, I have
>> >>>> questions
>> >>>>>>>>>>>>>>>>> about
>> >>>>>>>>>>>>>>>>>>>>>>> some of your statements (maybe I didn't get
>> >>>> something?).
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> Caching actually breaks the semantic of "FOR
>> >>>> SYSTEM_TIME
>> >>>>>>>>>>>>>>> AS OF
>> >>>>>>>>>>>>>>>>>>>>>> proc_time”
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> I agree that the semantics of "FOR SYSTEM_TIME AS
>> >> OF
>> >>>>>>>>>>>>>>> proc_time"
>> >>>>>>>>>>>>>>>>> is
>> >>>>>>>>>>>>>>>>>>>> not
>> >>>>>>>>>>>>>>>>>>>>>>> fully implemented with caching, but as you said,
>> >>> users
>> >>>> go
>> >>>>>>>>>>>>>>> on it
>> >>>>>>>>>>>>>>>>>>>>>>> consciously to achieve better performance (no one
>> >>>> proposed
>> >>>>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>>> enable
>> >>>>>>>>>>>>>>>>>>>>>>> caching by default, etc.). Or by users do you mean
>> >>>> other
>> >>>>>>>>>>>>>>>>> developers
>> >>>>>>>>>>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>>>>>>>>> connectors? In this case developers explicitly
>> >>> specify
>> >>>>>>>>>>>>>>> whether
>> >>>>>>>>>>>>>>>>> their
>> >>>>>>>>>>>>>>>>>>>>>>> connector supports caching or not (in the list of
>> >>>> supported
>> >>>>>>>>>>>>>>>>>>>> options),
>> >>>>>>>>>>>>>>>>>>>>>>> no one makes them do that if they don't want to.
>> So
>> >>>> what
>> >>>>>>>>>>>>>>>>> exactly is
>> >>>>>>>>>>>>>>>>>>>>>>> the difference between implementing caching in
>> >>> modules
>> >>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime and in flink-table-common from
>> >>> the
>> >>>>>>>>>>>>>>>>> considered
>> >>>>>>>>>>>>>>>>>>>>>>> point of view? How does it affect on
>> >>>> breaking/non-breaking
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF proc_time"?
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> confront a situation that allows table options in
>> >>> DDL
>> >>>> to
>> >>>>>>>>>>>>>>>>> control
>> >>>>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>>> behavior of the framework, which has never
>> happened
>> >>>>>>>>>>>>>>> previously
>> >>>>>>>>>>>>>>>>> and
>> >>>>>>>>>>>>>>>>>>>>> should
>> >>>>>>>>>>>>>>>>>>>>>>> be cautious
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> If we talk about main differences of semantics of
>> >> DDL
>> >>>>>>>>>>>>>>> options
>> >>>>>>>>>>>>>>>>> and
>> >>>>>>>>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't it about
>> >>>> limiting
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>> scope
>> >>>>>>>>>>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>>>>>>>>> the options + importance for the user business
>> >> logic
>> >>>> rather
>> >>>>>>>>>>>>>>> than
>> >>>>>>>>>>>>>>>>>>>>>>> specific location of corresponding logic in the
>> >>>> framework? I
>> >>>>>>>>>>>>>>>>> mean
>> >>>>>>>>>>>>>>>>>>>> that
>> >>>>>>>>>>>>>>>>>>>>>>> in my design, for example, putting an option with
>> >>>> lookup
>> >>>>>>>>>>>>>>> cache
>> >>>>>>>>>>>>>>>>>>>>>>> strategy in configurations would  be the wrong
>> >>>> decision,
>> >>>>>>>>>>>>>>>>> because it
>> >>>>>>>>>>>>>>>>>>>>>>> directly affects the user's business logic (not
>> >> just
>> >>>>>>>>>>>>>>> performance
>> >>>>>>>>>>>>>>>>>>>>>>> optimization) + touches just several functions of
>> >> ONE
>> >>>> table
>> >>>>>>>>>>>>>>>>> (there
>> >>>>>>>>>>>>>>>>>>>> can
>> >>>>>>>>>>>>>>>>>>>>>>> be multiple tables with different caches). Does it
>> >>>> really
>> >>>>>>>>>>>>>>>>> matter for
>> >>>>>>>>>>>>>>>>>>>>>>> the user (or someone else) where the logic is
>> >>> located,
>> >>>>>>>>>>>>>>> which is
>> >>>>>>>>>>>>>>>>>>>>>>> affected by the applied option?
>> >>>>>>>>>>>>>>>>>>>>>>> Also I can remember DDL option 'sink.parallelism',
>> >>>> which in
>> >>>>>>>>>>>>>>>>> some way
>> >>>>>>>>>>>>>>>>>>>>>>> "controls the behavior of the framework" and I
>> >> don't
>> >>>> see any
>> >>>>>>>>>>>>>>>>> problem
>> >>>>>>>>>>>>>>>>>>>>>>> here.
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> introduce a new interface for this all-caching
>> >>>> scenario
>> >>>>>>>>>>>>>>> and
>> >>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>> design
>> >>>>>>>>>>>>>>>>>>>>>>> would become more complex
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> This is a subject for a separate discussion, but
>> >>>> actually
>> >>>>>>>>>>>>>>> in our
>> >>>>>>>>>>>>>>>>>>>>>>> internal version we solved this problem quite
>> >> easily
>> >>> -
>> >>>> we
>> >>>>>>>>>>>>>>> reused
>> >>>>>>>>>>>>>>>>>>>>>>> InputFormat class (so there is no need for a new
>> >>> API).
>> >>>> The
>> >>>>>>>>>>>>>>>>> point is
>> >>>>>>>>>>>>>>>>>>>>>>> that currently all lookup connectors use
>> >> InputFormat
>> >>>> for
>> >>>>>>>>>>>>>>>>> scanning
>> >>>>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and even Hive - it
>> >>> uses
>> >>>>>>>>>>>>>>> class
>> >>>>>>>>>>>>>>>>>>>>>>> PartitionReader, that is actually just a wrapper
>> >>> around
>> >>>>>>>>>>>>>>>>> InputFormat.
>> >>>>>>>>>>>>>>>>>>>>>>> The advantage of this solution is the ability to
>> >>> reload
>> >>>>>>>>>>>>>>> cache
>> >>>>>>>>>>>>>>>>> data
>> >>>>>>>>>>>>>>>>>>>> in
>> >>>>>>>>>>>>>>>>>>>>>>> parallel (number of threads depends on number of
>> >>>>>>>>>>>>>>> InputSplits,
>> >>>>>>>>>>>>>>>>> but
>> >>>>>>>>>>>>>>>>>>>> has
>> >>>>>>>>>>>>>>>>>>>>>>> an upper limit). As a result cache reload time
>> >>>> significantly
>> >>>>>>>>>>>>>>>>> reduces
>> >>>>>>>>>>>>>>>>>>>>>>> (as well as time of input stream blocking). I know
>> >>> that
>> >>>>>>>>>>>>>>> usually
>> >>>>>>>>>>>>>>>>> we
>> >>>>>>>>>>>>>>>>>>>> try
>> >>>>>>>>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink code, but
>> >>> maybe
>> >>>> this
>> >>>>>>>>>>>>>>> one
>> >>>>>>>>>>>>>>>>> can
>> >>>>>>>>>>>>>>>>>>>> be
>> >>>>>>>>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's an ideal
>> >>>> solution,
>> >>>>>>>>>>>>>>> maybe
>> >>>>>>>>>>>>>>>>>>>> there
>> >>>>>>>>>>>>>>>>>>>>>>> are better ones.
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> Providing the cache in the framework might
>> >> introduce
>> >>>>>>>>>>>>>>>>> compatibility
>> >>>>>>>>>>>>>>>>>>>>>> issues
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> It's possible only in cases when the developer of
>> >> the
>> >>>>>>>>>>>>>>> connector
>> >>>>>>>>>>>>>>>>>>>> won't
>> >>>>>>>>>>>>>>>>>>>>>>> properly refactor his code and will use new cache
>> >>>> options
>> >>>>>>>>>>>>>>>>>>>> incorrectly
>> >>>>>>>>>>>>>>>>>>>>>>> (i.e. explicitly provide the same options into 2
>> >>>> different
>> >>>>>>>>>>>>>>> code
>> >>>>>>>>>>>>>>>>>>>>>>> places). For correct behavior all he will need to
>> >> do
>> >>>> is to
>> >>>>>>>>>>>>>>>>> redirect
>> >>>>>>>>>>>>>>>>>>>>>>> existing options to the framework's LookupConfig
>> (+
>> >>>> maybe
>> >>>>>>>>>>>>>>> add an
>> >>>>>>>>>>>>>>>>>>>> alias
>> >>>>>>>>>>>>>>>>>>>>>>> for options, if there was different naming),
>> >>> everything
>> >>>>>>>>>>>>>>> will be
>> >>>>>>>>>>>>>>>>>>>>>>> transparent for users. If the developer won't do
>> >>>>>>>>>>>>>>> refactoring at
>> >>>>>>>>>>>>>>>>> all,
>> >>>>>>>>>>>>>>>>>>>>>>> nothing will be changed for the connector because
>> >> of
>> >>>>>>>>>>>>>>> backward
>> >>>>>>>>>>>>>>>>>>>>>>> compatibility. Also if a developer wants to use
>> his
>> >>> own
>> >>>>>>>>>>>>>>> cache
>> >>>>>>>>>>>>>>>>> logic,
>> >>>>>>>>>>>>>>>>>>>>>>> he just can refuse to pass some of the configs
>> into
>> >>> the
>> >>>>>>>>>>>>>>>>> framework,
>> >>>>>>>>>>>>>>>>>>>> and
>> >>>>>>>>>>>>>>>>>>>>>>> instead make his own implementation with already
>> >>>> existing
>> >>>>>>>>>>>>>>>>> configs
>> >>>>>>>>>>>>>>>>>>>> and
>> >>>>>>>>>>>>>>>>>>>>>>> metrics (but actually I think that it's a rare
>> >> case).
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> filters and projections should be pushed all the
>> >> way
>> >>>> down
>> >>>>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>> table
>> >>>>>>>>>>>>>>>>>>>>>>> function, like what we do in the scan source
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> It's the great purpose. But the truth is that the
>> >>> ONLY
>> >>>>>>>>>>>>>>> connector
>> >>>>>>>>>>>>>>>>>>>> that
>> >>>>>>>>>>>>>>>>>>>>>>> supports filter pushdown is FileSystemTableSource
>> >>>>>>>>>>>>>>>>>>>>>>> (no database connector supports it currently).
>> Also
>> >>>> for some
>> >>>>>>>>>>>>>>>>>>>> databases
>> >>>>>>>>>>>>>>>>>>>>>>> it's simply impossible to pushdown such complex
>> >>> filters
>> >>>>>>>>>>>>>>> that we
>> >>>>>>>>>>>>>>>>> have
>> >>>>>>>>>>>>>>>>>>>>>>> in Flink.
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> only applying these optimizations to the cache
>> >> seems
>> >>>> not
>> >>>>>>>>>>>>>>>>> quite
>> >>>>>>>>>>>>>>>>>>>>> useful
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily large amount of
>> >>> data
>> >>>>>>>>>>>>>>> from the
>> >>>>>>>>>>>>>>>>>>>>>>> dimension table. For a simple example, suppose in
>> >>>> dimension
>> >>>>>>>>>>>>>>>>> table
>> >>>>>>>>>>>>>>>>>>>>>>> 'users'
>> >>>>>>>>>>>>>>>>>>>>>>> we have column 'age' with values from 20 to 40,
>> and
>> >>>> input
>> >>>>>>>>>>>>>>> stream
>> >>>>>>>>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed by age of
>> >>>> users. If
>> >>>>>>>>>>>>>>> we
>> >>>>>>>>>>>>>>>>> have
>> >>>>>>>>>>>>>>>>>>>>>>> filter 'age > 30',
>> >>>>>>>>>>>>>>>>>>>>>>> there will be twice less data in cache. This means
>> >>> the
>> >>>> user
>> >>>>>>>>>>>>>>> can
>> >>>>>>>>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by almost 2
>> times.
>> >>> It
>> >>>> will
>> >>>>>>>>>>>>>>>>> gain a
>> >>>>>>>>>>>>>>>>>>>>>>> huge
>> >>>>>>>>>>>>>>>>>>>>>>> performance boost. Moreover, this optimization
>> >> starts
>> >>>> to
>> >>>>>>>>>>>>>>> really
>> >>>>>>>>>>>>>>>>>>>> shine
>> >>>>>>>>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without filters and
>> >>>> projections
>> >>>>>>>>>>>>>>>>> can't
>> >>>>>>>>>>>>>>>>>>>> fit
>> >>>>>>>>>>>>>>>>>>>>>>> in memory, but with them - can. This opens up
>> >>>> additional
>> >>>>>>>>>>>>>>>>>>>> possibilities
>> >>>>>>>>>>>>>>>>>>>>>>> for users. And this doesn't sound as 'not quite
>> >>>> useful'.
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> It would be great to hear other voices regarding
>> >> this
>> >>>> topic!
>> >>>>>>>>>>>>>>>>> Because
>> >>>>>>>>>>>>>>>>>>>>>>> we have quite a lot of controversial points, and I
>> >>>> think
>> >>>>>>>>>>>>>>> with
>> >>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>> help
>> >>>>>>>>>>>>>>>>>>>>>>> of others it will be easier for us to come to a
>> >>>> consensus.
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <
>> >>>>>>>>>>>>>>> renqs...@gmail.com
>> >>>>>>>>>>>>>>>>>> :
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> Hi Alexander and Arvid,
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry for my late
>> >>>> response!
>> >>>>>>>>>>>>>>> We
>> >>>>>>>>>>>>>>>>> had
>> >>>>>>>>>>>>>>>>>>>> an
>> >>>>>>>>>>>>>>>>>>>>>>> internal discussion together with Jark and Leonard
>> >>> and
>> >>>> I’d
>> >>>>>>>>>>>>>>> like
>> >>>>>>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>>>>>>>>> summarize our ideas. Instead of implementing the
>> >>> cache
>> >>>>>>>>>>>>>>> logic in
>> >>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>> table
>> >>>>>>>>>>>>>>>>>>>>>>> runtime layer or wrapping around the user-provided
>> >>>> table
>> >>>>>>>>>>>>>>>>> function,
>> >>>>>>>>>>>>>>>>>>>> we
>> >>>>>>>>>>>>>>>>>>>>>>> prefer to introduce some new APIs extending
>> >>>> TableFunction
>> >>>>>>>>>>>>>>> with
>> >>>>>>>>>>>>>>>>> these
>> >>>>>>>>>>>>>>>>>>>>>>> concerns:
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> 1. Caching actually breaks the semantic of "FOR
>> >>>>>>>>>>>>>>> SYSTEM_TIME
>> >>>>>>>>>>>>>>>>> AS OF
>> >>>>>>>>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly reflect the
>> >>>> content
>> >>>>>>>>>>>>>>> of the
>> >>>>>>>>>>>>>>>>>>>> lookup
>> >>>>>>>>>>>>>>>>>>>>>>> table at the moment of querying. If users choose
>> to
>> >>>> enable
>> >>>>>>>>>>>>>>>>> caching
>> >>>>>>>>>>>>>>>>>>>> on
>> >>>>>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>>> lookup table, they implicitly indicate that this
>> >>>> breakage is
>> >>>>>>>>>>>>>>>>>>>> acceptable
>> >>>>>>>>>>>>>>>>>>>>>> in
>> >>>>>>>>>>>>>>>>>>>>>>> exchange for the performance. So we prefer not to
>> >>>> provide
>> >>>>>>>>>>>>>>>>> caching on
>> >>>>>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>>> table runtime level.
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> 2. If we make the cache implementation in the
>> >>>> framework
>> >>>>>>>>>>>>>>>>> (whether
>> >>>>>>>>>>>>>>>>>>>> in a
>> >>>>>>>>>>>>>>>>>>>>>>> runner or a wrapper around TableFunction), we have
>> >> to
>> >>>>>>>>>>>>>>> confront a
>> >>>>>>>>>>>>>>>>>>>>>> situation
>> >>>>>>>>>>>>>>>>>>>>>>> that allows table options in DDL to control the
>> >>>> behavior of
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>> framework,
>> >>>>>>>>>>>>>>>>>>>>>>> which has never happened previously and should be
>> >>>> cautious.
>> >>>>>>>>>>>>>>>>> Under
>> >>>>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>>> current design the behavior of the framework
>> should
>> >>>> only be
>> >>>>>>>>>>>>>>>>>>>> specified
>> >>>>>>>>>>>>>>>>>>>>> by
>> >>>>>>>>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and it’s hard
>> to
>> >>>> apply
>> >>>>>>>>>>>>>>> these
>> >>>>>>>>>>>>>>>>>>>> general
>> >>>>>>>>>>>>>>>>>>>>>>> configs to a specific table.
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> 3. We have use cases that lookup source loads and
>> >>>> refresh
>> >>>>>>>>>>>>>>> all
>> >>>>>>>>>>>>>>>>>>>> records
>> >>>>>>>>>>>>>>>>>>>>>>> periodically into the memory to achieve high
>> lookup
>> >>>>>>>>>>>>>>> performance
>> >>>>>>>>>>>>>>>>>>>> (like
>> >>>>>>>>>>>>>>>>>>>>>> Hive
>> >>>>>>>>>>>>>>>>>>>>>>> connector in the community, and also widely used
>> by
>> >>> our
>> >>>>>>>>>>>>>>> internal
>> >>>>>>>>>>>>>>>>>>>>>>> connectors). Wrapping the cache around the user’s
>> >>>>>>>>>>>>>>> TableFunction
>> >>>>>>>>>>>>>>>>>>>> works
>> >>>>>>>>>>>>>>>>>>>>>> fine
>> >>>>>>>>>>>>>>>>>>>>>>> for LRU caches, but I think we have to introduce a
>> >>> new
>> >>>>>>>>>>>>>>>>> interface for
>> >>>>>>>>>>>>>>>>>>>>> this
>> >>>>>>>>>>>>>>>>>>>>>>> all-caching scenario and the design would become
>> >> more
>> >>>>>>>>>>>>>>> complex.
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> 4. Providing the cache in the framework might
>> >>>> introduce
>> >>>>>>>>>>>>>>>>>>>> compatibility
>> >>>>>>>>>>>>>>>>>>>>>>> issues to existing lookup sources like there might
>> >>>> exist two
>> >>>>>>>>>>>>>>>>> caches
>> >>>>>>>>>>>>>>>>>>>>> with
>> >>>>>>>>>>>>>>>>>>>>>>> totally different strategies if the user
>> >> incorrectly
>> >>>>>>>>>>>>>>> configures
>> >>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>> table
>> >>>>>>>>>>>>>>>>>>>>>>> (one in the framework and another implemented by
>> >> the
>> >>>> lookup
>> >>>>>>>>>>>>>>>>> source).
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> As for the optimization mentioned by Alexander, I
>> >>>> think
>> >>>>>>>>>>>>>>>>> filters
>> >>>>>>>>>>>>>>>>>>>> and
>> >>>>>>>>>>>>>>>>>>>>>>> projections should be pushed all the way down to
>> >> the
>> >>>> table
>> >>>>>>>>>>>>>>>>> function,
>> >>>>>>>>>>>>>>>>>>>>> like
>> >>>>>>>>>>>>>>>>>>>>>>> what we do in the scan source, instead of the
>> >> runner
>> >>>> with
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>> cache.
>> >>>>>>>>>>>>>>>>>>>>> The
>> >>>>>>>>>>>>>>>>>>>>>>> goal of using cache is to reduce the network I/O
>> >> and
>> >>>>>>>>>>>>>>> pressure
>> >>>>>>>>>>>>>>>>> on the
>> >>>>>>>>>>>>>>>>>>>>>>> external system, and only applying these
>> >>> optimizations
>> >>>> to
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>> cache
>> >>>>>>>>>>>>>>>>>>>>> seems
>> >>>>>>>>>>>>>>>>>>>>>>> not quite useful.
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to reflect our
>> >>>> ideas.
>> >>>>>>>>>>>>>>> We
>> >>>>>>>>>>>>>>>>>>>> prefer to
>> >>>>>>>>>>>>>>>>>>>>>>> keep the cache implementation as a part of
>> >>>> TableFunction,
>> >>>>>>>>>>>>>>> and we
>> >>>>>>>>>>>>>>>>>>>> could
>> >>>>>>>>>>>>>>>>>>>>>>> provide some helper classes (CachingTableFunction,
>> >>>>>>>>>>>>>>>>>>>>>> AllCachingTableFunction,
>> >>>>>>>>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to developers and
>> >> regulate
>> >>>>>>>>>>>>>>> metrics
>> >>>>>>>>>>>>>>>>> of the
>> >>>>>>>>>>>>>>>>>>>>>> cache.
>> >>>>>>>>>>>>>>>>>>>>>>> Also, I made a POC[2] for your reference.
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your ideas!
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> [1]
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>
>> >>>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>> >>>>>>>>>>>>>>>>>>>>>>>> [2]
>> >>> https://github.com/PatrickRen/flink/tree/FLIP-221
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов
>> >> <
>> >>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
>> >>>>>>>>>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid!
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> I have few comments on your message.
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>> but could also live with an easier solution as
>> >> the
>> >>>>>>>>>>>>>>> first
>> >>>>>>>>>>>>>>>>> step:
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> I think that these 2 ways are mutually exclusive
>> >>>>>>>>>>>>>>> (originally
>> >>>>>>>>>>>>>>>>>>>>> proposed
>> >>>>>>>>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because conceptually
>> they
>> >>>> follow
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>> same
>> >>>>>>>>>>>>>>>>>>>>>>>>> goal, but implementation details are different.
>> >> If
>> >>> we
>> >>>>>>>>>>>>>>> will
>> >>>>>>>>>>>>>>>>> go one
>> >>>>>>>>>>>>>>>>>>>>> way,
>> >>>>>>>>>>>>>>>>>>>>>>>>> moving to another way in the future will mean
>> >>>> deleting
>> >>>>>>>>>>>>>>>>> existing
>> >>>>>>>>>>>>>>>>>>>> code
>> >>>>>>>>>>>>>>>>>>>>>>>>> and once again changing the API for connectors.
>> >> So
>> >>> I
>> >>>>>>>>>>>>>>> think we
>> >>>>>>>>>>>>>>>>>>>> should
>> >>>>>>>>>>>>>>>>>>>>>>>>> reach a consensus with the community about that
>> >> and
>> >>>> then
>> >>>>>>>>>>>>>>> work
>> >>>>>>>>>>>>>>>>>>>>> together
>> >>>>>>>>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on tasks for
>> >>>> different
>> >>>>>>>>>>>>>>>>> parts
>> >>>>>>>>>>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>>>>> flip (for example, LRU cache unification /
>> >>>> introducing
>> >>>>>>>>>>>>>>>>> proposed
>> >>>>>>>>>>>>>>>>>>>> set
>> >>>>>>>>>>>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT, Qingsheng?
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>> as the source will only receive the requests
>> >> after
>> >>>>>>>>>>>>>>> filter
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> Actually if filters are applied to fields of the
>> >>>> lookup
>> >>>>>>>>>>>>>>>>> table, we
>> >>>>>>>>>>>>>>>>>>>>>>>>> firstly must do requests, and only after that we
>> >>> can
>> >>>>>>>>>>>>>>> filter
>> >>>>>>>>>>>>>>>>>>>>> responses,
>> >>>>>>>>>>>>>>>>>>>>>>>>> because lookup connectors don't have filter
>> >>>> pushdown. So
>> >>>>>>>>>>>>>>> if
>> >>>>>>>>>>>>>>>>>>>>> filtering
>> >>>>>>>>>>>>>>>>>>>>>>>>> is done before caching, there will be much less
>> >>> rows
>> >>>> in
>> >>>>>>>>>>>>>>>>> cache.
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is
>> >> not
>> >>>>>>>>>>>>>>> shared.
>> >>>>>>>>>>>>>>>>> I
>> >>>>>>>>>>>>>>>>>>>> don't
>> >>>>>>>>>>>>>>>>>>>>>>> know the
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest.
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such kinds of
>> >>>>>>>>>>>>>>> conversations
>> >>>>>>>>>>>>>>>>> :)
>> >>>>>>>>>>>>>>>>>>>>>>>>> I have no write access to the confluence, so I
>> >>> made a
>> >>>>>>>>>>>>>>> Jira
>> >>>>>>>>>>>>>>>>> issue,
>> >>>>>>>>>>>>>>>>>>>>>>>>> where described the proposed changes in more
>> >>> details
>> >>>> -
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >> https://issues.apache.org/jira/browse/FLINK-27411.
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> Will happy to get more feedback!
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> Best,
>> >>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <
>> >>>>>>>>>>>>>>> ar...@apache.org>:
>> >>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng,
>> >>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving this; the inconsistency was
>> >> not
>> >>>>>>>>>>>>>>>>> satisfying
>> >>>>>>>>>>>>>>>>>>>> for
>> >>>>>>>>>>>>>>>>>>>>>> me.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but could also
>> >>> live
>> >>>>>>>>>>>>>>> with
>> >>>>>>>>>>>>>>>>> an
>> >>>>>>>>>>>>>>>>>>>>> easier
>> >>>>>>>>>>>>>>>>>>>>>>>>>> solution as the first step: Instead of making
>> >>>> caching
>> >>>>>>>>>>>>>>> an
>> >>>>>>>>>>>>>>>>>>>>>>> implementation
>> >>>>>>>>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather devise a
>> >> caching
>> >>>>>>>>>>>>>>> layer
>> >>>>>>>>>>>>>>>>>>>> around X.
>> >>>>>>>>>>>>>>>>>>>>>> So
>> >>>>>>>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>>>>>> proposal would be a CachingTableFunction that
>> >>>>>>>>>>>>>>> delegates to
>> >>>>>>>>>>>>>>>>> X in
>> >>>>>>>>>>>>>>>>>>>>> case
>> >>>>>>>>>>>>>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>>>>>>>>>>>> misses and else manages the cache. Lifting it
>> >> into
>> >>>> the
>> >>>>>>>>>>>>>>>>> operator
>> >>>>>>>>>>>>>>>>>>>>>> model
>> >>>>>>>>>>>>>>>>>>>>>>> as
>> >>>>>>>>>>>>>>>>>>>>>>>>>> proposed would be even better but is probably
>> >>>>>>>>>>>>>>> unnecessary
>> >>>>>>>>>>>>>>>>> in
>> >>>>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>>> first step
>> >>>>>>>>>>>>>>>>>>>>>>>>>> for a lookup source (as the source will only
>> >>> receive
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>> requests
>> >>>>>>>>>>>>>>>>>>>>>>> after
>> >>>>>>>>>>>>>>>>>>>>>>>>>> filter; applying projection may be more
>> >>> interesting
>> >>>> to
>> >>>>>>>>>>>>>>> save
>> >>>>>>>>>>>>>>>>>>>>> memory).
>> >>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>> Another advantage is that all the changes of
>> >> this
>> >>>> FLIP
>> >>>>>>>>>>>>>>>>> would be
>> >>>>>>>>>>>>>>>>>>>>>>> limited to
>> >>>>>>>>>>>>>>>>>>>>>>>>>> options, no need for new public interfaces.
>> >>>> Everything
>> >>>>>>>>>>>>>>> else
>> >>>>>>>>>>>>>>>>>>>>> remains
>> >>>>>>>>>>>>>>>>>>>>>> an
>> >>>>>>>>>>>>>>>>>>>>>>>>>> implementation of Table runtime. That means we
>> >> can
>> >>>>>>>>>>>>>>> easily
>> >>>>>>>>>>>>>>>>>>>>>> incorporate
>> >>>>>>>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>>>>>> optimization potential that Alexander pointed
>> >> out
>> >>>>>>>>>>>>>>> later.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is
>> >> not
>> >>>>>>>>>>>>>>> shared.
>> >>>>>>>>>>>>>>>>> I
>> >>>>>>>>>>>>>>>>>>>> don't
>> >>>>>>>>>>>>>>>>>>>>>>> know the
>> >>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM Александр
>> >> Смирнов
>> >>> <
>> >>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
>> >>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, I'm not a
>> >>>>>>>>>>>>>>> committer
>> >>>>>>>>>>>>>>>>> yet,
>> >>>>>>>>>>>>>>>>>>>> but
>> >>>>>>>>>>>>>>>>>>>>>> I'd
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> really like to become one. And this FLIP
>> really
>> >>>>>>>>>>>>>>>>> interested
>> >>>>>>>>>>>>>>>>>>>> me.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar feature in
>> >> my
>> >>>>>>>>>>>>>>>>> company’s
>> >>>>>>>>>>>>>>>>>>>>> Flink
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> fork, and we would like to share our thoughts
>> >> on
>> >>>>>>>>>>>>>>> this and
>> >>>>>>>>>>>>>>>>>>>> make
>> >>>>>>>>>>>>>>>>>>>>>> code
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> open source.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I think there is a better alternative than
>> >>>>>>>>>>>>>>> introducing an
>> >>>>>>>>>>>>>>>>>>>>> abstract
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> class for TableFunction
>> (CachingTableFunction).
>> >>> As
>> >>>>>>>>>>>>>>> you
>> >>>>>>>>>>>>>>>>> know,
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> TableFunction exists in the flink-table-common
>> >>>>>>>>>>>>>>> module,
>> >>>>>>>>>>>>>>>>> which
>> >>>>>>>>>>>>>>>>>>>>>>> provides
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> only an API for working with tables – it’s
>> very
>> >>>>>>>>>>>>>>>>> convenient
>> >>>>>>>>>>>>>>>>>>>> for
>> >>>>>>>>>>>>>>>>>>>>>>> importing
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> in connectors. In turn, CachingTableFunction
>> >>>> contains
>> >>>>>>>>>>>>>>>>> logic
>> >>>>>>>>>>>>>>>>>>>> for
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> runtime execution,  so this class and
>> >> everything
>> >>>>>>>>>>>>>>>>> connected
>> >>>>>>>>>>>>>>>>>>>> with
>> >>>>>>>>>>>>>>>>>>>>> it
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> should be located in another module, probably
>> >> in
>> >>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> But this will require connectors to depend on
>> >>>> another
>> >>>>>>>>>>>>>>>>> module,
>> >>>>>>>>>>>>>>>>>>>>>> which
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic, which doesn’t
>> >>>> sound
>> >>>>>>>>>>>>>>>>> good.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding a new method
>> ‘getLookupConfig’
>> >>> to
>> >>>>>>>>>>>>>>>>>>>>>> LookupTableSource
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow connectors
>> to
>> >>>> only
>> >>>>>>>>>>>>>>> pass
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> configurations to the planner, therefore they
>> >>> won’t
>> >>>>>>>>>>>>>>>>> depend on
>> >>>>>>>>>>>>>>>>>>>>>>> runtime
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> realization. Based on these configs planner
>> >> will
>> >>>>>>>>>>>>>>>>> construct a
>> >>>>>>>>>>>>>>>>>>>>>> lookup
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> join operator with corresponding runtime logic
>> >>>>>>>>>>>>>>>>>>>> (ProcessFunctions
>> >>>>>>>>>>>>>>>>>>>>>> in
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> module flink-table-runtime). Architecture
>> looks
>> >>>> like
>> >>>>>>>>>>>>>>> in
>> >>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>> pinned
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is actually
>> >> yours
>> >>>>>>>>>>>>>>>>>>>> CacheConfig).
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner, that will be
>> >>>>>>>>>>>>>>> responsible
>> >>>>>>>>>>>>>>>>> for
>> >>>>>>>>>>>>>>>>>>>>> this
>> >>>>>>>>>>>>>>>>>>>>>> –
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his inheritors.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in
>> >>>>>>>>>>>>>>> flink-table-runtime
>> >>>>>>>>>>>>>>>>> -
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunner, AsyncLookupJoinRunner,
>> >>>>>>>>>>>>>>>>>>>>> LookupJoinRunnerWithCalc,
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding classes
>> >> LookupJoinCachingRunner,
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> And here comes another more powerful advantage
>> >> of
>> >>>>>>>>>>>>>>> such a
>> >>>>>>>>>>>>>>>>>>>>> solution.
>> >>>>>>>>>>>>>>>>>>>>>>> If
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower level, we can
>> >>>> apply
>> >>>>>>>>>>>>>>> some
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> optimizations to it. LookupJoinRunnerWithCalc
>> >> was
>> >>>>>>>>>>>>>>> named
>> >>>>>>>>>>>>>>>>> like
>> >>>>>>>>>>>>>>>>>>>>> this
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’ function, which
>> >>> actually
>> >>>>>>>>>>>>>>>>> mostly
>> >>>>>>>>>>>>>>>>>>>>>> consists
>> >>>>>>>>>>>>>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> For example, in join table A with lookup table
>> >> B
>> >>>>>>>>>>>>>>>>> condition
>> >>>>>>>>>>>>>>>>>>>>> ‘JOIN …
>> >>>>>>>>>>>>>>>>>>>>>>> ON
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 WHERE
>> >>> B.salary >
>> >>>>>>>>>>>>>>> 1000’
>> >>>>>>>>>>>>>>>>>>>>> ‘calc’
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> function will contain filters A.age = B.age +
>> >> 10
>> >>>> and
>> >>>>>>>>>>>>>>>>>>>> B.salary >
>> >>>>>>>>>>>>>>>>>>>>>>> 1000.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> If we apply this function before storing
>> >> records
>> >>> in
>> >>>>>>>>>>>>>>>>> cache,
>> >>>>>>>>>>>>>>>>>>>> size
>> >>>>>>>>>>>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> cache will be significantly reduced: filters =
>> >>>> avoid
>> >>>>>>>>>>>>>>>>> storing
>> >>>>>>>>>>>>>>>>>>>>>> useless
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> records in cache, projections = reduce
>> records’
>> >>>>>>>>>>>>>>> size. So
>> >>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>> initial
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> max number of records in cache can be
>> increased
>> >>> by
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>> user.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> What do you think about it?
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren wrote:
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi devs,
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a discussion
>> >>> about
>> >>>>>>>>>>>>>>>>>>>> FLIP-221[1],
>> >>>>>>>>>>>>>>>>>>>>>>> which
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup table
>> cache
>> >>> and
>> >>>>>>>>>>>>>>> its
>> >>>>>>>>>>>>>>>>>>>> standard
>> >>>>>>>>>>>>>>>>>>>>>>> metrics.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source should
>> >>>> implement
>> >>>>>>>>>>>>>>>>> their
>> >>>>>>>>>>>>>>>>>>>> own
>> >>>>>>>>>>>>>>>>>>>>>>> cache to
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> store lookup results, and there isn’t a
>> >> standard
>> >>> of
>> >>>>>>>>>>>>>>>>> metrics
>> >>>>>>>>>>>>>>>>>>>> for
>> >>>>>>>>>>>>>>>>>>>>>>> users and
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs with lookup
>> >>> joins,
>> >>>>>>>>>>>>>>> which
>> >>>>>>>>>>>>>>>>> is a
>> >>>>>>>>>>>>>>>>>>>>>> quite
>> >>>>>>>>>>>>>>>>>>>>>>> common
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs including
>> >>>> cache,
>> >>>>>>>>>>>>>>>>>>>> metrics,
>> >>>>>>>>>>>>>>>>>>>>>>> wrapper
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new table
>> options.
>> >>>>>>>>>>>>>>> Please
>> >>>>>>>>>>>>>>>>> take a
>> >>>>>>>>>>>>>>>>>>>>> look
>> >>>>>>>>>>>>>>>>>>>>>>> at the
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details. Any
>> >>> suggestions
>> >>>>>>>>>>>>>>> and
>> >>>>>>>>>>>>>>>>>>>> comments
>> >>>>>>>>>>>>>>>>>>>>>>> would be
>> >>>>>>>>>>>>>>>>>>>>>>>>>>> appreciated!
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>
>> >>>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> --
>> >>>>>>>>>>>>>>>>>>>>>>>> Best Regards,
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng Ren
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> Real-time Computing Team
>> >>>>>>>>>>>>>>>>>>>>>>>> Alibaba Cloud
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> --
>> >>>>>>>>>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>>>>>>>>> Roman Boyko
>> >>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> --
>> >>>>>>>>>>>> Best Regards,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Qingsheng Ren
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Real-time Computing Team
>> >>>>>>>>>>>> Alibaba Cloud
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Email: renqs...@gmail.com
>> >>>>>>>>>>
>> >>>>
>> >>>>
>> >>>
>> >>
>>
>>

Reply via email to