Hey Becket,

+1 From my side

Piotrek

> On 14 Jan 2019, at 14:43, Becket Qin <becket....@gmail.com> wrote:
> 
> Hi Seth,
> 
> Thanks for the feedback. Re-caching makes sense to me. Piotr and I had some
> offline discussion and we generally reached consensus on the following API:
> 
> {
>  /**
>    * Cache this table to builtin table service or the specified customized
> table service.
>    *
>    * This method provides a hint to Flink that the current table maybe
> reused later so a
>    * cache should be created to avoid regenerating this table.
>    *
>    * The following code snippet gives an example of how this method could
> be used.
>    *
>    * {{{
>    *   val t = tEnv.fromCollection(data).as('country, 'color, 'count)
>    *
>    *   val t1 = t.filter('count < 100).cache()
>    *   // t1 is cached after it is computed for the first time.
>    *   val x = t1.collect().size
>    *
>    *   // When t1 is used again to compute t2, it may not be re-computed.
>    *   val t2 = t1.groupBy('country).select('country, 'count.sum as 'sum)
>    *   val res2 = t2.collect()
>    *   res2.foreach(println)
>    *
>    *   // Similarly when t1 is used again to compute t3, it may not be
> re-computed.
>    *   val t3 = t1.groupBy('color).select('color, 'count.avg as 'avg)
>    *   val res3 = t3.collect()
>    *   res3.foreach(println)
>    *
>    * }}}
>    *
>    * @note Flink optimizer may decide to not use the cache if doing that
> will accelerate the
>    * processing, or if the cache is no longer available for reasons such
> as the cache has
>    * been invalidated.
>    * @note The table cache could be create lazily. That means the cache
> may be created at
>    * the first time when the cached table is computed.
>    * @note The table cache will be cleared when the user program exits.
>    *
>    * @return the current table with a cache hint. The original table
> reference is not modified
>    *               by the execution of this method.
>    */
>  def cache(): Table
> 
>  /**
>    * Manually invalidate the cache of this table to release the physical
> resources. Users are
>    * not required to invoke this method to release physical resource
> unless they want to. The
>    * table caches are cleared when user program exits.
>    *
>    * @note After invalidated, the cache may be re-created if this table is
> used again.
>    */
>  def invalidateCache(): Unit
> }
> 
> In the future, after we introduce automatic caching, the table may also be
> automatically cached.
> 
> In summary the final state we are looking at is following:
> 1. A table could be cached either manually or automatically.
> 2. If cache exists, Flink may or may not use it, depending on whether that
> will accelerate the execution.
> 3. In some rare use cases, an hint of could be used to explicitly ask Flink
> to ignore cache.
> 
> I'll document all the discussions we have had around the API. If there is
> no further concerns over this API, I'll convert it to a FLIP.
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> On Thu, Jan 10, 2019 at 9:08 PM Seth Wiesman <s...@data-artisans.com> wrote:
> 
>> I spoke to Piotr a little bit offline and I wanted to comment with a
>> summary of our discussion and what I believe is most intuitive cache model
>> from a users perspective.
>> 
>> (I am making up some class names here, not looking to bike shed feel free
>> to change the names how ever you see fit).
>> 
>> A cache is by definition an optimization, something used to store
>> intermediate results for faster / more performant downstream computation.
>> Therefore, as a Flink user I would not expect it to change the semantics of
>> my application, I would expect it to be rebuildable, and I do not expect to
>> know how it works under the hood. With there principles in mind I feel the
>> most intuitive api would be as follows:
>> 
>> // Some table
>> Table a = . . .
>> 
>> // Signal that we would like to cache the table
>> // this is lazy and does not force any computation.
>> CachedTable cachedA = a.cache()
>> 
>> // The first operation against the cache.
>> // This count will trigger reading input
>> // data and building the cache.
>> cachedA.count()
>> 
>> // Operates against the cache, no operations
>> // before a.cache are performed.
>> cachedA.sum()
>> 
>> // This does not operate against the cache,
>> // it will trigger reading data from source
>> // and performing a full computation
>> a.min()
>> 
>> // Invalidates the cache, releasing all
>> // underlying resources
>> cachedA.invalidateCache()
>> 
>> // Rebuilds the cache. Since caches are recomputable
>> // this should not be an error, it will simply be a more
>> // expensive operation than if we had not invalidated the cache.
>> cachedA.min()
>> 
>> This model leads to 2 nice properties:
>> 
>> 1) The same cache can be shared across multiple invocations of
>> Table#cache. Because the cache can always be rebuilt one code path
>> invalidating the cache will not break others. Cache’s are simply and
>> optimization and rebuilding the cache is not an error but an expected
>> property, semantics never change.
>> 
>> 2) When automatic caching is implemented it can follow this same model.
>>   a) A single cache is created when the optimizer determines it is
>> necessary.
>>   b) If the user decides to explicitly cache a table which has already
>> been implicitly cached under the hood then calling Table#cache will just
>> return that pre-built cache.
>>   c) If either the user or optimizer decide to invalidate the cache then
>> neither code path will break the other, the cache is simply destroyed and
>> will be rebuilt the next time is needed.
>> 
>> Of course caches are still automatically cleaned up when user sessions are
>> terminated.
>> 
>> Seth
>> 
>> On 2018/12/11 04:10:21, Becket Qin <b...@gmail.com> wrote:
>>> Hi Piotrek,>
>>> 
>>> Thanks for the reply. Thought about it again, I might have
>> misunderstood>
>>> your proposal in earlier emails. Returning a CachedTable might not be a
>> bad>
>>> idea.>
>>> 
>>> I was more concerned about the semantic and its intuitiveness when a>
>>> CachedTable is returned. i..e, if cache() returns CachedTable. What are
>> the>
>>> semantic in the following code:>
>>> {>
>>>  val cachedTable = a.cache()>
>>>  val b = cachedTable.select(...)>
>>>  val c = a.select(...)>
>>> }>
>>> What is the difference between b and c? At the first glance, I see two>
>>> options:>
>>> 
>>> Semantic 1. b uses cachedTable as user demanded so. c uses original DAG
>> as>
>>> user demanded so. In this case, the optimizer has no chance to
>> optimize.>
>>> Semantic 2. b uses cachedTable as user demanded so. c leaves the
>> optimizer>
>>> to choose whether the cache or DAG should be used. In this case, user
>> lose>
>>> the option to NOT use cache.>
>>> 
>>> As you can see, neither of the options seem perfect. However, I guess
>> you>
>>> and Till are proposing the third option:>
>>> 
>>> Semantic 3. b leaves the optimizer to choose whether cache or DAG should
>> be>
>>> used. c always use the DAG.>
>>> 
>>> This does address all the concerns. It is just that from intuitiveness>
>>> perspective, I found that asking user to explicitly use a CachedTable
>> while>
>>> the optimizer might choose to ignore is a little weird. That was why I
>> did>
>>> not think about that semantic. But given there is material benefit, I
>> think>
>>> this semantic is acceptable.>
>>> 
>>> 1. If we want to let optimiser make decisions whether to use cache or
>> not,>
>>>> then why do we need “void cache()” method at all? Would It  “increase”
>> the>
>>>> chance of using the cache? That’s sounds strange. What would be the>
>>>> mechanism of deciding whether to use the cache or not? If we want to>
>>>> introduce such kind  automated optimisations of “plan nodes
>> deduplication”>
>>>> I would turn it on globally, not per table, and let the optimiser do
>> all of>
>>>> the work.>
>>>> 2. We do not have statistics at the moment for any use/not use cache>
>>>> decision.>
>>>> 3. Even if we had, I would be veeerryy sceptical whether such cost
>> based>
>>>> optimisations would work properly and I would still insist first on>
>>>> providing explicit caching mechanism (`CachedTable cache()`)>
>>>>> 
>>> We are absolutely on the same page here. An explicit cache() method is>
>>> necessary not only because optimizer may not be able to make the right>
>>> decision, but also because of the nature of interactive programming.
>> For>
>>> example, if users write the following code in Scala shell:>
>>>  val b = a.select(...)>
>>>  val c = b.select(...)>
>>>  val d = c.select(...).writeToSink(...)>
>>>  tEnv.execute()>
>>> There is no way optimizer will know whether b or c will be used in
>> later>
>>> code, unless users hint explicitly.>
>>> 
>>> At the same time I’m not sure if you have responded to our objections
>> of>
>>>> `void cache()` being implicit/having side effects, which me, Jark,
>> Fabian,>
>>>> Till and I think also Shaoxuan are supporting.>
>>> 
>>> Is there any other side effects if we use semantic 3 mentioned above?>
>>> 
>>> Thanks,>
>>> 
>>> JIangjie (Becket) Qin>
>>> 
>>> 
>>> On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski <pi...@data-artisans.com>>
>> 
>>> wrote:>
>>> 
>>>> Hi Becket,>
>>>>> 
>>>> Sorry for not responding long time.>
>>>>> 
>>>> Regarding case1.>
>>>>> 
>>>> There wouldn’t be no “a.unCache()” method, but I would expect only>
>>>> `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t affect>
>>>> `cachedTableA2`. Just as in any other database dropping modifying one>
>>>> independent table/materialised view does not affect others.>
>>>>> 
>>>>> What I meant is that assuming there is already a cached table,
>> ideally>
>>>> users need>
>>>>> not to specify whether the next query should read from the cache or
>> use>
>>>> the>
>>>>> original DAG. This should be decided by the optimizer.>
>>>>> 
>>>> 1. If we want to let optimiser make decisions whether to use cache or
>> not,>
>>>> then why do we need “void cache()” method at all? Would It  “increase”
>> the>
>>>> chance of using the cache? That’s sounds strange. What would be the>
>>>> mechanism of deciding whether to use the cache or not? If we want to>
>>>> introduce such kind  automated optimisations of “plan nodes
>> deduplication”>
>>>> I would turn it on globally, not per table, and let the optimiser do
>> all of>
>>>> the work.>
>>>> 2. We do not have statistics at the moment for any use/not use cache>
>>>> decision.>
>>>> 3. Even if we had, I would be veeerryy sceptical whether such cost
>> based>
>>>> optimisations would work properly and I would still insist first on>
>>>> providing explicit caching mechanism (`CachedTable cache()`)>
>>>> 4. As Till wrote, having explicit `CachedTable cache()` doesn’t
>> contradict>
>>>> future work on automated cost based caching.>
>>>>> 
>>>>> 
>>>> At the same time I’m not sure if you have responded to our objections
>> of>
>>>> `void cache()` being implicit/having side effects, which me, Jark,
>> Fabian,>
>>>> Till and I think also Shaoxuan are supporting.>
>>>>> 
>>>> Piotrek>
>>>>> 
>>>>> On 5 Dec 2018, at 12:42, Becket Qin <be...@gmail.com> wrote:>
>>>>>> 
>>>>> Hi Till,>
>>>>>> 
>>>>> It is true that after the first job submission, there will be no>
>>>> ambiguity>
>>>>> in terms of whether a cached table is used or not. That is the same
>> for>
>>>> the>
>>>>> cache() without returning a CachedTable.>
>>>>>> 
>>>>> Conceptually one could think of cache() as introducing a caching
>> operator>
>>>>>> from which you need to consume from if you want to benefit from
>> the>
>>>> caching>
>>>>>> functionality.>
>>>>>> 
>>>>> I am thinking a little differently. I think it is a hint (as you>
>>>> mentioned>
>>>>> later) instead of a new operator. I'd like to be careful about the>
>>>> semantic>
>>>>> of the API. A hint is a property set on an existing operator, but is
>> not>
>>>>> itself an operator as it does not really manipulate the data.>
>>>>>> 
>>>>> I agree, ideally the optimizer makes this kind of decision which>
>>>>>> intermediate result should be cached. But especially when
>> executing>
>>>> ad-hoc>
>>>>>> queries the user might better know which results need to be cached>
>>>> because>
>>>>>> Flink might not see the full DAG. In that sense, I would consider
>> the>
>>>>>> cache() method as a hint for the optimizer. Of course, in the
>> future we>
>>>>>> might add functionality which tries to automatically cache results
>> (e.g.>
>>>>>> caching the latest intermediate results until so and so much space
>> is>
>>>>>> used). But this should hopefully not contradict with `CachedTable>
>>>> cache()`.>
>>>>>> 
>>>>> I agree that cache() method is needed for exactly the reason you>
>>>> mentioned,>
>>>>> i.e. Flink cannot predict what users are going to write later, so
>> users>
>>>>> need to tell Flink explicitly that this table will be used later.
>> What I>
>>>>> meant is that assuming there is already a cached table, ideally
>> users>
>>>> need>
>>>>> not to specify whether the next query should read from the cache or
>> use>
>>>> the>
>>>>> original DAG. This should be decided by the optimizer.>
>>>>>> 
>>>>> To explain the difference between returning / not returning a>
>>>> CachedTable,>
>>>>> I want compare the following two case:>
>>>>>> 
>>>>> *Case 1:  returning a CachedTable*>
>>>>> b = a.map(...)>
>>>>> val cachedTableA1 = a.cache()>
>>>>> val cachedTableA2 = a.cache()>
>>>>> b.print() // Just to make sure a is cached.>
>>>>>> 
>>>>> c = a.filter(...) // User specify that the original DAG is used? Or
>> the>
>>>>> optimizer decides whether DAG or cache should be used?>
>>>>> d = cachedTableA1.filter() // User specify that the cached table is
>> used.>
>>>>>> 
>>>>> a.unCache() // Can cachedTableA still be used afterwards?>
>>>>> cachedTableA1.uncache() // Can cachedTableA2 still be used?>
>>>>>> 
>>>>> *Case 2: not returning a CachedTable*>
>>>>> b = a.map()>
>>>>> a.cache()>
>>>>> a.cache() // no-op>
>>>>> b.print() // Just to make sure a is cached>
>>>>>> 
>>>>> c = a.filter(...) // Optimizer decides whether the cache or DAG
>> should be>
>>>>> used>
>>>>> d = a.filter(...) // Optimizer decides whether the cache or DAG
>> should be>
>>>>> used>
>>>>>> 
>>>>> a.unCache()>
>>>>> a.unCache() // no-op>
>>>>>> 
>>>>> In case 1, semantic wise, optimizer lose the option to choose
>> between DAG>
>>>>> and cache. And the unCache() call becomes tricky.>
>>>>> In case 2, users do not need to worry about whether cache or DAG is
>> used.>
>>>>> And the unCache() semantic is clear. However, the caveat is that
>> users>
>>>>> cannot explicitly ignore the cache.>
>>>>>> 
>>>>> In order to address the issues mentioned in case 2 and inspired by
>> the>
>>>>> discussion so far, I am thinking about using hint to allow user>
>>>> explicitly>
>>>>> ignore cache. Although we do not have hint yet, but we probably
>> should>
>>>> have>
>>>>> one. So the code becomes:>
>>>>>> 
>>>>> *Case 3: returning this table*>
>>>>> b = a.map()>
>>>>> a.cache()>
>>>>> a.cache() // no-op>
>>>>> b.print() // Just to make sure a is cached>
>>>>>> 
>>>>> c = a.filter(...) // Optimizer decides whether the cache or DAG
>> should be>
>>>>> used>
>>>>> d = a.hint("ignoreCache").filter(...) // DAG will be used instead of
>> the>
>>>>> cache.>
>>>>>> 
>>>>> a.unCache()>
>>>>> a.unCache() // no-op>
>>>>>> 
>>>>> We could also let cache() return this table to allow chained method>
>>>> calls.>
>>>>> Do you think this API addresses the concerns?>
>>>>>> 
>>>>> Thanks,>
>>>>>> 
>>>>> Jiangjie (Becket) Qin>
>>>>>> 
>>>>>> 
>>>>> On Wed, Dec 5, 2018 at 10:55 AM Jark Wu <im...@gmail.com> wrote:>
>>>>>> 
>>>>>> Hi,>
>>>>>>> 
>>>>>> All the recent discussions are focused on whether there is a
>> problem if>
>>>>>> cache() not return a Table.>
>>>>>> It seems that returning a Table explicitly is more clear (and
>> safe?).>
>>>>>>> 
>>>>>> So whether there are any problems if cache() returns a Table?
>> @Becket>
>>>>>>> 
>>>>>> Best,>
>>>>>> Jark>
>>>>>>> 
>>>>>> On Tue, 4 Dec 2018 at 22:27, Till Rohrmann <tr...@apache.org>>
>>>> wrote:>
>>>>>>> 
>>>>>>> It's true that b, c, d and e will all read from the original DAG
>> that>
>>>>>>> generates a. But all subsequent operators (when running multiple>
>>>> queries)>
>>>>>>> which reference cachedTableA should not need to reproduce `a` but>
>>>>>> directly>
>>>>>>> consume the intermediate result.>
>>>>>>>> 
>>>>>>> Conceptually one could think of cache() as introducing a caching>
>>>> operator>
>>>>>>> from which you need to consume from if you want to benefit from
>> the>
>>>>>> caching>
>>>>>>> functionality.>
>>>>>>>> 
>>>>>>> I agree, ideally the optimizer makes this kind of decision which>
>>>>>>> intermediate result should be cached. But especially when
>> executing>
>>>>>> ad-hoc>
>>>>>>> queries the user might better know which results need to be
>> cached>
>>>>>> because>
>>>>>>> Flink might not see the full DAG. In that sense, I would consider
>> the>
>>>>>>> cache() method as a hint for the optimizer. Of course, in the
>> future we>
>>>>>>> might add functionality which tries to automatically cache
>> results>
>>>> (e.g.>
>>>>>>> caching the latest intermediate results until so and so much space
>> is>
>>>>>>> used). But this should hopefully not contradict with `CachedTable>
>>>>>> cache()`.>
>>>>>>>> 
>>>>>>> Cheers,>
>>>>>>> Till>
>>>>>>>> 
>>>>>>> On Tue, Dec 4, 2018 at 2:33 PM Becket Qin <be...@gmail.com>>
>>>> wrote:>
>>>>>>>> 
>>>>>>>> Hi Till,>
>>>>>>>>> 
>>>>>>>> Thanks for the clarification. I am still a little confused.>
>>>>>>>>> 
>>>>>>>> If cache() returns a CachedTable, the example might become:>
>>>>>>>>> 
>>>>>>>> b = a.map(...)>
>>>>>>>> c = a.map(...)>
>>>>>>>>> 
>>>>>>>> cachedTableA = a.cache()>
>>>>>>>> d = cachedTableA.map(...)>
>>>>>>>> e = a.map()>
>>>>>>>>> 
>>>>>>>> In the above case, if cache() is lazily evaluated, b, c, d and e
>> are>
>>>>>> all>
>>>>>>>> going to be reading from the original DAG that generates a. But
>> with a>
>>>>>>>> naive expectation, d should be reading from the cache. This seems
>> not>
>>>>>>>> solving the potential confusion you raised, right?>
>>>>>>>>> 
>>>>>>>> Just to be clear, my understanding are all based on the
>> assumption>
>>>> that>
>>>>>>> the>
>>>>>>>> tables are immutable. Therefore, after a.cache(), a the
>> c*achedTableA*>
>>>>>>> and>
>>>>>>>> original table *a * should be completely interchangeable.>
>>>>>>>>> 
>>>>>>>> That said, I think a valid argument is optimization. There are
>> indeed>
>>>>>>> cases>
>>>>>>>> that reading from the original DAG could be faster than reading
>> from>
>>>>>> the>
>>>>>>>> cache. For example, in the following example:>
>>>>>>>>> 
>>>>>>>> a.filter(f1' > 100)>
>>>>>>>> a.cache()>
>>>>>>>> b = a.filter(f1' < 100)>
>>>>>>>>> 
>>>>>>>> Ideally the optimizer should be intelligent enough to decide
>> which way>
>>>>>> is>
>>>>>>>> faster, without user intervention. In this case, it will identify
>> that>
>>>>>> b>
>>>>>>>> would just be an empty table, thus skip reading from the cache>
>>>>>>> completely.>
>>>>>>>> But I agree that returning a CachedTable would give user the
>> control>
>>>> of>
>>>>>>>> when to use cache, even though I still feel that letting the
>> optimizer>
>>>>>>>> handle this is a better option in long run.>
>>>>>>>>> 
>>>>>>>> Thanks,>
>>>>>>>>> 
>>>>>>>> Jiangjie (Becket) Qin>
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> On Tue, Dec 4, 2018 at 6:51 PM Till Rohrmann <tr...@apache.org>>
>>>>>>> wrote:>
>>>>>>>>> 
>>>>>>>>> Yes you are right Becket that it still depends on the actual>
>>>>>> execution>
>>>>>>> of>
>>>>>>>>> the job whether a consumer reads from a cached result or not.>
>>>>>>>>>> 
>>>>>>>>> My point was actually about the properties of a (cached vs.>
>>>>>> non-cached)>
>>>>>>>> and>
>>>>>>>>> not about the execution. I would not make cache trigger the
>> execution>
>>>>>>> of>
>>>>>>>>> the job because one loses some flexibility by eagerly triggering
>> the>
>>>>>>>>> execution.>
>>>>>>>>>> 
>>>>>>>>> I tried to argue for an explicit CachedTable which is returned
>> by the>
>>>>>>>>> cache() method like Piotr did in order to make the API more
>> explicit.>
>>>>>>>>>> 
>>>>>>>>> Cheers,>
>>>>>>>>> Till>
>>>>>>>>>> 
>>>>>>>>> On Mon, Dec 3, 2018 at 4:23 PM Becket Qin <be...@gmail.com>>
>>>>>>> wrote:>
>>>>>>>>>> 
>>>>>>>>>> Hi Till,>
>>>>>>>>>>> 
>>>>>>>>>> That is a good example. Just a minor correction, in this case,
>> b, c>
>>>>>>>> and d>
>>>>>>>>>> will all consume from a non-cached a. This is because cache
>> will>
>>>>>> only>
>>>>>>>> be>
>>>>>>>>>> created on the very first job submission that generates the
>> table>
>>>>>> to>
>>>>>>> be>
>>>>>>>>>> cached.>
>>>>>>>>>>> 
>>>>>>>>>> If I understand correctly, this is example is about whether>
>>>>>> .cache()>
>>>>>>>>> method>
>>>>>>>>>> should be eagerly evaluated or lazily evaluated. In another
>> word,>
>>>>>> if>
>>>>>>>>>> cache() method actually triggers a job that creates the cache,>
>>>>>> there>
>>>>>>>> will>
>>>>>>>>>> be no such confusion. Is that right?>
>>>>>>>>>>> 
>>>>>>>>>> In the example, although d will not consume from the cached
>> Table>
>>>>>>> while>
>>>>>>>>> it>
>>>>>>>>>> looks supposed to, from correctness perspective the code will
>> still>
>>>>>>>>> return>
>>>>>>>>>> correct result, assuming that tables are immutable.>
>>>>>>>>>>> 
>>>>>>>>>> Personally I feel it is OK because users probably won't really>
>>>>>> worry>
>>>>>>>>> about>
>>>>>>>>>> whether the table is cached or not. And lazy cache could avoid
>> some>
>>>>>>>>>> unnecessary caching if a cached table is never created in the
>> user>
>>>>>>>>>> application. But I am not opposed to do eager evaluation of
>> cache.>
>>>>>>>>>>> 
>>>>>>>>>> Thanks,>
>>>>>>>>>>> 
>>>>>>>>>> Jiangjie (Becket) Qin>
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> On Mon
>> [message truncated...]

Reply via email to