Hi Seth,

Thanks for the feedback. Re-caching makes sense to me. Piotr and I had some
offline discussion and we generally reached consensus on the following API:

{
  /**
    * Cache this table to builtin table service or the specified customized
table service.
    *
    * This method provides a hint to Flink that the current table maybe
reused later so a
    * cache should be created to avoid regenerating this table.
    *
    * The following code snippet gives an example of how this method could
be used.
    *
    * {{{
    *   val t = tEnv.fromCollection(data).as('country, 'color, 'count)
    *
    *   val t1 = t.filter('count < 100).cache()
    *   // t1 is cached after it is computed for the first time.
    *   val x = t1.collect().size
    *
    *   // When t1 is used again to compute t2, it may not be re-computed.
    *   val t2 = t1.groupBy('country).select('country, 'count.sum as 'sum)
    *   val res2 = t2.collect()
    *   res2.foreach(println)
    *
    *   // Similarly when t1 is used again to compute t3, it may not be
re-computed.
    *   val t3 = t1.groupBy('color).select('color, 'count.avg as 'avg)
    *   val res3 = t3.collect()
    *   res3.foreach(println)
    *
    * }}}
    *
    * @note Flink optimizer may decide to not use the cache if doing that
will accelerate the
    * processing, or if the cache is no longer available for reasons such
as the cache has
    * been invalidated.
    * @note The table cache could be create lazily. That means the cache
may be created at
    * the first time when the cached table is computed.
    * @note The table cache will be cleared when the user program exits.
    *
    * @return the current table with a cache hint. The original table
reference is not modified
    *               by the execution of this method.
    */
  def cache(): Table

  /**
    * Manually invalidate the cache of this table to release the physical
resources. Users are
    * not required to invoke this method to release physical resource
unless they want to. The
    * table caches are cleared when user program exits.
    *
    * @note After invalidated, the cache may be re-created if this table is
used again.
    */
  def invalidateCache(): Unit
}

In the future, after we introduce automatic caching, the table may also be
automatically cached.

In summary the final state we are looking at is following:
1. A table could be cached either manually or automatically.
2. If cache exists, Flink may or may not use it, depending on whether that
will accelerate the execution.
3. In some rare use cases, an hint of could be used to explicitly ask Flink
to ignore cache.

I'll document all the discussions we have had around the API. If there is
no further concerns over this API, I'll convert it to a FLIP.

Thanks,

Jiangjie (Becket) Qin

On Thu, Jan 10, 2019 at 9:08 PM Seth Wiesman <s...@data-artisans.com> wrote:

