Hi devs, Thanks for the in-depth discussion! We recently update some designs of FLIP-221 [1]:
1. Introduce a new interface “FullCachingReloadTrigger” for developer to customize the reload strategy. The previous design was only time-based and not flexable enough. Developers can implement any logic on this interface to trigger a full caching reload. 2. LookupFunctionProvider / AsyncLookupFunctionProvider are renamed to PartialCachingLookupProvider / AsyncPartialCachingLookupProvider, in order to be symmetic with “FullCachingLookupProvider” 3. Remove lookup option “lookup.async” because FLIP-234 is planning to move the decision of whether to use async mode to the planner. 4. LookupCacheMetricGroup is renamed to CacheMetricGroup because it’s under flink-metrics-core and all caching behaviours could be able to reuse it. A POC has been pushed to my GitHub repo [2] to reflect the update. Some implementations on FullCachingReloadTrigger maybe quite naive to just reflect how the interface works. Looking forward to your comments! [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric [2] https://github.com/PatrickRen/flink/tree/FLIP-221-framework > On Jun 1, 2022, at 04:58, Jing Ge <j...@ververica.com> wrote: > > Hi Jark, > > Thanks for clarifying it. It would be fine. as long as we could provide the > no-cache solution. I was just wondering if the client side cache could > really help when HBase is used, since the data to look up should be huge. > Depending how much data will be cached on the client side, the data that > should be lru in e.g. LruBlockCache will not be lru anymore. In the worst > case scenario, once the cached data at client side is expired, the request > will hit disk which will cause extra latency temporarily, if I am not > mistaken. > > Best regards, > Jing > > On Mon, May 30, 2022 at 9:59 AM Jark Wu <imj...@gmail.com> wrote: > >> Hi Jing Ge, >> >> What do you mean about the "impact on the block cache used by HBase"? >> In my understanding, the connector cache and HBase cache are totally two >> things. >> The connector cache is a local/client cache, and the HBase cache is a >> server cache. >> >>> does it make sense to have a no-cache solution as one of the >> default solutions so that customers will have no effort for the migration >> if they want to stick with Hbase cache >> >> The implementation migration should be transparent to users. Take the HBase >> connector as >> an example, it already supports lookup cache but is disabled by default. >> After migration, the >> connector still disables cache by default (i.e. no-cache solution). No >> migration effort for users. >> >> HBase cache and connector cache are two different things. HBase cache can't >> simply replace >> connector cache. Because one of the most important usages for connector >> cache is reducing >> the I/O request/response and improving the throughput, which can achieve >> by just using a server cache. >> >> Best, >> Jark >> >> >> >> >> On Fri, 27 May 2022 at 22:42, Jing Ge <j...@ververica.com> wrote: >> >>> Thanks all for the valuable discussion. The new feature looks very >>> interesting. >>> >>> According to the FLIP description: "*Currently we have JDBC, Hive and >> HBase >>> connector implemented lookup table source. All existing implementations >>> will be migrated to the current design and the migration will be >>> transparent to end users*." I was only wondering if we should pay >> attention >>> to HBase and similar DBs. Since, commonly, the lookup data will be huge >>> while using HBase, partial caching will be used in this case, if I am not >>> mistaken, which might have an impact on the block cache used by HBase, >> e.g. >>> LruBlockCache. >>> Another question is that, since HBase provides a sophisticated cache >>> solution, does it make sense to have a no-cache solution as one of the >>> default solutions so that customers will have no effort for the migration >>> if they want to stick with Hbase cache? >>> >>> Best regards, >>> Jing >>> >>> On Fri, May 27, 2022 at 11:19 AM Jingsong Li <jingsongl...@gmail.com> >>> wrote: >>> >>>> Hi all, >>>> >>>> I think the problem now is below: >>>> 1. AllCache and PartialCache interface on the non-uniform, one needs to >>>> provide LookupProvider, the other needs to provide CacheBuilder. >>>> 2. AllCache definition is not flexible, for example, PartialCache can >> use >>>> any custom storage, while the AllCache can not, AllCache can also be >>>> considered to store memory or disk, also need a flexible strategy. >>>> 3. AllCache can not customize ReloadStrategy, currently only >>>> ScheduledReloadStrategy. >>>> >>>> In order to solve the above problems, the following are my ideas. >>>> >>>> ## Top level cache interfaces: >>>> >>>> ``` >>>> >>>> public interface CacheLookupProvider extends >>>> LookupTableSource.LookupRuntimeProvider { >>>> >>>> CacheBuilder createCacheBuilder(); >>>> } >>>> >>>> >>>> public interface CacheBuilder { >>>> Cache create(); >>>> } >>>> >>>> >>>> public interface Cache { >>>> >>>> /** >>>> * Returns the value associated with key in this cache, or null if >>>> there is no cached value for >>>> * key. >>>> */ >>>> @Nullable >>>> Collection<RowData> getIfPresent(RowData key); >>>> >>>> /** Returns the number of key-value mappings in the cache. */ >>>> long size(); >>>> } >>>> >>>> ``` >>>> >>>> ## Partial cache >>>> >>>> ``` >>>> >>>> public interface PartialCacheLookupFunction extends >> CacheLookupProvider { >>>> >>>> @Override >>>> PartialCacheBuilder createCacheBuilder(); >>>> >>>> /** Creates an {@link LookupFunction} instance. */ >>>> LookupFunction createLookupFunction(); >>>> } >>>> >>>> >>>> public interface PartialCacheBuilder extends CacheBuilder { >>>> >>>> PartialCache create(); >>>> } >>>> >>>> >>>> public interface PartialCache extends Cache { >>>> >>>> /** >>>> * Associates the specified value rows with the specified key row >>>> in the cache. If the cache >>>> * previously contained value associated with the key, the old >>>> value is replaced by the >>>> * specified value. >>>> * >>>> * @return the previous value rows associated with key, or null if >>>> there was no mapping for key. >>>> * @param key - key row with which the specified value is to be >>>> associated >>>> * @param value – value rows to be associated with the specified >> key >>>> */ >>>> Collection<RowData> put(RowData key, Collection<RowData> value); >>>> >>>> /** Discards any cached value for the specified key. */ >>>> void invalidate(RowData key); >>>> } >>>> >>>> ``` >>>> >>>> ## All cache >>>> ``` >>>> >>>> public interface AllCacheLookupProvider extends CacheLookupProvider { >>>> >>>> void registerReloadStrategy(ScheduledExecutorService >>>> executorService, Reloader reloader); >>>> >>>> ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(); >>>> >>>> @Override >>>> AllCacheBuilder createCacheBuilder(); >>>> } >>>> >>>> >>>> public interface AllCacheBuilder extends CacheBuilder { >>>> >>>> AllCache create(); >>>> } >>>> >>>> >>>> public interface AllCache extends Cache { >>>> >>>> void putAll(Iterator<Map<RowData, RowData>> allEntries); >>>> >>>> void clearAll(); >>>> } >>>> >>>> >>>> public interface Reloader { >>>> >>>> void reload(); >>>> } >>>> >>>> ``` >>>> >>>> Best, >>>> Jingsong >>>> >>>> On Fri, May 27, 2022 at 11:10 AM Jingsong Li <jingsongl...@gmail.com> >>>> wrote: >>>> >>>>> Thanks Qingsheng and all for your discussion. >>>>> >>>>> Very sorry to jump in so late. >>>>> >>>>> Maybe I missed something? >>>>> My first impression when I saw the cache interface was, why don't we >>>>> provide an interface similar to guava cache [1], on top of guava >> cache, >>>>> caffeine also makes extensions for asynchronous calls.[2] >>>>> There is also the bulk load in caffeine too. >>>>> >>>>> I am also more confused why first from LookupCacheFactory.Builder and >>>> then >>>>> to Factory to create Cache. >>>>> >>>>> [1] https://github.com/google/guava >>>>> [2] https://github.com/ben-manes/caffeine/wiki/Population >>>>> >>>>> Best, >>>>> Jingsong >>>>> >>>>> On Thu, May 26, 2022 at 11:17 PM Jark Wu <imj...@gmail.com> wrote: >>>>> >>>>>> After looking at the new introduced ReloadTime and Becket's comment, >>>>>> I agree with Becket we should have a pluggable reloading strategy. >>>>>> We can provide some common implementations, e.g., periodic >> reloading, >>>> and >>>>>> daily reloading. >>>>>> But there definitely be some connector- or business-specific >> reloading >>>>>> strategies, e.g. >>>>>> notify by a zookeeper watcher, reload once a new Hive partition is >>>>>> complete. >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> On Thu, 26 May 2022 at 11:52, Becket Qin <becket....@gmail.com> >>> wrote: >>>>>> >>>>>>> Hi Qingsheng, >>>>>>> >>>>>>> Thanks for updating the FLIP. A few comments / questions below: >>>>>>> >>>>>>> 1. Is there a reason that we have both "XXXFactory" and >>> "XXXProvider". >>>>>>> What is the difference between them? If they are the same, can we >>> just >>>>>> use >>>>>>> XXXFactory everywhere? >>>>>>> >>>>>>> 2. Regarding the FullCachingLookupProvider, should the reloading >>>> policy >>>>>>> also be pluggable? Periodical reloading could be sometimes be >> tricky >>>> in >>>>>>> practice. For example, if user uses 24 hours as the cache refresh >>>>>> interval >>>>>>> and some nightly batch job delayed, the cache update may still see >>> the >>>>>>> stale data. >>>>>>> >>>>>>> 3. In DefaultLookupCacheFactory, it looks like InitialCapacity >>> should >>>> be >>>>>>> removed. >>>>>>> >>>>>>> 4. The purpose of LookupFunctionProvider#cacheMissingKey() seems a >>>>>> little >>>>>>> confusing to me. If Optional<LookupCacheFactory> getCacheFactory() >>>>>> returns >>>>>>> a non-empty factory, doesn't that already indicates the framework >> to >>>>>> cache >>>>>>> the missing keys? Also, why is this method returning an >>>>>> Optional<Boolean> >>>>>>> instead of boolean? >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Jiangjie (Becket) Qin >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, May 25, 2022 at 5:07 PM Qingsheng Ren <renqs...@gmail.com >>> >>>>>> wrote: >>>>>>> >>>>>>>> Hi Lincoln and Jark, >>>>>>>> >>>>>>>> Thanks for the comments! If the community reaches a consensus >> that >>> we >>>>>> use >>>>>>>> SQL hint instead of table options to decide whether to use sync >> or >>>>>> async >>>>>>>> mode, it’s indeed not necessary to introduce the “lookup.async” >>>> option. >>>>>>>> >>>>>>>> I think it’s a good idea to let the decision of async made on >> query >>>>>>>> level, which could make better optimization with more infomation >>>>>> gathered >>>>>>>> by planner. Is there any FLIP describing the issue in >> FLINK-27625? >>> I >>>>>>>> thought FLIP-234 is only proposing adding SQL hint for retry on >>>> missing >>>>>>>> instead of the entire async mode to be controlled by hint. >>>>>>>> >>>>>>>> Best regards, >>>>>>>> >>>>>>>> Qingsheng >>>>>>>> >>>>>>>>> On May 25, 2022, at 15:13, Lincoln Lee <lincoln.8...@gmail.com >>> >>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi Jark, >>>>>>>>> >>>>>>>>> Thanks for your reply! >>>>>>>>> >>>>>>>>> Currently 'lookup.async' just lies in HBase connector, I have >> no >>>> idea >>>>>>>>> whether or when to remove it (we can discuss it in another >> issue >>>> for >>>>>> the >>>>>>>>> HBase connector after FLINK-27625 is done), just not add it >> into >>> a >>>>>>>> common >>>>>>>>> option now. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Lincoln Lee >>>>>>>>> >>>>>>>>> >>>>>>>>> Jark Wu <imj...@gmail.com> 于2022年5月24日周二 20:14写道: >>>>>>>>> >>>>>>>>>> Hi Lincoln, >>>>>>>>>> >>>>>>>>>> I have taken a look at FLIP-234, and I agree with you that the >>>>>>>> connectors >>>>>>>>>> can >>>>>>>>>> provide both async and sync runtime providers simultaneously >>>> instead >>>>>>>> of one >>>>>>>>>> of them. >>>>>>>>>> At that point, "lookup.async" looks redundant. If this option >> is >>>>>>>> planned to >>>>>>>>>> be removed >>>>>>>>>> in the long term, I think it makes sense not to introduce it >> in >>>> this >>>>>>>> FLIP. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Jark >>>>>>>>>> >>>>>>>>>> On Tue, 24 May 2022 at 11:08, Lincoln Lee < >>> lincoln.8...@gmail.com >>>>> >>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Qingsheng, >>>>>>>>>>> >>>>>>>>>>> Sorry for jumping into the discussion so late. It's a good >> idea >>>>>> that >>>>>>>> we >>>>>>>>>> can >>>>>>>>>>> have a common table option. I have a minor comments on >>>>>> 'lookup.async' >>>>>>>>>> that >>>>>>>>>>> not make it a common option: >>>>>>>>>>> >>>>>>>>>>> The table layer abstracts both sync and async lookup >>>> capabilities, >>>>>>>>>>> connectors implementers can choose one or both, in the case >> of >>>>>>>>>> implementing >>>>>>>>>>> only one capability(status of the most of existing builtin >>>>>> connectors) >>>>>>>>>>> 'lookup.async' will not be used. And when a connector has >> both >>>>>>>>>>> capabilities, I think this choice is more suitable for making >>>>>>>> decisions >>>>>>>>>> at >>>>>>>>>>> the query level, for example, table planner can choose the >>>> physical >>>>>>>>>>> implementation of async lookup or sync lookup based on its >> cost >>>>>>>> model, or >>>>>>>>>>> users can give query hint based on their own better >>>>>> understanding. If >>>>>>>>>>> there is another common table option 'lookup.async', it may >>>> confuse >>>>>>>> the >>>>>>>>>>> users in the long run. >>>>>>>>>>> >>>>>>>>>>> So, I prefer to leave the 'lookup.async' option in private >>> place >>>>>> (for >>>>>>>> the >>>>>>>>>>> current hbase connector) and not turn it into a common >> option. >>>>>>>>>>> >>>>>>>>>>> WDYT? >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Lincoln Lee >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Qingsheng Ren <renqs...@gmail.com> 于2022年5月23日周一 14:54写道: >>>>>>>>>>> >>>>>>>>>>>> Hi Alexander, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the review! We recently updated the FLIP and you >>> can >>>>>> find >>>>>>>>>>> those >>>>>>>>>>>> changes from my latest email. Since some terminologies has >>>>>> changed so >>>>>>>>>>> I’ll >>>>>>>>>>>> use the new concept for replying your comments. >>>>>>>>>>>> >>>>>>>>>>>> 1. Builder vs ‘of’ >>>>>>>>>>>> I’m OK to use builder pattern if we have additional optional >>>>>>>> parameters >>>>>>>>>>>> for full caching mode (“rescan” previously). The >>>>>> schedule-with-delay >>>>>>>>>> idea >>>>>>>>>>>> looks reasonable to me, but I think we need to redesign the >>>>>> builder >>>>>>>> API >>>>>>>>>>> of >>>>>>>>>>>> full caching to make it more descriptive for developers. >> Would >>>> you >>>>>>>> mind >>>>>>>>>>>> sharing your ideas about the API? For accessing the FLIP >>>> workspace >>>>>>>> you >>>>>>>>>>> can >>>>>>>>>>>> just provide your account ID and ping any PMC member >> including >>>>>> Jark. >>>>>>>>>>>> >>>>>>>>>>>> 2. Common table options >>>>>>>>>>>> We have some discussions these days and propose to >> introduce 8 >>>>>> common >>>>>>>>>>>> table options about caching. It has been updated on the >> FLIP. >>>>>>>>>>>> >>>>>>>>>>>> 3. Retries >>>>>>>>>>>> I think we are on the same page :-) >>>>>>>>>>>> >>>>>>>>>>>> For your additional concerns: >>>>>>>>>>>> 1) The table option has been updated. >>>>>>>>>>>> 2) We got “lookup.cache” back for configuring whether to use >>>>>> partial >>>>>>>> or >>>>>>>>>>>> full caching mode. >>>>>>>>>>>> >>>>>>>>>>>> Best regards, >>>>>>>>>>>> >>>>>>>>>>>> Qingsheng >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> On May 19, 2022, at 17:25, Александр Смирнов < >>>>>> smirale...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Also I have a few additions: >>>>>>>>>>>>> 1) maybe rename 'lookup.cache.maximum-size' to >>>>>>>>>>>>> 'lookup.cache.max-rows'? I think it will be more clear that >>> we >>>>>> talk >>>>>>>>>>>>> not about bytes, but about the number of rows. Plus it fits >>>> more, >>>>>>>>>>>>> considering my optimization with filters. >>>>>>>>>>>>> 2) How will users enable rescanning? Are we going to >> separate >>>>>>>> caching >>>>>>>>>>>>> and rescanning from the options point of view? Like >> initially >>>> we >>>>>> had >>>>>>>>>>>>> one option 'lookup.cache' with values LRU / ALL. I think >> now >>> we >>>>>> can >>>>>>>>>>>>> make a boolean option 'lookup.rescan'. RescanInterval can >> be >>>>>>>>>>>>> 'lookup.rescan.interval', etc. >>>>>>>>>>>>> >>>>>>>>>>>>> Best regards, >>>>>>>>>>>>> Alexander >>>>>>>>>>>>> >>>>>>>>>>>>> чт, 19 мая 2022 г. в 14:50, Александр Смирнов < >>>>>> smirale...@gmail.com >>>>>>>>>>> : >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Qingsheng and Jark, >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. Builders vs 'of' >>>>>>>>>>>>>> I understand that builders are used when we have multiple >>>>>>>>>> parameters. >>>>>>>>>>>>>> I suggested them because we could add parameters later. To >>>>>> prevent >>>>>>>>>>>>>> Builder for ScanRuntimeProvider from looking redundant I >> can >>>>>>>> suggest >>>>>>>>>>>>>> one more config now - "rescanStartTime". >>>>>>>>>>>>>> It's a time in UTC (LocalTime class) when the first reload >>> of >>>>>> cache >>>>>>>>>>>>>> starts. This parameter can be thought of as 'initialDelay' >>>> (diff >>>>>>>>>>>>>> between current time and rescanStartTime) in method >>>>>>>>>>>>>> ScheduleExecutorService#scheduleWithFixedDelay [1] . It >> can >>> be >>>>>> very >>>>>>>>>>>>>> useful when the dimension table is updated by some other >>>>>> scheduled >>>>>>>>>> job >>>>>>>>>>>>>> at a certain time. Or when the user simply wants a second >>> scan >>>>>>>>>> (first >>>>>>>>>>>>>> cache reload) be delayed. This option can be used even >>> without >>>>>>>>>>>>>> 'rescanInterval' - in this case 'rescanInterval' will be >> one >>>>>> day. >>>>>>>>>>>>>> If you are fine with this option, I would be very glad if >>> you >>>>>> would >>>>>>>>>>>>>> give me access to edit FLIP page, so I could add it myself >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2. Common table options >>>>>>>>>>>>>> I also think that FactoryUtil would be overloaded by all >>> cache >>>>>>>>>>>>>> options. But maybe unify all suggested options, not only >> for >>>>>>>> default >>>>>>>>>>>>>> cache? I.e. class 'LookupOptions', that unifies default >>> cache >>>>>>>>>> options, >>>>>>>>>>>>>> rescan options, 'async', 'maxRetries'. WDYT? >>>>>>>>>>>>>> >>>>>>>>>>>>>> 3. Retries >>>>>>>>>>>>>> I'm fine with suggestion close to >> RetryUtils#tryTimes(times, >>>>>> call) >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> >>> >> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit- >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>> Alexander >>>>>>>>>>>>>> >>>>>>>>>>>>>> ср, 18 мая 2022 г. в 16:04, Qingsheng Ren < >>> renqs...@gmail.com >>>>> : >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Jark and Alexander, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for your comments! I’m also OK to introduce common >>>> table >>>>>>>>>>>> options. I prefer to introduce a new >> DefaultLookupCacheOptions >>>>>> class >>>>>>>>>> for >>>>>>>>>>>> holding these option definitions because putting all options >>>> into >>>>>>>>>>>> FactoryUtil would make it a bit ”crowded” and not well >>>>>> categorized. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> FLIP has been updated according to suggestions above: >>>>>>>>>>>>>>> 1. Use static “of” method for constructing >>>>>> RescanRuntimeProvider >>>>>>>>>>>> considering both arguments are required. >>>>>>>>>>>>>>> 2. Introduce new table options matching >>>>>> DefaultLookupCacheFactory >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> Qingsheng >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, May 18, 2022 at 2:57 PM Jark Wu < >> imj...@gmail.com> >>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Alex, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1) retry logic >>>>>>>>>>>>>>>> I think we can extract some common retry logic into >>>> utilities, >>>>>>>>>> e.g. >>>>>>>>>>>> RetryUtils#tryTimes(times, call). >>>>>>>>>>>>>>>> This seems independent of this FLIP and can be reused by >>>>>>>>>> DataStream >>>>>>>>>>>> users. >>>>>>>>>>>>>>>> Maybe we can open an issue to discuss this and where to >>> put >>>>>> it. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2) cache ConfigOptions >>>>>>>>>>>>>>>> I'm fine with defining cache config options in the >>>> framework. >>>>>>>>>>>>>>>> A candidate place to put is FactoryUtil which also >>> includes >>>>>>>>>>>> "sink.parallelism", "format" options. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>> Jark >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, 18 May 2022 at 13:52, Александр Смирнов < >>>>>>>>>>> smirale...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Qingsheng, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thank you for considering my comments. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> there might be custom logic before making retry, such >> as >>>>>>>>>>>> re-establish the connection >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Yes, I understand that. I meant that such logic can be >>>>>> placed in >>>>>>>>>> a >>>>>>>>>>>>>>>>> separate function, that can be implemented by >> connectors. >>>>>> Just >>>>>>>>>>> moving >>>>>>>>>>>>>>>>> the retry logic would make connector's LookupFunction >>> more >>>>>>>>>> concise >>>>>>>>>>> + >>>>>>>>>>>>>>>>> avoid duplicate code. However, it's a minor change. The >>>>>> decision >>>>>>>>>> is >>>>>>>>>>>> up >>>>>>>>>>>>>>>>> to you. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> We decide not to provide common DDL options and let >>>>>> developers >>>>>>>>>> to >>>>>>>>>>>> define their own options as we do now per connector. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What is the reason for that? One of the main goals of >>> this >>>>>> FLIP >>>>>>>>>> was >>>>>>>>>>>> to >>>>>>>>>>>>>>>>> unify the configs, wasn't it? I understand that current >>>> cache >>>>>>>>>>> design >>>>>>>>>>>>>>>>> doesn't depend on ConfigOptions, like was before. But >>> still >>>>>> we >>>>>>>>>> can >>>>>>>>>>>> put >>>>>>>>>>>>>>>>> these options into the framework, so connectors can >> reuse >>>>>> them >>>>>>>>>> and >>>>>>>>>>>>>>>>> avoid code duplication, and, what is more significant, >>>> avoid >>>>>>>>>>> possible >>>>>>>>>>>>>>>>> different options naming. This moment can be pointed >> out >>> in >>>>>>>>>>>>>>>>> documentation for connector developers. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>> Alexander >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren < >>>>>> renqs...@gmail.com>: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Alexander, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the review and glad to see we are on the >> same >>>>>> page! >>>>>>>> I >>>>>>>>>>>> think you forgot to cc the dev mailing list so I’m also >>> quoting >>>>>> your >>>>>>>>>>> reply >>>>>>>>>>>> under this email. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> We can add 'maxRetryTimes' option into this class >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> In my opinion the retry logic should be implemented in >>>>>> lookup() >>>>>>>>>>>> instead of in LookupFunction#eval(). Retrying is only >>> meaningful >>>>>>>> under >>>>>>>>>>> some >>>>>>>>>>>> specific retriable failures, and there might be custom logic >>>>>> before >>>>>>>>>>> making >>>>>>>>>>>> retry, such as re-establish the connection >>>>>> (JdbcRowDataLookupFunction >>>>>>>>>> is >>>>>>>>>>> an >>>>>>>>>>>> example), so it's more handy to leave it to the connector. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I don't see DDL options, that were in previous >> version >>> of >>>>>>>> FLIP. >>>>>>>>>>> Do >>>>>>>>>>>> you have any special plans for them? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> We decide not to provide common DDL options and let >>>>>> developers >>>>>>>>>> to >>>>>>>>>>>> define their own options as we do now per connector. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The rest of comments sound great and I’ll update the >>> FLIP. >>>>>> Hope >>>>>>>>>> we >>>>>>>>>>>> can finalize our proposal soon! >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Qingsheng >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On May 17, 2022, at 13:46, Александр Смирнов < >>>>>>>>>>> smirale...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Qingsheng and devs! >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I like the overall design of updated FLIP, however I >>> have >>>>>>>>>> several >>>>>>>>>>>>>>>>>>> suggestions and questions. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1) Introducing LookupFunction as a subclass of >>>>>> TableFunction >>>>>>>>>> is a >>>>>>>>>>>> good >>>>>>>>>>>>>>>>>>> idea. We can add 'maxRetryTimes' option into this >>> class. >>>>>>>> 'eval' >>>>>>>>>>>> method >>>>>>>>>>>>>>>>>>> of new LookupFunction is great for this purpose. The >>> same >>>>>> is >>>>>>>>>> for >>>>>>>>>>>>>>>>>>> 'async' case. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 2) There might be other configs in future, such as >>>>>>>>>>>> 'cacheMissingKey' >>>>>>>>>>>>>>>>>>> in LookupFunctionProvider or 'rescanInterval' in >>>>>>>>>>>> ScanRuntimeProvider. >>>>>>>>>>>>>>>>>>> Maybe use Builder pattern in LookupFunctionProvider >> and >>>>>>>>>>>>>>>>>>> RescanRuntimeProvider for more flexibility (use one >>>> 'build' >>>>>>>>>>> method >>>>>>>>>>>>>>>>>>> instead of many 'of' methods in future)? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 3) What are the plans for existing >>> TableFunctionProvider >>>>>> and >>>>>>>>>>>>>>>>>>> AsyncTableFunctionProvider? I think they should be >>>>>> deprecated. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 4) Am I right that the current design does not assume >>>>>> usage of >>>>>>>>>>>>>>>>>>> user-provided LookupCache in re-scanning? In this >> case, >>>> it >>>>>> is >>>>>>>>>> not >>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>> clear why do we need methods such as 'invalidate' or >>>>>> 'putAll' >>>>>>>>>> in >>>>>>>>>>>>>>>>>>> LookupCache. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 5) I don't see DDL options, that were in previous >>> version >>>>>> of >>>>>>>>>>> FLIP. >>>>>>>>>>>> Do >>>>>>>>>>>>>>>>>>> you have any special plans for them? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> If you don't mind, I would be glad to be able to make >>>> small >>>>>>>>>>>>>>>>>>> adjustments to the FLIP document too. I think it's >>> worth >>>>>>>>>>> mentioning >>>>>>>>>>>>>>>>>>> about what exactly optimizations are planning in the >>>>>> future. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>> Smirnov Alexander >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren < >>>>>> renqs...@gmail.com >>>>>>>>>>> : >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Alexander and devs, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thank you very much for the in-depth discussion! As >>> Jark >>>>>>>>>>>> mentioned we were inspired by Alexander's idea and made a >>>>>> refactor on >>>>>>>>>> our >>>>>>>>>>>> design. FLIP-221 [1] has been updated to reflect our design >>> now >>>>>> and >>>>>>>> we >>>>>>>>>>> are >>>>>>>>>>>> happy to hear more suggestions from you! >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Compared to the previous design: >>>>>>>>>>>>>>>>>>>> 1. The lookup cache serves at table runtime level >> and >>> is >>>>>>>>>>>> integrated as a component of LookupJoinRunner as discussed >>>>>>>> previously. >>>>>>>>>>>>>>>>>>>> 2. Interfaces are renamed and re-designed to reflect >>> the >>>>>> new >>>>>>>>>>>> design. >>>>>>>>>>>>>>>>>>>> 3. We separate the all-caching case individually and >>>>>>>>>> introduce a >>>>>>>>>>>> new RescanRuntimeProvider to reuse the ability of scanning. >> We >>>> are >>>>>>>>>>> planning >>>>>>>>>>>> to support SourceFunction / InputFormat for now considering >>> the >>>>>>>>>>> complexity >>>>>>>>>>>> of FLIP-27 Source API. >>>>>>>>>>>>>>>>>>>> 4. A new interface LookupFunction is introduced to >>> make >>>>>> the >>>>>>>>>>>> semantic of lookup more straightforward for developers. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> For replying to Alexander: >>>>>>>>>>>>>>>>>>>>> However I'm a little confused whether InputFormat >> is >>>>>>>>>> deprecated >>>>>>>>>>>> or not. Am I right that it will be so in the future, but >>>> currently >>>>>>>> it's >>>>>>>>>>> not? >>>>>>>>>>>>>>>>>>>> Yes you are right. InputFormat is not deprecated for >>>> now. >>>>>> I >>>>>>>>>>> think >>>>>>>>>>>> it will be deprecated in the future but we don't have a >> clear >>>> plan >>>>>>>> for >>>>>>>>>>> that. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks again for the discussion on this FLIP and >>> looking >>>>>>>>>> forward >>>>>>>>>>>> to cooperating with you after we finalize the design and >>>>>> interfaces! >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Qingsheng >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов < >>>>>>>>>>>> smirale...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi Jark, Qingsheng and Leonard! >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Glad to see that we came to a consensus on almost >> all >>>>>>>> points! >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> However I'm a little confused whether InputFormat >> is >>>>>>>>>> deprecated >>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>> not. Am I right that it will be so in the future, >> but >>>>>>>>>> currently >>>>>>>>>>>> it's >>>>>>>>>>>>>>>>>>>>> not? Actually I also think that for the first >> version >>>>>> it's >>>>>>>> OK >>>>>>>>>>> to >>>>>>>>>>>> use >>>>>>>>>>>>>>>>>>>>> InputFormat in ALL cache realization, because >>>> supporting >>>>>>>>>> rescan >>>>>>>>>>>>>>>>>>>>> ability seems like a very distant prospect. But for >>>> this >>>>>>>>>>>> decision we >>>>>>>>>>>>>>>>>>>>> need a consensus among all discussion participants. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> In general, I don't have something to argue with >> your >>>>>>>>>>>> statements. All >>>>>>>>>>>>>>>>>>>>> of them correspond my ideas. Looking ahead, it >> would >>> be >>>>>> nice >>>>>>>>>> to >>>>>>>>>>>> work >>>>>>>>>>>>>>>>>>>>> on this FLIP cooperatively. I've already done a lot >>> of >>>>>> work >>>>>>>>>> on >>>>>>>>>>>> lookup >>>>>>>>>>>>>>>>>>>>> join caching with realization very close to the one >>> we >>>>>> are >>>>>>>>>>>> discussing, >>>>>>>>>>>>>>>>>>>>> and want to share the results of this work. Anyway >>>>>> looking >>>>>>>>>>>> forward for >>>>>>>>>>>>>>>>>>>>> the FLIP update! >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>>>> Smirnov Alexander >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu < >>> imj...@gmail.com >>>>> : >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi Alex, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks for summarizing your points. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> In the past week, Qingsheng, Leonard, and I have >>>>>> discussed >>>>>>>>>> it >>>>>>>>>>>> several times >>>>>>>>>>>>>>>>>>>>>> and we have totally refactored the design. >>>>>>>>>>>>>>>>>>>>>> I'm glad to say we have reached a consensus on >> many >>> of >>>>>> your >>>>>>>>>>>> points! >>>>>>>>>>>>>>>>>>>>>> Qingsheng is still working on updating the design >>> docs >>>>>> and >>>>>>>>>>>> maybe can be >>>>>>>>>>>>>>>>>>>>>> available in the next few days. >>>>>>>>>>>>>>>>>>>>>> I will share some conclusions from our >> discussions: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 1) we have refactored the design towards to "cache >>> in >>>>>>>>>>>> framework" way. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 2) a "LookupCache" interface for users to >> customize >>>> and >>>>>> a >>>>>>>>>>>> default >>>>>>>>>>>>>>>>>>>>>> implementation with builder for users to easy-use. >>>>>>>>>>>>>>>>>>>>>> This can both make it possible to both have >>>> flexibility >>>>>> and >>>>>>>>>>>> conciseness. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 3) Filter pushdown is important for ALL and LRU >>> lookup >>>>>>>>>> cache, >>>>>>>>>>>> esp reducing >>>>>>>>>>>>>>>>>>>>>> IO. >>>>>>>>>>>>>>>>>>>>>> Filter pushdown should be the final state and the >>>>>> unified >>>>>>>>>> way >>>>>>>>>>>> to both >>>>>>>>>>>>>>>>>>>>>> support pruning ALL cache and LRU cache, >>>>>>>>>>>>>>>>>>>>>> so I think we should make effort in this >> direction. >>> If >>>>>> we >>>>>>>>>> need >>>>>>>>>>>> to support >>>>>>>>>>>>>>>>>>>>>> filter pushdown for ALL cache anyway, why not use >>>>>>>>>>>>>>>>>>>>>> it for LRU cache as well? Either way, as we decide >>> to >>>>>>>>>>> implement >>>>>>>>>>>> the cache >>>>>>>>>>>>>>>>>>>>>> in the framework, we have the chance to support >>>>>>>>>>>>>>>>>>>>>> filter on cache anytime. This is an optimization >> and >>>> it >>>>>>>>>>> doesn't >>>>>>>>>>>> affect the >>>>>>>>>>>>>>>>>>>>>> public API. I think we can create a JIRA issue to >>>>>>>>>>>>>>>>>>>>>> discuss it when the FLIP is accepted. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 4) The idea to support ALL cache is similar to >> your >>>>>>>>>> proposal. >>>>>>>>>>>>>>>>>>>>>> In the first version, we will only support >>>> InputFormat, >>>>>>>>>>>> SourceFunction for >>>>>>>>>>>>>>>>>>>>>> cache all (invoke InputFormat in join operator). >>>>>>>>>>>>>>>>>>>>>> For FLIP-27 source, we need to join a true source >>>>>> operator >>>>>>>>>>>> instead of >>>>>>>>>>>>>>>>>>>>>> calling it embedded in the join operator. >>>>>>>>>>>>>>>>>>>>>> However, this needs another FLIP to support the >>>> re-scan >>>>>>>>>>> ability >>>>>>>>>>>> for FLIP-27 >>>>>>>>>>>>>>>>>>>>>> Source, and this can be a large work. >>>>>>>>>>>>>>>>>>>>>> In order to not block this issue, we can put the >>>> effort >>>>>> of >>>>>>>>>>>> FLIP-27 source >>>>>>>>>>>>>>>>>>>>>> integration into future work and integrate >>>>>>>>>>>>>>>>>>>>>> InputFormat&SourceFunction for now. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I think it's fine to use >> InputFormat&SourceFunction, >>>> as >>>>>>>> they >>>>>>>>>>>> are not >>>>>>>>>>>>>>>>>>>>>> deprecated, otherwise, we have to introduce >> another >>>>>>>> function >>>>>>>>>>>>>>>>>>>>>> similar to them which is meaningless. We need to >>> plan >>>>>>>>>> FLIP-27 >>>>>>>>>>>> source >>>>>>>>>>>>>>>>>>>>>> integration ASAP before InputFormat & >> SourceFunction >>>> are >>>>>>>>>>>> deprecated. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>> Jark >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов < >>>>>>>>>>>> smirale...@gmail.com> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Martijn! >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Got it. Therefore, the realization with >> InputFormat >>>> is >>>>>> not >>>>>>>>>>>> considered. >>>>>>>>>>>>>>>>>>>>>>> Thanks for clearing that up! >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser < >>>>>>>>>>>> mart...@ververica.com>: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> With regards to: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> But if there are plans to refactor all >> connectors >>>> to >>>>>>>>>>> FLIP-27 >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Yes, FLIP-27 is the target for all connectors. >> The >>>> old >>>>>>>>>>>> interfaces will be >>>>>>>>>>>>>>>>>>>>>>>> deprecated and connectors will either be >>> refactored >>>> to >>>>>>>> use >>>>>>>>>>>> the new ones >>>>>>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>>>>> dropped. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> The caching should work for connectors that are >>>> using >>>>>>>>>>> FLIP-27 >>>>>>>>>>>> interfaces, >>>>>>>>>>>>>>>>>>>>>>>> we should not introduce new features for old >>>>>> interfaces. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Martijn >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов >> < >>>>>>>>>>>> smirale...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hi Jark! >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Sorry for the late response. I would like to >> make >>>>>> some >>>>>>>>>>>> comments and >>>>>>>>>>>>>>>>>>>>>>>>> clarify my points. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 1) I agree with your first statement. I think >> we >>>> can >>>>>>>>>>> achieve >>>>>>>>>>>> both >>>>>>>>>>>>>>>>>>>>>>>>> advantages this way: put the Cache interface in >>>>>>>>>>>> flink-table-common, >>>>>>>>>>>>>>>>>>>>>>>>> but have implementations of it in >>>>>> flink-table-runtime. >>>>>>>>>>>> Therefore if a >>>>>>>>>>>>>>>>>>>>>>>>> connector developer wants to use existing cache >>>>>>>>>> strategies >>>>>>>>>>>> and their >>>>>>>>>>>>>>>>>>>>>>>>> implementations, he can just pass lookupConfig >> to >>>> the >>>>>>>>>>>> planner, but if >>>>>>>>>>>>>>>>>>>>>>>>> he wants to have its own cache implementation >> in >>>> his >>>>>>>>>>>> TableFunction, it >>>>>>>>>>>>>>>>>>>>>>>>> will be possible for him to use the existing >>>>>> interface >>>>>>>>>> for >>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>> purpose (we can explicitly point this out in >> the >>>>>>>>>>>> documentation). In >>>>>>>>>>>>>>>>>>>>>>>>> this way all configs and metrics will be >> unified. >>>>>> WDYT? >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the >> cache, >>> we >>>>>> will >>>>>>>>>>>> have 90% of >>>>>>>>>>>>>>>>>>>>>>>>> lookup requests that can never be cached >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 2) Let me clarify the logic filters >> optimization >>> in >>>>>> case >>>>>>>>>> of >>>>>>>>>>>> LRU cache. >>>>>>>>>>>>>>>>>>>>>>>>> It looks like Cache<RowData, >>> Collection<RowData>>. >>>>>> Here >>>>>>>>>> we >>>>>>>>>>>> always >>>>>>>>>>>>>>>>>>>>>>>>> store the response of the dimension table in >>> cache, >>>>>> even >>>>>>>>>>>> after >>>>>>>>>>>>>>>>>>>>>>>>> applying calc function. I.e. if there are no >> rows >>>>>> after >>>>>>>>>>>> applying >>>>>>>>>>>>>>>>>>>>>>>>> filters to the result of the 'eval' method of >>>>>>>>>>> TableFunction, >>>>>>>>>>>> we store >>>>>>>>>>>>>>>>>>>>>>>>> the empty list by lookup keys. Therefore the >>> cache >>>>>> line >>>>>>>>>>> will >>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>> filled, but will require much less memory (in >>>> bytes). >>>>>>>>>> I.e. >>>>>>>>>>>> we don't >>>>>>>>>>>>>>>>>>>>>>>>> completely filter keys, by which result was >>> pruned, >>>>>> but >>>>>>>>>>>> significantly >>>>>>>>>>>>>>>>>>>>>>>>> reduce required memory to store this result. If >>> the >>>>>> user >>>>>>>>>>>> knows about >>>>>>>>>>>>>>>>>>>>>>>>> this behavior, he can increase the 'max-rows' >>>> option >>>>>>>>>> before >>>>>>>>>>>> the start >>>>>>>>>>>>>>>>>>>>>>>>> of the job. But actually I came up with the >> idea >>>>>> that we >>>>>>>>>>> can >>>>>>>>>>>> do this >>>>>>>>>>>>>>>>>>>>>>>>> automatically by using the 'maximumWeight' and >>>>>> 'weigher' >>>>>>>>>>>> methods of >>>>>>>>>>>>>>>>>>>>>>>>> GuavaCache [1]. Weight can be the size of the >>>>>> collection >>>>>>>>>> of >>>>>>>>>>>> rows >>>>>>>>>>>>>>>>>>>>>>>>> (value of cache). Therefore cache can >>> automatically >>>>>> fit >>>>>>>>>>> much >>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>>>>>> records than before. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Flink SQL has provided a standard way to do >>>> filters >>>>>> and >>>>>>>>>>>> projects >>>>>>>>>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and >>>>>>>>>>>> SupportsProjectionPushDown. >>>>>>>>>>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the >>>> interfaces, >>>>>>>>>> don't >>>>>>>>>>>> mean it's >>>>>>>>>>>>>>>>>>>>>>> hard >>>>>>>>>>>>>>>>>>>>>>>>> to implement. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> It's debatable how difficult it will be to >>>> implement >>>>>>>>>> filter >>>>>>>>>>>> pushdown. >>>>>>>>>>>>>>>>>>>>>>>>> But I think the fact that currently there is no >>>>>> database >>>>>>>>>>>> connector >>>>>>>>>>>>>>>>>>>>>>>>> with filter pushdown at least means that this >>>> feature >>>>>>>>>> won't >>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>> supported soon in connectors. Moreover, if we >>> talk >>>>>> about >>>>>>>>>>>> other >>>>>>>>>>>>>>>>>>>>>>>>> connectors (not in Flink repo), their databases >>>> might >>>>>>>> not >>>>>>>>>>>> support all >>>>>>>>>>>>>>>>>>>>>>>>> Flink filters (or not support filters at all). >> I >>>>>> think >>>>>>>>>>> users >>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>> interested in supporting cache filters >>> optimization >>>>>>>>>>>> independently of >>>>>>>>>>>>>>>>>>>>>>>>> supporting other features and solving more >>> complex >>>>>>>>>> problems >>>>>>>>>>>> (or >>>>>>>>>>>>>>>>>>>>>>>>> unsolvable at all). >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 3) I agree with your third statement. Actually >> in >>>> our >>>>>>>>>>>> internal version >>>>>>>>>>>>>>>>>>>>>>>>> I also tried to unify the logic of scanning and >>>>>>>> reloading >>>>>>>>>>>> data from >>>>>>>>>>>>>>>>>>>>>>>>> connectors. But unfortunately, I didn't find a >>> way >>>> to >>>>>>>>>> unify >>>>>>>>>>>> the logic >>>>>>>>>>>>>>>>>>>>>>>>> of all ScanRuntimeProviders (InputFormat, >>>>>>>> SourceFunction, >>>>>>>>>>>> Source,...) >>>>>>>>>>>>>>>>>>>>>>>>> and reuse it in reloading ALL cache. As a >> result >>> I >>>>>>>>>> settled >>>>>>>>>>>> on using >>>>>>>>>>>>>>>>>>>>>>>>> InputFormat, because it was used for scanning >> in >>>> all >>>>>>>>>> lookup >>>>>>>>>>>>>>>>>>>>>>>>> connectors. (I didn't know that there are plans >>> to >>>>>>>>>>> deprecate >>>>>>>>>>>>>>>>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO >>> usage >>>> of >>>>>>>>>>>> FLIP-27 source >>>>>>>>>>>>>>>>>>>>>>>>> in ALL caching is not good idea, because this >>>> source >>>>>> was >>>>>>>>>>>> designed to >>>>>>>>>>>>>>>>>>>>>>>>> work in distributed environment >> (SplitEnumerator >>> on >>>>>>>>>>>> JobManager and >>>>>>>>>>>>>>>>>>>>>>>>> SourceReaders on TaskManagers), not in one >>> operator >>>>>>>>>> (lookup >>>>>>>>>>>> join >>>>>>>>>>>>>>>>>>>>>>>>> operator in our case). There is even no direct >>> way >>>> to >>>>>>>>>> pass >>>>>>>>>>>> splits from >>>>>>>>>>>>>>>>>>>>>>>>> SplitEnumerator to SourceReader (this logic >> works >>>>>>>> through >>>>>>>>>>>>>>>>>>>>>>>>> SplitEnumeratorContext, which requires >>>>>>>>>>>>>>>>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send >>>>>>>>>> AddSplitEvents). >>>>>>>>>>>> Usage of >>>>>>>>>>>>>>>>>>>>>>>>> InputFormat for ALL cache seems much more >> clearer >>>> and >>>>>>>>>>>> easier. But if >>>>>>>>>>>>>>>>>>>>>>>>> there are plans to refactor all connectors to >>>>>> FLIP-27, I >>>>>>>>>>>> have the >>>>>>>>>>>>>>>>>>>>>>>>> following ideas: maybe we can refuse from >> lookup >>>> join >>>>>>>> ALL >>>>>>>>>>>> cache in >>>>>>>>>>>>>>>>>>>>>>>>> favor of simple join with multiple scanning of >>>> batch >>>>>>>>>>> source? >>>>>>>>>>>> The point >>>>>>>>>>>>>>>>>>>>>>>>> is that the only difference between lookup join >>> ALL >>>>>>>> cache >>>>>>>>>>>> and simple >>>>>>>>>>>>>>>>>>>>>>>>> join with batch source is that in the first >> case >>>>>>>> scanning >>>>>>>>>>> is >>>>>>>>>>>> performed >>>>>>>>>>>>>>>>>>>>>>>>> multiple times, in between which state (cache) >> is >>>>>>>> cleared >>>>>>>>>>>> (correct me >>>>>>>>>>>>>>>>>>>>>>>>> if I'm wrong). So what if we extend the >>>>>> functionality of >>>>>>>>>>>> simple join >>>>>>>>>>>>>>>>>>>>>>>>> to support state reloading + extend the >>>>>> functionality of >>>>>>>>>>>> scanning >>>>>>>>>>>>>>>>>>>>>>>>> batch source multiple times (this one should be >>>> easy >>>>>>>> with >>>>>>>>>>>> new FLIP-27 >>>>>>>>>>>>>>>>>>>>>>>>> source, that unifies streaming/batch reading - >> we >>>>>> will >>>>>>>>>> need >>>>>>>>>>>> to change >>>>>>>>>>>>>>>>>>>>>>>>> only SplitEnumerator, which will pass splits >>> again >>>>>> after >>>>>>>>>>>> some TTL). >>>>>>>>>>>>>>>>>>>>>>>>> WDYT? I must say that this looks like a >> long-term >>>>>> goal >>>>>>>>>> and >>>>>>>>>>>> will make >>>>>>>>>>>>>>>>>>>>>>>>> the scope of this FLIP even larger than you >> said. >>>>>> Maybe >>>>>>>>>> we >>>>>>>>>>>> can limit >>>>>>>>>>>>>>>>>>>>>>>>> ourselves to a simpler solution now >>> (InputFormats). >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> So to sum up, my points is like this: >>>>>>>>>>>>>>>>>>>>>>>>> 1) There is a way to make both concise and >>> flexible >>>>>>>>>>>> interfaces for >>>>>>>>>>>>>>>>>>>>>>>>> caching in lookup join. >>>>>>>>>>>>>>>>>>>>>>>>> 2) Cache filters optimization is important both >>> in >>>>>> LRU >>>>>>>>>> and >>>>>>>>>>>> ALL caches. >>>>>>>>>>>>>>>>>>>>>>>>> 3) It is unclear when filter pushdown will be >>>>>> supported >>>>>>>>>> in >>>>>>>>>>>> Flink >>>>>>>>>>>>>>>>>>>>>>>>> connectors, some of the connectors might not >> have >>>> the >>>>>>>>>>>> opportunity to >>>>>>>>>>>>>>>>>>>>>>>>> support filter pushdown + as I know, currently >>>> filter >>>>>>>>>>>> pushdown works >>>>>>>>>>>>>>>>>>>>>>>>> only for scanning (not lookup). So cache >> filters >>> + >>>>>>>>>>>> projections >>>>>>>>>>>>>>>>>>>>>>>>> optimization should be independent from other >>>>>> features. >>>>>>>>>>>>>>>>>>>>>>>>> 4) ALL cache realization is a complex topic >> that >>>>>>>> involves >>>>>>>>>>>> multiple >>>>>>>>>>>>>>>>>>>>>>>>> aspects of how Flink is developing. Refusing >> from >>>>>>>>>>>> InputFormat in favor >>>>>>>>>>>>>>>>>>>>>>>>> of FLIP-27 Source will make ALL cache >> realization >>>>>> really >>>>>>>>>>>> complex and >>>>>>>>>>>>>>>>>>>>>>>>> not clear, so maybe instead of that we can >> extend >>>> the >>>>>>>>>>>> functionality of >>>>>>>>>>>>>>>>>>>>>>>>> simple join or not refuse from InputFormat in >>> case >>>> of >>>>>>>>>>> lookup >>>>>>>>>>>> join ALL >>>>>>>>>>>>>>>>>>>>>>>>> cache? >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> >>> >> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher) >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu < >>>> imj...@gmail.com >>>>>>> : >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> It's great to see the active discussion! I >> want >>> to >>>>>>>> share >>>>>>>>>>> my >>>>>>>>>>>> ideas: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 1) implement the cache in framework vs. >>> connectors >>>>>> base >>>>>>>>>>>>>>>>>>>>>>>>>> I don't have a strong opinion on this. Both >> ways >>>>>> should >>>>>>>>>>>> work (e.g., >>>>>>>>>>>>>>>>>>>>>>> cache >>>>>>>>>>>>>>>>>>>>>>>>>> pruning, compatibility). >>>>>>>>>>>>>>>>>>>>>>>>>> The framework way can provide more concise >>>>>> interfaces. >>>>>>>>>>>>>>>>>>>>>>>>>> The connector base way can define more >> flexible >>>>>> cache >>>>>>>>>>>>>>>>>>>>>>>>>> strategies/implementations. >>>>>>>>>>>>>>>>>>>>>>>>>> We are still investigating a way to see if we >>> can >>>>>> have >>>>>>>>>>> both >>>>>>>>>>>>>>>>>>>>>>> advantages. >>>>>>>>>>>>>>>>>>>>>>>>>> We should reach a consensus that the way >> should >>>> be a >>>>>>>>>> final >>>>>>>>>>>> state, >>>>>>>>>>>>>>>>>>>>>>> and we >>>>>>>>>>>>>>>>>>>>>>>>>> are on the path to it. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 2) filters and projections pushdown: >>>>>>>>>>>>>>>>>>>>>>>>>> I agree with Alex that the filter pushdown >> into >>>>>> cache >>>>>>>>>> can >>>>>>>>>>>> benefit a >>>>>>>>>>>>>>>>>>>>>>> lot >>>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>> ALL cache. >>>>>>>>>>>>>>>>>>>>>>>>>> However, this is not true for LRU cache. >>>> Connectors >>>>>> use >>>>>>>>>>>> cache to >>>>>>>>>>>>>>>>>>>>>>> reduce >>>>>>>>>>>>>>>>>>>>>>>>> IO >>>>>>>>>>>>>>>>>>>>>>>>>> requests to databases for better throughput. >>>>>>>>>>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the >> cache, >>> we >>>>>> will >>>>>>>>>>>> have 90% of >>>>>>>>>>>>>>>>>>>>>>>>> lookup >>>>>>>>>>>>>>>>>>>>>>>>>> requests that can never be cached >>>>>>>>>>>>>>>>>>>>>>>>>> and hit directly to the databases. That means >>> the >>>>>> cache >>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>> meaningless in >>>>>>>>>>>>>>>>>>>>>>>>>> this case. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> IMO, Flink SQL has provided a standard way to >> do >>>>>>>> filters >>>>>>>>>>>> and projects >>>>>>>>>>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and >>>>>>>>>>>>>>>>>>>>>>> SupportsProjectionPushDown. >>>>>>>>>>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the >>>> interfaces, >>>>>>>>>> don't >>>>>>>>>>>> mean it's >>>>>>>>>>>>>>>>>>>>>>> hard >>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>> implement. >>>>>>>>>>>>>>>>>>>>>>>>>> They should implement the pushdown interfaces >> to >>>>>> reduce >>>>>>>>>> IO >>>>>>>>>>>> and the >>>>>>>>>>>>>>>>>>>>>>> cache >>>>>>>>>>>>>>>>>>>>>>>>>> size. >>>>>>>>>>>>>>>>>>>>>>>>>> That should be a final state that the scan >>> source >>>>>> and >>>>>>>>>>>> lookup source >>>>>>>>>>>>>>>>>>>>>>> share >>>>>>>>>>>>>>>>>>>>>>>>>> the exact pushdown implementation. >>>>>>>>>>>>>>>>>>>>>>>>>> I don't see why we need to duplicate the >>> pushdown >>>>>> logic >>>>>>>>>> in >>>>>>>>>>>> caches, >>>>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>>>>> will complex the lookup join design. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 3) ALL cache abstraction >>>>>>>>>>>>>>>>>>>>>>>>>> All cache might be the most challenging part >> of >>>> this >>>>>>>>>> FLIP. >>>>>>>>>>>> We have >>>>>>>>>>>>>>>>>>>>>>> never >>>>>>>>>>>>>>>>>>>>>>>>>> provided a reload-lookup public interface. >>>>>>>>>>>>>>>>>>>>>>>>>> Currently, we put the reload logic in the >> "eval" >>>>>> method >>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>> TableFunction. >>>>>>>>>>>>>>>>>>>>>>>>>> That's hard for some sources (e.g., Hive). >>>>>>>>>>>>>>>>>>>>>>>>>> Ideally, connector implementation should share >>> the >>>>>>>> logic >>>>>>>>>>> of >>>>>>>>>>>> reload >>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>> scan, i.e. ScanTableSource with >>>>>>>>>>>> InputFormat/SourceFunction/FLIP-27 >>>>>>>>>>>>>>>>>>>>>>>>> Source. >>>>>>>>>>>>>>>>>>>>>>>>>> However, InputFormat/SourceFunction are >>>> deprecated, >>>>>> and >>>>>>>>>>> the >>>>>>>>>>>> FLIP-27 >>>>>>>>>>>>>>>>>>>>>>>>> source >>>>>>>>>>>>>>>>>>>>>>>>>> is deeply coupled with SourceOperator. >>>>>>>>>>>>>>>>>>>>>>>>>> If we want to invoke the FLIP-27 source in >>>>>> LookupJoin, >>>>>>>>>>> this >>>>>>>>>>>> may make >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> scope of this FLIP much larger. >>>>>>>>>>>>>>>>>>>>>>>>>> We are still investigating how to abstract the >>> ALL >>>>>>>> cache >>>>>>>>>>>> logic and >>>>>>>>>>>>>>>>>>>>>>> reuse >>>>>>>>>>>>>>>>>>>>>>>>>> the existing source interfaces. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>> Jark >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko < >>>>>>>>>>>> ro.v.bo...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> It's a much more complicated activity and >> lies >>>> out >>>>>> of >>>>>>>>>> the >>>>>>>>>>>> scope of >>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>> improvement. Because such pushdowns should be >>>> done >>>>>> for >>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>>>>>>> ScanTableSource >>>>>>>>>>>>>>>>>>>>>>>>>>> implementations (not only for Lookup ones). >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser < >>>>>>>>>>>>>>>>>>>>>>> martijnvis...@apache.org> >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> One question regarding "And Alexander >>> correctly >>>>>>>>>>> mentioned >>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>> filter >>>>>>>>>>>>>>>>>>>>>>>>>>>> pushdown still is not implemented for >>>>>>>>>> jdbc/hive/hbase." >>>>>>>>>>>> -> Would >>>>>>>>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>>>>>>>> alternative solution be to actually >> implement >>>>>> these >>>>>>>>>>> filter >>>>>>>>>>>>>>>>>>>>>>> pushdowns? >>>>>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>>>>> imagine that there are many more benefits to >>>> doing >>>>>>>>>> that, >>>>>>>>>>>> outside >>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>> lookup >>>>>>>>>>>>>>>>>>>>>>>>>>>> caching and metrics. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Martijn Visser >>>>>>>>>>>>>>>>>>>>>>>>>>>> https://twitter.com/MartijnVisser82 >>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/MartijnVisser >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko < >>>>>>>>>>>> ro.v.bo...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone! >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving such a valuable >>> improvement! >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I do think that single cache implementation >>>>>> would be >>>>>>>>>> a >>>>>>>>>>>> nice >>>>>>>>>>>>>>>>>>>>>>>>> opportunity >>>>>>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>> users. And it will break the "FOR >> SYSTEM_TIME >>>> AS >>>>>> OF >>>>>>>>>>>> proc_time" >>>>>>>>>>>>>>>>>>>>>>>>> semantics >>>>>>>>>>>>>>>>>>>>>>>>>>>>> anyway - doesn't matter how it will be >>>>>> implemented. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Putting myself in the user's shoes, I can >> say >>>>>> that: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) I would prefer to have the opportunity >> to >>>> cut >>>>>> off >>>>>>>>>>> the >>>>>>>>>>>> cache >>>>>>>>>>>>>>>>>>>>>>> size >>>>>>>>>>>>>>>>>>>>>>>>> by >>>>>>>>>>>>>>>>>>>>>>>>>>>>> simply filtering unnecessary data. And the >>> most >>>>>>>> handy >>>>>>>>>>>> way to do >>>>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>> apply >>>>>>>>>>>>>>>>>>>>>>>>>>>>> it inside LookupRunners. It would be a bit >>>>>> harder to >>>>>>>>>>>> pass it >>>>>>>>>>>>>>>>>>>>>>>>> through the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoin node to TableFunction. And >>> Alexander >>>>>>>>>>> correctly >>>>>>>>>>>>>>>>>>>>>>> mentioned >>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>> filter pushdown still is not implemented >> for >>>>>>>>>>>> jdbc/hive/hbase. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) The ability to set the different caching >>>>>>>>>> parameters >>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>>>>>>>> tables >>>>>>>>>>>>>>>>>>>>>>>>>>>>> is quite important. So I would prefer to >> set >>> it >>>>>>>>>> through >>>>>>>>>>>> DDL >>>>>>>>>>>>>>>>>>>>>>> rather >>>>>>>>>>>>>>>>>>>>>>>>> than >>>>>>>>>>>>>>>>>>>>>>>>>>>>> have similar ttla, strategy and other >> options >>>> for >>>>>>>> all >>>>>>>>>>>> lookup >>>>>>>>>>>>>>>>>>>>>>> tables. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Providing the cache into the framework >>>> really >>>>>>>>>>>> deprives us of >>>>>>>>>>>>>>>>>>>>>>>>>>>>> extensibility (users won't be able to >>> implement >>>>>>>> their >>>>>>>>>>> own >>>>>>>>>>>>>>>>>>>>>>> cache). >>>>>>>>>>>>>>>>>>>>>>>>> But >>>>>>>>>>>>>>>>>>>>>>>>>>>> most >>>>>>>>>>>>>>>>>>>>>>>>>>>>> probably it might be solved by creating >> more >>>>>>>>>> different >>>>>>>>>>>> cache >>>>>>>>>>>>>>>>>>>>>>>>> strategies >>>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>> a wider set of configurations. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> All these points are much closer to the >>> schema >>>>>>>>>> proposed >>>>>>>>>>>> by >>>>>>>>>>>>>>>>>>>>>>>>> Alexander. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm not >>>> right >>>>>> and >>>>>>>>>>> all >>>>>>>>>>>> these >>>>>>>>>>>>>>>>>>>>>>>>>>>> facilities >>>>>>>>>>>>>>>>>>>>>>>>>>>>> might be simply implemented in your >>>> architecture? >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Roman Boyko >>>>>>>>>>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn >> Visser < >>>>>>>>>>>>>>>>>>>>>>>>> martijnvis...@apache.org> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't have much to chip in, but just >>> wanted >>>> to >>>>>>>>>>>> express that >>>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>> really >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> appreciate the in-depth discussion on this >>>> topic >>>>>>>>>> and I >>>>>>>>>>>> hope >>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>> others >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will join the conversation. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Martijn >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр >>>> Смирнов < >>>>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your detailed feedback! >>> However, I >>>>>> have >>>>>>>>>>>> questions >>>>>>>>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> some of your statements (maybe I didn't >> get >>>>>>>>>>>> something?). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Caching actually breaks the semantic of >>> "FOR >>>>>>>>>>>> SYSTEM_TIME >>>>>>>>>>>>>>>>>>>>>>> AS OF >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proc_time” >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree that the semantics of "FOR >>>> SYSTEM_TIME >>>>>> AS >>>>>>>>>> OF >>>>>>>>>>>>>>>>>>>>>>> proc_time" >>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fully implemented with caching, but as >> you >>>>>> said, >>>>>>>>>>> users >>>>>>>>>>>> go >>>>>>>>>>>>>>>>>>>>>>> on it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consciously to achieve better performance >>> (no >>>>>> one >>>>>>>>>>>> proposed >>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>> enable >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> caching by default, etc.). Or by users do >>> you >>>>>> mean >>>>>>>>>>>> other >>>>>>>>>>>>>>>>>>>>>>>>> developers >>>>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connectors? In this case developers >>>> explicitly >>>>>>>>>>> specify >>>>>>>>>>>>>>>>>>>>>>> whether >>>>>>>>>>>>>>>>>>>>>>>>> their >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector supports caching or not (in the >>>> list >>>>>> of >>>>>>>>>>>> supported >>>>>>>>>>>>>>>>>>>>>>>>>>>> options), >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> no one makes them do that if they don't >>> want >>>>>> to. >>>>>>>> So >>>>>>>>>>>> what >>>>>>>>>>>>>>>>>>>>>>>>> exactly is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the difference between implementing >> caching >>>> in >>>>>>>>>>> modules >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime and in >>> flink-table-common >>>>>> from >>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> considered >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> point of view? How does it affect on >>>>>>>>>>>> breaking/non-breaking >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF >>>> proc_time"? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> confront a situation that allows table >>>>>> options in >>>>>>>>>>> DDL >>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>> control >>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> behavior of the framework, which has >> never >>>>>>>> happened >>>>>>>>>>>>>>>>>>>>>>> previously >>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be cautious >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we talk about main differences of >>>> semantics >>>>>> of >>>>>>>>>> DDL >>>>>>>>>>>>>>>>>>>>>>> options >>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't >> it >>>>>> about >>>>>>>>>>>> limiting >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> scope >>>>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the options + importance for the user >>>> business >>>>>>>>>> logic >>>>>>>>>>>> rather >>>>>>>>>>>>>>>>>>>>>>> than >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specific location of corresponding logic >> in >>>> the >>>>>>>>>>>> framework? I >>>>>>>>>>>>>>>>>>>>>>>>> mean >>>>>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in my design, for example, putting an >>> option >>>>>> with >>>>>>>>>>>> lookup >>>>>>>>>>>>>>>>>>>>>>> cache >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> strategy in configurations would be the >>>> wrong >>>>>>>>>>>> decision, >>>>>>>>>>>>>>>>>>>>>>>>> because it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> directly affects the user's business >> logic >>>> (not >>>>>>>>>> just >>>>>>>>>>>>>>>>>>>>>>> performance >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimization) + touches just several >>>> functions >>>>>> of >>>>>>>>>> ONE >>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>>>>> (there >>>>>>>>>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be multiple tables with different >> caches). >>>>>> Does it >>>>>>>>>>>> really >>>>>>>>>>>>>>>>>>>>>>>>> matter for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the user (or someone else) where the >> logic >>> is >>>>>>>>>>> located, >>>>>>>>>>>>>>>>>>>>>>> which is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> affected by the applied option? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also I can remember DDL option >>>>>> 'sink.parallelism', >>>>>>>>>>>> which in >>>>>>>>>>>>>>>>>>>>>>>>> some way >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "controls the behavior of the framework" >>> and >>>> I >>>>>>>>>> don't >>>>>>>>>>>> see any >>>>>>>>>>>>>>>>>>>>>>>>> problem >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> here. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introduce a new interface for this >>>> all-caching >>>>>>>>>>>> scenario >>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> design >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would become more complex >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is a subject for a separate >>> discussion, >>>>>> but >>>>>>>>>>>> actually >>>>>>>>>>>>>>>>>>>>>>> in our >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal version we solved this problem >>> quite >>>>>>>>>> easily >>>>>>>>>>> - >>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>> reused >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat class (so there is no need >> for >>> a >>>>>> new >>>>>>>>>>> API). >>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>>>>>>> point is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that currently all lookup connectors use >>>>>>>>>> InputFormat >>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>> scanning >>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and even >>> Hive >>>>>> - it >>>>>>>>>>> uses >>>>>>>>>>>>>>>>>>>>>>> class >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PartitionReader, that is actually just a >>>>>> wrapper >>>>>>>>>>> around >>>>>>>>>>>>>>>>>>>>>>>>> InputFormat. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The advantage of this solution is the >>> ability >>>>>> to >>>>>>>>>>> reload >>>>>>>>>>>>>>>>>>>>>>> cache >>>>>>>>>>>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parallel (number of threads depends on >>> number >>>>>> of >>>>>>>>>>>>>>>>>>>>>>> InputSplits, >>>>>>>>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>>>>>>>> has >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> an upper limit). As a result cache reload >>>> time >>>>>>>>>>>> significantly >>>>>>>>>>>>>>>>>>>>>>>>> reduces >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (as well as time of input stream >>> blocking). I >>>>>> know >>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>> usually >>>>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>>>> try >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink >>> code, >>>>>> but >>>>>>>>>>> maybe >>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's >> an >>>>>> ideal >>>>>>>>>>>> solution, >>>>>>>>>>>>>>>>>>>>>>> maybe >>>>>>>>>>>>>>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are better ones. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Providing the cache in the framework >> might >>>>>>>>>> introduce >>>>>>>>>>>>>>>>>>>>>>>>> compatibility >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> issues >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It's possible only in cases when the >>>> developer >>>>>> of >>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> connector >>>>>>>>>>>>>>>>>>>>>>>>>>>> won't >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> properly refactor his code and will use >> new >>>>>> cache >>>>>>>>>>>> options >>>>>>>>>>>>>>>>>>>>>>>>>>>> incorrectly >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (i.e. explicitly provide the same options >>>> into >>>>>> 2 >>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> places). For correct behavior all he will >>>> need >>>>>> to >>>>>>>>>> do >>>>>>>>>>>> is to >>>>>>>>>>>>>>>>>>>>>>>>> redirect >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> existing options to the framework's >>>>>> LookupConfig >>>>>>>> (+ >>>>>>>>>>>> maybe >>>>>>>>>>>>>>>>>>>>>>> add an >>>>>>>>>>>>>>>>>>>>>>>>>>>> alias >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for options, if there was different >>> naming), >>>>>>>>>>> everything >>>>>>>>>>>>>>>>>>>>>>> will be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transparent for users. If the developer >>> won't >>>>>> do >>>>>>>>>>>>>>>>>>>>>>> refactoring at >>>>>>>>>>>>>>>>>>>>>>>>> all, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> nothing will be changed for the connector >>>>>> because >>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>> backward >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility. Also if a developer wants >> to >>>> use >>>>>>>> his >>>>>>>>>>> own >>>>>>>>>>>>>>>>>>>>>>> cache >>>>>>>>>>>>>>>>>>>>>>>>> logic, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> he just can refuse to pass some of the >>>> configs >>>>>>>> into >>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> framework, >>>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> instead make his own implementation with >>>>>> already >>>>>>>>>>>> existing >>>>>>>>>>>>>>>>>>>>>>>>> configs >>>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics (but actually I think that it's a >>>> rare >>>>>>>>>> case). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections should be pushed >>> all >>>>>> the >>>>>>>>>> way >>>>>>>>>>>> down >>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> function, like what we do in the scan >>> source >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It's the great purpose. But the truth is >>> that >>>>>> the >>>>>>>>>>> ONLY >>>>>>>>>>>>>>>>>>>>>>> connector >>>>>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> supports filter pushdown is >>>>>> FileSystemTableSource >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (no database connector supports it >>>> currently). >>>>>>>> Also >>>>>>>>>>>> for some >>>>>>>>>>>>>>>>>>>>>>>>>>>> databases >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's simply impossible to pushdown such >>>> complex >>>>>>>>>>> filters >>>>>>>>>>>>>>>>>>>>>>> that we >>>>>>>>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in Flink. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> only applying these optimizations to the >>>> cache >>>>>>>>>> seems >>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>> quite >>>>>>>>>>>>>>>>>>>>>>>>>>>>> useful >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily large >>>>>> amount of >>>>>>>>>>> data >>>>>>>>>>>>>>>>>>>>>>> from the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dimension table. For a simple example, >>>> suppose >>>>>> in >>>>>>>>>>>> dimension >>>>>>>>>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'users' >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we have column 'age' with values from 20 >> to >>>> 40, >>>>>>>> and >>>>>>>>>>>> input >>>>>>>>>>>>>>>>>>>>>>> stream >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed >> by >>>> age >>>>>> of >>>>>>>>>>>> users. If >>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filter 'age > 30', >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there will be twice less data in cache. >>> This >>>>>> means >>>>>>>>>>> the >>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by >> almost >>> 2 >>>>>>>> times. >>>>>>>>>>> It >>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>>>> gain a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> huge >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> performance boost. Moreover, this >>>> optimization >>>>>>>>>> starts >>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> really >>>>>>>>>>>>>>>>>>>>>>>>>>>> shine >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without >>> filters >>>>>> and >>>>>>>>>>>> projections >>>>>>>>>>>>>>>>>>>>>>>>> can't >>>>>>>>>>>>>>>>>>>>>>>>>>>> fit >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in memory, but with them - can. This >> opens >>> up >>>>>>>>>>>> additional >>>>>>>>>>>>>>>>>>>>>>>>>>>> possibilities >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for users. And this doesn't sound as 'not >>>> quite >>>>>>>>>>>> useful'. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It would be great to hear other voices >>>>>> regarding >>>>>>>>>> this >>>>>>>>>>>> topic! >>>>>>>>>>>>>>>>>>>>>>>>> Because >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we have quite a lot of controversial >>> points, >>>>>> and I >>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>> help >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of others it will be easier for us to >> come >>>> to a >>>>>>>>>>>> consensus. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng >> Ren >>> < >>>>>>>>>>>>>>>>>>>>>>> renqs...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>> : >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Alexander and Arvid, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry for >> my >>>>>> late >>>>>>>>>>>> response! >>>>>>>>>>>>>>>>>>>>>>> We >>>>>>>>>>>>>>>>>>>>>>>>> had >>>>>>>>>>>>>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal discussion together with Jark >> and >>>>>> Leonard >>>>>>>>>>> and >>>>>>>>>>>> I’d >>>>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> summarize our ideas. Instead of >>> implementing >>>>>> the >>>>>>>>>>> cache >>>>>>>>>>>>>>>>>>>>>>> logic in >>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runtime layer or wrapping around the >>>>>> user-provided >>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>>>>> function, >>>>>>>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> prefer to introduce some new APIs >> extending >>>>>>>>>>>> TableFunction >>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>> these >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> concerns: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. Caching actually breaks the semantic >> of >>>>>> "FOR >>>>>>>>>>>>>>>>>>>>>>> SYSTEM_TIME >>>>>>>>>>>>>>>>>>>>>>>>> AS OF >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly >>> reflect >>>>>> the >>>>>>>>>>>> content >>>>>>>>>>>>>>>>>>>>>>> of the >>>>>>>>>>>>>>>>>>>>>>>>>>>> lookup >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table at the moment of querying. If users >>>>>> choose >>>>>>>> to >>>>>>>>>>>> enable >>>>>>>>>>>>>>>>>>>>>>>>> caching >>>>>>>>>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lookup table, they implicitly indicate >> that >>>>>> this >>>>>>>>>>>> breakage is >>>>>>>>>>>>>>>>>>>>>>>>>>>> acceptable >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> exchange for the performance. So we >> prefer >>>> not >>>>>> to >>>>>>>>>>>> provide >>>>>>>>>>>>>>>>>>>>>>>>> caching on >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table runtime level. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. If we make the cache implementation >> in >>>> the >>>>>>>>>>>> framework >>>>>>>>>>>>>>>>>>>>>>>>> (whether >>>>>>>>>>>>>>>>>>>>>>>>>>>> in a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runner or a wrapper around >> TableFunction), >>> we >>>>>> have >>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> confront a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> situation >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that allows table options in DDL to >> control >>>> the >>>>>>>>>>>> behavior of >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> framework, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which has never happened previously and >>>> should >>>>>> be >>>>>>>>>>>> cautious. >>>>>>>>>>>>>>>>>>>>>>>>> Under >>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> current design the behavior of the >>> framework >>>>>>>> should >>>>>>>>>>>> only be >>>>>>>>>>>>>>>>>>>>>>>>>>>> specified >>>>>>>>>>>>>>>>>>>>>>>>>>>>> by >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and >> it’s >>>>>> hard >>>>>>>> to >>>>>>>>>>>> apply >>>>>>>>>>>>>>>>>>>>>>> these >>>>>>>>>>>>>>>>>>>>>>>>>>>> general >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configs to a specific table. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. We have use cases that lookup source >>>> loads >>>>>> and >>>>>>>>>>>> refresh >>>>>>>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>>>>>>>>>> records >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> periodically into the memory to achieve >>> high >>>>>>>> lookup >>>>>>>>>>>>>>>>>>>>>>> performance >>>>>>>>>>>>>>>>>>>>>>>>>>>> (like >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hive >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector in the community, and also >> widely >>>>>> used >>>>>>>> by >>>>>>>>>>> our >>>>>>>>>>>>>>>>>>>>>>> internal >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connectors). Wrapping the cache around >> the >>>>>> user’s >>>>>>>>>>>>>>>>>>>>>>> TableFunction >>>>>>>>>>>>>>>>>>>>>>>>>>>> works >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fine >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for LRU caches, but I think we have to >>>>>> introduce a >>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>>>>> interface for >>>>>>>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all-caching scenario and the design would >>>>>> become >>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>>>> complex. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. Providing the cache in the framework >>>> might >>>>>>>>>>>> introduce >>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> issues to existing lookup sources like >>> there >>>>>> might >>>>>>>>>>>> exist two >>>>>>>>>>>>>>>>>>>>>>>>> caches >>>>>>>>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> totally different strategies if the user >>>>>>>>>> incorrectly >>>>>>>>>>>>>>>>>>>>>>> configures >>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (one in the framework and another >>> implemented >>>>>> by >>>>>>>>>> the >>>>>>>>>>>> lookup >>>>>>>>>>>>>>>>>>>>>>>>> source). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the optimization mentioned by >>>>>> Alexander, I >>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>>>>>> filters >>>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> projections should be pushed all the way >>> down >>>>>> to >>>>>>>>>> the >>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>>>>> function, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> what we do in the scan source, instead of >>> the >>>>>>>>>> runner >>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> cache. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> goal of using cache is to reduce the >>> network >>>>>> I/O >>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>> pressure >>>>>>>>>>>>>>>>>>>>>>>>> on the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> external system, and only applying these >>>>>>>>>>> optimizations >>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> cache >>>>>>>>>>>>>>>>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not quite useful. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to >>>> reflect >>>>>> our >>>>>>>>>>>> ideas. >>>>>>>>>>>>>>>>>>>>>>> We >>>>>>>>>>>>>>>>>>>>>>>>>>>> prefer to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> keep the cache implementation as a part >> of >>>>>>>>>>>> TableFunction, >>>>>>>>>>>>>>>>>>>>>>> and we >>>>>>>>>>>>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> provide some helper classes >>>>>> (CachingTableFunction, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AllCachingTableFunction, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to developers >>> and >>>>>>>>>> regulate >>>>>>>>>>>>>>>>>>>>>>> metrics >>>>>>>>>>>>>>>>>>>>>>>>> of the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cache. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also, I made a POC[2] for your reference. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your ideas! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>> https://github.com/PatrickRen/flink/tree/FLIP-221 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM >> Александр >>>>>> Смирнов >>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have few comments on your message. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> but could also live with an easier >>>> solution >>>>>> as >>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> first >>>>>>>>>>>>>>>>>>>>>>>>> step: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think that these 2 ways are mutually >>>>>> exclusive >>>>>>>>>>>>>>>>>>>>>>> (originally >>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposed >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because >>>> conceptually >>>>>>>> they >>>>>>>>>>>> follow >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> goal, but implementation details are >>>>>> different. >>>>>>>>>> If >>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>>>> go one >>>>>>>>>>>>>>>>>>>>>>>>>>>>> way, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> moving to another way in the future >> will >>>> mean >>>>>>>>>>>> deleting >>>>>>>>>>>>>>>>>>>>>>>>> existing >>>>>>>>>>>>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and once again changing the API for >>>>>> connectors. >>>>>>>>>> So >>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>> think we >>>>>>>>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reach a consensus with the community >>> about >>>>>> that >>>>>>>>>> and >>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>>>> work >>>>>>>>>>>>>>>>>>>>>>>>>>>>> together >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on >>> tasks >>>>>> for >>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>>>>> parts >>>>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flip (for example, LRU cache >> unification >>> / >>>>>>>>>>>> introducing >>>>>>>>>>>>>>>>>>>>>>>>> proposed >>>>>>>>>>>>>>>>>>>>>>>>>>>> set >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT, >>> Qingsheng? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as the source will only receive the >>>> requests >>>>>>>>>> after >>>>>>>>>>>>>>>>>>>>>>> filter >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually if filters are applied to >> fields >>>> of >>>>>> the >>>>>>>>>>>> lookup >>>>>>>>>>>>>>>>>>>>>>>>> table, we >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> firstly must do requests, and only >> after >>>>>> that we >>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>> filter >>>>>>>>>>>>>>>>>>>>>>>>>>>>> responses, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> because lookup connectors don't have >>> filter >>>>>>>>>>>> pushdown. So >>>>>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>>>>>>> filtering >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is done before caching, there will be >>> much >>>>>> less >>>>>>>>>>> rows >>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>> cache. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your >>>> architecture >>>>>> is >>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>> shared. >>>>>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> know the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such >>> kinds >>>>>> of >>>>>>>>>>>>>>>>>>>>>>> conversations >>>>>>>>>>>>>>>>>>>>>>>>> :) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have no write access to the >> confluence, >>>> so >>>>>> I >>>>>>>>>>> made a >>>>>>>>>>>>>>>>>>>>>>> Jira >>>>>>>>>>>>>>>>>>>>>>>>> issue, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> where described the proposed changes in >>>> more >>>>>>>>>>> details >>>>>>>>>>>> - >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-27411. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Will happy to get more feedback! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid >> Heise >>> < >>>>>>>>>>>>>>>>>>>>>>> ar...@apache.org>: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving this; the >>> inconsistency >>>>>> was >>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>> satisfying >>>>>>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> me. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but >>> could >>>>>> also >>>>>>>>>>> live >>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>>>>>>>>> easier >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution as the first step: Instead of >>>>>> making >>>>>>>>>>>> caching >>>>>>>>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather >>> devise a >>>>>>>>>> caching >>>>>>>>>>>>>>>>>>>>>>> layer >>>>>>>>>>>>>>>>>>>>>>>>>>>> around X. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal would be a >> CachingTableFunction >>>>>> that >>>>>>>>>>>>>>>>>>>>>>> delegates to >>>>>>>>>>>>>>>>>>>>>>>>> X in >>>>>>>>>>>>>>>>>>>>>>>>>>>>> case >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> misses and else manages the cache. >>> Lifting >>>>>> it >>>>>>>>>> into >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> model >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposed would be even better but is >>>>>> probably >>>>>>>>>>>>>>>>>>>>>>> unnecessary >>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> first step >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for a lookup source (as the source >> will >>>> only >>>>>>>>>>> receive >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>> requests >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> after >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filter; applying projection may be >> more >>>>>>>>>>> interesting >>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> save >>>>>>>>>>>>>>>>>>>>>>>>>>>>> memory). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Another advantage is that all the >>> changes >>>> of >>>>>>>>>> this >>>>>>>>>>>> FLIP >>>>>>>>>>>>>>>>>>>>>>>>> would be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> limited to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> options, no need for new public >>>> interfaces. >>>>>>>>>>>> Everything >>>>>>>>>>>>>>>>>>>>>>> else >>>>>>>>>>>>>>>>>>>>>>>>>>>>> remains >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation of Table runtime. That >>>> means >>>>>> we >>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>> easily >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> incorporate >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimization potential that Alexander >>>>>> pointed >>>>>>>>>> out >>>>>>>>>>>>>>>>>>>>>>> later. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your >>>> architecture >>>>>> is >>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>> shared. >>>>>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> know the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM >>> Александр >>>>>>>>>> Смирнов >>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, >> I'm >>>>>> not a >>>>>>>>>>>>>>>>>>>>>>> committer >>>>>>>>>>>>>>>>>>>>>>>>> yet, >>>>>>>>>>>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'd >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> really like to become one. And this >>> FLIP >>>>>>>> really >>>>>>>>>>>>>>>>>>>>>>>>> interested >>>>>>>>>>>>>>>>>>>>>>>>>>>> me. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar >>>>>> feature in >>>>>>>>>> my >>>>>>>>>>>>>>>>>>>>>>>>> company’s >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fork, and we would like to share our >>>>>> thoughts >>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>>> this and >>>>>>>>>>>>>>>>>>>>>>>>>>>> make >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> open source. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think there is a better alternative >>>> than >>>>>>>>>>>>>>>>>>>>>>> introducing an >>>>>>>>>>>>>>>>>>>>>>>>>>>>> abstract >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class for TableFunction >>>>>>>> (CachingTableFunction). >>>>>>>>>>> As >>>>>>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>>>>>>> know, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> TableFunction exists in the >>>>>> flink-table-common >>>>>>>>>>>>>>>>>>>>>>> module, >>>>>>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> provides >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> only an API for working with tables – >>>> it’s >>>>>>>> very >>>>>>>>>>>>>>>>>>>>>>>>> convenient >>>>>>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> importing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in connectors. In turn, >>>>>> CachingTableFunction >>>>>>>>>>>> contains >>>>>>>>>>>>>>>>>>>>>>>>> logic >>>>>>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runtime execution, so this class and >>>>>>>>>> everything >>>>>>>>>>>>>>>>>>>>>>>>> connected >>>>>>>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should be located in another module, >>>>>> probably >>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> But this will require connectors to >>>> depend >>>>>> on >>>>>>>>>>>> another >>>>>>>>>>>>>>>>>>>>>>>>> module, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic, >> which >>>>>> doesn’t >>>>>>>>>>>> sound >>>>>>>>>>>>>>>>>>>>>>>>> good. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding a new method >>>>>>>> ‘getLookupConfig’ >>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupTableSource >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow >>>>>> connectors >>>>>>>> to >>>>>>>>>>>> only >>>>>>>>>>>>>>>>>>>>>>> pass >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations to the planner, >>> therefore >>>>>> they >>>>>>>>>>> won’t >>>>>>>>>>>>>>>>>>>>>>>>> depend on >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runtime >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> realization. Based on these configs >>>> planner >>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>>>> construct a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lookup >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> join operator with corresponding >>> runtime >>>>>> logic >>>>>>>>>>>>>>>>>>>>>>>>>>>> (ProcessFunctions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> module flink-table-runtime). >>> Architecture >>>>>>>> looks >>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> pinned >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is >>>> actually >>>>>>>>>> yours >>>>>>>>>>>>>>>>>>>>>>>>>>>> CacheConfig). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner, that >>> will >>>>>> be >>>>>>>>>>>>>>>>>>>>>>> responsible >>>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> – >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his >>>>>> inheritors. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in >>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime >>>>>>>>>>>>>>>>>>>>>>>>> - >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunner, >>> AsyncLookupJoinRunner, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunnerWithCalc, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding classes >>>>>>>>>> LookupJoinCachingRunner, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> And here comes another more powerful >>>>>> advantage >>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>> such a >>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower >> level, >>>> we >>>>>> can >>>>>>>>>>>> apply >>>>>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimizations to it. >>>>>> LookupJoinRunnerWithCalc >>>>>>>>>> was >>>>>>>>>>>>>>>>>>>>>>> named >>>>>>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’ function, >>>> which >>>>>>>>>>> actually >>>>>>>>>>>>>>>>>>>>>>>>> mostly >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consists >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example, in join table A with >>> lookup >>>>>> table >>>>>>>>>> B >>>>>>>>>>>>>>>>>>>>>>>>> condition >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ‘JOIN … >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ON >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 >>> WHERE >>>>>>>>>>> B.salary > >>>>>>>>>>>>>>>>>>>>>>> 1000’ >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ‘calc’ >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> function will contain filters A.age = >>>>>> B.age + >>>>>>>>>> 10 >>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>> B.salary > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1000. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we apply this function before >>> storing >>>>>>>>>> records >>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>> cache, >>>>>>>>>>>>>>>>>>>>>>>>>>>> size >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cache will be significantly reduced: >>>>>> filters = >>>>>>>>>>>> avoid >>>>>>>>>>>>>>>>>>>>>>>>> storing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> useless >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> records in cache, projections = >> reduce >>>>>>>> records’ >>>>>>>>>>>>>>>>>>>>>>> size. So >>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> initial >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> max number of records in cache can be >>>>>>>> increased >>>>>>>>>>> by >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> user. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What do you think about it? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren >>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi devs, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a >>>>>> discussion >>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP-221[1], >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup >>> table >>>>>>>> cache >>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>>>>>>>>>>>>> standard >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source >>>> should >>>>>>>>>>>> implement >>>>>>>>>>>>>>>>>>>>>>>>> their >>>>>>>>>>>>>>>>>>>>>>>>>>>> own >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cache to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> store lookup results, and there >> isn’t a >>>>>>>>>> standard >>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>> metrics >>>>>>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs with >>>> lookup >>>>>>>>>>> joins, >>>>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>>>> is a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> quite >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> common >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs >>>>>> including >>>>>>>>>>>> cache, >>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrapper >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new >> table >>>>>>>> options. >>>>>>>>>>>>>>>>>>>>>>> Please >>>>>>>>>>>>>>>>>>>>>>>>> take a >>>>>>>>>>>>>>>>>>>>>>>>>>>>> look >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details. >> Any >>>>>>>>>>> suggestions >>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>> comments >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> appreciated! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng Ren >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Real-time Computing Team >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Alibaba Cloud >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>>>>>>>>>> Roman Boyko >>>>>>>>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>> Best Regards, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Qingsheng Ren >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Real-time Computing Team >>>>>>>>>>>>>>>>>>>> Alibaba Cloud >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com >>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >>> >>