After looking at the new introduced ReloadTime and Becket's comment, I agree with Becket we should have a pluggable reloading strategy. We can provide some common implementations, e.g., periodic reloading, and daily reloading. But there definitely be some connector- or business-specific reloading strategies, e.g. notify by a zookeeper watcher, reload once a new Hive partition is complete.
Best, Jark On Thu, 26 May 2022 at 11:52, Becket Qin <becket....@gmail.com> wrote: > Hi Qingsheng, > > Thanks for updating the FLIP. A few comments / questions below: > > 1. Is there a reason that we have both "XXXFactory" and "XXXProvider". > What is the difference between them? If they are the same, can we just use > XXXFactory everywhere? > > 2. Regarding the FullCachingLookupProvider, should the reloading policy > also be pluggable? Periodical reloading could be sometimes be tricky in > practice. For example, if user uses 24 hours as the cache refresh interval > and some nightly batch job delayed, the cache update may still see the > stale data. > > 3. In DefaultLookupCacheFactory, it looks like InitialCapacity should be > removed. > > 4. The purpose of LookupFunctionProvider#cacheMissingKey() seems a little > confusing to me. If Optional<LookupCacheFactory> getCacheFactory() returns > a non-empty factory, doesn't that already indicates the framework to cache > the missing keys? Also, why is this method returning an Optional<Boolean> > instead of boolean? > > Thanks, > > Jiangjie (Becket) Qin > > > > On Wed, May 25, 2022 at 5:07 PM Qingsheng Ren <renqs...@gmail.com> wrote: > >> Hi Lincoln and Jark, >> >> Thanks for the comments! If the community reaches a consensus that we use >> SQL hint instead of table options to decide whether to use sync or async >> mode, it’s indeed not necessary to introduce the “lookup.async” option. >> >> I think it’s a good idea to let the decision of async made on query >> level, which could make better optimization with more infomation gathered >> by planner. Is there any FLIP describing the issue in FLINK-27625? I >> thought FLIP-234 is only proposing adding SQL hint for retry on missing >> instead of the entire async mode to be controlled by hint. >> >> Best regards, >> >> Qingsheng >> >> > On May 25, 2022, at 15:13, Lincoln Lee <lincoln.8...@gmail.com> wrote: >> > >> > Hi Jark, >> > >> > Thanks for your reply! >> > >> > Currently 'lookup.async' just lies in HBase connector, I have no idea >> > whether or when to remove it (we can discuss it in another issue for the >> > HBase connector after FLINK-27625 is done), just not add it into a >> common >> > option now. >> > >> > Best, >> > Lincoln Lee >> > >> > >> > Jark Wu <imj...@gmail.com> 于2022年5月24日周二 20:14写道: >> > >> >> Hi Lincoln, >> >> >> >> I have taken a look at FLIP-234, and I agree with you that the >> connectors >> >> can >> >> provide both async and sync runtime providers simultaneously instead >> of one >> >> of them. >> >> At that point, "lookup.async" looks redundant. If this option is >> planned to >> >> be removed >> >> in the long term, I think it makes sense not to introduce it in this >> FLIP. >> >> >> >> Best, >> >> Jark >> >> >> >> On Tue, 24 May 2022 at 11:08, Lincoln Lee <lincoln.8...@gmail.com> >> wrote: >> >> >> >>> Hi Qingsheng, >> >>> >> >>> Sorry for jumping into the discussion so late. It's a good idea that >> we >> >> can >> >>> have a common table option. I have a minor comments on 'lookup.async' >> >> that >> >>> not make it a common option: >> >>> >> >>> The table layer abstracts both sync and async lookup capabilities, >> >>> connectors implementers can choose one or both, in the case of >> >> implementing >> >>> only one capability(status of the most of existing builtin connectors) >> >>> 'lookup.async' will not be used. And when a connector has both >> >>> capabilities, I think this choice is more suitable for making >> decisions >> >> at >> >>> the query level, for example, table planner can choose the physical >> >>> implementation of async lookup or sync lookup based on its cost >> model, or >> >>> users can give query hint based on their own better understanding. If >> >>> there is another common table option 'lookup.async', it may confuse >> the >> >>> users in the long run. >> >>> >> >>> So, I prefer to leave the 'lookup.async' option in private place (for >> the >> >>> current hbase connector) and not turn it into a common option. >> >>> >> >>> WDYT? >> >>> >> >>> Best, >> >>> Lincoln Lee >> >>> >> >>> >> >>> Qingsheng Ren <renqs...@gmail.com> 于2022年5月23日周一 14:54写道: >> >>> >> >>>> Hi Alexander, >> >>>> >> >>>> Thanks for the review! We recently updated the FLIP and you can find >> >>> those >> >>>> changes from my latest email. Since some terminologies has changed so >> >>> I’ll >> >>>> use the new concept for replying your comments. >> >>>> >> >>>> 1. Builder vs ‘of’ >> >>>> I’m OK to use builder pattern if we have additional optional >> parameters >> >>>> for full caching mode (“rescan” previously). The schedule-with-delay >> >> idea >> >>>> looks reasonable to me, but I think we need to redesign the builder >> API >> >>> of >> >>>> full caching to make it more descriptive for developers. Would you >> mind >> >>>> sharing your ideas about the API? For accessing the FLIP workspace >> you >> >>> can >> >>>> just provide your account ID and ping any PMC member including Jark. >> >>>> >> >>>> 2. Common table options >> >>>> We have some discussions these days and propose to introduce 8 common >> >>>> table options about caching. It has been updated on the FLIP. >> >>>> >> >>>> 3. Retries >> >>>> I think we are on the same page :-) >> >>>> >> >>>> For your additional concerns: >> >>>> 1) The table option has been updated. >> >>>> 2) We got “lookup.cache” back for configuring whether to use partial >> or >> >>>> full caching mode. >> >>>> >> >>>> Best regards, >> >>>> >> >>>> Qingsheng >> >>>> >> >>>> >> >>>> >> >>>>> On May 19, 2022, at 17:25, Александр Смирнов <smirale...@gmail.com> >> >>>> wrote: >> >>>>> >> >>>>> Also I have a few additions: >> >>>>> 1) maybe rename 'lookup.cache.maximum-size' to >> >>>>> 'lookup.cache.max-rows'? I think it will be more clear that we talk >> >>>>> not about bytes, but about the number of rows. Plus it fits more, >> >>>>> considering my optimization with filters. >> >>>>> 2) How will users enable rescanning? Are we going to separate >> caching >> >>>>> and rescanning from the options point of view? Like initially we had >> >>>>> one option 'lookup.cache' with values LRU / ALL. I think now we can >> >>>>> make a boolean option 'lookup.rescan'. RescanInterval can be >> >>>>> 'lookup.rescan.interval', etc. >> >>>>> >> >>>>> Best regards, >> >>>>> Alexander >> >>>>> >> >>>>> чт, 19 мая 2022 г. в 14:50, Александр Смирнов <smirale...@gmail.com >> >>> : >> >>>>>> >> >>>>>> Hi Qingsheng and Jark, >> >>>>>> >> >>>>>> 1. Builders vs 'of' >> >>>>>> I understand that builders are used when we have multiple >> >> parameters. >> >>>>>> I suggested them because we could add parameters later. To prevent >> >>>>>> Builder for ScanRuntimeProvider from looking redundant I can >> suggest >> >>>>>> one more config now - "rescanStartTime". >> >>>>>> It's a time in UTC (LocalTime class) when the first reload of cache >> >>>>>> starts. This parameter can be thought of as 'initialDelay' (diff >> >>>>>> between current time and rescanStartTime) in method >> >>>>>> ScheduleExecutorService#scheduleWithFixedDelay [1] . It can be very >> >>>>>> useful when the dimension table is updated by some other scheduled >> >> job >> >>>>>> at a certain time. Or when the user simply wants a second scan >> >> (first >> >>>>>> cache reload) be delayed. This option can be used even without >> >>>>>> 'rescanInterval' - in this case 'rescanInterval' will be one day. >> >>>>>> If you are fine with this option, I would be very glad if you would >> >>>>>> give me access to edit FLIP page, so I could add it myself >> >>>>>> >> >>>>>> 2. Common table options >> >>>>>> I also think that FactoryUtil would be overloaded by all cache >> >>>>>> options. But maybe unify all suggested options, not only for >> default >> >>>>>> cache? I.e. class 'LookupOptions', that unifies default cache >> >> options, >> >>>>>> rescan options, 'async', 'maxRetries'. WDYT? >> >>>>>> >> >>>>>> 3. Retries >> >>>>>> I'm fine with suggestion close to RetryUtils#tryTimes(times, call) >> >>>>>> >> >>>>>> [1] >> >>>> >> >>> >> >> >> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit- >> >>>>>> >> >>>>>> Best regards, >> >>>>>> Alexander >> >>>>>> >> >>>>>> ср, 18 мая 2022 г. в 16:04, Qingsheng Ren <renqs...@gmail.com>: >> >>>>>>> >> >>>>>>> Hi Jark and Alexander, >> >>>>>>> >> >>>>>>> Thanks for your comments! I’m also OK to introduce common table >> >>>> options. I prefer to introduce a new DefaultLookupCacheOptions class >> >> for >> >>>> holding these option definitions because putting all options into >> >>>> FactoryUtil would make it a bit ”crowded” and not well categorized. >> >>>>>>> >> >>>>>>> FLIP has been updated according to suggestions above: >> >>>>>>> 1. Use static “of” method for constructing RescanRuntimeProvider >> >>>> considering both arguments are required. >> >>>>>>> 2. Introduce new table options matching DefaultLookupCacheFactory >> >>>>>>> >> >>>>>>> Best, >> >>>>>>> Qingsheng >> >>>>>>> >> >>>>>>> On Wed, May 18, 2022 at 2:57 PM Jark Wu <imj...@gmail.com> wrote: >> >>>>>>>> >> >>>>>>>> Hi Alex, >> >>>>>>>> >> >>>>>>>> 1) retry logic >> >>>>>>>> I think we can extract some common retry logic into utilities, >> >> e.g. >> >>>> RetryUtils#tryTimes(times, call). >> >>>>>>>> This seems independent of this FLIP and can be reused by >> >> DataStream >> >>>> users. >> >>>>>>>> Maybe we can open an issue to discuss this and where to put it. >> >>>>>>>> >> >>>>>>>> 2) cache ConfigOptions >> >>>>>>>> I'm fine with defining cache config options in the framework. >> >>>>>>>> A candidate place to put is FactoryUtil which also includes >> >>>> "sink.parallelism", "format" options. >> >>>>>>>> >> >>>>>>>> Best, >> >>>>>>>> Jark >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On Wed, 18 May 2022 at 13:52, Александр Смирнов < >> >>> smirale...@gmail.com> >> >>>> wrote: >> >>>>>>>>> >> >>>>>>>>> Hi Qingsheng, >> >>>>>>>>> >> >>>>>>>>> Thank you for considering my comments. >> >>>>>>>>> >> >>>>>>>>>> there might be custom logic before making retry, such as >> >>>> re-establish the connection >> >>>>>>>>> >> >>>>>>>>> Yes, I understand that. I meant that such logic can be placed in >> >> a >> >>>>>>>>> separate function, that can be implemented by connectors. Just >> >>> moving >> >>>>>>>>> the retry logic would make connector's LookupFunction more >> >> concise >> >>> + >> >>>>>>>>> avoid duplicate code. However, it's a minor change. The decision >> >> is >> >>>> up >> >>>>>>>>> to you. >> >>>>>>>>> >> >>>>>>>>>> We decide not to provide common DDL options and let developers >> >> to >> >>>> define their own options as we do now per connector. >> >>>>>>>>> >> >>>>>>>>> What is the reason for that? One of the main goals of this FLIP >> >> was >> >>>> to >> >>>>>>>>> unify the configs, wasn't it? I understand that current cache >> >>> design >> >>>>>>>>> doesn't depend on ConfigOptions, like was before. But still we >> >> can >> >>>> put >> >>>>>>>>> these options into the framework, so connectors can reuse them >> >> and >> >>>>>>>>> avoid code duplication, and, what is more significant, avoid >> >>> possible >> >>>>>>>>> different options naming. This moment can be pointed out in >> >>>>>>>>> documentation for connector developers. >> >>>>>>>>> >> >>>>>>>>> Best regards, >> >>>>>>>>> Alexander >> >>>>>>>>> >> >>>>>>>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren <renqs...@gmail.com>: >> >>>>>>>>>> >> >>>>>>>>>> Hi Alexander, >> >>>>>>>>>> >> >>>>>>>>>> Thanks for the review and glad to see we are on the same page! >> I >> >>>> think you forgot to cc the dev mailing list so I’m also quoting your >> >>> reply >> >>>> under this email. >> >>>>>>>>>> >> >>>>>>>>>>> We can add 'maxRetryTimes' option into this class >> >>>>>>>>>> >> >>>>>>>>>> In my opinion the retry logic should be implemented in lookup() >> >>>> instead of in LookupFunction#eval(). Retrying is only meaningful >> under >> >>> some >> >>>> specific retriable failures, and there might be custom logic before >> >>> making >> >>>> retry, such as re-establish the connection (JdbcRowDataLookupFunction >> >> is >> >>> an >> >>>> example), so it's more handy to leave it to the connector. >> >>>>>>>>>> >> >>>>>>>>>>> I don't see DDL options, that were in previous version of >> FLIP. >> >>> Do >> >>>> you have any special plans for them? >> >>>>>>>>>> >> >>>>>>>>>> We decide not to provide common DDL options and let developers >> >> to >> >>>> define their own options as we do now per connector. >> >>>>>>>>>> >> >>>>>>>>>> The rest of comments sound great and I’ll update the FLIP. Hope >> >> we >> >>>> can finalize our proposal soon! >> >>>>>>>>>> >> >>>>>>>>>> Best, >> >>>>>>>>>> >> >>>>>>>>>> Qingsheng >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>>> On May 17, 2022, at 13:46, Александр Смирнов < >> >>> smirale...@gmail.com> >> >>>> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>> Hi Qingsheng and devs! >> >>>>>>>>>>> >> >>>>>>>>>>> I like the overall design of updated FLIP, however I have >> >> several >> >>>>>>>>>>> suggestions and questions. >> >>>>>>>>>>> >> >>>>>>>>>>> 1) Introducing LookupFunction as a subclass of TableFunction >> >> is a >> >>>> good >> >>>>>>>>>>> idea. We can add 'maxRetryTimes' option into this class. >> 'eval' >> >>>> method >> >>>>>>>>>>> of new LookupFunction is great for this purpose. The same is >> >> for >> >>>>>>>>>>> 'async' case. >> >>>>>>>>>>> >> >>>>>>>>>>> 2) There might be other configs in future, such as >> >>>> 'cacheMissingKey' >> >>>>>>>>>>> in LookupFunctionProvider or 'rescanInterval' in >> >>>> ScanRuntimeProvider. >> >>>>>>>>>>> Maybe use Builder pattern in LookupFunctionProvider and >> >>>>>>>>>>> RescanRuntimeProvider for more flexibility (use one 'build' >> >>> method >> >>>>>>>>>>> instead of many 'of' methods in future)? >> >>>>>>>>>>> >> >>>>>>>>>>> 3) What are the plans for existing TableFunctionProvider and >> >>>>>>>>>>> AsyncTableFunctionProvider? I think they should be deprecated. >> >>>>>>>>>>> >> >>>>>>>>>>> 4) Am I right that the current design does not assume usage of >> >>>>>>>>>>> user-provided LookupCache in re-scanning? In this case, it is >> >> not >> >>>> very >> >>>>>>>>>>> clear why do we need methods such as 'invalidate' or 'putAll' >> >> in >> >>>>>>>>>>> LookupCache. >> >>>>>>>>>>> >> >>>>>>>>>>> 5) I don't see DDL options, that were in previous version of >> >>> FLIP. >> >>>> Do >> >>>>>>>>>>> you have any special plans for them? >> >>>>>>>>>>> >> >>>>>>>>>>> If you don't mind, I would be glad to be able to make small >> >>>>>>>>>>> adjustments to the FLIP document too. I think it's worth >> >>> mentioning >> >>>>>>>>>>> about what exactly optimizations are planning in the future. >> >>>>>>>>>>> >> >>>>>>>>>>> Best regards, >> >>>>>>>>>>> Smirnov Alexander >> >>>>>>>>>>> >> >>>>>>>>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren <renqs...@gmail.com >> >>> : >> >>>>>>>>>>>> >> >>>>>>>>>>>> Hi Alexander and devs, >> >>>>>>>>>>>> >> >>>>>>>>>>>> Thank you very much for the in-depth discussion! As Jark >> >>>> mentioned we were inspired by Alexander's idea and made a refactor on >> >> our >> >>>> design. FLIP-221 [1] has been updated to reflect our design now and >> we >> >>> are >> >>>> happy to hear more suggestions from you! >> >>>>>>>>>>>> >> >>>>>>>>>>>> Compared to the previous design: >> >>>>>>>>>>>> 1. The lookup cache serves at table runtime level and is >> >>>> integrated as a component of LookupJoinRunner as discussed >> previously. >> >>>>>>>>>>>> 2. Interfaces are renamed and re-designed to reflect the new >> >>>> design. >> >>>>>>>>>>>> 3. We separate the all-caching case individually and >> >> introduce a >> >>>> new RescanRuntimeProvider to reuse the ability of scanning. We are >> >>> planning >> >>>> to support SourceFunction / InputFormat for now considering the >> >>> complexity >> >>>> of FLIP-27 Source API. >> >>>>>>>>>>>> 4. A new interface LookupFunction is introduced to make the >> >>>> semantic of lookup more straightforward for developers. >> >>>>>>>>>>>> >> >>>>>>>>>>>> For replying to Alexander: >> >>>>>>>>>>>>> However I'm a little confused whether InputFormat is >> >> deprecated >> >>>> or not. Am I right that it will be so in the future, but currently >> it's >> >>> not? >> >>>>>>>>>>>> Yes you are right. InputFormat is not deprecated for now. I >> >>> think >> >>>> it will be deprecated in the future but we don't have a clear plan >> for >> >>> that. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Thanks again for the discussion on this FLIP and looking >> >> forward >> >>>> to cooperating with you after we finalize the design and interfaces! >> >>>>>>>>>>>> >> >>>>>>>>>>>> [1] >> >>>> >> >>> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >> >>>>>>>>>>>> >> >>>>>>>>>>>> Best regards, >> >>>>>>>>>>>> >> >>>>>>>>>>>> Qingsheng >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов < >> >>>> smirale...@gmail.com> wrote: >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Hi Jark, Qingsheng and Leonard! >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Glad to see that we came to a consensus on almost all >> points! >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> However I'm a little confused whether InputFormat is >> >> deprecated >> >>>> or >> >>>>>>>>>>>>> not. Am I right that it will be so in the future, but >> >> currently >> >>>> it's >> >>>>>>>>>>>>> not? Actually I also think that for the first version it's >> OK >> >>> to >> >>>> use >> >>>>>>>>>>>>> InputFormat in ALL cache realization, because supporting >> >> rescan >> >>>>>>>>>>>>> ability seems like a very distant prospect. But for this >> >>>> decision we >> >>>>>>>>>>>>> need a consensus among all discussion participants. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> In general, I don't have something to argue with your >> >>>> statements. All >> >>>>>>>>>>>>> of them correspond my ideas. Looking ahead, it would be nice >> >> to >> >>>> work >> >>>>>>>>>>>>> on this FLIP cooperatively. I've already done a lot of work >> >> on >> >>>> lookup >> >>>>>>>>>>>>> join caching with realization very close to the one we are >> >>>> discussing, >> >>>>>>>>>>>>> and want to share the results of this work. Anyway looking >> >>>> forward for >> >>>>>>>>>>>>> the FLIP update! >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Best regards, >> >>>>>>>>>>>>> Smirnov Alexander >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Hi Alex, >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Thanks for summarizing your points. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> In the past week, Qingsheng, Leonard, and I have discussed >> >> it >> >>>> several times >> >>>>>>>>>>>>>> and we have totally refactored the design. >> >>>>>>>>>>>>>> I'm glad to say we have reached a consensus on many of your >> >>>> points! >> >>>>>>>>>>>>>> Qingsheng is still working on updating the design docs and >> >>>> maybe can be >> >>>>>>>>>>>>>> available in the next few days. >> >>>>>>>>>>>>>> I will share some conclusions from our discussions: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 1) we have refactored the design towards to "cache in >> >>>> framework" way. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 2) a "LookupCache" interface for users to customize and a >> >>>> default >> >>>>>>>>>>>>>> implementation with builder for users to easy-use. >> >>>>>>>>>>>>>> This can both make it possible to both have flexibility and >> >>>> conciseness. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 3) Filter pushdown is important for ALL and LRU lookup >> >> cache, >> >>>> esp reducing >> >>>>>>>>>>>>>> IO. >> >>>>>>>>>>>>>> Filter pushdown should be the final state and the unified >> >> way >> >>>> to both >> >>>>>>>>>>>>>> support pruning ALL cache and LRU cache, >> >>>>>>>>>>>>>> so I think we should make effort in this direction. If we >> >> need >> >>>> to support >> >>>>>>>>>>>>>> filter pushdown for ALL cache anyway, why not use >> >>>>>>>>>>>>>> it for LRU cache as well? Either way, as we decide to >> >>> implement >> >>>> the cache >> >>>>>>>>>>>>>> in the framework, we have the chance to support >> >>>>>>>>>>>>>> filter on cache anytime. This is an optimization and it >> >>> doesn't >> >>>> affect the >> >>>>>>>>>>>>>> public API. I think we can create a JIRA issue to >> >>>>>>>>>>>>>> discuss it when the FLIP is accepted. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 4) The idea to support ALL cache is similar to your >> >> proposal. >> >>>>>>>>>>>>>> In the first version, we will only support InputFormat, >> >>>> SourceFunction for >> >>>>>>>>>>>>>> cache all (invoke InputFormat in join operator). >> >>>>>>>>>>>>>> For FLIP-27 source, we need to join a true source operator >> >>>> instead of >> >>>>>>>>>>>>>> calling it embedded in the join operator. >> >>>>>>>>>>>>>> However, this needs another FLIP to support the re-scan >> >>> ability >> >>>> for FLIP-27 >> >>>>>>>>>>>>>> Source, and this can be a large work. >> >>>>>>>>>>>>>> In order to not block this issue, we can put the effort of >> >>>> FLIP-27 source >> >>>>>>>>>>>>>> integration into future work and integrate >> >>>>>>>>>>>>>> InputFormat&SourceFunction for now. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> I think it's fine to use InputFormat&SourceFunction, as >> they >> >>>> are not >> >>>>>>>>>>>>>> deprecated, otherwise, we have to introduce another >> function >> >>>>>>>>>>>>>> similar to them which is meaningless. We need to plan >> >> FLIP-27 >> >>>> source >> >>>>>>>>>>>>>> integration ASAP before InputFormat & SourceFunction are >> >>>> deprecated. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Best, >> >>>>>>>>>>>>>> Jark >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов < >> >>>> smirale...@gmail.com> >> >>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Hi Martijn! >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Got it. Therefore, the realization with InputFormat is not >> >>>> considered. >> >>>>>>>>>>>>>>> Thanks for clearing that up! >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Best regards, >> >>>>>>>>>>>>>>> Smirnov Alexander >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser < >> >>>> mart...@ververica.com>: >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Hi, >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> With regards to: >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> But if there are plans to refactor all connectors to >> >>> FLIP-27 >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Yes, FLIP-27 is the target for all connectors. The old >> >>>> interfaces will be >> >>>>>>>>>>>>>>>> deprecated and connectors will either be refactored to >> use >> >>>> the new ones >> >>>>>>>>>>>>>>> or >> >>>>>>>>>>>>>>>> dropped. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> The caching should work for connectors that are using >> >>> FLIP-27 >> >>>> interfaces, >> >>>>>>>>>>>>>>>> we should not introduce new features for old interfaces. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Best regards, >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Martijn >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов < >> >>>> smirale...@gmail.com> >> >>>>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Hi Jark! >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Sorry for the late response. I would like to make some >> >>>> comments and >> >>>>>>>>>>>>>>>>> clarify my points. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> 1) I agree with your first statement. I think we can >> >>> achieve >> >>>> both >> >>>>>>>>>>>>>>>>> advantages this way: put the Cache interface in >> >>>> flink-table-common, >> >>>>>>>>>>>>>>>>> but have implementations of it in flink-table-runtime. >> >>>> Therefore if a >> >>>>>>>>>>>>>>>>> connector developer wants to use existing cache >> >> strategies >> >>>> and their >> >>>>>>>>>>>>>>>>> implementations, he can just pass lookupConfig to the >> >>>> planner, but if >> >>>>>>>>>>>>>>>>> he wants to have its own cache implementation in his >> >>>> TableFunction, it >> >>>>>>>>>>>>>>>>> will be possible for him to use the existing interface >> >> for >> >>>> this >> >>>>>>>>>>>>>>>>> purpose (we can explicitly point this out in the >> >>>> documentation). In >> >>>>>>>>>>>>>>>>> this way all configs and metrics will be unified. WDYT? >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we will >> >>>> have 90% of >> >>>>>>>>>>>>>>>>> lookup requests that can never be cached >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> 2) Let me clarify the logic filters optimization in case >> >> of >> >>>> LRU cache. >> >>>>>>>>>>>>>>>>> It looks like Cache<RowData, Collection<RowData>>. Here >> >> we >> >>>> always >> >>>>>>>>>>>>>>>>> store the response of the dimension table in cache, even >> >>>> after >> >>>>>>>>>>>>>>>>> applying calc function. I.e. if there are no rows after >> >>>> applying >> >>>>>>>>>>>>>>>>> filters to the result of the 'eval' method of >> >>> TableFunction, >> >>>> we store >> >>>>>>>>>>>>>>>>> the empty list by lookup keys. Therefore the cache line >> >>> will >> >>>> be >> >>>>>>>>>>>>>>>>> filled, but will require much less memory (in bytes). >> >> I.e. >> >>>> we don't >> >>>>>>>>>>>>>>>>> completely filter keys, by which result was pruned, but >> >>>> significantly >> >>>>>>>>>>>>>>>>> reduce required memory to store this result. If the user >> >>>> knows about >> >>>>>>>>>>>>>>>>> this behavior, he can increase the 'max-rows' option >> >> before >> >>>> the start >> >>>>>>>>>>>>>>>>> of the job. But actually I came up with the idea that we >> >>> can >> >>>> do this >> >>>>>>>>>>>>>>>>> automatically by using the 'maximumWeight' and 'weigher' >> >>>> methods of >> >>>>>>>>>>>>>>>>> GuavaCache [1]. Weight can be the size of the collection >> >> of >> >>>> rows >> >>>>>>>>>>>>>>>>> (value of cache). Therefore cache can automatically fit >> >>> much >> >>>> more >> >>>>>>>>>>>>>>>>> records than before. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> Flink SQL has provided a standard way to do filters and >> >>>> projects >> >>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and >> >>>> SupportsProjectionPushDown. >> >>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, >> >> don't >> >>>> mean it's >> >>>>>>>>>>>>>>> hard >> >>>>>>>>>>>>>>>>> to implement. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> It's debatable how difficult it will be to implement >> >> filter >> >>>> pushdown. >> >>>>>>>>>>>>>>>>> But I think the fact that currently there is no database >> >>>> connector >> >>>>>>>>>>>>>>>>> with filter pushdown at least means that this feature >> >> won't >> >>>> be >> >>>>>>>>>>>>>>>>> supported soon in connectors. Moreover, if we talk about >> >>>> other >> >>>>>>>>>>>>>>>>> connectors (not in Flink repo), their databases might >> not >> >>>> support all >> >>>>>>>>>>>>>>>>> Flink filters (or not support filters at all). I think >> >>> users >> >>>> are >> >>>>>>>>>>>>>>>>> interested in supporting cache filters optimization >> >>>> independently of >> >>>>>>>>>>>>>>>>> supporting other features and solving more complex >> >> problems >> >>>> (or >> >>>>>>>>>>>>>>>>> unsolvable at all). >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> 3) I agree with your third statement. Actually in our >> >>>> internal version >> >>>>>>>>>>>>>>>>> I also tried to unify the logic of scanning and >> reloading >> >>>> data from >> >>>>>>>>>>>>>>>>> connectors. But unfortunately, I didn't find a way to >> >> unify >> >>>> the logic >> >>>>>>>>>>>>>>>>> of all ScanRuntimeProviders (InputFormat, >> SourceFunction, >> >>>> Source,...) >> >>>>>>>>>>>>>>>>> and reuse it in reloading ALL cache. As a result I >> >> settled >> >>>> on using >> >>>>>>>>>>>>>>>>> InputFormat, because it was used for scanning in all >> >> lookup >> >>>>>>>>>>>>>>>>> connectors. (I didn't know that there are plans to >> >>> deprecate >> >>>>>>>>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO usage of >> >>>> FLIP-27 source >> >>>>>>>>>>>>>>>>> in ALL caching is not good idea, because this source was >> >>>> designed to >> >>>>>>>>>>>>>>>>> work in distributed environment (SplitEnumerator on >> >>>> JobManager and >> >>>>>>>>>>>>>>>>> SourceReaders on TaskManagers), not in one operator >> >> (lookup >> >>>> join >> >>>>>>>>>>>>>>>>> operator in our case). There is even no direct way to >> >> pass >> >>>> splits from >> >>>>>>>>>>>>>>>>> SplitEnumerator to SourceReader (this logic works >> through >> >>>>>>>>>>>>>>>>> SplitEnumeratorContext, which requires >> >>>>>>>>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send >> >> AddSplitEvents). >> >>>> Usage of >> >>>>>>>>>>>>>>>>> InputFormat for ALL cache seems much more clearer and >> >>>> easier. But if >> >>>>>>>>>>>>>>>>> there are plans to refactor all connectors to FLIP-27, I >> >>>> have the >> >>>>>>>>>>>>>>>>> following ideas: maybe we can refuse from lookup join >> ALL >> >>>> cache in >> >>>>>>>>>>>>>>>>> favor of simple join with multiple scanning of batch >> >>> source? >> >>>> The point >> >>>>>>>>>>>>>>>>> is that the only difference between lookup join ALL >> cache >> >>>> and simple >> >>>>>>>>>>>>>>>>> join with batch source is that in the first case >> scanning >> >>> is >> >>>> performed >> >>>>>>>>>>>>>>>>> multiple times, in between which state (cache) is >> cleared >> >>>> (correct me >> >>>>>>>>>>>>>>>>> if I'm wrong). So what if we extend the functionality of >> >>>> simple join >> >>>>>>>>>>>>>>>>> to support state reloading + extend the functionality of >> >>>> scanning >> >>>>>>>>>>>>>>>>> batch source multiple times (this one should be easy >> with >> >>>> new FLIP-27 >> >>>>>>>>>>>>>>>>> source, that unifies streaming/batch reading - we will >> >> need >> >>>> to change >> >>>>>>>>>>>>>>>>> only SplitEnumerator, which will pass splits again after >> >>>> some TTL). >> >>>>>>>>>>>>>>>>> WDYT? I must say that this looks like a long-term goal >> >> and >> >>>> will make >> >>>>>>>>>>>>>>>>> the scope of this FLIP even larger than you said. Maybe >> >> we >> >>>> can limit >> >>>>>>>>>>>>>>>>> ourselves to a simpler solution now (InputFormats). >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> So to sum up, my points is like this: >> >>>>>>>>>>>>>>>>> 1) There is a way to make both concise and flexible >> >>>> interfaces for >> >>>>>>>>>>>>>>>>> caching in lookup join. >> >>>>>>>>>>>>>>>>> 2) Cache filters optimization is important both in LRU >> >> and >> >>>> ALL caches. >> >>>>>>>>>>>>>>>>> 3) It is unclear when filter pushdown will be supported >> >> in >> >>>> Flink >> >>>>>>>>>>>>>>>>> connectors, some of the connectors might not have the >> >>>> opportunity to >> >>>>>>>>>>>>>>>>> support filter pushdown + as I know, currently filter >> >>>> pushdown works >> >>>>>>>>>>>>>>>>> only for scanning (not lookup). So cache filters + >> >>>> projections >> >>>>>>>>>>>>>>>>> optimization should be independent from other features. >> >>>>>>>>>>>>>>>>> 4) ALL cache realization is a complex topic that >> involves >> >>>> multiple >> >>>>>>>>>>>>>>>>> aspects of how Flink is developing. Refusing from >> >>>> InputFormat in favor >> >>>>>>>>>>>>>>>>> of FLIP-27 Source will make ALL cache realization really >> >>>> complex and >> >>>>>>>>>>>>>>>>> not clear, so maybe instead of that we can extend the >> >>>> functionality of >> >>>>>>>>>>>>>>>>> simple join or not refuse from InputFormat in case of >> >>> lookup >> >>>> join ALL >> >>>>>>>>>>>>>>>>> cache? >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Best regards, >> >>>>>>>>>>>>>>>>> Smirnov Alexander >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> [1] >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>> >> >>> >> >> >> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher) >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>: >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> It's great to see the active discussion! I want to >> share >> >>> my >> >>>> ideas: >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> 1) implement the cache in framework vs. connectors base >> >>>>>>>>>>>>>>>>>> I don't have a strong opinion on this. Both ways should >> >>>> work (e.g., >> >>>>>>>>>>>>>>> cache >> >>>>>>>>>>>>>>>>>> pruning, compatibility). >> >>>>>>>>>>>>>>>>>> The framework way can provide more concise interfaces. >> >>>>>>>>>>>>>>>>>> The connector base way can define more flexible cache >> >>>>>>>>>>>>>>>>>> strategies/implementations. >> >>>>>>>>>>>>>>>>>> We are still investigating a way to see if we can have >> >>> both >> >>>>>>>>>>>>>>> advantages. >> >>>>>>>>>>>>>>>>>> We should reach a consensus that the way should be a >> >> final >> >>>> state, >> >>>>>>>>>>>>>>> and we >> >>>>>>>>>>>>>>>>>> are on the path to it. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> 2) filters and projections pushdown: >> >>>>>>>>>>>>>>>>>> I agree with Alex that the filter pushdown into cache >> >> can >> >>>> benefit a >> >>>>>>>>>>>>>>> lot >> >>>>>>>>>>>>>>>>> for >> >>>>>>>>>>>>>>>>>> ALL cache. >> >>>>>>>>>>>>>>>>>> However, this is not true for LRU cache. Connectors use >> >>>> cache to >> >>>>>>>>>>>>>>> reduce >> >>>>>>>>>>>>>>>>> IO >> >>>>>>>>>>>>>>>>>> requests to databases for better throughput. >> >>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we will >> >>>> have 90% of >> >>>>>>>>>>>>>>>>> lookup >> >>>>>>>>>>>>>>>>>> requests that can never be cached >> >>>>>>>>>>>>>>>>>> and hit directly to the databases. That means the cache >> >> is >> >>>>>>>>>>>>>>> meaningless in >> >>>>>>>>>>>>>>>>>> this case. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> IMO, Flink SQL has provided a standard way to do >> filters >> >>>> and projects >> >>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and >> >>>>>>>>>>>>>>> SupportsProjectionPushDown. >> >>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, >> >> don't >> >>>> mean it's >> >>>>>>>>>>>>>>> hard >> >>>>>>>>>>>>>>>>> to >> >>>>>>>>>>>>>>>>>> implement. >> >>>>>>>>>>>>>>>>>> They should implement the pushdown interfaces to reduce >> >> IO >> >>>> and the >> >>>>>>>>>>>>>>> cache >> >>>>>>>>>>>>>>>>>> size. >> >>>>>>>>>>>>>>>>>> That should be a final state that the scan source and >> >>>> lookup source >> >>>>>>>>>>>>>>> share >> >>>>>>>>>>>>>>>>>> the exact pushdown implementation. >> >>>>>>>>>>>>>>>>>> I don't see why we need to duplicate the pushdown logic >> >> in >> >>>> caches, >> >>>>>>>>>>>>>>> which >> >>>>>>>>>>>>>>>>>> will complex the lookup join design. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> 3) ALL cache abstraction >> >>>>>>>>>>>>>>>>>> All cache might be the most challenging part of this >> >> FLIP. >> >>>> We have >> >>>>>>>>>>>>>>> never >> >>>>>>>>>>>>>>>>>> provided a reload-lookup public interface. >> >>>>>>>>>>>>>>>>>> Currently, we put the reload logic in the "eval" method >> >> of >> >>>>>>>>>>>>>>> TableFunction. >> >>>>>>>>>>>>>>>>>> That's hard for some sources (e.g., Hive). >> >>>>>>>>>>>>>>>>>> Ideally, connector implementation should share the >> logic >> >>> of >> >>>> reload >> >>>>>>>>>>>>>>> and >> >>>>>>>>>>>>>>>>>> scan, i.e. ScanTableSource with >> >>>> InputFormat/SourceFunction/FLIP-27 >> >>>>>>>>>>>>>>>>> Source. >> >>>>>>>>>>>>>>>>>> However, InputFormat/SourceFunction are deprecated, and >> >>> the >> >>>> FLIP-27 >> >>>>>>>>>>>>>>>>> source >> >>>>>>>>>>>>>>>>>> is deeply coupled with SourceOperator. >> >>>>>>>>>>>>>>>>>> If we want to invoke the FLIP-27 source in LookupJoin, >> >>> this >> >>>> may make >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>> scope of this FLIP much larger. >> >>>>>>>>>>>>>>>>>> We are still investigating how to abstract the ALL >> cache >> >>>> logic and >> >>>>>>>>>>>>>>> reuse >> >>>>>>>>>>>>>>>>>> the existing source interfaces. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> Best, >> >>>>>>>>>>>>>>>>>> Jark >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko < >> >>>> ro.v.bo...@gmail.com> >> >>>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> It's a much more complicated activity and lies out of >> >> the >> >>>> scope of >> >>>>>>>>>>>>>>> this >> >>>>>>>>>>>>>>>>>>> improvement. Because such pushdowns should be done for >> >>> all >> >>>>>>>>>>>>>>>>> ScanTableSource >> >>>>>>>>>>>>>>>>>>> implementations (not only for Lookup ones). >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser < >> >>>>>>>>>>>>>>> martijnvis...@apache.org> >> >>>>>>>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> Hi everyone, >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> One question regarding "And Alexander correctly >> >>> mentioned >> >>>> that >> >>>>>>>>>>>>>>> filter >> >>>>>>>>>>>>>>>>>>>> pushdown still is not implemented for >> >> jdbc/hive/hbase." >> >>>> -> Would >> >>>>>>>>>>>>>>> an >> >>>>>>>>>>>>>>>>>>>> alternative solution be to actually implement these >> >>> filter >> >>>>>>>>>>>>>>> pushdowns? >> >>>>>>>>>>>>>>>>> I >> >>>>>>>>>>>>>>>>>>>> can >> >>>>>>>>>>>>>>>>>>>> imagine that there are many more benefits to doing >> >> that, >> >>>> outside >> >>>>>>>>>>>>>>> of >> >>>>>>>>>>>>>>>>> lookup >> >>>>>>>>>>>>>>>>>>>> caching and metrics. >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> Best regards, >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> Martijn Visser >> >>>>>>>>>>>>>>>>>>>> https://twitter.com/MartijnVisser82 >> >>>>>>>>>>>>>>>>>>>> https://github.com/MartijnVisser >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko < >> >>>> ro.v.bo...@gmail.com> >> >>>>>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> Hi everyone! >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> Thanks for driving such a valuable improvement! >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> I do think that single cache implementation would be >> >> a >> >>>> nice >> >>>>>>>>>>>>>>>>> opportunity >> >>>>>>>>>>>>>>>>>>>> for >> >>>>>>>>>>>>>>>>>>>>> users. And it will break the "FOR SYSTEM_TIME AS OF >> >>>> proc_time" >> >>>>>>>>>>>>>>>>> semantics >> >>>>>>>>>>>>>>>>>>>>> anyway - doesn't matter how it will be implemented. >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> Putting myself in the user's shoes, I can say that: >> >>>>>>>>>>>>>>>>>>>>> 1) I would prefer to have the opportunity to cut off >> >>> the >> >>>> cache >> >>>>>>>>>>>>>>> size >> >>>>>>>>>>>>>>>>> by >> >>>>>>>>>>>>>>>>>>>>> simply filtering unnecessary data. And the most >> handy >> >>>> way to do >> >>>>>>>>>>>>>>> it >> >>>>>>>>>>>>>>>>> is >> >>>>>>>>>>>>>>>>>>>> apply >> >>>>>>>>>>>>>>>>>>>>> it inside LookupRunners. It would be a bit harder to >> >>>> pass it >> >>>>>>>>>>>>>>>>> through the >> >>>>>>>>>>>>>>>>>>>>> LookupJoin node to TableFunction. And Alexander >> >>> correctly >> >>>>>>>>>>>>>>> mentioned >> >>>>>>>>>>>>>>>>> that >> >>>>>>>>>>>>>>>>>>>>> filter pushdown still is not implemented for >> >>>> jdbc/hive/hbase. >> >>>>>>>>>>>>>>>>>>>>> 2) The ability to set the different caching >> >> parameters >> >>>> for >> >>>>>>>>>>>>>>> different >> >>>>>>>>>>>>>>>>>>>> tables >> >>>>>>>>>>>>>>>>>>>>> is quite important. So I would prefer to set it >> >> through >> >>>> DDL >> >>>>>>>>>>>>>>> rather >> >>>>>>>>>>>>>>>>> than >> >>>>>>>>>>>>>>>>>>>>> have similar ttla, strategy and other options for >> all >> >>>> lookup >> >>>>>>>>>>>>>>> tables. >> >>>>>>>>>>>>>>>>>>>>> 3) Providing the cache into the framework really >> >>>> deprives us of >> >>>>>>>>>>>>>>>>>>>>> extensibility (users won't be able to implement >> their >> >>> own >> >>>>>>>>>>>>>>> cache). >> >>>>>>>>>>>>>>>>> But >> >>>>>>>>>>>>>>>>>>>> most >> >>>>>>>>>>>>>>>>>>>>> probably it might be solved by creating more >> >> different >> >>>> cache >> >>>>>>>>>>>>>>>>> strategies >> >>>>>>>>>>>>>>>>>>>> and >> >>>>>>>>>>>>>>>>>>>>> a wider set of configurations. >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> All these points are much closer to the schema >> >> proposed >> >>>> by >> >>>>>>>>>>>>>>>>> Alexander. >> >>>>>>>>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm not right and >> >>> all >> >>>> these >> >>>>>>>>>>>>>>>>>>>> facilities >> >>>>>>>>>>>>>>>>>>>>> might be simply implemented in your architecture? >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> Best regards, >> >>>>>>>>>>>>>>>>>>>>> Roman Boyko >> >>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn Visser < >> >>>>>>>>>>>>>>>>> martijnvis...@apache.org> >> >>>>>>>>>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>> Hi everyone, >> >>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>> I don't have much to chip in, but just wanted to >> >>>> express that >> >>>>>>>>>>>>>>> I >> >>>>>>>>>>>>>>>>> really >> >>>>>>>>>>>>>>>>>>>>>> appreciate the in-depth discussion on this topic >> >> and I >> >>>> hope >> >>>>>>>>>>>>>>> that >> >>>>>>>>>>>>>>>>>>>> others >> >>>>>>>>>>>>>>>>>>>>>> will join the conversation. >> >>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>> Best regards, >> >>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>> Martijn >> >>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр Смирнов < >> >>>>>>>>>>>>>>>>> smirale...@gmail.com> >> >>>>>>>>>>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark, >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> Thanks for your detailed feedback! However, I have >> >>>> questions >> >>>>>>>>>>>>>>>>> about >> >>>>>>>>>>>>>>>>>>>>>>> some of your statements (maybe I didn't get >> >>>> something?). >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> Caching actually breaks the semantic of "FOR >> >>>> SYSTEM_TIME >> >>>>>>>>>>>>>>> AS OF >> >>>>>>>>>>>>>>>>>>>>>> proc_time” >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> I agree that the semantics of "FOR SYSTEM_TIME AS >> >> OF >> >>>>>>>>>>>>>>> proc_time" >> >>>>>>>>>>>>>>>>> is >> >>>>>>>>>>>>>>>>>>>> not >> >>>>>>>>>>>>>>>>>>>>>>> fully implemented with caching, but as you said, >> >>> users >> >>>> go >> >>>>>>>>>>>>>>> on it >> >>>>>>>>>>>>>>>>>>>>>>> consciously to achieve better performance (no one >> >>>> proposed >> >>>>>>>>>>>>>>> to >> >>>>>>>>>>>>>>>>> enable >> >>>>>>>>>>>>>>>>>>>>>>> caching by default, etc.). Or by users do you mean >> >>>> other >> >>>>>>>>>>>>>>>>> developers >> >>>>>>>>>>>>>>>>>>>> of >> >>>>>>>>>>>>>>>>>>>>>>> connectors? In this case developers explicitly >> >>> specify >> >>>>>>>>>>>>>>> whether >> >>>>>>>>>>>>>>>>> their >> >>>>>>>>>>>>>>>>>>>>>>> connector supports caching or not (in the list of >> >>>> supported >> >>>>>>>>>>>>>>>>>>>> options), >> >>>>>>>>>>>>>>>>>>>>>>> no one makes them do that if they don't want to. >> So >> >>>> what >> >>>>>>>>>>>>>>>>> exactly is >> >>>>>>>>>>>>>>>>>>>>>>> the difference between implementing caching in >> >>> modules >> >>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime and in flink-table-common from >> >>> the >> >>>>>>>>>>>>>>>>> considered >> >>>>>>>>>>>>>>>>>>>>>>> point of view? How does it affect on >> >>>> breaking/non-breaking >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF proc_time"? >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> confront a situation that allows table options in >> >>> DDL >> >>>> to >> >>>>>>>>>>>>>>>>> control >> >>>>>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>>>> behavior of the framework, which has never >> happened >> >>>>>>>>>>>>>>> previously >> >>>>>>>>>>>>>>>>> and >> >>>>>>>>>>>>>>>>>>>>> should >> >>>>>>>>>>>>>>>>>>>>>>> be cautious >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> If we talk about main differences of semantics of >> >> DDL >> >>>>>>>>>>>>>>> options >> >>>>>>>>>>>>>>>>> and >> >>>>>>>>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't it about >> >>>> limiting >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>> scope >> >>>>>>>>>>>>>>>>>>>> of >> >>>>>>>>>>>>>>>>>>>>>>> the options + importance for the user business >> >> logic >> >>>> rather >> >>>>>>>>>>>>>>> than >> >>>>>>>>>>>>>>>>>>>>>>> specific location of corresponding logic in the >> >>>> framework? I >> >>>>>>>>>>>>>>>>> mean >> >>>>>>>>>>>>>>>>>>>> that >> >>>>>>>>>>>>>>>>>>>>>>> in my design, for example, putting an option with >> >>>> lookup >> >>>>>>>>>>>>>>> cache >> >>>>>>>>>>>>>>>>>>>>>>> strategy in configurations would be the wrong >> >>>> decision, >> >>>>>>>>>>>>>>>>> because it >> >>>>>>>>>>>>>>>>>>>>>>> directly affects the user's business logic (not >> >> just >> >>>>>>>>>>>>>>> performance >> >>>>>>>>>>>>>>>>>>>>>>> optimization) + touches just several functions of >> >> ONE >> >>>> table >> >>>>>>>>>>>>>>>>> (there >> >>>>>>>>>>>>>>>>>>>> can >> >>>>>>>>>>>>>>>>>>>>>>> be multiple tables with different caches). Does it >> >>>> really >> >>>>>>>>>>>>>>>>> matter for >> >>>>>>>>>>>>>>>>>>>>>>> the user (or someone else) where the logic is >> >>> located, >> >>>>>>>>>>>>>>> which is >> >>>>>>>>>>>>>>>>>>>>>>> affected by the applied option? >> >>>>>>>>>>>>>>>>>>>>>>> Also I can remember DDL option 'sink.parallelism', >> >>>> which in >> >>>>>>>>>>>>>>>>> some way >> >>>>>>>>>>>>>>>>>>>>>>> "controls the behavior of the framework" and I >> >> don't >> >>>> see any >> >>>>>>>>>>>>>>>>> problem >> >>>>>>>>>>>>>>>>>>>>>>> here. >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> introduce a new interface for this all-caching >> >>>> scenario >> >>>>>>>>>>>>>>> and >> >>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>> design >> >>>>>>>>>>>>>>>>>>>>>>> would become more complex >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> This is a subject for a separate discussion, but >> >>>> actually >> >>>>>>>>>>>>>>> in our >> >>>>>>>>>>>>>>>>>>>>>>> internal version we solved this problem quite >> >> easily >> >>> - >> >>>> we >> >>>>>>>>>>>>>>> reused >> >>>>>>>>>>>>>>>>>>>>>>> InputFormat class (so there is no need for a new >> >>> API). >> >>>> The >> >>>>>>>>>>>>>>>>> point is >> >>>>>>>>>>>>>>>>>>>>>>> that currently all lookup connectors use >> >> InputFormat >> >>>> for >> >>>>>>>>>>>>>>>>> scanning >> >>>>>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and even Hive - it >> >>> uses >> >>>>>>>>>>>>>>> class >> >>>>>>>>>>>>>>>>>>>>>>> PartitionReader, that is actually just a wrapper >> >>> around >> >>>>>>>>>>>>>>>>> InputFormat. >> >>>>>>>>>>>>>>>>>>>>>>> The advantage of this solution is the ability to >> >>> reload >> >>>>>>>>>>>>>>> cache >> >>>>>>>>>>>>>>>>> data >> >>>>>>>>>>>>>>>>>>>> in >> >>>>>>>>>>>>>>>>>>>>>>> parallel (number of threads depends on number of >> >>>>>>>>>>>>>>> InputSplits, >> >>>>>>>>>>>>>>>>> but >> >>>>>>>>>>>>>>>>>>>> has >> >>>>>>>>>>>>>>>>>>>>>>> an upper limit). As a result cache reload time >> >>>> significantly >> >>>>>>>>>>>>>>>>> reduces >> >>>>>>>>>>>>>>>>>>>>>>> (as well as time of input stream blocking). I know >> >>> that >> >>>>>>>>>>>>>>> usually >> >>>>>>>>>>>>>>>>> we >> >>>>>>>>>>>>>>>>>>>> try >> >>>>>>>>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink code, but >> >>> maybe >> >>>> this >> >>>>>>>>>>>>>>> one >> >>>>>>>>>>>>>>>>> can >> >>>>>>>>>>>>>>>>>>>> be >> >>>>>>>>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's an ideal >> >>>> solution, >> >>>>>>>>>>>>>>> maybe >> >>>>>>>>>>>>>>>>>>>> there >> >>>>>>>>>>>>>>>>>>>>>>> are better ones. >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> Providing the cache in the framework might >> >> introduce >> >>>>>>>>>>>>>>>>> compatibility >> >>>>>>>>>>>>>>>>>>>>>> issues >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> It's possible only in cases when the developer of >> >> the >> >>>>>>>>>>>>>>> connector >> >>>>>>>>>>>>>>>>>>>> won't >> >>>>>>>>>>>>>>>>>>>>>>> properly refactor his code and will use new cache >> >>>> options >> >>>>>>>>>>>>>>>>>>>> incorrectly >> >>>>>>>>>>>>>>>>>>>>>>> (i.e. explicitly provide the same options into 2 >> >>>> different >> >>>>>>>>>>>>>>> code >> >>>>>>>>>>>>>>>>>>>>>>> places). For correct behavior all he will need to >> >> do >> >>>> is to >> >>>>>>>>>>>>>>>>> redirect >> >>>>>>>>>>>>>>>>>>>>>>> existing options to the framework's LookupConfig >> (+ >> >>>> maybe >> >>>>>>>>>>>>>>> add an >> >>>>>>>>>>>>>>>>>>>> alias >> >>>>>>>>>>>>>>>>>>>>>>> for options, if there was different naming), >> >>> everything >> >>>>>>>>>>>>>>> will be >> >>>>>>>>>>>>>>>>>>>>>>> transparent for users. If the developer won't do >> >>>>>>>>>>>>>>> refactoring at >> >>>>>>>>>>>>>>>>> all, >> >>>>>>>>>>>>>>>>>>>>>>> nothing will be changed for the connector because >> >> of >> >>>>>>>>>>>>>>> backward >> >>>>>>>>>>>>>>>>>>>>>>> compatibility. Also if a developer wants to use >> his >> >>> own >> >>>>>>>>>>>>>>> cache >> >>>>>>>>>>>>>>>>> logic, >> >>>>>>>>>>>>>>>>>>>>>>> he just can refuse to pass some of the configs >> into >> >>> the >> >>>>>>>>>>>>>>>>> framework, >> >>>>>>>>>>>>>>>>>>>> and >> >>>>>>>>>>>>>>>>>>>>>>> instead make his own implementation with already >> >>>> existing >> >>>>>>>>>>>>>>>>> configs >> >>>>>>>>>>>>>>>>>>>> and >> >>>>>>>>>>>>>>>>>>>>>>> metrics (but actually I think that it's a rare >> >> case). >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> filters and projections should be pushed all the >> >> way >> >>>> down >> >>>>>>>>>>>>>>> to >> >>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>> table >> >>>>>>>>>>>>>>>>>>>>>>> function, like what we do in the scan source >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> It's the great purpose. But the truth is that the >> >>> ONLY >> >>>>>>>>>>>>>>> connector >> >>>>>>>>>>>>>>>>>>>> that >> >>>>>>>>>>>>>>>>>>>>>>> supports filter pushdown is FileSystemTableSource >> >>>>>>>>>>>>>>>>>>>>>>> (no database connector supports it currently). >> Also >> >>>> for some >> >>>>>>>>>>>>>>>>>>>> databases >> >>>>>>>>>>>>>>>>>>>>>>> it's simply impossible to pushdown such complex >> >>> filters >> >>>>>>>>>>>>>>> that we >> >>>>>>>>>>>>>>>>> have >> >>>>>>>>>>>>>>>>>>>>>>> in Flink. >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> only applying these optimizations to the cache >> >> seems >> >>>> not >> >>>>>>>>>>>>>>>>> quite >> >>>>>>>>>>>>>>>>>>>>> useful >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily large amount of >> >>> data >> >>>>>>>>>>>>>>> from the >> >>>>>>>>>>>>>>>>>>>>>>> dimension table. For a simple example, suppose in >> >>>> dimension >> >>>>>>>>>>>>>>>>> table >> >>>>>>>>>>>>>>>>>>>>>>> 'users' >> >>>>>>>>>>>>>>>>>>>>>>> we have column 'age' with values from 20 to 40, >> and >> >>>> input >> >>>>>>>>>>>>>>> stream >> >>>>>>>>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed by age of >> >>>> users. If >> >>>>>>>>>>>>>>> we >> >>>>>>>>>>>>>>>>> have >> >>>>>>>>>>>>>>>>>>>>>>> filter 'age > 30', >> >>>>>>>>>>>>>>>>>>>>>>> there will be twice less data in cache. This means >> >>> the >> >>>> user >> >>>>>>>>>>>>>>> can >> >>>>>>>>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by almost 2 >> times. >> >>> It >> >>>> will >> >>>>>>>>>>>>>>>>> gain a >> >>>>>>>>>>>>>>>>>>>>>>> huge >> >>>>>>>>>>>>>>>>>>>>>>> performance boost. Moreover, this optimization >> >> starts >> >>>> to >> >>>>>>>>>>>>>>> really >> >>>>>>>>>>>>>>>>>>>> shine >> >>>>>>>>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without filters and >> >>>> projections >> >>>>>>>>>>>>>>>>> can't >> >>>>>>>>>>>>>>>>>>>> fit >> >>>>>>>>>>>>>>>>>>>>>>> in memory, but with them - can. This opens up >> >>>> additional >> >>>>>>>>>>>>>>>>>>>> possibilities >> >>>>>>>>>>>>>>>>>>>>>>> for users. And this doesn't sound as 'not quite >> >>>> useful'. >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> It would be great to hear other voices regarding >> >> this >> >>>> topic! >> >>>>>>>>>>>>>>>>> Because >> >>>>>>>>>>>>>>>>>>>>>>> we have quite a lot of controversial points, and I >> >>>> think >> >>>>>>>>>>>>>>> with >> >>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>> help >> >>>>>>>>>>>>>>>>>>>>>>> of others it will be easier for us to come to a >> >>>> consensus. >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> Best regards, >> >>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren < >> >>>>>>>>>>>>>>> renqs...@gmail.com >> >>>>>>>>>>>>>>>>>> : >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> Hi Alexander and Arvid, >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry for my late >> >>>> response! >> >>>>>>>>>>>>>>> We >> >>>>>>>>>>>>>>>>> had >> >>>>>>>>>>>>>>>>>>>> an >> >>>>>>>>>>>>>>>>>>>>>>> internal discussion together with Jark and Leonard >> >>> and >> >>>> I’d >> >>>>>>>>>>>>>>> like >> >>>>>>>>>>>>>>>>> to >> >>>>>>>>>>>>>>>>>>>>>>> summarize our ideas. Instead of implementing the >> >>> cache >> >>>>>>>>>>>>>>> logic in >> >>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>> table >> >>>>>>>>>>>>>>>>>>>>>>> runtime layer or wrapping around the user-provided >> >>>> table >> >>>>>>>>>>>>>>>>> function, >> >>>>>>>>>>>>>>>>>>>> we >> >>>>>>>>>>>>>>>>>>>>>>> prefer to introduce some new APIs extending >> >>>> TableFunction >> >>>>>>>>>>>>>>> with >> >>>>>>>>>>>>>>>>> these >> >>>>>>>>>>>>>>>>>>>>>>> concerns: >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> 1. Caching actually breaks the semantic of "FOR >> >>>>>>>>>>>>>>> SYSTEM_TIME >> >>>>>>>>>>>>>>>>> AS OF >> >>>>>>>>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly reflect the >> >>>> content >> >>>>>>>>>>>>>>> of the >> >>>>>>>>>>>>>>>>>>>> lookup >> >>>>>>>>>>>>>>>>>>>>>>> table at the moment of querying. If users choose >> to >> >>>> enable >> >>>>>>>>>>>>>>>>> caching >> >>>>>>>>>>>>>>>>>>>> on >> >>>>>>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>>>> lookup table, they implicitly indicate that this >> >>>> breakage is >> >>>>>>>>>>>>>>>>>>>> acceptable >> >>>>>>>>>>>>>>>>>>>>>> in >> >>>>>>>>>>>>>>>>>>>>>>> exchange for the performance. So we prefer not to >> >>>> provide >> >>>>>>>>>>>>>>>>> caching on >> >>>>>>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>>>> table runtime level. >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> 2. If we make the cache implementation in the >> >>>> framework >> >>>>>>>>>>>>>>>>> (whether >> >>>>>>>>>>>>>>>>>>>> in a >> >>>>>>>>>>>>>>>>>>>>>>> runner or a wrapper around TableFunction), we have >> >> to >> >>>>>>>>>>>>>>> confront a >> >>>>>>>>>>>>>>>>>>>>>> situation >> >>>>>>>>>>>>>>>>>>>>>>> that allows table options in DDL to control the >> >>>> behavior of >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>>> framework, >> >>>>>>>>>>>>>>>>>>>>>>> which has never happened previously and should be >> >>>> cautious. >> >>>>>>>>>>>>>>>>> Under >> >>>>>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>>>> current design the behavior of the framework >> should >> >>>> only be >> >>>>>>>>>>>>>>>>>>>> specified >> >>>>>>>>>>>>>>>>>>>>> by >> >>>>>>>>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and it’s hard >> to >> >>>> apply >> >>>>>>>>>>>>>>> these >> >>>>>>>>>>>>>>>>>>>> general >> >>>>>>>>>>>>>>>>>>>>>>> configs to a specific table. >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> 3. We have use cases that lookup source loads and >> >>>> refresh >> >>>>>>>>>>>>>>> all >> >>>>>>>>>>>>>>>>>>>> records >> >>>>>>>>>>>>>>>>>>>>>>> periodically into the memory to achieve high >> lookup >> >>>>>>>>>>>>>>> performance >> >>>>>>>>>>>>>>>>>>>> (like >> >>>>>>>>>>>>>>>>>>>>>> Hive >> >>>>>>>>>>>>>>>>>>>>>>> connector in the community, and also widely used >> by >> >>> our >> >>>>>>>>>>>>>>> internal >> >>>>>>>>>>>>>>>>>>>>>>> connectors). Wrapping the cache around the user’s >> >>>>>>>>>>>>>>> TableFunction >> >>>>>>>>>>>>>>>>>>>> works >> >>>>>>>>>>>>>>>>>>>>>> fine >> >>>>>>>>>>>>>>>>>>>>>>> for LRU caches, but I think we have to introduce a >> >>> new >> >>>>>>>>>>>>>>>>> interface for >> >>>>>>>>>>>>>>>>>>>>> this >> >>>>>>>>>>>>>>>>>>>>>>> all-caching scenario and the design would become >> >> more >> >>>>>>>>>>>>>>> complex. >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> 4. Providing the cache in the framework might >> >>>> introduce >> >>>>>>>>>>>>>>>>>>>> compatibility >> >>>>>>>>>>>>>>>>>>>>>>> issues to existing lookup sources like there might >> >>>> exist two >> >>>>>>>>>>>>>>>>> caches >> >>>>>>>>>>>>>>>>>>>>> with >> >>>>>>>>>>>>>>>>>>>>>>> totally different strategies if the user >> >> incorrectly >> >>>>>>>>>>>>>>> configures >> >>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>> table >> >>>>>>>>>>>>>>>>>>>>>>> (one in the framework and another implemented by >> >> the >> >>>> lookup >> >>>>>>>>>>>>>>>>> source). >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> As for the optimization mentioned by Alexander, I >> >>>> think >> >>>>>>>>>>>>>>>>> filters >> >>>>>>>>>>>>>>>>>>>> and >> >>>>>>>>>>>>>>>>>>>>>>> projections should be pushed all the way down to >> >> the >> >>>> table >> >>>>>>>>>>>>>>>>> function, >> >>>>>>>>>>>>>>>>>>>>> like >> >>>>>>>>>>>>>>>>>>>>>>> what we do in the scan source, instead of the >> >> runner >> >>>> with >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>> cache. >> >>>>>>>>>>>>>>>>>>>>> The >> >>>>>>>>>>>>>>>>>>>>>>> goal of using cache is to reduce the network I/O >> >> and >> >>>>>>>>>>>>>>> pressure >> >>>>>>>>>>>>>>>>> on the >> >>>>>>>>>>>>>>>>>>>>>>> external system, and only applying these >> >>> optimizations >> >>>> to >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>> cache >> >>>>>>>>>>>>>>>>>>>>> seems >> >>>>>>>>>>>>>>>>>>>>>>> not quite useful. >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to reflect our >> >>>> ideas. >> >>>>>>>>>>>>>>> We >> >>>>>>>>>>>>>>>>>>>> prefer to >> >>>>>>>>>>>>>>>>>>>>>>> keep the cache implementation as a part of >> >>>> TableFunction, >> >>>>>>>>>>>>>>> and we >> >>>>>>>>>>>>>>>>>>>> could >> >>>>>>>>>>>>>>>>>>>>>>> provide some helper classes (CachingTableFunction, >> >>>>>>>>>>>>>>>>>>>>>> AllCachingTableFunction, >> >>>>>>>>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to developers and >> >> regulate >> >>>>>>>>>>>>>>> metrics >> >>>>>>>>>>>>>>>>> of the >> >>>>>>>>>>>>>>>>>>>>>> cache. >> >>>>>>>>>>>>>>>>>>>>>>> Also, I made a POC[2] for your reference. >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your ideas! >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> [1] >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>> >> >>> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >> >>>>>>>>>>>>>>>>>>>>>>>> [2] >> >>> https://github.com/PatrickRen/flink/tree/FLIP-221 >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> Best regards, >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов >> >> < >> >>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com> >> >>>>>>>>>>>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid! >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> I have few comments on your message. >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>> but could also live with an easier solution as >> >> the >> >>>>>>>>>>>>>>> first >> >>>>>>>>>>>>>>>>> step: >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> I think that these 2 ways are mutually exclusive >> >>>>>>>>>>>>>>> (originally >> >>>>>>>>>>>>>>>>>>>>> proposed >> >>>>>>>>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because conceptually >> they >> >>>> follow >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>> same >> >>>>>>>>>>>>>>>>>>>>>>>>> goal, but implementation details are different. >> >> If >> >>> we >> >>>>>>>>>>>>>>> will >> >>>>>>>>>>>>>>>>> go one >> >>>>>>>>>>>>>>>>>>>>> way, >> >>>>>>>>>>>>>>>>>>>>>>>>> moving to another way in the future will mean >> >>>> deleting >> >>>>>>>>>>>>>>>>> existing >> >>>>>>>>>>>>>>>>>>>> code >> >>>>>>>>>>>>>>>>>>>>>>>>> and once again changing the API for connectors. >> >> So >> >>> I >> >>>>>>>>>>>>>>> think we >> >>>>>>>>>>>>>>>>>>>> should >> >>>>>>>>>>>>>>>>>>>>>>>>> reach a consensus with the community about that >> >> and >> >>>> then >> >>>>>>>>>>>>>>> work >> >>>>>>>>>>>>>>>>>>>>> together >> >>>>>>>>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on tasks for >> >>>> different >> >>>>>>>>>>>>>>>>> parts >> >>>>>>>>>>>>>>>>>>>> of >> >>>>>>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>>>>>> flip (for example, LRU cache unification / >> >>>> introducing >> >>>>>>>>>>>>>>>>> proposed >> >>>>>>>>>>>>>>>>>>>> set >> >>>>>>>>>>>>>>>>>>>>> of >> >>>>>>>>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT, Qingsheng? >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>> as the source will only receive the requests >> >> after >> >>>>>>>>>>>>>>> filter >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> Actually if filters are applied to fields of the >> >>>> lookup >> >>>>>>>>>>>>>>>>> table, we >> >>>>>>>>>>>>>>>>>>>>>>>>> firstly must do requests, and only after that we >> >>> can >> >>>>>>>>>>>>>>> filter >> >>>>>>>>>>>>>>>>>>>>> responses, >> >>>>>>>>>>>>>>>>>>>>>>>>> because lookup connectors don't have filter >> >>>> pushdown. So >> >>>>>>>>>>>>>>> if >> >>>>>>>>>>>>>>>>>>>>> filtering >> >>>>>>>>>>>>>>>>>>>>>>>>> is done before caching, there will be much less >> >>> rows >> >>>> in >> >>>>>>>>>>>>>>>>> cache. >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is >> >> not >> >>>>>>>>>>>>>>> shared. >> >>>>>>>>>>>>>>>>> I >> >>>>>>>>>>>>>>>>>>>> don't >> >>>>>>>>>>>>>>>>>>>>>>> know the >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest. >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such kinds of >> >>>>>>>>>>>>>>> conversations >> >>>>>>>>>>>>>>>>> :) >> >>>>>>>>>>>>>>>>>>>>>>>>> I have no write access to the confluence, so I >> >>> made a >> >>>>>>>>>>>>>>> Jira >> >>>>>>>>>>>>>>>>> issue, >> >>>>>>>>>>>>>>>>>>>>>>>>> where described the proposed changes in more >> >>> details >> >>>> - >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >> https://issues.apache.org/jira/browse/FLINK-27411. >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> Will happy to get more feedback! >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> Best, >> >>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid Heise < >> >>>>>>>>>>>>>>> ar...@apache.org>: >> >>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, >> >>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving this; the inconsistency was >> >> not >> >>>>>>>>>>>>>>>>> satisfying >> >>>>>>>>>>>>>>>>>>>> for >> >>>>>>>>>>>>>>>>>>>>>> me. >> >>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but could also >> >>> live >> >>>>>>>>>>>>>>> with >> >>>>>>>>>>>>>>>>> an >> >>>>>>>>>>>>>>>>>>>>> easier >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution as the first step: Instead of making >> >>>> caching >> >>>>>>>>>>>>>>> an >> >>>>>>>>>>>>>>>>>>>>>>> implementation >> >>>>>>>>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather devise a >> >> caching >> >>>>>>>>>>>>>>> layer >> >>>>>>>>>>>>>>>>>>>> around X. >> >>>>>>>>>>>>>>>>>>>>>> So >> >>>>>>>>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>>>>>>> proposal would be a CachingTableFunction that >> >>>>>>>>>>>>>>> delegates to >> >>>>>>>>>>>>>>>>> X in >> >>>>>>>>>>>>>>>>>>>>> case >> >>>>>>>>>>>>>>>>>>>>>>> of >> >>>>>>>>>>>>>>>>>>>>>>>>>> misses and else manages the cache. Lifting it >> >> into >> >>>> the >> >>>>>>>>>>>>>>>>> operator >> >>>>>>>>>>>>>>>>>>>>>> model >> >>>>>>>>>>>>>>>>>>>>>>> as >> >>>>>>>>>>>>>>>>>>>>>>>>>> proposed would be even better but is probably >> >>>>>>>>>>>>>>> unnecessary >> >>>>>>>>>>>>>>>>> in >> >>>>>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>>>> first step >> >>>>>>>>>>>>>>>>>>>>>>>>>> for a lookup source (as the source will only >> >>> receive >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>> requests >> >>>>>>>>>>>>>>>>>>>>>>> after >> >>>>>>>>>>>>>>>>>>>>>>>>>> filter; applying projection may be more >> >>> interesting >> >>>> to >> >>>>>>>>>>>>>>> save >> >>>>>>>>>>>>>>>>>>>>> memory). >> >>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Another advantage is that all the changes of >> >> this >> >>>> FLIP >> >>>>>>>>>>>>>>>>> would be >> >>>>>>>>>>>>>>>>>>>>>>> limited to >> >>>>>>>>>>>>>>>>>>>>>>>>>> options, no need for new public interfaces. >> >>>> Everything >> >>>>>>>>>>>>>>> else >> >>>>>>>>>>>>>>>>>>>>> remains >> >>>>>>>>>>>>>>>>>>>>>> an >> >>>>>>>>>>>>>>>>>>>>>>>>>> implementation of Table runtime. That means we >> >> can >> >>>>>>>>>>>>>>> easily >> >>>>>>>>>>>>>>>>>>>>>> incorporate >> >>>>>>>>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>>>>>>> optimization potential that Alexander pointed >> >> out >> >>>>>>>>>>>>>>> later. >> >>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is >> >> not >> >>>>>>>>>>>>>>> shared. >> >>>>>>>>>>>>>>>>> I >> >>>>>>>>>>>>>>>>>>>> don't >> >>>>>>>>>>>>>>>>>>>>>>> know the >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest. >> >>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM Александр >> >> Смирнов >> >>> < >> >>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com> >> >>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, I'm not a >> >>>>>>>>>>>>>>> committer >> >>>>>>>>>>>>>>>>> yet, >> >>>>>>>>>>>>>>>>>>>> but >> >>>>>>>>>>>>>>>>>>>>>> I'd >> >>>>>>>>>>>>>>>>>>>>>>>>>>> really like to become one. And this FLIP >> really >> >>>>>>>>>>>>>>>>> interested >> >>>>>>>>>>>>>>>>>>>> me. >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar feature in >> >> my >> >>>>>>>>>>>>>>>>> company’s >> >>>>>>>>>>>>>>>>>>>>> Flink >> >>>>>>>>>>>>>>>>>>>>>>>>>>> fork, and we would like to share our thoughts >> >> on >> >>>>>>>>>>>>>>> this and >> >>>>>>>>>>>>>>>>>>>> make >> >>>>>>>>>>>>>>>>>>>>>> code >> >>>>>>>>>>>>>>>>>>>>>>>>>>> open source. >> >>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I think there is a better alternative than >> >>>>>>>>>>>>>>> introducing an >> >>>>>>>>>>>>>>>>>>>>> abstract >> >>>>>>>>>>>>>>>>>>>>>>>>>>> class for TableFunction >> (CachingTableFunction). >> >>> As >> >>>>>>>>>>>>>>> you >> >>>>>>>>>>>>>>>>> know, >> >>>>>>>>>>>>>>>>>>>>>>>>>>> TableFunction exists in the flink-table-common >> >>>>>>>>>>>>>>> module, >> >>>>>>>>>>>>>>>>> which >> >>>>>>>>>>>>>>>>>>>>>>> provides >> >>>>>>>>>>>>>>>>>>>>>>>>>>> only an API for working with tables – it’s >> very >> >>>>>>>>>>>>>>>>> convenient >> >>>>>>>>>>>>>>>>>>>> for >> >>>>>>>>>>>>>>>>>>>>>>> importing >> >>>>>>>>>>>>>>>>>>>>>>>>>>> in connectors. In turn, CachingTableFunction >> >>>> contains >> >>>>>>>>>>>>>>>>> logic >> >>>>>>>>>>>>>>>>>>>> for >> >>>>>>>>>>>>>>>>>>>>>>>>>>> runtime execution, so this class and >> >> everything >> >>>>>>>>>>>>>>>>> connected >> >>>>>>>>>>>>>>>>>>>> with >> >>>>>>>>>>>>>>>>>>>>> it >> >>>>>>>>>>>>>>>>>>>>>>>>>>> should be located in another module, probably >> >> in >> >>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime. >> >>>>>>>>>>>>>>>>>>>>>>>>>>> But this will require connectors to depend on >> >>>> another >> >>>>>>>>>>>>>>>>> module, >> >>>>>>>>>>>>>>>>>>>>>> which >> >>>>>>>>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic, which doesn’t >> >>>> sound >> >>>>>>>>>>>>>>>>> good. >> >>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding a new method >> ‘getLookupConfig’ >> >>> to >> >>>>>>>>>>>>>>>>>>>>>> LookupTableSource >> >>>>>>>>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow connectors >> to >> >>>> only >> >>>>>>>>>>>>>>> pass >> >>>>>>>>>>>>>>>>>>>>>>>>>>> configurations to the planner, therefore they >> >>> won’t >> >>>>>>>>>>>>>>>>> depend on >> >>>>>>>>>>>>>>>>>>>>>>> runtime >> >>>>>>>>>>>>>>>>>>>>>>>>>>> realization. Based on these configs planner >> >> will >> >>>>>>>>>>>>>>>>> construct a >> >>>>>>>>>>>>>>>>>>>>>> lookup >> >>>>>>>>>>>>>>>>>>>>>>>>>>> join operator with corresponding runtime logic >> >>>>>>>>>>>>>>>>>>>> (ProcessFunctions >> >>>>>>>>>>>>>>>>>>>>>> in >> >>>>>>>>>>>>>>>>>>>>>>>>>>> module flink-table-runtime). Architecture >> looks >> >>>> like >> >>>>>>>>>>>>>>> in >> >>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>> pinned >> >>>>>>>>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is actually >> >> yours >> >>>>>>>>>>>>>>>>>>>> CacheConfig). >> >>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner, that will be >> >>>>>>>>>>>>>>> responsible >> >>>>>>>>>>>>>>>>> for >> >>>>>>>>>>>>>>>>>>>>> this >> >>>>>>>>>>>>>>>>>>>>>> – >> >>>>>>>>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his inheritors. >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in >> >>>>>>>>>>>>>>> flink-table-runtime >> >>>>>>>>>>>>>>>>> - >> >>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunner, AsyncLookupJoinRunner, >> >>>>>>>>>>>>>>>>>>>>> LookupJoinRunnerWithCalc, >> >>>>>>>>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc. >> >>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding classes >> >> LookupJoinCachingRunner, >> >>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc. >> >>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> And here comes another more powerful advantage >> >> of >> >>>>>>>>>>>>>>> such a >> >>>>>>>>>>>>>>>>>>>>> solution. >> >>>>>>>>>>>>>>>>>>>>>>> If >> >>>>>>>>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower level, we can >> >>>> apply >> >>>>>>>>>>>>>>> some >> >>>>>>>>>>>>>>>>>>>>>>>>>>> optimizations to it. LookupJoinRunnerWithCalc >> >> was >> >>>>>>>>>>>>>>> named >> >>>>>>>>>>>>>>>>> like >> >>>>>>>>>>>>>>>>>>>>> this >> >>>>>>>>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’ function, which >> >>> actually >> >>>>>>>>>>>>>>>>> mostly >> >>>>>>>>>>>>>>>>>>>>>> consists >> >>>>>>>>>>>>>>>>>>>>>>> of >> >>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections. >> >>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> For example, in join table A with lookup table >> >> B >> >>>>>>>>>>>>>>>>> condition >> >>>>>>>>>>>>>>>>>>>>> ‘JOIN … >> >>>>>>>>>>>>>>>>>>>>>>> ON >> >>>>>>>>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 WHERE >> >>> B.salary > >> >>>>>>>>>>>>>>> 1000’ >> >>>>>>>>>>>>>>>>>>>>> ‘calc’ >> >>>>>>>>>>>>>>>>>>>>>>>>>>> function will contain filters A.age = B.age + >> >> 10 >> >>>> and >> >>>>>>>>>>>>>>>>>>>> B.salary > >> >>>>>>>>>>>>>>>>>>>>>>> 1000. >> >>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> If we apply this function before storing >> >> records >> >>> in >> >>>>>>>>>>>>>>>>> cache, >> >>>>>>>>>>>>>>>>>>>> size >> >>>>>>>>>>>>>>>>>>>>> of >> >>>>>>>>>>>>>>>>>>>>>>>>>>> cache will be significantly reduced: filters = >> >>>> avoid >> >>>>>>>>>>>>>>>>> storing >> >>>>>>>>>>>>>>>>>>>>>> useless >> >>>>>>>>>>>>>>>>>>>>>>>>>>> records in cache, projections = reduce >> records’ >> >>>>>>>>>>>>>>> size. So >> >>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>>>> initial >> >>>>>>>>>>>>>>>>>>>>>>>>>>> max number of records in cache can be >> increased >> >>> by >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>> user. >> >>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> What do you think about it? >> >>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren wrote: >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi devs, >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a discussion >> >>> about >> >>>>>>>>>>>>>>>>>>>> FLIP-221[1], >> >>>>>>>>>>>>>>>>>>>>>>> which >> >>>>>>>>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup table >> cache >> >>> and >> >>>>>>>>>>>>>>> its >> >>>>>>>>>>>>>>>>>>>> standard >> >>>>>>>>>>>>>>>>>>>>>>> metrics. >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source should >> >>>> implement >> >>>>>>>>>>>>>>>>> their >> >>>>>>>>>>>>>>>>>>>> own >> >>>>>>>>>>>>>>>>>>>>>>> cache to >> >>>>>>>>>>>>>>>>>>>>>>>>>>> store lookup results, and there isn’t a >> >> standard >> >>> of >> >>>>>>>>>>>>>>>>> metrics >> >>>>>>>>>>>>>>>>>>>> for >> >>>>>>>>>>>>>>>>>>>>>>> users and >> >>>>>>>>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs with lookup >> >>> joins, >> >>>>>>>>>>>>>>> which >> >>>>>>>>>>>>>>>>> is a >> >>>>>>>>>>>>>>>>>>>>>> quite >> >>>>>>>>>>>>>>>>>>>>>>> common >> >>>>>>>>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL. >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs including >> >>>> cache, >> >>>>>>>>>>>>>>>>>>>> metrics, >> >>>>>>>>>>>>>>>>>>>>>>> wrapper >> >>>>>>>>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new table >> options. >> >>>>>>>>>>>>>>> Please >> >>>>>>>>>>>>>>>>> take a >> >>>>>>>>>>>>>>>>>>>>> look >> >>>>>>>>>>>>>>>>>>>>>>> at the >> >>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details. Any >> >>> suggestions >> >>>>>>>>>>>>>>> and >> >>>>>>>>>>>>>>>>>>>> comments >> >>>>>>>>>>>>>>>>>>>>>>> would be >> >>>>>>>>>>>>>>>>>>>>>>>>>>> appreciated! >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] >> >>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>> >> >>> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> -- >> >>>>>>>>>>>>>>>>>>>>>>>> Best Regards, >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng Ren >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> Real-time Computing Team >> >>>>>>>>>>>>>>>>>>>>>>>> Alibaba Cloud >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> -- >> >>>>>>>>>>>>>>>>>>> Best regards, >> >>>>>>>>>>>>>>>>>>> Roman Boyko >> >>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> -- >> >>>>>>>>>>>> Best Regards, >> >>>>>>>>>>>> >> >>>>>>>>>>>> Qingsheng Ren >> >>>>>>>>>>>> >> >>>>>>>>>>>> Real-time Computing Team >> >>>>>>>>>>>> Alibaba Cloud >> >>>>>>>>>>>> >> >>>>>>>>>>>> Email: renqs...@gmail.com >> >>>>>>>>>> >> >>>> >> >>>> >> >>> >> >> >> >>