Hey Becket, +1 From my side
Piotrek > On 14 Jan 2019, at 14:43, Becket Qin <becket....@gmail.com> wrote: > > Hi Seth, > > Thanks for the feedback. Re-caching makes sense to me. Piotr and I had some > offline discussion and we generally reached consensus on the following API: > > { > /** > * Cache this table to builtin table service or the specified customized > table service. > * > * This method provides a hint to Flink that the current table maybe > reused later so a > * cache should be created to avoid regenerating this table. > * > * The following code snippet gives an example of how this method could > be used. > * > * {{{ > * val t = tEnv.fromCollection(data).as('country, 'color, 'count) > * > * val t1 = t.filter('count < 100).cache() > * // t1 is cached after it is computed for the first time. > * val x = t1.collect().size > * > * // When t1 is used again to compute t2, it may not be re-computed. > * val t2 = t1.groupBy('country).select('country, 'count.sum as 'sum) > * val res2 = t2.collect() > * res2.foreach(println) > * > * // Similarly when t1 is used again to compute t3, it may not be > re-computed. > * val t3 = t1.groupBy('color).select('color, 'count.avg as 'avg) > * val res3 = t3.collect() > * res3.foreach(println) > * > * }}} > * > * @note Flink optimizer may decide to not use the cache if doing that > will accelerate the > * processing, or if the cache is no longer available for reasons such > as the cache has > * been invalidated. > * @note The table cache could be create lazily. That means the cache > may be created at > * the first time when the cached table is computed. > * @note The table cache will be cleared when the user program exits. > * > * @return the current table with a cache hint. The original table > reference is not modified > * by the execution of this method. > */ > def cache(): Table > > /** > * Manually invalidate the cache of this table to release the physical > resources. Users are > * not required to invoke this method to release physical resource > unless they want to. The > * table caches are cleared when user program exits. > * > * @note After invalidated, the cache may be re-created if this table is > used again. > */ > def invalidateCache(): Unit > } > > In the future, after we introduce automatic caching, the table may also be > automatically cached. > > In summary the final state we are looking at is following: > 1. A table could be cached either manually or automatically. > 2. If cache exists, Flink may or may not use it, depending on whether that > will accelerate the execution. > 3. In some rare use cases, an hint of could be used to explicitly ask Flink > to ignore cache. > > I'll document all the discussions we have had around the API. If there is > no further concerns over this API, I'll convert it to a FLIP. > > Thanks, > > Jiangjie (Becket) Qin > > On Thu, Jan 10, 2019 at 9:08 PM Seth Wiesman <s...@data-artisans.com> wrote: > >> I spoke to Piotr a little bit offline and I wanted to comment with a >> summary of our discussion and what I believe is most intuitive cache model >> from a users perspective. >> >> (I am making up some class names here, not looking to bike shed feel free >> to change the names how ever you see fit). >> >> A cache is by definition an optimization, something used to store >> intermediate results for faster / more performant downstream computation. >> Therefore, as a Flink user I would not expect it to change the semantics of >> my application, I would expect it to be rebuildable, and I do not expect to >> know how it works under the hood. With there principles in mind I feel the >> most intuitive api would be as follows: >> >> // Some table >> Table a = . . . >> >> // Signal that we would like to cache the table >> // this is lazy and does not force any computation. >> CachedTable cachedA = a.cache() >> >> // The first operation against the cache. >> // This count will trigger reading input >> // data and building the cache. >> cachedA.count() >> >> // Operates against the cache, no operations >> // before a.cache are performed. >> cachedA.sum() >> >> // This does not operate against the cache, >> // it will trigger reading data from source >> // and performing a full computation >> a.min() >> >> // Invalidates the cache, releasing all >> // underlying resources >> cachedA.invalidateCache() >> >> // Rebuilds the cache. Since caches are recomputable >> // this should not be an error, it will simply be a more >> // expensive operation than if we had not invalidated the cache. >> cachedA.min() >> >> This model leads to 2 nice properties: >> >> 1) The same cache can be shared across multiple invocations of >> Table#cache. Because the cache can always be rebuilt one code path >> invalidating the cache will not break others. Cache’s are simply and >> optimization and rebuilding the cache is not an error but an expected >> property, semantics never change. >> >> 2) When automatic caching is implemented it can follow this same model. >> a) A single cache is created when the optimizer determines it is >> necessary. >> b) If the user decides to explicitly cache a table which has already >> been implicitly cached under the hood then calling Table#cache will just >> return that pre-built cache. >> c) If either the user or optimizer decide to invalidate the cache then >> neither code path will break the other, the cache is simply destroyed and >> will be rebuilt the next time is needed. >> >> Of course caches are still automatically cleaned up when user sessions are >> terminated. >> >> Seth >> >> On 2018/12/11 04:10:21, Becket Qin <b...@gmail.com> wrote: >>> Hi Piotrek,> >>> >>> Thanks for the reply. Thought about it again, I might have >> misunderstood> >>> your proposal in earlier emails. Returning a CachedTable might not be a >> bad> >>> idea.> >>> >>> I was more concerned about the semantic and its intuitiveness when a> >>> CachedTable is returned. i..e, if cache() returns CachedTable. What are >> the> >>> semantic in the following code:> >>> {> >>> val cachedTable = a.cache()> >>> val b = cachedTable.select(...)> >>> val c = a.select(...)> >>> }> >>> What is the difference between b and c? At the first glance, I see two> >>> options:> >>> >>> Semantic 1. b uses cachedTable as user demanded so. c uses original DAG >> as> >>> user demanded so. In this case, the optimizer has no chance to >> optimize.> >>> Semantic 2. b uses cachedTable as user demanded so. c leaves the >> optimizer> >>> to choose whether the cache or DAG should be used. In this case, user >> lose> >>> the option to NOT use cache.> >>> >>> As you can see, neither of the options seem perfect. However, I guess >> you> >>> and Till are proposing the third option:> >>> >>> Semantic 3. b leaves the optimizer to choose whether cache or DAG should >> be> >>> used. c always use the DAG.> >>> >>> This does address all the concerns. It is just that from intuitiveness> >>> perspective, I found that asking user to explicitly use a CachedTable >> while> >>> the optimizer might choose to ignore is a little weird. That was why I >> did> >>> not think about that semantic. But given there is material benefit, I >> think> >>> this semantic is acceptable.> >>> >>> 1. If we want to let optimiser make decisions whether to use cache or >> not,> >>>> then why do we need “void cache()” method at all? Would It “increase” >> the> >>>> chance of using the cache? That’s sounds strange. What would be the> >>>> mechanism of deciding whether to use the cache or not? If we want to> >>>> introduce such kind automated optimisations of “plan nodes >> deduplication”> >>>> I would turn it on globally, not per table, and let the optimiser do >> all of> >>>> the work.> >>>> 2. We do not have statistics at the moment for any use/not use cache> >>>> decision.> >>>> 3. Even if we had, I would be veeerryy sceptical whether such cost >> based> >>>> optimisations would work properly and I would still insist first on> >>>> providing explicit caching mechanism (`CachedTable cache()`)> >>>>> >>> We are absolutely on the same page here. An explicit cache() method is> >>> necessary not only because optimizer may not be able to make the right> >>> decision, but also because of the nature of interactive programming. >> For> >>> example, if users write the following code in Scala shell:> >>> val b = a.select(...)> >>> val c = b.select(...)> >>> val d = c.select(...).writeToSink(...)> >>> tEnv.execute()> >>> There is no way optimizer will know whether b or c will be used in >> later> >>> code, unless users hint explicitly.> >>> >>> At the same time I’m not sure if you have responded to our objections >> of> >>>> `void cache()` being implicit/having side effects, which me, Jark, >> Fabian,> >>>> Till and I think also Shaoxuan are supporting.> >>> >>> Is there any other side effects if we use semantic 3 mentioned above?> >>> >>> Thanks,> >>> >>> JIangjie (Becket) Qin> >>> >>> >>> On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski <pi...@data-artisans.com>> >> >>> wrote:> >>> >>>> Hi Becket,> >>>>> >>>> Sorry for not responding long time.> >>>>> >>>> Regarding case1.> >>>>> >>>> There wouldn’t be no “a.unCache()” method, but I would expect only> >>>> `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t affect> >>>> `cachedTableA2`. Just as in any other database dropping modifying one> >>>> independent table/materialised view does not affect others.> >>>>> >>>>> What I meant is that assuming there is already a cached table, >> ideally> >>>> users need> >>>>> not to specify whether the next query should read from the cache or >> use> >>>> the> >>>>> original DAG. This should be decided by the optimizer.> >>>>> >>>> 1. If we want to let optimiser make decisions whether to use cache or >> not,> >>>> then why do we need “void cache()” method at all? Would It “increase” >> the> >>>> chance of using the cache? That’s sounds strange. What would be the> >>>> mechanism of deciding whether to use the cache or not? If we want to> >>>> introduce such kind automated optimisations of “plan nodes >> deduplication”> >>>> I would turn it on globally, not per table, and let the optimiser do >> all of> >>>> the work.> >>>> 2. We do not have statistics at the moment for any use/not use cache> >>>> decision.> >>>> 3. Even if we had, I would be veeerryy sceptical whether such cost >> based> >>>> optimisations would work properly and I would still insist first on> >>>> providing explicit caching mechanism (`CachedTable cache()`)> >>>> 4. As Till wrote, having explicit `CachedTable cache()` doesn’t >> contradict> >>>> future work on automated cost based caching.> >>>>> >>>>> >>>> At the same time I’m not sure if you have responded to our objections >> of> >>>> `void cache()` being implicit/having side effects, which me, Jark, >> Fabian,> >>>> Till and I think also Shaoxuan are supporting.> >>>>> >>>> Piotrek> >>>>> >>>>> On 5 Dec 2018, at 12:42, Becket Qin <be...@gmail.com> wrote:> >>>>>> >>>>> Hi Till,> >>>>>> >>>>> It is true that after the first job submission, there will be no> >>>> ambiguity> >>>>> in terms of whether a cached table is used or not. That is the same >> for> >>>> the> >>>>> cache() without returning a CachedTable.> >>>>>> >>>>> Conceptually one could think of cache() as introducing a caching >> operator> >>>>>> from which you need to consume from if you want to benefit from >> the> >>>> caching> >>>>>> functionality.> >>>>>> >>>>> I am thinking a little differently. I think it is a hint (as you> >>>> mentioned> >>>>> later) instead of a new operator. I'd like to be careful about the> >>>> semantic> >>>>> of the API. A hint is a property set on an existing operator, but is >> not> >>>>> itself an operator as it does not really manipulate the data.> >>>>>> >>>>> I agree, ideally the optimizer makes this kind of decision which> >>>>>> intermediate result should be cached. But especially when >> executing> >>>> ad-hoc> >>>>>> queries the user might better know which results need to be cached> >>>> because> >>>>>> Flink might not see the full DAG. In that sense, I would consider >> the> >>>>>> cache() method as a hint for the optimizer. Of course, in the >> future we> >>>>>> might add functionality which tries to automatically cache results >> (e.g.> >>>>>> caching the latest intermediate results until so and so much space >> is> >>>>>> used). But this should hopefully not contradict with `CachedTable> >>>> cache()`.> >>>>>> >>>>> I agree that cache() method is needed for exactly the reason you> >>>> mentioned,> >>>>> i.e. Flink cannot predict what users are going to write later, so >> users> >>>>> need to tell Flink explicitly that this table will be used later. >> What I> >>>>> meant is that assuming there is already a cached table, ideally >> users> >>>> need> >>>>> not to specify whether the next query should read from the cache or >> use> >>>> the> >>>>> original DAG. This should be decided by the optimizer.> >>>>>> >>>>> To explain the difference between returning / not returning a> >>>> CachedTable,> >>>>> I want compare the following two case:> >>>>>> >>>>> *Case 1: returning a CachedTable*> >>>>> b = a.map(...)> >>>>> val cachedTableA1 = a.cache()> >>>>> val cachedTableA2 = a.cache()> >>>>> b.print() // Just to make sure a is cached.> >>>>>> >>>>> c = a.filter(...) // User specify that the original DAG is used? Or >> the> >>>>> optimizer decides whether DAG or cache should be used?> >>>>> d = cachedTableA1.filter() // User specify that the cached table is >> used.> >>>>>> >>>>> a.unCache() // Can cachedTableA still be used afterwards?> >>>>> cachedTableA1.uncache() // Can cachedTableA2 still be used?> >>>>>> >>>>> *Case 2: not returning a CachedTable*> >>>>> b = a.map()> >>>>> a.cache()> >>>>> a.cache() // no-op> >>>>> b.print() // Just to make sure a is cached> >>>>>> >>>>> c = a.filter(...) // Optimizer decides whether the cache or DAG >> should be> >>>>> used> >>>>> d = a.filter(...) // Optimizer decides whether the cache or DAG >> should be> >>>>> used> >>>>>> >>>>> a.unCache()> >>>>> a.unCache() // no-op> >>>>>> >>>>> In case 1, semantic wise, optimizer lose the option to choose >> between DAG> >>>>> and cache. And the unCache() call becomes tricky.> >>>>> In case 2, users do not need to worry about whether cache or DAG is >> used.> >>>>> And the unCache() semantic is clear. However, the caveat is that >> users> >>>>> cannot explicitly ignore the cache.> >>>>>> >>>>> In order to address the issues mentioned in case 2 and inspired by >> the> >>>>> discussion so far, I am thinking about using hint to allow user> >>>> explicitly> >>>>> ignore cache. Although we do not have hint yet, but we probably >> should> >>>> have> >>>>> one. So the code becomes:> >>>>>> >>>>> *Case 3: returning this table*> >>>>> b = a.map()> >>>>> a.cache()> >>>>> a.cache() // no-op> >>>>> b.print() // Just to make sure a is cached> >>>>>> >>>>> c = a.filter(...) // Optimizer decides whether the cache or DAG >> should be> >>>>> used> >>>>> d = a.hint("ignoreCache").filter(...) // DAG will be used instead of >> the> >>>>> cache.> >>>>>> >>>>> a.unCache()> >>>>> a.unCache() // no-op> >>>>>> >>>>> We could also let cache() return this table to allow chained method> >>>> calls.> >>>>> Do you think this API addresses the concerns?> >>>>>> >>>>> Thanks,> >>>>>> >>>>> Jiangjie (Becket) Qin> >>>>>> >>>>>> >>>>> On Wed, Dec 5, 2018 at 10:55 AM Jark Wu <im...@gmail.com> wrote:> >>>>>> >>>>>> Hi,> >>>>>>> >>>>>> All the recent discussions are focused on whether there is a >> problem if> >>>>>> cache() not return a Table.> >>>>>> It seems that returning a Table explicitly is more clear (and >> safe?).> >>>>>>> >>>>>> So whether there are any problems if cache() returns a Table? >> @Becket> >>>>>>> >>>>>> Best,> >>>>>> Jark> >>>>>>> >>>>>> On Tue, 4 Dec 2018 at 22:27, Till Rohrmann <tr...@apache.org>> >>>> wrote:> >>>>>>> >>>>>>> It's true that b, c, d and e will all read from the original DAG >> that> >>>>>>> generates a. But all subsequent operators (when running multiple> >>>> queries)> >>>>>>> which reference cachedTableA should not need to reproduce `a` but> >>>>>> directly> >>>>>>> consume the intermediate result.> >>>>>>>> >>>>>>> Conceptually one could think of cache() as introducing a caching> >>>> operator> >>>>>>> from which you need to consume from if you want to benefit from >> the> >>>>>> caching> >>>>>>> functionality.> >>>>>>>> >>>>>>> I agree, ideally the optimizer makes this kind of decision which> >>>>>>> intermediate result should be cached. But especially when >> executing> >>>>>> ad-hoc> >>>>>>> queries the user might better know which results need to be >> cached> >>>>>> because> >>>>>>> Flink might not see the full DAG. In that sense, I would consider >> the> >>>>>>> cache() method as a hint for the optimizer. Of course, in the >> future we> >>>>>>> might add functionality which tries to automatically cache >> results> >>>> (e.g.> >>>>>>> caching the latest intermediate results until so and so much space >> is> >>>>>>> used). But this should hopefully not contradict with `CachedTable> >>>>>> cache()`.> >>>>>>>> >>>>>>> Cheers,> >>>>>>> Till> >>>>>>>> >>>>>>> On Tue, Dec 4, 2018 at 2:33 PM Becket Qin <be...@gmail.com>> >>>> wrote:> >>>>>>>> >>>>>>>> Hi Till,> >>>>>>>>> >>>>>>>> Thanks for the clarification. I am still a little confused.> >>>>>>>>> >>>>>>>> If cache() returns a CachedTable, the example might become:> >>>>>>>>> >>>>>>>> b = a.map(...)> >>>>>>>> c = a.map(...)> >>>>>>>>> >>>>>>>> cachedTableA = a.cache()> >>>>>>>> d = cachedTableA.map(...)> >>>>>>>> e = a.map()> >>>>>>>>> >>>>>>>> In the above case, if cache() is lazily evaluated, b, c, d and e >> are> >>>>>> all> >>>>>>>> going to be reading from the original DAG that generates a. But >> with a> >>>>>>>> naive expectation, d should be reading from the cache. This seems >> not> >>>>>>>> solving the potential confusion you raised, right?> >>>>>>>>> >>>>>>>> Just to be clear, my understanding are all based on the >> assumption> >>>> that> >>>>>>> the> >>>>>>>> tables are immutable. Therefore, after a.cache(), a the >> c*achedTableA*> >>>>>>> and> >>>>>>>> original table *a * should be completely interchangeable.> >>>>>>>>> >>>>>>>> That said, I think a valid argument is optimization. There are >> indeed> >>>>>>> cases> >>>>>>>> that reading from the original DAG could be faster than reading >> from> >>>>>> the> >>>>>>>> cache. For example, in the following example:> >>>>>>>>> >>>>>>>> a.filter(f1' > 100)> >>>>>>>> a.cache()> >>>>>>>> b = a.filter(f1' < 100)> >>>>>>>>> >>>>>>>> Ideally the optimizer should be intelligent enough to decide >> which way> >>>>>> is> >>>>>>>> faster, without user intervention. In this case, it will identify >> that> >>>>>> b> >>>>>>>> would just be an empty table, thus skip reading from the cache> >>>>>>> completely.> >>>>>>>> But I agree that returning a CachedTable would give user the >> control> >>>> of> >>>>>>>> when to use cache, even though I still feel that letting the >> optimizer> >>>>>>>> handle this is a better option in long run.> >>>>>>>>> >>>>>>>> Thanks,> >>>>>>>>> >>>>>>>> Jiangjie (Becket) Qin> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> On Tue, Dec 4, 2018 at 6:51 PM Till Rohrmann <tr...@apache.org>> >>>>>>> wrote:> >>>>>>>>> >>>>>>>>> Yes you are right Becket that it still depends on the actual> >>>>>> execution> >>>>>>> of> >>>>>>>>> the job whether a consumer reads from a cached result or not.> >>>>>>>>>> >>>>>>>>> My point was actually about the properties of a (cached vs.> >>>>>> non-cached)> >>>>>>>> and> >>>>>>>>> not about the execution. I would not make cache trigger the >> execution> >>>>>>> of> >>>>>>>>> the job because one loses some flexibility by eagerly triggering >> the> >>>>>>>>> execution.> >>>>>>>>>> >>>>>>>>> I tried to argue for an explicit CachedTable which is returned >> by the> >>>>>>>>> cache() method like Piotr did in order to make the API more >> explicit.> >>>>>>>>>> >>>>>>>>> Cheers,> >>>>>>>>> Till> >>>>>>>>>> >>>>>>>>> On Mon, Dec 3, 2018 at 4:23 PM Becket Qin <be...@gmail.com>> >>>>>>> wrote:> >>>>>>>>>> >>>>>>>>>> Hi Till,> >>>>>>>>>>> >>>>>>>>>> That is a good example. Just a minor correction, in this case, >> b, c> >>>>>>>> and d> >>>>>>>>>> will all consume from a non-cached a. This is because cache >> will> >>>>>> only> >>>>>>>> be> >>>>>>>>>> created on the very first job submission that generates the >> table> >>>>>> to> >>>>>>> be> >>>>>>>>>> cached.> >>>>>>>>>>> >>>>>>>>>> If I understand correctly, this is example is about whether> >>>>>> .cache()> >>>>>>>>> method> >>>>>>>>>> should be eagerly evaluated or lazily evaluated. In another >> word,> >>>>>> if> >>>>>>>>>> cache() method actually triggers a job that creates the cache,> >>>>>> there> >>>>>>>> will> >>>>>>>>>> be no such confusion. Is that right?> >>>>>>>>>>> >>>>>>>>>> In the example, although d will not consume from the cached >> Table> >>>>>>> while> >>>>>>>>> it> >>>>>>>>>> looks supposed to, from correctness perspective the code will >> still> >>>>>>>>> return> >>>>>>>>>> correct result, assuming that tables are immutable.> >>>>>>>>>>> >>>>>>>>>> Personally I feel it is OK because users probably won't really> >>>>>> worry> >>>>>>>>> about> >>>>>>>>>> whether the table is cached or not. And lazy cache could avoid >> some> >>>>>>>>>> unnecessary caching if a cached table is never created in the >> user> >>>>>>>>>> application. But I am not opposed to do eager evaluation of >> cache.> >>>>>>>>>>> >>>>>>>>>> Thanks,> >>>>>>>>>>> >>>>>>>>>> Jiangjie (Becket) Qin> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> On Mon >> [message truncated...]