Thanks Qingsheng and Alexander for the update. Current API and Options design 
of this FLIP look good enough from my side,.
If no more concerns about the thread, I think we can start a VOTE thread later.

Best,
Leonard


> 2022年5月18日 下午5:04,Qingsheng Ren <renqs...@gmail.com> 写道:
> 
> Hi Jark and Alexander,
> 
> Thanks for your comments! I’m also OK to introduce common table options. I
> prefer to introduce a new DefaultLookupCacheOptions class for holding these
> option definitions because putting all options into FactoryUtil would make
> it a bit ”crowded” and not well categorized.
> 
> FLIP has been updated according to suggestions above:
> 1. Use static “of” method for constructing RescanRuntimeProvider
> considering both arguments are required.
> 2. Introduce new table options matching DefaultLookupCacheFactory
> 
> Best,
> Qingsheng
> 
> On Wed, May 18, 2022 at 2:57 PM Jark Wu <imj...@gmail.com> wrote:
> 
>> Hi Alex,
>> 
>> 1) retry logic
>> I think we can extract some common retry logic into utilities, e.g.
>> RetryUtils#tryTimes(times, call).
>> This seems independent of this FLIP and can be reused by DataStream users.
>> Maybe we can open an issue to discuss this and where to put it.
>> 
>> 2) cache ConfigOptions
>> I'm fine with defining cache config options in the framework.
>> A candidate place to put is FactoryUtil which also includes
>> "sink.parallelism", "format" options.
>> 
>> Best,
>> Jark
>> 
>> 
>> On Wed, 18 May 2022 at 13:52, Александр Смирнов <smirale...@gmail.com>
>> wrote:
>> 
>>> Hi Qingsheng,
>>> 
>>> Thank you for considering my comments.
>>> 
>>>> there might be custom logic before making retry, such as re-establish
>>> the connection
>>> 
>>> Yes, I understand that. I meant that such logic can be placed in a
>>> separate function, that can be implemented by connectors. Just moving
>>> the retry logic would make connector's LookupFunction more concise +
>>> avoid duplicate code. However, it's a minor change. The decision is up
>>> to you.
>>> 
>>>> We decide not to provide common DDL options and let developers to
>>> define their own options as we do now per connector.
>>> 
>>> What is the reason for that? One of the main goals of this FLIP was to
>>> unify the configs, wasn't it? I understand that current cache design
>>> doesn't depend on ConfigOptions, like was before. But still we can put
>>> these options into the framework, so connectors can reuse them and
>>> avoid code duplication, and, what is more significant, avoid possible
>>> different options naming. This moment can be pointed out in
>>> documentation for connector developers.
>>> 
>>> Best regards,
>>> Alexander
>>> 
>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren <renqs...@gmail.com>:
>>>> 
>>>> Hi Alexander,
>>>> 
>>>> Thanks for the review and glad to see we are on the same page! I think
>>> you forgot to cc the dev mailing list so I’m also quoting your reply under
>>> this email.
>>>> 
>>>>> We can add 'maxRetryTimes' option into this class
>>>> 
>>>> In my opinion the retry logic should be implemented in lookup() instead
>>> of in LookupFunction#eval(). Retrying is only meaningful under some
>>> specific retriable failures, and there might be custom logic before making
>>> retry, such as re-establish the connection (JdbcRowDataLookupFunction is an
>>> example), so it's more handy to leave it to the connector.
>>>> 
>>>>> I don't see DDL options, that were in previous version of FLIP. Do
>>> you have any special plans for them?
>>>> 
>>>> We decide not to provide common DDL options and let developers to
>>> define their own options as we do now per connector.
>>>> 
>>>> The rest of comments sound great and I’ll update the FLIP. Hope we can
>>> finalize our proposal soon!
>>>> 
>>>> Best,
>>>> 
>>>> Qingsheng
>>>> 
>>>> 
>>>>> On May 17, 2022, at 13:46, Александр Смирнов <smirale...@gmail.com>
>>> wrote:
>>>>> 
>>>>> Hi Qingsheng and devs!
>>>>> 
>>>>> I like the overall design of updated FLIP, however I have several
>>>>> suggestions and questions.
>>>>> 
>>>>> 1) Introducing LookupFunction as a subclass of TableFunction is a good
>>>>> idea. We can add 'maxRetryTimes' option into this class. 'eval' method
>>>>> of new LookupFunction is great for this purpose. The same is for
>>>>> 'async' case.
>>>>> 
>>>>> 2) There might be other configs in future, such as 'cacheMissingKey'
>>>>> in LookupFunctionProvider or 'rescanInterval' in ScanRuntimeProvider.
>>>>> Maybe use Builder pattern in LookupFunctionProvider and
>>>>> RescanRuntimeProvider for more flexibility (use one 'build' method
>>>>> instead of many 'of' methods in future)?
>>>>> 
>>>>> 3) What are the plans for existing TableFunctionProvider and
>>>>> AsyncTableFunctionProvider? I think they should be deprecated.
>>>>> 
>>>>> 4) Am I right that the current design does not assume usage of
>>>>> user-provided LookupCache in re-scanning? In this case, it is not very
>>>>> clear why do we need methods such as 'invalidate' or 'putAll' in
>>>>> LookupCache.
>>>>> 
>>>>> 5) I don't see DDL options, that were in previous version of FLIP. Do
>>>>> you have any special plans for them?
>>>>> 
>>>>> If you don't mind, I would be glad to be able to make small
>>>>> adjustments to the FLIP document too. I think it's worth mentioning
>>>>> about what exactly optimizations are planning in the future.
>>>>> 
>>>>> Best regards,
>>>>> Smirnov Alexander
>>>>> 
>>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren <renqs...@gmail.com>:
>>>>>> 
>>>>>> Hi Alexander and devs,
>>>>>> 
>>>>>> Thank you very much for the in-depth discussion! As Jark mentioned
>>> we were inspired by Alexander's idea and made a refactor on our design.
>>> FLIP-221 [1] has been updated to reflect our design now and we are happy to
>>> hear more suggestions from you!
>>>>>> 
>>>>>> Compared to the previous design:
>>>>>> 1. The lookup cache serves at table runtime level and is integrated
>>> as a component of LookupJoinRunner as discussed previously.
>>>>>> 2. Interfaces are renamed and re-designed to reflect the new design.
>>>>>> 3. We separate the all-caching case individually and introduce a new
>>> RescanRuntimeProvider to reuse the ability of scanning. We are planning to
>>> support SourceFunction / InputFormat for now considering the complexity of
>>> FLIP-27 Source API.
>>>>>> 4. A new interface LookupFunction is introduced to make the semantic
>>> of lookup more straightforward for developers.
>>>>>> 
>>>>>> For replying to Alexander:
>>>>>>> However I'm a little confused whether InputFormat is deprecated or
>>> not. Am I right that it will be so in the future, but currently it's not?
>>>>>> Yes you are right. InputFormat is not deprecated for now. I think it
>>> will be deprecated in the future but we don't have a clear plan for that.
>>>>>> 
>>>>>> Thanks again for the discussion on this FLIP and looking forward to
>>> cooperating with you after we finalize the design and interfaces!
>>>>>> 
>>>>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>>>>>> 
>>>>>> Best regards,
>>>>>> 
>>>>>> Qingsheng
>>>>>> 
>>>>>> 
>>>>>> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов <
>>> smirale...@gmail.com> wrote:
>>>>>>> 
>>>>>>> Hi Jark, Qingsheng and Leonard!
>>>>>>> 
>>>>>>> Glad to see that we came to a consensus on almost all points!
>>>>>>> 
>>>>>>> However I'm a little confused whether InputFormat is deprecated or
>>>>>>> not. Am I right that it will be so in the future, but currently it's
>>>>>>> not? Actually I also think that for the first version it's OK to use
>>>>>>> InputFormat in ALL cache realization, because supporting rescan
>>>>>>> ability seems like a very distant prospect. But for this decision we
>>>>>>> need a consensus among all discussion participants.
>>>>>>> 
>>>>>>> In general, I don't have something to argue with your statements.
>>> All
>>>>>>> of them correspond my ideas. Looking ahead, it would be nice to work
>>>>>>> on this FLIP cooperatively. I've already done a lot of work on
>>> lookup
>>>>>>> join caching with realization very close to the one we are
>>> discussing,
>>>>>>> and want to share the results of this work. Anyway looking forward
>>> for
>>>>>>> the FLIP update!
>>>>>>> 
>>>>>>> Best regards,
>>>>>>> Smirnov Alexander
>>>>>>> 
>>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>:
>>>>>>>> 
>>>>>>>> Hi Alex,
>>>>>>>> 
>>>>>>>> Thanks for summarizing your points.
>>>>>>>> 
>>>>>>>> In the past week, Qingsheng, Leonard, and I have discussed it
>>> several times
>>>>>>>> and we have totally refactored the design.
>>>>>>>> I'm glad to say we have reached a consensus on many of your points!
>>>>>>>> Qingsheng is still working on updating the design docs and maybe
>>> can be
>>>>>>>> available in the next few days.
>>>>>>>> I will share some conclusions from our discussions:
>>>>>>>> 
>>>>>>>> 1) we have refactored the design towards to "cache in framework"
>>> way.
>>>>>>>> 
>>>>>>>> 2) a "LookupCache" interface for users to customize and a default
>>>>>>>> implementation with builder for users to easy-use.
>>>>>>>> This can both make it possible to both have flexibility and
>>> conciseness.
>>>>>>>> 
>>>>>>>> 3) Filter pushdown is important for ALL and LRU lookup cache, esp
>>> reducing
>>>>>>>> IO.
>>>>>>>> Filter pushdown should be the final state and the unified way to
>>> both
>>>>>>>> support pruning ALL cache and LRU cache,
>>>>>>>> so I think we should make effort in this direction. If we need to
>>> support
>>>>>>>> filter pushdown for ALL cache anyway, why not use
>>>>>>>> it for LRU cache as well? Either way, as we decide to implement
>>> the cache
>>>>>>>> in the framework, we have the chance to support
>>>>>>>> filter on cache anytime. This is an optimization and it doesn't
>>> affect the
>>>>>>>> public API. I think we can create a JIRA issue to
>>>>>>>> discuss it when the FLIP is accepted.
>>>>>>>> 
>>>>>>>> 4) The idea to support ALL cache is similar to your proposal.
>>>>>>>> In the first version, we will only support InputFormat,
>>> SourceFunction for
>>>>>>>> cache all (invoke InputFormat in join operator).
>>>>>>>> For FLIP-27 source, we need to join a true source operator instead
>>> of
>>>>>>>> calling it embedded in the join operator.
>>>>>>>> However, this needs another FLIP to support the re-scan ability
>>> for FLIP-27
>>>>>>>> Source, and this can be a large work.
>>>>>>>> In order to not block this issue, we can put the effort of FLIP-27
>>> source
>>>>>>>> integration into future work and integrate
>>>>>>>> InputFormat&SourceFunction for now.
>>>>>>>> 
>>>>>>>> I think it's fine to use InputFormat&SourceFunction, as they are
>>> not
>>>>>>>> deprecated, otherwise, we have to introduce another function
>>>>>>>> similar to them which is meaningless. We need to plan FLIP-27
>>> source
>>>>>>>> integration ASAP before InputFormat & SourceFunction are
>>> deprecated.
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Jark
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов <
>>> smirale...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Martijn!
>>>>>>>>> 
>>>>>>>>> Got it. Therefore, the realization with InputFormat is not
>>> considered.
>>>>>>>>> Thanks for clearing that up!
>>>>>>>>> 
>>>>>>>>> Best regards,
>>>>>>>>> Smirnov Alexander
>>>>>>>>> 
>>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser <mart...@ververica.com
>>>> :
>>>>>>>>>> 
>>>>>>>>>> Hi,
>>>>>>>>>> 
>>>>>>>>>> With regards to:
>>>>>>>>>> 
>>>>>>>>>>> But if there are plans to refactor all connectors to FLIP-27
>>>>>>>>>> 
>>>>>>>>>> Yes, FLIP-27 is the target for all connectors. The old
>>> interfaces will be
>>>>>>>>>> deprecated and connectors will either be refactored to use the
>>> new ones
>>>>>>>>> or
>>>>>>>>>> dropped.
>>>>>>>>>> 
>>>>>>>>>> The caching should work for connectors that are using FLIP-27
>>> interfaces,
>>>>>>>>>> we should not introduce new features for old interfaces.
>>>>>>>>>> 
>>>>>>>>>> Best regards,
>>>>>>>>>> 
>>>>>>>>>> Martijn
>>>>>>>>>> 
>>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов <
>>> smirale...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hi Jark!
>>>>>>>>>>> 
>>>>>>>>>>> Sorry for the late response. I would like to make some comments
>>> and
>>>>>>>>>>> clarify my points.
>>>>>>>>>>> 
>>>>>>>>>>> 1) I agree with your first statement. I think we can achieve
>>> both
>>>>>>>>>>> advantages this way: put the Cache interface in
>>> flink-table-common,
>>>>>>>>>>> but have implementations of it in flink-table-runtime.
>>> Therefore if a
>>>>>>>>>>> connector developer wants to use existing cache strategies and
>>> their
>>>>>>>>>>> implementations, he can just pass lookupConfig to the planner,
>>> but if
>>>>>>>>>>> he wants to have its own cache implementation in his
>>> TableFunction, it
>>>>>>>>>>> will be possible for him to use the existing interface for this
>>>>>>>>>>> purpose (we can explicitly point this out in the
>>> documentation). In
>>>>>>>>>>> this way all configs and metrics will be unified. WDYT?
>>>>>>>>>>> 
>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we will have
>>> 90% of
>>>>>>>>>>> lookup requests that can never be cached
>>>>>>>>>>> 
>>>>>>>>>>> 2) Let me clarify the logic filters optimization in case of LRU
>>> cache.
>>>>>>>>>>> It looks like Cache<RowData, Collection<RowData>>. Here we
>>> always
>>>>>>>>>>> store the response of the dimension table in cache, even after
>>>>>>>>>>> applying calc function. I.e. if there are no rows after applying
>>>>>>>>>>> filters to the result of the 'eval' method of TableFunction, we
>>> store
>>>>>>>>>>> the empty list by lookup keys. Therefore the cache line will be
>>>>>>>>>>> filled, but will require much less memory (in bytes). I.e. we
>>> don't
>>>>>>>>>>> completely filter keys, by which result was pruned, but
>>> significantly
>>>>>>>>>>> reduce required memory to store this result. If the user knows
>>> about
>>>>>>>>>>> this behavior, he can increase the 'max-rows' option before the
>>> start
>>>>>>>>>>> of the job. But actually I came up with the idea that we can do
>>> this
>>>>>>>>>>> automatically by using the 'maximumWeight' and 'weigher'
>>> methods of
>>>>>>>>>>> GuavaCache [1]. Weight can be the size of the collection of rows
>>>>>>>>>>> (value of cache). Therefore cache can automatically fit much
>>> more
>>>>>>>>>>> records than before.
>>>>>>>>>>> 
>>>>>>>>>>>> Flink SQL has provided a standard way to do filters and
>>> projects
>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
>>> SupportsProjectionPushDown.
>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, don't mean
>>> it's
>>>>>>>>> hard
>>>>>>>>>>> to implement.
>>>>>>>>>>> 
>>>>>>>>>>> It's debatable how difficult it will be to implement filter
>>> pushdown.
>>>>>>>>>>> But I think the fact that currently there is no database
>>> connector
>>>>>>>>>>> with filter pushdown at least means that this feature won't be
>>>>>>>>>>> supported soon in connectors. Moreover, if we talk about other
>>>>>>>>>>> connectors (not in Flink repo), their databases might not
>>> support all
>>>>>>>>>>> Flink filters (or not support filters at all). I think users are
>>>>>>>>>>> interested in supporting cache filters optimization
>>> independently of
>>>>>>>>>>> supporting other features and solving more complex problems (or
>>>>>>>>>>> unsolvable at all).
>>>>>>>>>>> 
>>>>>>>>>>> 3) I agree with your third statement. Actually in our internal
>>> version
>>>>>>>>>>> I also tried to unify the logic of scanning and reloading data
>>> from
>>>>>>>>>>> connectors. But unfortunately, I didn't find a way to unify the
>>> logic
>>>>>>>>>>> of all ScanRuntimeProviders (InputFormat, SourceFunction,
>>> Source,...)
>>>>>>>>>>> and reuse it in reloading ALL cache. As a result I settled on
>>> using
>>>>>>>>>>> InputFormat, because it was used for scanning in all lookup
>>>>>>>>>>> connectors. (I didn't know that there are plans to deprecate
>>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO usage of FLIP-27
>>> source
>>>>>>>>>>> in ALL caching is not good idea, because this source was
>>> designed to
>>>>>>>>>>> work in distributed environment (SplitEnumerator on JobManager
>>> and
>>>>>>>>>>> SourceReaders on TaskManagers), not in one operator (lookup join
>>>>>>>>>>> operator in our case). There is even no direct way to pass
>>> splits from
>>>>>>>>>>> SplitEnumerator to SourceReader (this logic works through
>>>>>>>>>>> SplitEnumeratorContext, which requires
>>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send AddSplitEvents).
>>> Usage of
>>>>>>>>>>> InputFormat for ALL cache seems much more clearer and easier.
>>> But if
>>>>>>>>>>> there are plans to refactor all connectors to FLIP-27, I have
>>> the
>>>>>>>>>>> following ideas: maybe we can refuse from lookup join ALL cache
>>> in
>>>>>>>>>>> favor of simple join with multiple scanning of batch source?
>>> The point
>>>>>>>>>>> is that the only difference between lookup join ALL cache and
>>> simple
>>>>>>>>>>> join with batch source is that in the first case scanning is
>>> performed
>>>>>>>>>>> multiple times, in between which state (cache) is cleared
>>> (correct me
>>>>>>>>>>> if I'm wrong). So what if we extend the functionality of simple
>>> join
>>>>>>>>>>> to support state reloading + extend the functionality of
>>> scanning
>>>>>>>>>>> batch source multiple times (this one should be easy with new
>>> FLIP-27
>>>>>>>>>>> source, that unifies streaming/batch reading - we will need to
>>> change
>>>>>>>>>>> only SplitEnumerator, which will pass splits again after some
>>> TTL).
>>>>>>>>>>> WDYT? I must say that this looks like a long-term goal and will
>>> make
>>>>>>>>>>> the scope of this FLIP even larger than you said. Maybe we can
>>> limit
>>>>>>>>>>> ourselves to a simpler solution now (InputFormats).
>>>>>>>>>>> 
>>>>>>>>>>> So to sum up, my points is like this:
>>>>>>>>>>> 1) There is a way to make both concise and flexible interfaces
>>> for
>>>>>>>>>>> caching in lookup join.
>>>>>>>>>>> 2) Cache filters optimization is important both in LRU and ALL
>>> caches.
>>>>>>>>>>> 3) It is unclear when filter pushdown will be supported in Flink
>>>>>>>>>>> connectors, some of the connectors might not have the
>>> opportunity to
>>>>>>>>>>> support filter pushdown + as I know, currently filter pushdown
>>> works
>>>>>>>>>>> only for scanning (not lookup). So cache filters + projections
>>>>>>>>>>> optimization should be independent from other features.
>>>>>>>>>>> 4) ALL cache realization is a complex topic that involves
>>> multiple
>>>>>>>>>>> aspects of how Flink is developing. Refusing from InputFormat
>>> in favor
>>>>>>>>>>> of FLIP-27 Source will make ALL cache realization really
>>> complex and
>>>>>>>>>>> not clear, so maybe instead of that we can extend the
>>> functionality of
>>>>>>>>>>> simple join or not refuse from InputFormat in case of lookup
>>> join ALL
>>>>>>>>>>> cache?
>>>>>>>>>>> 
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> [1]
>>>>>>>>>>> 
>>>>>>>>> 
>>> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher)
>>>>>>>>>>> 
>>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>:
>>>>>>>>>>>> 
>>>>>>>>>>>> It's great to see the active discussion! I want to share my
>>> ideas:
>>>>>>>>>>>> 
>>>>>>>>>>>> 1) implement the cache in framework vs. connectors base
>>>>>>>>>>>> I don't have a strong opinion on this. Both ways should work
>>> (e.g.,
>>>>>>>>> cache
>>>>>>>>>>>> pruning, compatibility).
>>>>>>>>>>>> The framework way can provide more concise interfaces.
>>>>>>>>>>>> The connector base way can define more flexible cache
>>>>>>>>>>>> strategies/implementations.
>>>>>>>>>>>> We are still investigating a way to see if we can have both
>>>>>>>>> advantages.
>>>>>>>>>>>> We should reach a consensus that the way should be a final
>>> state,
>>>>>>>>> and we
>>>>>>>>>>>> are on the path to it.
>>>>>>>>>>>> 
>>>>>>>>>>>> 2) filters and projections pushdown:
>>>>>>>>>>>> I agree with Alex that the filter pushdown into cache can
>>> benefit a
>>>>>>>>> lot
>>>>>>>>>>> for
>>>>>>>>>>>> ALL cache.
>>>>>>>>>>>> However, this is not true for LRU cache. Connectors use cache
>>> to
>>>>>>>>> reduce
>>>>>>>>>>> IO
>>>>>>>>>>>> requests to databases for better throughput.
>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we will have
>>> 90% of
>>>>>>>>>>> lookup
>>>>>>>>>>>> requests that can never be cached
>>>>>>>>>>>> and hit directly to the databases. That means the cache is
>>>>>>>>> meaningless in
>>>>>>>>>>>> this case.
>>>>>>>>>>>> 
>>>>>>>>>>>> IMO, Flink SQL has provided a standard way to do filters and
>>> projects
>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
>>>>>>>>> SupportsProjectionPushDown.
>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, don't mean
>>> it's
>>>>>>>>> hard
>>>>>>>>>>> to
>>>>>>>>>>>> implement.
>>>>>>>>>>>> They should implement the pushdown interfaces to reduce IO and
>>> the
>>>>>>>>> cache
>>>>>>>>>>>> size.
>>>>>>>>>>>> That should be a final state that the scan source and lookup
>>> source
>>>>>>>>> share
>>>>>>>>>>>> the exact pushdown implementation.
>>>>>>>>>>>> I don't see why we need to duplicate the pushdown logic in
>>> caches,
>>>>>>>>> which
>>>>>>>>>>>> will complex the lookup join design.
>>>>>>>>>>>> 
>>>>>>>>>>>> 3) ALL cache abstraction
>>>>>>>>>>>> All cache might be the most challenging part of this FLIP. We
>>> have
>>>>>>>>> never
>>>>>>>>>>>> provided a reload-lookup public interface.
>>>>>>>>>>>> Currently, we put the reload logic in the "eval" method of
>>>>>>>>> TableFunction.
>>>>>>>>>>>> That's hard for some sources (e.g., Hive).
>>>>>>>>>>>> Ideally, connector implementation should share the logic of
>>> reload
>>>>>>>>> and
>>>>>>>>>>>> scan, i.e. ScanTableSource with
>>> InputFormat/SourceFunction/FLIP-27
>>>>>>>>>>> Source.
>>>>>>>>>>>> However, InputFormat/SourceFunction are deprecated, and the
>>> FLIP-27
>>>>>>>>>>> source
>>>>>>>>>>>> is deeply coupled with SourceOperator.
>>>>>>>>>>>> If we want to invoke the FLIP-27 source in LookupJoin, this
>>> may make
>>>>>>>>> the
>>>>>>>>>>>> scope of this FLIP much larger.
>>>>>>>>>>>> We are still investigating how to abstract the ALL cache logic
>>> and
>>>>>>>>> reuse
>>>>>>>>>>>> the existing source interfaces.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jark
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko <ro.v.bo...@gmail.com
>>>> 
>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> It's a much more complicated activity and lies out of the
>>> scope of
>>>>>>>>> this
>>>>>>>>>>>>> improvement. Because such pushdowns should be done for all
>>>>>>>>>>> ScanTableSource
>>>>>>>>>>>>> implementations (not only for Lookup ones).
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser <
>>>>>>>>> martijnvis...@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> One question regarding "And Alexander correctly mentioned
>>> that
>>>>>>>>> filter
>>>>>>>>>>>>>> pushdown still is not implemented for jdbc/hive/hbase." ->
>>> Would
>>>>>>>>> an
>>>>>>>>>>>>>> alternative solution be to actually implement these filter
>>>>>>>>> pushdowns?
>>>>>>>>>>> I
>>>>>>>>>>>>>> can
>>>>>>>>>>>>>> imagine that there are many more benefits to doing that,
>>> outside
>>>>>>>>> of
>>>>>>>>>>> lookup
>>>>>>>>>>>>>> caching and metrics.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Martijn Visser
>>>>>>>>>>>>>> https://twitter.com/MartijnVisser82
>>>>>>>>>>>>>> https://github.com/MartijnVisser
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko <
>>> ro.v.bo...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi everyone!
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks for driving such a valuable improvement!
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I do think that single cache implementation would be a nice
>>>>>>>>>>> opportunity
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>> users. And it will break the "FOR SYSTEM_TIME AS OF
>>> proc_time"
>>>>>>>>>>> semantics
>>>>>>>>>>>>>>> anyway - doesn't matter how it will be implemented.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Putting myself in the user's shoes, I can say that:
>>>>>>>>>>>>>>> 1) I would prefer to have the opportunity to cut off the
>>> cache
>>>>>>>>> size
>>>>>>>>>>> by
>>>>>>>>>>>>>>> simply filtering unnecessary data. And the most handy way
>>> to do
>>>>>>>>> it
>>>>>>>>>>> is
>>>>>>>>>>>>>> apply
>>>>>>>>>>>>>>> it inside LookupRunners. It would be a bit harder to pass it
>>>>>>>>>>> through the
>>>>>>>>>>>>>>> LookupJoin node to TableFunction. And Alexander correctly
>>>>>>>>> mentioned
>>>>>>>>>>> that
>>>>>>>>>>>>>>> filter pushdown still is not implemented for
>>> jdbc/hive/hbase.
>>>>>>>>>>>>>>> 2) The ability to set the different caching parameters for
>>>>>>>>> different
>>>>>>>>>>>>>> tables
>>>>>>>>>>>>>>> is quite important. So I would prefer to set it through DDL
>>>>>>>>> rather
>>>>>>>>>>> than
>>>>>>>>>>>>>>> have similar ttla, strategy and other options for all lookup
>>>>>>>>> tables.
>>>>>>>>>>>>>>> 3) Providing the cache into the framework really deprives
>>> us of
>>>>>>>>>>>>>>> extensibility (users won't be able to implement their own
>>>>>>>>> cache).
>>>>>>>>>>> But
>>>>>>>>>>>>>> most
>>>>>>>>>>>>>>> probably it might be solved by creating more different cache
>>>>>>>>>>> strategies
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> a wider set of configurations.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> All these points are much closer to the schema proposed by
>>>>>>>>>>> Alexander.
>>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm not right and all
>>> these
>>>>>>>>>>>>>> facilities
>>>>>>>>>>>>>>> might be simply implemented in your architecture?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>> Roman Boyko
>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn Visser <
>>>>>>>>>>> martijnvis...@apache.org>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I don't have much to chip in, but just wanted to express
>>> that
>>>>>>>>> I
>>>>>>>>>>> really
>>>>>>>>>>>>>>>> appreciate the in-depth discussion on this topic and I hope
>>>>>>>>> that
>>>>>>>>>>>>>> others
>>>>>>>>>>>>>>>> will join the conversation.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Martijn
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр Смирнов <
>>>>>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks for your detailed feedback! However, I have
>>> questions
>>>>>>>>>>> about
>>>>>>>>>>>>>>>>> some of your statements (maybe I didn't get something?).
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Caching actually breaks the semantic of "FOR SYSTEM_TIME
>>>>>>>>> AS OF
>>>>>>>>>>>>>>>> proc_time”
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I agree that the semantics of "FOR SYSTEM_TIME AS OF
>>>>>>>>> proc_time"
>>>>>>>>>>> is
>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>> fully implemented with caching, but as you said, users go
>>>>>>>>> on it
>>>>>>>>>>>>>>>>> consciously to achieve better performance (no one proposed
>>>>>>>>> to
>>>>>>>>>>> enable
>>>>>>>>>>>>>>>>> caching by default, etc.). Or by users do you mean other
>>>>>>>>>>> developers
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>> connectors? In this case developers explicitly specify
>>>>>>>>> whether
>>>>>>>>>>> their
>>>>>>>>>>>>>>>>> connector supports caching or not (in the list of
>>> supported
>>>>>>>>>>>>>> options),
>>>>>>>>>>>>>>>>> no one makes them do that if they don't want to. So what
>>>>>>>>>>> exactly is
>>>>>>>>>>>>>>>>> the difference between implementing caching in modules
>>>>>>>>>>>>>>>>> flink-table-runtime and in flink-table-common from the
>>>>>>>>>>> considered
>>>>>>>>>>>>>>>>> point of view? How does it affect on breaking/non-breaking
>>>>>>>>> the
>>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF proc_time"?
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> confront a situation that allows table options in DDL to
>>>>>>>>>>> control
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> behavior of the framework, which has never happened
>>>>>>>>> previously
>>>>>>>>>>> and
>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>> be cautious
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> If we talk about main differences of semantics of DDL
>>>>>>>>> options
>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't it about limiting
>>>>>>>>> the
>>>>>>>>>>> scope
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>> the options + importance for the user business logic
>>> rather
>>>>>>>>> than
>>>>>>>>>>>>>>>>> specific location of corresponding logic in the
>>> framework? I
>>>>>>>>>>> mean
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> in my design, for example, putting an option with lookup
>>>>>>>>> cache
>>>>>>>>>>>>>>>>> strategy in configurations would  be the wrong decision,
>>>>>>>>>>> because it
>>>>>>>>>>>>>>>>> directly affects the user's business logic (not just
>>>>>>>>> performance
>>>>>>>>>>>>>>>>> optimization) + touches just several functions of ONE
>>> table
>>>>>>>>>>> (there
>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>> be multiple tables with different caches). Does it really
>>>>>>>>>>> matter for
>>>>>>>>>>>>>>>>> the user (or someone else) where the logic is located,
>>>>>>>>> which is
>>>>>>>>>>>>>>>>> affected by the applied option?
>>>>>>>>>>>>>>>>> Also I can remember DDL option 'sink.parallelism', which
>>> in
>>>>>>>>>>> some way
>>>>>>>>>>>>>>>>> "controls the behavior of the framework" and I don't see
>>> any
>>>>>>>>>>> problem
>>>>>>>>>>>>>>>>> here.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> introduce a new interface for this all-caching scenario
>>>>>>>>> and
>>>>>>>>>>> the
>>>>>>>>>>>>>>> design
>>>>>>>>>>>>>>>>> would become more complex
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> This is a subject for a separate discussion, but actually
>>>>>>>>> in our
>>>>>>>>>>>>>>>>> internal version we solved this problem quite easily - we
>>>>>>>>> reused
>>>>>>>>>>>>>>>>> InputFormat class (so there is no need for a new API). The
>>>>>>>>>>> point is
>>>>>>>>>>>>>>>>> that currently all lookup connectors use InputFormat for
>>>>>>>>>>> scanning
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and even Hive - it uses
>>>>>>>>> class
>>>>>>>>>>>>>>>>> PartitionReader, that is actually just a wrapper around
>>>>>>>>>>> InputFormat.
>>>>>>>>>>>>>>>>> The advantage of this solution is the ability to reload
>>>>>>>>> cache
>>>>>>>>>>> data
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> parallel (number of threads depends on number of
>>>>>>>>> InputSplits,
>>>>>>>>>>> but
>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>> an upper limit). As a result cache reload time
>>> significantly
>>>>>>>>>>> reduces
>>>>>>>>>>>>>>>>> (as well as time of input stream blocking). I know that
>>>>>>>>> usually
>>>>>>>>>>> we
>>>>>>>>>>>>>> try
>>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink code, but maybe
>>> this
>>>>>>>>> one
>>>>>>>>>>> can
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's an ideal solution,
>>>>>>>>> maybe
>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>> are better ones.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Providing the cache in the framework might introduce
>>>>>>>>>>> compatibility
>>>>>>>>>>>>>>>> issues
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> It's possible only in cases when the developer of the
>>>>>>>>> connector
>>>>>>>>>>>>>> won't
>>>>>>>>>>>>>>>>> properly refactor his code and will use new cache options
>>>>>>>>>>>>>> incorrectly
>>>>>>>>>>>>>>>>> (i.e. explicitly provide the same options into 2 different
>>>>>>>>> code
>>>>>>>>>>>>>>>>> places). For correct behavior all he will need to do is to
>>>>>>>>>>> redirect
>>>>>>>>>>>>>>>>> existing options to the framework's LookupConfig (+ maybe
>>>>>>>>> add an
>>>>>>>>>>>>>> alias
>>>>>>>>>>>>>>>>> for options, if there was different naming), everything
>>>>>>>>> will be
>>>>>>>>>>>>>>>>> transparent for users. If the developer won't do
>>>>>>>>> refactoring at
>>>>>>>>>>> all,
>>>>>>>>>>>>>>>>> nothing will be changed for the connector because of
>>>>>>>>> backward
>>>>>>>>>>>>>>>>> compatibility. Also if a developer wants to use his own
>>>>>>>>> cache
>>>>>>>>>>> logic,
>>>>>>>>>>>>>>>>> he just can refuse to pass some of the configs into the
>>>>>>>>>>> framework,
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> instead make his own implementation with already existing
>>>>>>>>>>> configs
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> metrics (but actually I think that it's a rare case).
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> filters and projections should be pushed all the way down
>>>>>>>>> to
>>>>>>>>>>> the
>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>> function, like what we do in the scan source
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> It's the great purpose. But the truth is that the ONLY
>>>>>>>>> connector
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> supports filter pushdown is FileSystemTableSource
>>>>>>>>>>>>>>>>> (no database connector supports it currently). Also for
>>> some
>>>>>>>>>>>>>> databases
>>>>>>>>>>>>>>>>> it's simply impossible to pushdown such complex filters
>>>>>>>>> that we
>>>>>>>>>>> have
>>>>>>>>>>>>>>>>> in Flink.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> only applying these optimizations to the cache seems not
>>>>>>>>>>> quite
>>>>>>>>>>>>>>> useful
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily large amount of data
>>>>>>>>> from the
>>>>>>>>>>>>>>>>> dimension table. For a simple example, suppose in
>>> dimension
>>>>>>>>>>> table
>>>>>>>>>>>>>>>>> 'users'
>>>>>>>>>>>>>>>>> we have column 'age' with values from 20 to 40, and input
>>>>>>>>> stream
>>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed by age of users.
>>> If
>>>>>>>>> we
>>>>>>>>>>> have
>>>>>>>>>>>>>>>>> filter 'age > 30',
>>>>>>>>>>>>>>>>> there will be twice less data in cache. This means the
>>> user
>>>>>>>>> can
>>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by almost 2 times. It
>>> will
>>>>>>>>>>> gain a
>>>>>>>>>>>>>>>>> huge
>>>>>>>>>>>>>>>>> performance boost. Moreover, this optimization starts to
>>>>>>>>> really
>>>>>>>>>>>>>> shine
>>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without filters and
>>> projections
>>>>>>>>>>> can't
>>>>>>>>>>>>>> fit
>>>>>>>>>>>>>>>>> in memory, but with them - can. This opens up additional
>>>>>>>>>>>>>> possibilities
>>>>>>>>>>>>>>>>> for users. And this doesn't sound as 'not quite useful'.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> It would be great to hear other voices regarding this
>>> topic!
>>>>>>>>>>> Because
>>>>>>>>>>>>>>>>> we have quite a lot of controversial points, and I think
>>>>>>>>> with
>>>>>>>>>>> the
>>>>>>>>>>>>>> help
>>>>>>>>>>>>>>>>> of others it will be easier for us to come to a consensus.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <
>>>>>>>>> renqs...@gmail.com
>>>>>>>>>>>> :
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi Alexander and Arvid,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry for my late response!
>>>>>>>>> We
>>>>>>>>>>> had
>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>> internal discussion together with Jark and Leonard and I’d
>>>>>>>>> like
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> summarize our ideas. Instead of implementing the cache
>>>>>>>>> logic in
>>>>>>>>>>> the
>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>> runtime layer or wrapping around the user-provided table
>>>>>>>>>>> function,
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>> prefer to introduce some new APIs extending TableFunction
>>>>>>>>> with
>>>>>>>>>>> these
>>>>>>>>>>>>>>>>> concerns:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 1. Caching actually breaks the semantic of "FOR
>>>>>>>>> SYSTEM_TIME
>>>>>>>>>>> AS OF
>>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly reflect the content
>>>>>>>>> of the
>>>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>> table at the moment of querying. If users choose to enable
>>>>>>>>>>> caching
>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> lookup table, they implicitly indicate that this breakage
>>> is
>>>>>>>>>>>>>> acceptable
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> exchange for the performance. So we prefer not to provide
>>>>>>>>>>> caching on
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> table runtime level.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 2. If we make the cache implementation in the framework
>>>>>>>>>>> (whether
>>>>>>>>>>>>>> in a
>>>>>>>>>>>>>>>>> runner or a wrapper around TableFunction), we have to
>>>>>>>>> confront a
>>>>>>>>>>>>>>>> situation
>>>>>>>>>>>>>>>>> that allows table options in DDL to control the behavior
>>> of
>>>>>>>>> the
>>>>>>>>>>>>>>>> framework,
>>>>>>>>>>>>>>>>> which has never happened previously and should be
>>> cautious.
>>>>>>>>>>> Under
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> current design the behavior of the framework should only
>>> be
>>>>>>>>>>>>>> specified
>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and it’s hard to apply
>>>>>>>>> these
>>>>>>>>>>>>>> general
>>>>>>>>>>>>>>>>> configs to a specific table.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 3. We have use cases that lookup source loads and refresh
>>>>>>>>> all
>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>>>> periodically into the memory to achieve high lookup
>>>>>>>>> performance
>>>>>>>>>>>>>> (like
>>>>>>>>>>>>>>>> Hive
>>>>>>>>>>>>>>>>> connector in the community, and also widely used by our
>>>>>>>>> internal
>>>>>>>>>>>>>>>>> connectors). Wrapping the cache around the user’s
>>>>>>>>> TableFunction
>>>>>>>>>>>>>> works
>>>>>>>>>>>>>>>> fine
>>>>>>>>>>>>>>>>> for LRU caches, but I think we have to introduce a new
>>>>>>>>>>> interface for
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> all-caching scenario and the design would become more
>>>>>>>>> complex.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 4. Providing the cache in the framework might introduce
>>>>>>>>>>>>>> compatibility
>>>>>>>>>>>>>>>>> issues to existing lookup sources like there might exist
>>> two
>>>>>>>>>>> caches
>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>> totally different strategies if the user incorrectly
>>>>>>>>> configures
>>>>>>>>>>> the
>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>> (one in the framework and another implemented by the
>>> lookup
>>>>>>>>>>> source).
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> As for the optimization mentioned by Alexander, I think
>>>>>>>>>>> filters
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> projections should be pushed all the way down to the table
>>>>>>>>>>> function,
>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>> what we do in the scan source, instead of the runner with
>>>>>>>>> the
>>>>>>>>>>> cache.
>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>> goal of using cache is to reduce the network I/O and
>>>>>>>>> pressure
>>>>>>>>>>> on the
>>>>>>>>>>>>>>>>> external system, and only applying these optimizations to
>>>>>>>>> the
>>>>>>>>>>> cache
>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>> not quite useful.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to reflect our ideas.
>>>>>>>>> We
>>>>>>>>>>>>>> prefer to
>>>>>>>>>>>>>>>>> keep the cache implementation as a part of TableFunction,
>>>>>>>>> and we
>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>> provide some helper classes (CachingTableFunction,
>>>>>>>>>>>>>>>> AllCachingTableFunction,
>>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to developers and regulate
>>>>>>>>> metrics
>>>>>>>>>>> of the
>>>>>>>>>>>>>>>> cache.
>>>>>>>>>>>>>>>>> Also, I made a POC[2] for your reference.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Looking forward to your ideas!
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>>>>>>>>>>>>>>>>>> [2] https://github.com/PatrickRen/flink/tree/FLIP-221
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Qingsheng
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов <
>>>>>>>>>>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid!
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I have few comments on your message.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> but could also live with an easier solution as the
>>>>>>>>> first
>>>>>>>>>>> step:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I think that these 2 ways are mutually exclusive
>>>>>>>>> (originally
>>>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because conceptually they follow
>>>>>>>>> the
>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>> goal, but implementation details are different. If we
>>>>>>>>> will
>>>>>>>>>>> go one
>>>>>>>>>>>>>>> way,
>>>>>>>>>>>>>>>>>>> moving to another way in the future will mean deleting
>>>>>>>>>>> existing
>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>> and once again changing the API for connectors. So I
>>>>>>>>> think we
>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>> reach a consensus with the community about that and then
>>>>>>>>> work
>>>>>>>>>>>>>>> together
>>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on tasks for
>>> different
>>>>>>>>>>> parts
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> flip (for example, LRU cache unification / introducing
>>>>>>>>>>> proposed
>>>>>>>>>>>>>> set
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT, Qingsheng?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> as the source will only receive the requests after
>>>>>>>>> filter
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Actually if filters are applied to fields of the lookup
>>>>>>>>>>> table, we
>>>>>>>>>>>>>>>>>>> firstly must do requests, and only after that we can
>>>>>>>>> filter
>>>>>>>>>>>>>>> responses,
>>>>>>>>>>>>>>>>>>> because lookup connectors don't have filter pushdown. So
>>>>>>>>> if
>>>>>>>>>>>>>>> filtering
>>>>>>>>>>>>>>>>>>> is done before caching, there will be much less rows in
>>>>>>>>>>> cache.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is not
>>>>>>>>> shared.
>>>>>>>>>>> I
>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>> know the
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> solution to share images to be honest.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such kinds of
>>>>>>>>> conversations
>>>>>>>>>>> :)
>>>>>>>>>>>>>>>>>>> I have no write access to the confluence, so I made a
>>>>>>>>> Jira
>>>>>>>>>>> issue,
>>>>>>>>>>>>>>>>>>> where described the proposed changes in more details -
>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-27411.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Will happy to get more feedback!
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <
>>>>>>>>> ar...@apache.org>:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi Qingsheng,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thanks for driving this; the inconsistency was not
>>>>>>>>>>> satisfying
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>> me.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but could also live
>>>>>>>>> with
>>>>>>>>>>> an
>>>>>>>>>>>>>>> easier
>>>>>>>>>>>>>>>>>>>> solution as the first step: Instead of making caching
>>>>>>>>> an
>>>>>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather devise a caching
>>>>>>>>> layer
>>>>>>>>>>>>>> around X.
>>>>>>>>>>>>>>>> So
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> proposal would be a CachingTableFunction that
>>>>>>>>> delegates to
>>>>>>>>>>> X in
>>>>>>>>>>>>>>> case
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> misses and else manages the cache. Lifting it into the
>>>>>>>>>>> operator
>>>>>>>>>>>>>>>> model
>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>> proposed would be even better but is probably
>>>>>>>>> unnecessary
>>>>>>>>>>> in
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> first step
>>>>>>>>>>>>>>>>>>>> for a lookup source (as the source will only receive
>>>>>>>>> the
>>>>>>>>>>>>>> requests
>>>>>>>>>>>>>>>>> after
>>>>>>>>>>>>>>>>>>>> filter; applying projection may be more interesting to
>>>>>>>>> save
>>>>>>>>>>>>>>> memory).
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Another advantage is that all the changes of this FLIP
>>>>>>>>>>> would be
>>>>>>>>>>>>>>>>> limited to
>>>>>>>>>>>>>>>>>>>> options, no need for new public interfaces. Everything
>>>>>>>>> else
>>>>>>>>>>>>>>> remains
>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>> implementation of Table runtime. That means we can
>>>>>>>>> easily
>>>>>>>>>>>>>>>> incorporate
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> optimization potential that Alexander pointed out
>>>>>>>>> later.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is not
>>>>>>>>> shared.
>>>>>>>>>>> I
>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>> know the
>>>>>>>>>>>>>>>>>>>> solution to share images to be honest.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов <
>>>>>>>>>>>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, I'm not a
>>>>>>>>> committer
>>>>>>>>>>> yet,
>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>> I'd
>>>>>>>>>>>>>>>>>>>>> really like to become one. And this FLIP really
>>>>>>>>>>> interested
>>>>>>>>>>>>>> me.
>>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar feature in my
>>>>>>>>>>> company’s
>>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>>>>>>>> fork, and we would like to share our thoughts on
>>>>>>>>> this and
>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>>>> open source.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> I think there is a better alternative than
>>>>>>>>> introducing an
>>>>>>>>>>>>>>> abstract
>>>>>>>>>>>>>>>>>>>>> class for TableFunction (CachingTableFunction). As
>>>>>>>>> you
>>>>>>>>>>> know,
>>>>>>>>>>>>>>>>>>>>> TableFunction exists in the flink-table-common
>>>>>>>>> module,
>>>>>>>>>>> which
>>>>>>>>>>>>>>>>> provides
>>>>>>>>>>>>>>>>>>>>> only an API for working with tables – it’s very
>>>>>>>>>>> convenient
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>> importing
>>>>>>>>>>>>>>>>>>>>> in connectors. In turn, CachingTableFunction contains
>>>>>>>>>>> logic
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>> runtime execution,  so this class and everything
>>>>>>>>>>> connected
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>> should be located in another module, probably in
>>>>>>>>>>>>>>>>> flink-table-runtime.
>>>>>>>>>>>>>>>>>>>>> But this will require connectors to depend on another
>>>>>>>>>>> module,
>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic, which doesn’t sound
>>>>>>>>>>> good.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> I suggest adding a new method ‘getLookupConfig’ to
>>>>>>>>>>>>>>>> LookupTableSource
>>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow connectors to only
>>>>>>>>> pass
>>>>>>>>>>>>>>>>>>>>> configurations to the planner, therefore they won’t
>>>>>>>>>>> depend on
>>>>>>>>>>>>>>>>> runtime
>>>>>>>>>>>>>>>>>>>>> realization. Based on these configs planner will
>>>>>>>>>>> construct a
>>>>>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>>>>>> join operator with corresponding runtime logic
>>>>>>>>>>>>>> (ProcessFunctions
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>> module flink-table-runtime). Architecture looks like
>>>>>>>>> in
>>>>>>>>>>> the
>>>>>>>>>>>>>>> pinned
>>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is actually yours
>>>>>>>>>>>>>> CacheConfig).
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner, that will be
>>>>>>>>> responsible
>>>>>>>>>>> for
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> –
>>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his inheritors.
>>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in
>>>>>>>>> flink-table-runtime
>>>>>>>>>>> -
>>>>>>>>>>>>>>>>>>>>> LookupJoinRunner, AsyncLookupJoinRunner,
>>>>>>>>>>>>>>> LookupJoinRunnerWithCalc,
>>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> I suggest adding classes LookupJoinCachingRunner,
>>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> And here comes another more powerful advantage of
>>>>>>>>> such a
>>>>>>>>>>>>>>> solution.
>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower level, we can apply
>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>> optimizations to it. LookupJoinRunnerWithCalc was
>>>>>>>>> named
>>>>>>>>>>> like
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’ function, which actually
>>>>>>>>>>> mostly
>>>>>>>>>>>>>>>> consists
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>> filters and projections.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> For example, in join table A with lookup table B
>>>>>>>>>>> condition
>>>>>>>>>>>>>>> ‘JOIN …
>>>>>>>>>>>>>>>>> ON
>>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 WHERE B.salary >
>>>>>>>>> 1000’
>>>>>>>>>>>>>>> ‘calc’
>>>>>>>>>>>>>>>>>>>>> function will contain filters A.age = B.age + 10 and
>>>>>>>>>>>>>> B.salary >
>>>>>>>>>>>>>>>>> 1000.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> If we apply this function before storing records in
>>>>>>>>>>> cache,
>>>>>>>>>>>>>> size
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>> cache will be significantly reduced: filters = avoid
>>>>>>>>>>> storing
>>>>>>>>>>>>>>>> useless
>>>>>>>>>>>>>>>>>>>>> records in cache, projections = reduce records’
>>>>>>>>> size. So
>>>>>>>>>>> the
>>>>>>>>>>>>>>>> initial
>>>>>>>>>>>>>>>>>>>>> max number of records in cache can be increased by
>>>>>>>>> the
>>>>>>>>>>> user.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> What do you think about it?
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren wrote:
>>>>>>>>>>>>>>>>>>>>>> Hi devs,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a discussion about
>>>>>>>>>>>>>> FLIP-221[1],
>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup table cache and
>>>>>>>>> its
>>>>>>>>>>>>>> standard
>>>>>>>>>>>>>>>>> metrics.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source should implement
>>>>>>>>>>> their
>>>>>>>>>>>>>> own
>>>>>>>>>>>>>>>>> cache to
>>>>>>>>>>>>>>>>>>>>> store lookup results, and there isn’t a standard of
>>>>>>>>>>> metrics
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>> users and
>>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs with lookup joins,
>>>>>>>>> which
>>>>>>>>>>> is a
>>>>>>>>>>>>>>>> quite
>>>>>>>>>>>>>>>>> common
>>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs including cache,
>>>>>>>>>>>>>> metrics,
>>>>>>>>>>>>>>>>> wrapper
>>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new table options.
>>>>>>>>> Please
>>>>>>>>>>> take a
>>>>>>>>>>>>>>> look
>>>>>>>>>>>>>>>>> at the
>>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details. Any suggestions
>>>>>>>>> and
>>>>>>>>>>>>>> comments
>>>>>>>>>>>>>>>>> would be
>>>>>>>>>>>>>>>>>>>>> appreciated!
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Qingsheng
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Qingsheng Ren
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Real-time Computing Team
>>>>>>>>>>>>>>>>>> Alibaba Cloud
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>> Roman Boyko
>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Best Regards,
>>>>>> 
>>>>>> Qingsheng Ren
>>>>>> 
>>>>>> Real-time Computing Team
>>>>>> Alibaba Cloud
>>>>>> 
>>>>>> Email: renqs...@gmail.com
>>>> 
>>> 
>> 

Reply via email to