> I spoke to Piotr a little bit offline and I wanted to comment with a
> summary of our discussion and what I believe is most intuitive cache model
> from a users perspective.
>
> (I am making up some class names here, not looking to bike shed feel free
> to change the names how ever you see fit).
>
> A cache is by definition an optimization, something used to store
> intermediate results for faster / more performant downstream computation.
> Therefore, as a Flink user I would not expect it to change the semantics of
> my application, I would expect it to be rebuildable, and I do not expect to
> know how it works under the hood. With there principles in mind I feel the
> most intuitive api would be as follows:
>
> // Some table
> Table a = . . .
>
> // Signal that we would like to cache the table
> // this is lazy and does not force any computation.
> CachedTable cachedA = a.cache()
>
> // The first operation against the cache.
> // This count will trigger reading input
> // data and building the cache.
> cachedA.count()
>
> // Operates against the cache, no operations
> // before a.cache are performed.
> cachedA.sum()
>
> // This does not operate against the cache,
> // it will trigger reading data from source
> // and performing a full computation
> a.min()
>
> // Invalidates the cache, releasing all
> // underlying resources
> cachedA.invalidateCache()
>
> // Rebuilds the cache. Since caches are recomputable
> // this should not be an error, it will simply be a more
> // expensive operation than if we had not invalidated the cache.
> cachedA.min()
>
> This model leads to 2 nice properties:
>
> 1) The same cache can be shared across multiple invocations of
> Table#cache. Because the cache can always be rebuilt one code path
> invalidating the cache will not break others. Cache’s are simply and
> optimization and rebuilding the cache is not an error but an expected
> property, semantics never change.
>
> 2) When automatic caching is implemented it can follow this same model.
>    a) A single cache is created when the optimizer determines it is
> necessary.
>    b) If the user decides to explicitly cache a table which has already
> been implicitly cached under the hood then calling Table#cache will just
> return that pre-built cache.
>    c) If either the user or optimizer decide to invalidate the cache then
> neither code path will break the other, the cache is simply destroyed and
> will be rebuilt the next time is needed.
>
> Of course caches are still automatically cleaned up when user sessions are
> terminated.
>
> Seth
>
> On 2018/12/11 04:10:21, Becket Qin <b...@gmail.com> wrote:
> > Hi Piotrek,>
> >
> > Thanks for the reply. Thought about it again, I might have
> misunderstood>
> > your proposal in earlier emails. Returning a CachedTable might not be a
> bad>
> > idea.>
> >
> > I was more concerned about the semantic and its intuitiveness when a>
> > CachedTable is returned. i..e, if cache() returns CachedTable. What are
> the>
> > semantic in the following code:>
> > {>
> >   val cachedTable = a.cache()>
> >   val b = cachedTable.select(...)>
> >   val c = a.select(...)>
> > }>
> > What is the difference between b and c? At the first glance, I see two>
> > options:>
> >
> > Semantic 1. b uses cachedTable as user demanded so. c uses original DAG
> as>
> > user demanded so. In this case, the optimizer has no chance to
> optimize.>
> > Semantic 2. b uses cachedTable as user demanded so. c leaves the
> optimizer>
> > to choose whether the cache or DAG should be used. In this case, user
> lose>
> > the option to NOT use cache.>
> >
> > As you can see, neither of the options seem perfect. However, I guess
> you>
> > and Till are proposing the third option:>
> >
> > Semantic 3. b leaves the optimizer to choose whether cache or DAG should
> be>
> > used. c always use the DAG.>
> >
> > This does address all the concerns. It is just that from intuitiveness>
> > perspective, I found that asking user to explicitly use a CachedTable
> while>
> > the optimizer might choose to ignore is a little weird. That was why I
> did>
> > not think about that semantic. But given there is material benefit, I
> think>
> > this semantic is acceptable.>
> >
> > 1. If we want to let optimiser make decisions whether to use cache or
> not,>
> > > then why do we need “void cache()” method at all? Would It  “increase”
> the>
> > > chance of using the cache? That’s sounds strange. What would be the>
> > > mechanism of deciding whether to use the cache or not? If we want to>
> > > introduce such kind  automated optimisations of “plan nodes
> deduplication”>
> > > I would turn it on globally, not per table, and let the optimiser do
> all of>
> > > the work.>
> > > 2. We do not have statistics at the moment for any use/not use cache>
> > > decision.>
> > > 3. Even if we had, I would be veeerryy sceptical whether such cost
> based>
> > > optimisations would work properly and I would still insist first on>
> > > providing explicit caching mechanism (`CachedTable cache()`)>
> > >>
> > We are absolutely on the same page here. An explicit cache() method is>
> > necessary not only because optimizer may not be able to make the right>
> > decision, but also because of the nature of interactive programming.
> For>
> > example, if users write the following code in Scala shell:>
> >   val b = a.select(...)>
> >   val c = b.select(...)>
> >   val d = c.select(...).writeToSink(...)>
> >   tEnv.execute()>
> > There is no way optimizer will know whether b or c will be used in
> later>
> > code, unless users hint explicitly.>
> >
> > At the same time I’m not sure if you have responded to our objections
> of>
> > > `void cache()` being implicit/having side effects, which me, Jark,
> Fabian,>
> > > Till and I think also Shaoxuan are supporting.>
> >
> > Is there any other side effects if we use semantic 3 mentioned above?>
> >
> > Thanks,>
> >
> > JIangjie (Becket) Qin>
> >
> >
> > On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski <pi...@data-artisans.com>>
>
> > wrote:>
> >
> > > Hi Becket,>
> > >>
> > > Sorry for not responding long time.>
> > >>
> > > Regarding case1.>
> > >>
> > > There wouldn’t be no “a.unCache()” method, but I would expect only>
> > > `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t affect>
> > > `cachedTableA2`. Just as in any other database dropping modifying one>
> > > independent table/materialised view does not affect others.>
> > >>
> > > > What I meant is that assuming there is already a cached table,
> ideally>
> > > users need>
> > > > not to specify whether the next query should read from the cache or
> use>
> > > the>
> > > > original DAG. This should be decided by the optimizer.>
> > >>
> > > 1. If we want to let optimiser make decisions whether to use cache or
> not,>
> > > then why do we need “void cache()” method at all? Would It  “increase”
> the>
> > > chance of using the cache? That’s sounds strange. What would be the>
> > > mechanism of deciding whether to use the cache or not? If we want to>
> > > introduce such kind  automated optimisations of “plan nodes
> deduplication”>
> > > I would turn it on globally, not per table, and let the optimiser do
> all of>
> > > the work.>
> > > 2. We do not have statistics at the moment for any use/not use cache>
> > > decision.>
> > > 3. Even if we had, I would be veeerryy sceptical whether such cost
> based>
> > > optimisations would work properly and I would still insist first on>
> > > providing explicit caching mechanism (`CachedTable cache()`)>
> > > 4. As Till wrote, having explicit `CachedTable cache()` doesn’t
> contradict>
> > > future work on automated cost based caching.>
> > >>
> > >>
> > > At the same time I’m not sure if you have responded to our objections
> of>
> > > `void cache()` being implicit/having side effects, which me, Jark,
> Fabian,>
> > > Till and I think also Shaoxuan are supporting.>
> > >>
> > > Piotrek>
> > >>
> > > > On 5 Dec 2018, at 12:42, Becket Qin <be...@gmail.com> wrote:>
> > > >>
> > > > Hi Till,>
> > > >>
> > > > It is true that after the first job submission, there will be no>
> > > ambiguity>
> > > > in terms of whether a cached table is used or not. That is the same
> for>
> > > the>
> > > > cache() without returning a CachedTable.>
> > > >>
> > > > Conceptually one could think of cache() as introducing a caching
> operator>
> > > >> from which you need to consume from if you want to benefit from
> the>
> > > caching>
> > > >> functionality.>
> > > >>
> > > > I am thinking a little differently. I think it is a hint (as you>
> > > mentioned>
> > > > later) instead of a new operator. I'd like to be careful about the>
> > > semantic>
> > > > of the API. A hint is a property set on an existing operator, but is
> not>
> > > > itself an operator as it does not really manipulate the data.>
> > > >>
> > > > I agree, ideally the optimizer makes this kind of decision which>
> > > >> intermediate result should be cached. But especially when
> executing>
> > > ad-hoc>
> > > >> queries the user might better know which results need to be cached>
> > > because>
> > > >> Flink might not see the full DAG. In that sense, I would consider
> the>
> > > >> cache() method as a hint for the optimizer. Of course, in the
> future we>
> > > >> might add functionality which tries to automatically cache results
> (e.g.>
> > > >> caching the latest intermediate results until so and so much space
> is>
> > > >> used). But this should hopefully not contradict with `CachedTable>
> > > cache()`.>
> > > >>
> > > > I agree that cache() method is needed for exactly the reason you>
> > > mentioned,>
> > > > i.e. Flink cannot predict what users are going to write later, so
> users>
> > > > need to tell Flink explicitly that this table will be used later.
> What I>
> > > > meant is that assuming there is already a cached table, ideally
> users>
> > > need>
> > > > not to specify whether the next query should read from the cache or
> use>
> > > the>
> > > > original DAG. This should be decided by the optimizer.>
> > > >>
> > > > To explain the difference between returning / not returning a>
> > > CachedTable,>
> > > > I want compare the following two case:>
> > > >>
> > > > *Case 1:  returning a CachedTable*>
> > > > b = a.map(...)>
> > > > val cachedTableA1 = a.cache()>
> > > > val cachedTableA2 = a.cache()>
> > > > b.print() // Just to make sure a is cached.>
> > > >>
> > > > c = a.filter(...) // User specify that the original DAG is used? Or
> the>
> > > > optimizer decides whether DAG or cache should be used?>
> > > > d = cachedTableA1.filter() // User specify that the cached table is
> used.>
> > > >>
> > > > a.unCache() // Can cachedTableA still be used afterwards?>
> > > > cachedTableA1.uncache() // Can cachedTableA2 still be used?>
> > > >>
> > > > *Case 2: not returning a CachedTable*>
> > > > b = a.map()>
> > > > a.cache()>
> > > > a.cache() // no-op>
> > > > b.print() // Just to make sure a is cached>
> > > >>
> > > > c = a.filter(...) // Optimizer decides whether the cache or DAG
> should be>
> > > > used>
> > > > d = a.filter(...) // Optimizer decides whether the cache or DAG
> should be>
> > > > used>
> > > >>
> > > > a.unCache()>
> > > > a.unCache() // no-op>
> > > >>
> > > > In case 1, semantic wise, optimizer lose the option to choose
> between DAG>
> > > > and cache. And the unCache() call becomes tricky.>
> > > > In case 2, users do not need to worry about whether cache or DAG is
> used.>
> > > > And the unCache() semantic is clear. However, the caveat is that
> users>
> > > > cannot explicitly ignore the cache.>
> > > >>
> > > > In order to address the issues mentioned in case 2 and inspired by
> the>
> > > > discussion so far, I am thinking about using hint to allow user>
> > > explicitly>
> > > > ignore cache. Although we do not have hint yet, but we probably
> should>
> > > have>
> > > > one. So the code becomes:>
> > > >>
> > > > *Case 3: returning this table*>
> > > > b = a.map()>
> > > > a.cache()>
> > > > a.cache() // no-op>
> > > > b.print() // Just to make sure a is cached>
> > > >>
> > > > c = a.filter(...) // Optimizer decides whether the cache or DAG
> should be>
> > > > used>
> > > > d = a.hint("ignoreCache").filter(...) // DAG will be used instead of
> the>
> > > > cache.>
> > > >>
> > > > a.unCache()>
> > > > a.unCache() // no-op>
> > > >>
> > > > We could also let cache() return this table to allow chained method>
> > > calls.>
> > > > Do you think this API addresses the concerns?>
> > > >>
> > > > Thanks,>
> > > >>
> > > > Jiangjie (Becket) Qin>
> > > >>
> > > >>
> > > > On Wed, Dec 5, 2018 at 10:55 AM Jark Wu <im...@gmail.com> wrote:>
> > > >>
> > > >> Hi,>
> > > >>>
> > > >> All the recent discussions are focused on whether there is a
> problem if>
> > > >> cache() not return a Table.>
> > > >> It seems that returning a Table explicitly is more clear (and
> safe?).>
> > > >>>
> > > >> So whether there are any problems if cache() returns a Table?
> @Becket>
> > > >>>
> > > >> Best,>
> > > >> Jark>
> > > >>>
> > > >> On Tue, 4 Dec 2018 at 22:27, Till Rohrmann <tr...@apache.org>>
> > > wrote:>
> > > >>>
> > > >>> It's true that b, c, d and e will all read from the original DAG
> that>
> > > >>> generates a. But all subsequent operators (when running multiple>
> > > queries)>
> > > >>> which reference cachedTableA should not need to reproduce `a` but>
> > > >> directly>
> > > >>> consume the intermediate result.>
> > > >>>>
> > > >>> Conceptually one could think of cache() as introducing a caching>
> > > operator>
> > > >>> from which you need to consume from if you want to benefit from
> the>
> > > >> caching>
> > > >>> functionality.>
> > > >>>>
> > > >>> I agree, ideally the optimizer makes this kind of decision which>
> > > >>> intermediate result should be cached. But especially when
> executing>
> > > >> ad-hoc>
> > > >>> queries the user might better know which results need to be
> cached>
> > > >> because>
> > > >>> Flink might not see the full DAG. In that sense, I would consider
> the>
> > > >>> cache() method as a hint for the optimizer. Of course, in the
> future we>
> > > >>> might add functionality which tries to automatically cache
> results>
> > > (e.g.>
> > > >>> caching the latest intermediate results until so and so much space
> is>
> > > >>> used). But this should hopefully not contradict with `CachedTable>
> > > >> cache()`.>
> > > >>>>
> > > >>> Cheers,>
> > > >>> Till>
> > > >>>>
> > > >>> On Tue, Dec 4, 2018 at 2:33 PM Becket Qin <be...@gmail.com>>
> > > wrote:>
> > > >>>>
> > > >>>> Hi Till,>
> > > >>>>>
> > > >>>> Thanks for the clarification. I am still a little confused.>
> > > >>>>>
> > > >>>> If cache() returns a CachedTable, the example might become:>
> > > >>>>>
> > > >>>> b = a.map(...)>
> > > >>>> c = a.map(...)>
> > > >>>>>
> > > >>>> cachedTableA = a.cache()>
> > > >>>> d = cachedTableA.map(...)>
> > > >>>> e = a.map()>
> > > >>>>>
> > > >>>> In the above case, if cache() is lazily evaluated, b, c, d and e
> are>
> > > >> all>
> > > >>>> going to be reading from the original DAG that generates a. But
> with a>
> > > >>>> naive expectation, d should be reading from the cache. This seems
> not>
> > > >>>> solving the potential confusion you raised, right?>
> > > >>>>>
> > > >>>> Just to be clear, my understanding are all based on the
> assumption>
> > > that>
> > > >>> the>
> > > >>>> tables are immutable. Therefore, after a.cache(), a the
> c*achedTableA*>
> > > >>> and>
> > > >>>> original table *a * should be completely interchangeable.>
> > > >>>>>
> > > >>>> That said, I think a valid argument is optimization. There are
> indeed>
> > > >>> cases>
> > > >>>> that reading from the original DAG could be faster than reading
> from>
> > > >> the>
> > > >>>> cache. For example, in the following example:>
> > > >>>>>
> > > >>>> a.filter(f1' > 100)>
> > > >>>> a.cache()>
> > > >>>> b = a.filter(f1' < 100)>
> > > >>>>>
> > > >>>> Ideally the optimizer should be intelligent enough to decide
> which way>
> > > >> is>
> > > >>>> faster, without user intervention. In this case, it will identify
> that>
> > > >> b>
> > > >>>> would just be an empty table, thus skip reading from the cache>
> > > >>> completely.>
> > > >>>> But I agree that returning a CachedTable would give user the
> control>
> > > of>
> > > >>>> when to use cache, even though I still feel that letting the
> optimizer>
> > > >>>> handle this is a better option in long run.>
> > > >>>>>
> > > >>>> Thanks,>
> > > >>>>>
> > > >>>> Jiangjie (Becket) Qin>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>> On Tue, Dec 4, 2018 at 6:51 PM Till Rohrmann <tr...@apache.org>>
> > > >>> wrote:>
> > > >>>>>
> > > >>>>> Yes you are right Becket that it still depends on the actual>
> > > >> execution>
> > > >>> of>
> > > >>>>> the job whether a consumer reads from a cached result or not.>
> > > >>>>>>
> > > >>>>> My point was actually about the properties of a (cached vs.>
> > > >> non-cached)>
> > > >>>> and>
> > > >>>>> not about the execution. I would not make cache trigger the
> execution>
> > > >>> of>
> > > >>>>> the job because one loses some flexibility by eagerly triggering
> the>
> > > >>>>> execution.>
> > > >>>>>>
> > > >>>>> I tried to argue for an explicit CachedTable which is returned
> by the>
> > > >>>>> cache() method like Piotr did in order to make the API more
> explicit.>
> > > >>>>>>
> > > >>>>> Cheers,>
> > > >>>>> Till>
> > > >>>>>>
> > > >>>>> On Mon, Dec 3, 2018 at 4:23 PM Becket Qin <be...@gmail.com>>
> > > >>> wrote:>
> > > >>>>>>
> > > >>>>>> Hi Till,>
> > > >>>>>>>
> > > >>>>>> That is a good example. Just a minor correction, in this case,
> b, c>
> > > >>>> and d>
> > > >>>>>> will all consume from a non-cached a. This is because cache
> will>
> > > >> only>
> > > >>>> be>
> > > >>>>>> created on the very first job submission that generates the
> table>
> > > >> to>
> > > >>> be>
> > > >>>>>> cached.>
> > > >>>>>>>
> > > >>>>>> If I understand correctly, this is example is about whether>
> > > >> .cache()>
> > > >>>>> method>
> > > >>>>>> should be eagerly evaluated or lazily evaluated. In another
> word,>
> > > >> if>
> > > >>>>>> cache() method actually triggers a job that creates the cache,>
> > > >> there>
> > > >>>> will>
> > > >>>>>> be no such confusion. Is that right?>
> > > >>>>>>>
> > > >>>>>> In the example, although d will not consume from the cached
> Table>
> > > >>> while>
> > > >>>>> it>
> > > >>>>>> looks supposed to, from correctness perspective the code will
> still>
> > > >>>>> return>
> > > >>>>>> correct result, assuming that tables are immutable.>
> > > >>>>>>>
> > > >>>>>> Personally I feel it is OK because users probably won't really>
> > > >> worry>
> > > >>>>> about>
> > > >>>>>> whether the table is cached or not. And lazy cache could avoid
> some>
> > > >>>>>> unnecessary caching if a cached table is never created in the
> user>
> > > >>>>>> application. But I am not opposed to do eager evaluation of
> cache.>
> > > >>>>>>>
> > > >>>>>> Thanks,>
> > > >>>>>>>
> > > >>>>>> Jiangjie (Becket) Qin>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>> On Mon
> [message truncated...]

Reply via email to