Hi Piotr,

Thanks for the proposal and detailed explanation. I like the idea of
returning a new hinted Table without modifying the original table. This
also leave the room for users to benefit from future implicit caching.

Just to make sure I get the full picture. In your proposal, there will also
be a 'void Table#uncache()' method to release the cache, right?

Thanks,

Jiangjie (Becket) Qin

On Mon, Jan 7, 2019 at 11:50 PM Piotr Nowojski <pi...@da-platform.com>
wrote:

> Hi Becket!
>
> After further thinking I tend to agree that my previous proposal (*Option
> 2*) indeed might not be if would in the future introduce automatic caching.
> However I would like to propose a slightly modified version of it:
>
> *Option 4*
>
> Adding `cache()` method with following signature:
>
> Table Table#cache();
>
> Without side-effects, and `cache()` call do not modify/change original
> Table in any way.
> It would return a copy of original table, with added hint for the
> optimizer to cache the table, so that the future accesses to the returned
> table might be cached or not.
>
> Assuming that we are talking about a setup, where we do not have automatic
> caching enabled (possible future extension).
>
> Example #1:
>
> ```
> Table a = …
> a.foo() // not cached
>
> val cachedTable = a.cache();
>
> cachedA.bar() // maybe cached
> a.foo() // same as before - effectively not cached
> ```
>
> Both the first and the second `a.foo()` operations would behave in the
> exactly same way. Again, `a.cache()` call doesn’t affect `a` itself. If `a`
> was not hinted for caching before `a.cache();`, then both `a.foo()` calls
> wouldn’t use cache.
>
> Returned `cachedA` would be hinted with “cache” hint, so probably
> `cachedA.bar()` would go through cache (unless optimiser decides the
> opposite)
>
> Example #2
>
> ```
> Table a = …
>
> a.foo() // not cached
>
> val b = a.cache();
>
> a.foo() // same as before - effectively not cached
> b.foo() // maybe cached
>
> val c = b.cache();
>
> a.foo() // same as before - effectively not cached
> b.foo() // same as before - effectively maybe cached
> c.foo() // maybe cached
> ```
>
> Now, assuming that we have some future “automatic caching optimisation”:
>
> Example #3
>
> ```
> env.enableAutomaticCaching()
> Table a = …
>
> a.foo() // might be cached, depending if `a` was selected to automatic
> caching
>
> val b = a.cache();
>
> a.foo() // same as before - might be cached, if `a` was selected to
> automatic caching
> b.foo() // maybe cached
> ```
>
>
> More or less this is the same behaviour as:
>
> Table a = ...
> val b = a.filter(x > 20)
>
> calling `filter` hasn’t changed or altered `a` in anyway. If `a` was
> previously filtered:
>
> Table src = …
> val a = src.filter(x > 20)
> val b = a.filter(x > 20)
>
> then yes, `a` and `b` will be the same. But the point is that neither
> `filter` nor `cache` changes the original `a` table.
>
> One thing is that indeed, physically dropping cache operation, will have
> side effects and it will in a way mutate the cached table references. But
> this is I think unavoidable in any solution - the same issue as calling
> `.close()`, or calling destructor in C++.
>
> Piotrek
>
> > On 7 Jan 2019, at 10:41, Becket Qin <becket....@gmail.com> wrote:
> >
> > Happy New Year, everybody!
> >
> > I would like to resume this discussion thread. At this point, We have
> > agreed on the first step goal of interactive programming. The open
> > discussion is the exact API. More specifically, what should *cache()*
> > method return and what is the semantic. There are three options:
> >
> > *Option 1*
> > *void cache()* OR *Table cache()* which returns the original table for
> > chained calls.
> > *void uncache() *releases the cache.
> > *Table.hint(ignoreCache).foo()* to ignore cache for operation foo().
> >
> > - Semantic: a.cache() hints that table 'a' should be cached. Optimizer
> > decides whether the cache will be used or not.
> > - pros: simple and no confusion between CachedTable and original table
> > - cons: A table may be cached / uncached in a method invocation, while
> the
> > caller does not know about this.
> >
> > *Option 2*
> > *CachedTable cache()*
> > *CachedTable *extends *Table *with an additional *uncache()* method
> >
> > - Semantic: After *val cachedA = a.cache()*, *cachedA.foo()* will always
> > use cache. *a.bar() *will always use original DAG.
> > - pros: No potential side effects in method invocation.
> > - cons: Optimizer has no chance to kick in. Future optimization will
> become
> > a behavior change and need users to change the code.
> >
> > *Option 3*
> > *CacheHandle cache()*
> > *CacheHandle.release() *to release a cache handle on the table. If all
> > cache handles are released, the cache could be removed.
> > *Table.hint(ignoreCache).foo()* to ignore cache for operation foo().
> >
> > - Semantic: *a.cache() *hints that 'a' should be cached. Optimizer
> decides
> > whether the cache will be used or not. Cache is released either no handle
> > is on it, or the user program exits.
> > - pros: No potential side effect in method invocation. No confusion
> between
> > cached table v.s original table.
> > - cons: An additional CacheHandle exposed to the users.
> >
> >
> > Personally I prefer option 3 for the following reasons:
> > 1. It is simple. Vast majority of the users would just call
> > *a.cache()* followed
> > by *a.foo(),* *a.bar(), etc. *
> > 2. There is no semantic ambiguity and semantic change if we decide to add
> > implicit cache in the future.
> > 3. There is no side effect in the method calls.
> > 4. Admittedly we need to expose one more CacheHandle class to the users.
> > But it is not that difficult to understand given similar well known
> concept
> > like ref count (we can name it CacheReference if that is easier to
> > understand). So I think it is fine.
> >
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Thu, Dec 13, 2018 at 11:23 AM Becket Qin <becket....@gmail.com>
> wrote:
> >
> >> Hi Piotrek,
> >>
> >> 1. Regarding optimization.
> >> Sure there are many cases that the decision is hard to make. But that
> does
> >> not make it any easier for the users to make those decisions. I imagine
> 99%
> >> of the users would just naively use cache. I am not saying we can
> optimize
> >> in all the cases. But as long as we agree that at least in certain
> cases (I
> >> would argue most cases), optimizer can do a little better than an
> average
> >> user who likely knows little about Flink internals, we should not push
> the
> >> burden of optimization to users.
> >>
> >> BTW, it seems some of your concerns are related to the implementation. I
> >> did not mention the implementation of the caching service because that
> >> should not affect the API semantic. Not sure if this helps, but imagine
> the
> >> default implementation has one StorageNode service colocating with each
> TM.
> >> It could be running within the TM process or in a standalone process,
> >> depending on configuration.
> >>
> >> The StorageNode uses memory + spill-to-disk mechanism. The cached data
> >> will just be written to the local StorageNode service. If the
> StorageNode
> >> is running within the TM process, the in-memory cache could just be
> objects
> >> so we save some serde cost. A later job referring to the cached Table
> will
> >> be scheduled in a locality aware manner, i.e. run in the TM whose peer
> >> StorageNode hosts the data.
> >>
> >>
> >> 2. Semantic
> >> I am not sure why introducing a new hintCache() or
> >> env.enableAutomaticCaching() method would avoid the consequence of
> semantic
> >> change.
> >>
> >> If the auto optimization is not enabled by default, users still need to
> >> make code change to all existing programs in order to get the benefit.
> >> If the auto optimization is enabled by default, advanced users who know
> >> that they really want to use cache will suddenly lose the opportunity
> to do
> >> so, unless they change the code to disable auto optimization.
> >>
> >>
> >> 3. side effect
> >> The CacheHandle is not only for where to put uncache(). It is to solve
> the
> >> implicit performance impact by moving the uncache() to the CacheHandle.
> >>
> >>   - If users wants to leverage cache, they can call a.cache(). After
> >>   that, unless user explicitly release that CacheHandle, a.foo() will
> always
> >>   leverage cache if needed (optimizer may choose to ignore cache if that
> >>   helps accelerate the process). Any function call will not be able to
> >>   release the cache because they do not have that CacheHandle.
> >>   - If some advanced users do not want to use cache at all, they will
> >>   call a.hint(ignoreCache).foo(). This will for sure ignore cache and
> use the
> >>   original DAG to process.
> >>
> >>
> >>> In vast majority of the cases, users wouldn't really care whether the
> >>> cache is used or not.
> >>> I wouldn’t agree with that, because “caching” (if not purely in memory
> >>> caching) would add additional IO costs. It’s similar as saying that
> users
> >>> would not see a difference between Spark/Flink and MapReduce (MapReduce
> >>> writes data to disks after every map/reduce stage).
> >>
> >> What I wanted to say is that in most cases, after users call cache(),
> they
> >> don't really care about whether auto optimization has decided to ignore
> the
> >> cache or not, as long as the program runs faster.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Wed, Dec 12, 2018 at 10:50 PM Piotr Nowojski <
> pi...@data-artisans.com>
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> Thanks for the quick answer :)
> >>>
> >>> Re 1.
> >>>
> >>> I generally agree with you, however couple of points:
> >>>
> >>> a) the problem with using automatic caching is bigger, because you will
> >>> have to decide, how do you compare IO vs CPU costs and if you pick
> wrong,
> >>> additional IO costs might be enormous or even can crash your system.
> This
> >>> is more difficult problem compared to let say join reordering, where
> the
> >>> only issue is to have good statistics that can capture correlations
> between
> >>> columns (when you reorder joins number of IO operations do not change)
> >>> c) your example is completely independent of caching.
> >>>
> >>> Query like this:
> >>>
> >>> src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1 ===`f2).as('f3,
> >>> …).filter(‘f3 > 30)
> >>>
> >>> Should/could be optimised to empty result immediately, without the need
> >>> for any cache/materialisation and that should work even without any
> >>> statistics provided by the connector.
> >>>
> >>> For me prerequisite to any serious cost-based optimisations would be
> some
> >>> reasonable benchmark coverage of the code (tpch?). Otherwise that
> would be
> >>> equivalent of adding not tested code, since we wouldn’t be able to
> verify
> >>> our assumptions, like how does the writing of 10 000 records to
> >>> cache/RocksDB/Kafka/CSV file compare to joining/filtering/processing of
> >>> lets say 1000 000 rows.
> >>>
> >>> Re 2.
> >>>
> >>> I wasn’t proposing to change the semantic later. I was proposing that
> we
> >>> start now:
> >>>
> >>> CachedTable cachedA = a.cache()
> >>> cachedA.foo() // Cache is used
> >>> a.bar() // Original DAG is used
> >>>
> >>> And then later we can think about adding for example
> >>>
> >>> CachedTable cachedA = a.hintCache()
> >>> cachedA.foo() // Cache might be used
> >>> a.bar() // Original DAG is used
> >>>
> >>> Or
> >>>
> >>> env.enableAutomaticCaching()
> >>> a.foo() // Cache might be used
> >>> a.bar() // Cache might be used
> >>>
> >>> Or (I would still not like this option):
> >>>
> >>> a.hintCache()
> >>> a.foo() // Cache might be used
> >>> a.bar() // Cache might be used
> >>>
> >>> Or whatever else that will come to our mind. Even if we add some
> >>> automatic caching in the future, keeping implicit (`CachedTable
> cache()`)
> >>> caching will still be useful, at least in some cases.
> >>>
> >>> Re 3.
> >>>
> >>>> 2. The source tables are immutable during one run of batch processing
> >>> logic.
> >>>> 3. The cache is immutable during one run of batch processing logic.
> >>>
> >>>> I think assumption 2 and 3 are by definition what batch processing
> >>> means,
> >>>> i.e the data must be complete before it is processed and should not
> >>> change
> >>>> when the processing is running.
> >>>
> >>> I agree that this is how batch systems SHOULD be working. However I
> know
> >>> from my previous experience that it’s not always the case. Sometimes
> users
> >>> are just working on some non transactional storage, which can be
> (either
> >>> constantly or occasionally) being modified by some other processes for
> >>> whatever the reasons (fixing the data, updating, adding new data etc).
> >>>
> >>> But even if we ignore this point (data immutability), performance side
> >>> effect issue of your proposal remains. If user calls `void a.cache()`
> deep
> >>> inside some private method, it will have implicit side effects on other
> >>> parts of his program that might not be obvious.
> >>>
> >>> Re `CacheHandle`.
> >>>
> >>> If I understand it correctly, it only addresses the issue where to
> place
> >>> method `uncache`/`dropCache`.
> >>>
> >>> Btw,
> >>>
> >>>> In vast majority of the cases, users wouldn't really care whether the
> >>> cache is used or not.
> >>>
> >>> I wouldn’t agree with that, because “caching” (if not purely in memory
> >>> caching) would add additional IO costs. It’s similar as saying that
> users
> >>> would not see a difference between Spark/Flink and MapReduce (MapReduce
> >>> writes data to disks after every map/reduce stage).
> >>>
> >>> Piotrek
> >>>
> >>>> On 12 Dec 2018, at 14:28, Becket Qin <becket....@gmail.com> wrote:
> >>>>
> >>>> Hi Piotrek,
> >>>>
> >>>> Not sure if you noticed, in my last email, I was proposing
> `CacheHandle
> >>>> cache()` to avoid the potential side effect due to function calls.
> >>>>
> >>>> Let's look at the disagreement in your reply one by one.
> >>>>
> >>>>
> >>>> 1. Optimization chances
> >>>>
> >>>> Optimization is never a trivial work. This is exactly why we should
> not
> >>> let
> >>>> user manually do that. Databases have done huge amount of work in this
> >>>> area. At Alibaba, we rely heavily on many optimization rules to boost
> >>> the
> >>>> SQL query performance.
> >>>>
> >>>> In your example, if I filling the filter conditions in a certain way,
> >>> the
> >>>> optimization would become obvious.
> >>>>
> >>>> Table src1 = … // read from connector 1
> >>>> Table src2 = … // read from connector 2
> >>>>
> >>>> Table a = src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1 ===
> >>>> `f2).as('f3, ...)
> >>>> a.cache() // write cache to connector 3, when writing the records,
> >>> remember
> >>>> min and max of `f1
> >>>>
> >>>> a.filter('f3 > 30) // There is no need to read from any connector
> >>> because
> >>>> `a` does not contain any record whose 'f3 is greater than 30.
> >>>> env.execute()
> >>>> a.select(…)
> >>>>
> >>>> BTW, it seems to me that adding some basic statistics is fairly
> >>>> straightforward and the cost is pretty marginal if not ignorable. In
> >>> fact
> >>>> it is not only needed for optimization, but also for cases such as ML,
> >>>> where some algorithms may need to decide their parameter based on the
> >>>> statistics of the data.
> >>>>
> >>>>
> >>>> 2. Same API, one semantic now, another semantic later.
> >>>>
> >>>> I am trying to understand what is the semantic of `CachedTable
> cache()`
> >>> you
> >>>> are proposing. IMO, we should avoid designing an API whose semantic
> >>> will be
> >>>> changed later. If we have a "CachedTable cache()" method, then the
> >>> semantic
> >>>> should be very clearly defined upfront and do not change later. It
> >>> should
> >>>> never be "right now let's go with semantic 1, later we can silently
> >>> change
> >>>> it to semantic 2 or 3". Such change could result in bad consequence.
> For
> >>>> example, let's say we decide go with semantic 1:
> >>>>
> >>>> CachedTable cachedA = a.cache()
> >>>> cachedA.foo() // Cache is used
> >>>> a.bar() // Original DAG is used.
> >>>>
> >>>> Now majority of the users would be using cachedA.foo() in their code.
> >>> And
> >>>> some advanced users will use a.bar() to explicitly skip the cache.
> Later
> >>>> on, we added smart optimization and change the semantic to semantic 2:
> >>>>
> >>>> CachedTable cachedA = a.cache()
> >>>> cachedA.foo() // Cache is used
> >>>> a.bar() // Cache MIGHT be used, and Flink may decide to skip cache if
> >>> it is
> >>>> faster.
> >>>>
> >>>> Now most of the users who were writing cachedA.foo() will not benefit
> >>> from
> >>>> this optimization at all, unless they change their code to use a.foo()
> >>>> instead. And those advanced users suddenly lose the option to
> explicitly
> >>>> ignore cache unless they change their code (assuming we care enough to
> >>>> provide something like hint(useCache)). If we don't define the
> semantic
> >>>> carefully, our users will have to change their code again and again
> >>> while
> >>>> they shouldn't have to.
> >>>>
> >>>>
> >>>> 3. side effect.
> >>>>
> >>>> Before we talk about side effect, we have to agree on the assumptions.
> >>> The
> >>>> assumptions I have are following:
> >>>> 1. We are talking about batch processing.
> >>>> 2. The source tables are immutable during one run of batch processing
> >>> logic.
> >>>> 3. The cache is immutable during one run of batch processing logic.
> >>>>
> >>>> I think assumption 2 and 3 are by definition what batch processing
> >>> means,
> >>>> i.e the data must be complete before it is processed and should not
> >>> change
> >>>> when the processing is running.
> >>>>
> >>>> As far as I am aware of, I don't know any batch processing system
> >>> breaking
> >>>> those assumptions. Even for relational database tables, where queries
> >>> can
> >>>> run with concurrent modifications, necessary locking are still
> required
> >>> to
> >>>> ensure the integrity of the query result.
> >>>>
> >>>> Please let me know if you disagree with the above assumptions. If you
> >>> agree
> >>>> with these assumptions, with the `CacheHandle cache()` API in my last
> >>>> email, do you still see side effects?
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Jiangjie (Becket) Qin
> >>>>
> >>>>
> >>>> On Wed, Dec 12, 2018 at 7:11 PM Piotr Nowojski <
> pi...@data-artisans.com
> >>>>
> >>>> wrote:
> >>>>
> >>>>> Hi Becket,
> >>>>>
> >>>>>> Regarding the chance of optimization, it might not be that rare.
> Some
> >>>>> very
> >>>>>> simple statistics could already help in many cases. For example,
> >>> simply
> >>>>>> maintaining max and min of each fields can already eliminate some
> >>>>>> unnecessary table scan (potentially scanning the cached table) if
> the
> >>>>>> result is doomed to be empty. A histogram would give even further
> >>>>>> information. The optimizer could be very careful and only ignores
> >>> cache
> >>>>>> when it is 100% sure doing that is cheaper. e.g. only when a filter
> on
> >>>>> the
> >>>>>> cache will absolutely return nothing.
> >>>>>
> >>>>> I do not see how this might be easy to achieve. It would require tons
> >>> of
> >>>>> effort to make it work and in the end you would still have a problem
> of
> >>>>> comparing/trading CPU cycles vs IO. For example:
> >>>>>
> >>>>> Table src1 = … // read from connector 1
> >>>>> Table src2 = … // read from connector 2
> >>>>>
> >>>>> Table a = src1.filter(…).join(src2.filter(…), …)
> >>>>> a.cache() // write cache to connector 3
> >>>>>
> >>>>> a.filter(…)
> >>>>> env.execute()
> >>>>> a.select(…)
> >>>>>
> >>>>> Decision whether it’s better to:
> >>>>> A) read from connector1/connector2, filter/map and join them twice
> >>>>> B) read from connector1/connector2, filter/map and join them once,
> pay
> >>> the
> >>>>> price of writing to connector 3 and then reading from it
> >>>>>
> >>>>> Is very far from trivial. `a` can end up much larger than `src1` and
> >>>>> `src2`, writes to connector 3 might be extremely slow, reads from
> >>> connector
> >>>>> 3 can be slower compared to reads from connector 1 & 2, … . You
> really
> >>> need
> >>>>> to have extremely good statistics to correctly asses size of the
> >>> output and
> >>>>> it would still be failing many times (correlations etc). And keep in
> >>> mind
> >>>>> that at the moment we do not have ANY statistics at all. More than
> >>> that, it
> >>>>> would require significantly more testing and setting up some
> >>> benchmarks to
> >>>>> make sure that we do not brake it with some regressions.
> >>>>>
> >>>>> That’s why I’m strongly opposing this idea - at least let’s not
> starts
> >>>>> with this. If we first start with completely manual/explicit caching,
> >>>>> without any magic, it would be a significant improvement for the
> users
> >>> for
> >>>>> a fraction of the development cost. After implementing that, when we
> >>>>> already have all of the working pieces, we can start working on some
> >>>>> optimisations rules. As I wrote before, if we start with
> >>>>>
> >>>>> `CachedTable cache()`
> >>>>>
> >>>>> We can later work on follow up stories to make it automatic. Despite
> >>> that
> >>>>> I don’t like this implicit/side effect approach with `void` method,
> >>> having
> >>>>> explicit `CachedTable cache()` wouldn’t even prevent as from later
> >>> adding
> >>>>> `void hintCache()` method, with the exact semantic that you want.
> >>>>>
> >>>>> On top of that I re-rise again that having implicit `void
> >>>>> cache()/hintCache()` has other side effects and problems with non
> >>> immutable
> >>>>> data, and being annoying when used secretly inside methods.
> >>>>>
> >>>>> Explicit `CachedTable cache()` just looks like much less
> controversial
> >>> MVP
> >>>>> and if we decide to go further with this topic, it’s not a wasted
> >>> effort,
> >>>>> but just lies on a stright path to more advanced/complicated
> solutions
> >>> in
> >>>>> the future. Are there any drawbacks of starting with `CachedTable
> >>> cache()`
> >>>>> that I’m missing?
> >>>>>
> >>>>> Piotrek
> >>>>>
> >>>>>> On 12 Dec 2018, at 09:30, Jeff Zhang <zjf...@gmail.com> wrote:
> >>>>>>
> >>>>>> Hi Becket,
> >>>>>>
> >>>>>> Introducing CacheHandle seems too complicated. That means users have
> >>> to
> >>>>>> maintain Handler properly.
> >>>>>>
> >>>>>> And since cache is just a hint for optimizer, why not just return
> >>> Table
> >>>>>> itself for cache method. This hint info should be kept in Table I
> >>>>> believe.
> >>>>>>
> >>>>>> So how about adding method cache and uncache for Table, and both
> >>> return
> >>>>>> Table. Because what cache and uncache did is just adding some hint
> >>> info
> >>>>>> into Table.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Becket Qin <becket....@gmail.com> 于2018年12月12日周三 上午11:25写道:
> >>>>>>
> >>>>>>> Hi Till and Piotrek,
> >>>>>>>
> >>>>>>> Thanks for the clarification. That solves quite a few confusion. My
> >>>>>>> understanding of how cache works is same as what Till describe.
> i.e.
> >>>>>>> cache() is a hint to Flink, but it is not guaranteed that cache
> >>> always
> >>>>>>> exist and it might be recomputed from its lineage.
> >>>>>>>
> >>>>>>> Is this the core of our disagreement here? That you would like this
> >>>>>>>> “cache()” to be mostly hint for the optimiser?
> >>>>>>>
> >>>>>>> Semantic wise, yes. That's also why I think materialize() has a
> much
> >>>>> larger
> >>>>>>> scope than cache(), thus it should be a different method.
> >>>>>>>
> >>>>>>> Regarding the chance of optimization, it might not be that rare.
> Some
> >>>>> very
> >>>>>>> simple statistics could already help in many cases. For example,
> >>> simply
> >>>>>>> maintaining max and min of each fields can already eliminate some
> >>>>>>> unnecessary table scan (potentially scanning the cached table) if
> the
> >>>>>>> result is doomed to be empty. A histogram would give even further
> >>>>>>> information. The optimizer could be very careful and only ignores
> >>> cache
> >>>>>>> when it is 100% sure doing that is cheaper. e.g. only when a filter
> >>> on
> >>>>> the
> >>>>>>> cache will absolutely return nothing.
> >>>>>>>
> >>>>>>> Given the above clarification on cache, I would like to revisit the
> >>>>>>> original "void cache()" proposal and see if we can improve on top
> of
> >>>>> that.
> >>>>>>>
> >>>>>>> What do you think about the following modified interface?
> >>>>>>>
> >>>>>>> Table {
> >>>>>>> /**
> >>>>>>> * This call hints Flink to maintain a cache of this table and
> >>> leverage
> >>>>>>> it for performance optimization if needed.
> >>>>>>> * Note that Flink may still decide to not use the cache if it is
> >>>>> cheaper
> >>>>>>> by doing so.
> >>>>>>> *
> >>>>>>> * A CacheHandle will be returned to allow user release the cache
> >>>>>>> actively. The cache will be deleted if there
> >>>>>>> * is no unreleased cache handlers to it. When the TableEnvironment
> >>> is
> >>>>>>> closed. The cache will also be deleted
> >>>>>>> * and all the cache handlers will be released.
> >>>>>>> *
> >>>>>>> * @return a CacheHandle referring to the cache of this table.
> >>>>>>> */
> >>>>>>> CacheHandle cache();
> >>>>>>> }
> >>>>>>>
> >>>>>>> CacheHandle {
> >>>>>>> /**
> >>>>>>> * Close the cache handle. This method does not necessarily deletes
> >>> the
> >>>>>>> cache. Instead, it simply decrements the reference counter to the
> >>> cache.
> >>>>>>> * When the there is no handle referring to a cache. The cache will
> >>> be
> >>>>>>> deleted.
> >>>>>>> *
> >>>>>>> * @return the number of open handles to the cache after this handle
> >>>>> has
> >>>>>>> been released.
> >>>>>>> */
> >>>>>>> int release()
> >>>>>>> }
> >>>>>>>
> >>>>>>> The rationale behind this interface is following:
> >>>>>>> In vast majority of the cases, users wouldn't really care whether
> the
> >>>>> cache
> >>>>>>> is used or not. So I think the most intuitive way is letting
> cache()
> >>>>> return
> >>>>>>> nothing. So nobody needs to worry about the difference between
> >>>>> operations
> >>>>>>> on CacheTables and those on the "original" tables. This will make
> >>> maybe
> >>>>>>> 99.9% of the users happy. There were two concerns raised for this
> >>>>> approach:
> >>>>>>> 1. In some rare cases, users may want to ignore cache,
> >>>>>>> 2. A table might be cached/uncached in a third party function while
> >>> the
> >>>>>>> caller does not know.
> >>>>>>>
> >>>>>>> For the first issue, users can use hint("ignoreCache") to
> explicitly
> >>>>> ignore
> >>>>>>> cache.
> >>>>>>> For the second issue, the above proposal lets cache() return a
> >>>>> CacheHandle,
> >>>>>>> the only method in it is release(). Different CacheHandles will
> >>> refer to
> >>>>>>> the same cache, if a cache no longer has any cache handle, it will
> be
> >>>>>>> deleted. This will address the following case:
> >>>>>>> {
> >>>>>>> val handle1 = a.cache()
> >>>>>>> process(a)
> >>>>>>> a.select(...) // cache is still available, handle1 has not been
> >>>>> released.
> >>>>>>> }
> >>>>>>>
> >>>>>>> void process(Table t) {
> >>>>>>> val handle2 = t.cache() // new handle to cache
> >>>>>>> t.select(...) // optimizer decides cache usage
> >>>>>>> t.hint("ignoreCache").select(...) // cache is ignored
> >>>>>>> handle2.release() // release the handle, but the cache may still be
> >>>>>>> available if there are other handles
> >>>>>>> ...
> >>>>>>> }
> >>>>>>>
> >>>>>>> Does the above modified approach look reasonable to you?
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>>
> >>>>>>> Jiangjie (Becket) Qin
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Dec 11, 2018 at 6:44 PM Till Rohrmann <
> trohrm...@apache.org>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Becket,
> >>>>>>>>
> >>>>>>>> I was aiming at semantics similar to 1. I actually thought that
> >>>>> `cache()`
> >>>>>>>> would tell the system to materialize the intermediate result so
> that
> >>>>>>>> subsequent queries don't need to reprocess it. This means that the
> >>>>> usage
> >>>>>>> of
> >>>>>>>> the cached table in this example
> >>>>>>>>
> >>>>>>>> {
> >>>>>>>> val cachedTable = a.cache()
> >>>>>>>> val b1 = cachedTable.select(…)
> >>>>>>>> val b2 = cachedTable.foo().select(…)
> >>>>>>>> val b3 = cachedTable.bar().select(...)
> >>>>>>>> val c1 = a.select(…)
> >>>>>>>> val c2 = a.foo().select(…)
> >>>>>>>> val c3 = a.bar().select(...)
> >>>>>>>> }
> >>>>>>>>
> >>>>>>>> strongly depends on interleaved calls which trigger the execution
> of
> >>>>> sub
> >>>>>>>> queries. So for example, if there is only a single env.execute
> call
> >>> at
> >>>>>>> the
> >>>>>>>> end of  block, then b1, b2, b3, c1, c2 and c3 would all be
> computed
> >>> by
> >>>>>>>> reading directly from the sources (given that there is only a
> single
> >>>>>>>> JobGraph). It just happens that the result of `a` will be cached
> >>> such
> >>>>>>> that
> >>>>>>>> we skip the processing of `a` when there are subsequent queries
> >>> reading
> >>>>>>>> from `cachedTable`. If for some reason the system cannot
> materialize
> >>>>> the
> >>>>>>>> table (e.g. running out of disk space, ttl expired), then it could
> >>> also
> >>>>>>>> happen that we need to reprocess `a`. In that sense `cachedTable`
> >>>>> simply
> >>>>>>> is
> >>>>>>>> an identifier for the materialized result of `a` with the lineage
> >>> how
> >>>>> to
> >>>>>>>> reprocess it.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Till
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Dec 11, 2018 at 11:01 AM Piotr Nowojski <
> >>>>> pi...@data-artisans.com
> >>>>>>>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Becket,
> >>>>>>>>>
> >>>>>>>>>> {
> >>>>>>>>>> val cachedTable = a.cache()
> >>>>>>>>>> val b = cachedTable.select(...)
> >>>>>>>>>> val c = a.select(...)
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses
> >>> original
> >>>>>>> DAG
> >>>>>>>>> as
> >>>>>>>>>> user demanded so. In this case, the optimizer has no chance to
> >>>>>>>> optimize.
> >>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c leaves the
> >>>>>>>>> optimizer
> >>>>>>>>>> to choose whether the cache or DAG should be used. In this case,
> >>> user
> >>>>>>>>> lose
> >>>>>>>>>> the option to NOT use cache.
> >>>>>>>>>>
> >>>>>>>>>> As you can see, neither of the options seem perfect. However, I
> >>> guess
> >>>>>>>> you
> >>>>>>>>>> and Till are proposing the third option:
> >>>>>>>>>>
> >>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether cache or
> DAG
> >>>>>>>> should
> >>>>>>>>> be
> >>>>>>>>>> used. c always use the DAG.
> >>>>>>>>>
> >>>>>>>>> I am pretty sure that me, Till, Fabian and others were all
> >>> proposing
> >>>>>>> and
> >>>>>>>>> advocating in favour of semantic “1”. No cost based optimiser
> >>>>> decisions
> >>>>>>>> at
> >>>>>>>>> all.
> >>>>>>>>>
> >>>>>>>>> {
> >>>>>>>>> val cachedTable = a.cache()
> >>>>>>>>> val b1 = cachedTable.select(…)
> >>>>>>>>> val b2 = cachedTable.foo().select(…)
> >>>>>>>>> val b3 = cachedTable.bar().select(...)
> >>>>>>>>> val c1 = a.select(…)
> >>>>>>>>> val c2 = a.foo().select(…)
> >>>>>>>>> val c3 = a.bar().select(...)
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>> All b1, b2 and b3 are reading from cache, while c1, c2 and c3 are
> >>>>>>>>> re-executing whole plan for “a”.
> >>>>>>>>>
> >>>>>>>>> In the future we could discuss going one step further,
> introducing
> >>>>> some
> >>>>>>>>> global optimisation (that can be manually enabled/disabled):
> >>>>>>> deduplicate
> >>>>>>>>> plan nodes/deduplicate sub queries/re-use sub queries results/or
> >>>>>>> whatever
> >>>>>>>>> we could call it. It could do two things:
> >>>>>>>>>
> >>>>>>>>> 1. Automatically try to deduplicate fragments of the plan and
> share
> >>>>> the
> >>>>>>>>> result using CachedTable - in other words automatically insert
> >>>>>>>> `CachedTable
> >>>>>>>>> cache()` calls.
> >>>>>>>>> 2. Automatically make decision to bypass explicit `CachedTable`
> >>> access
> >>>>>>>>> (this would be the equivalent of what you described as “semantic
> >>> 3”).
> >>>>>>>>>
> >>>>>>>>> However as I wrote previously, I have big doubts if such
> cost-based
> >>>>>>>>> optimisation would work (this applies also to “Semantic 2”). I
> >>> would
> >>>>>>>> expect
> >>>>>>>>> it to do more harm than good in so many cases, that it wouldn’t
> >>> make
> >>>>>>>> sense.
> >>>>>>>>> Even assuming that we calculate statistics perfectly (this ain’t
> >>> gonna
> >>>>>>>>> happen), it’s virtually impossible to correctly estimate correct
> >>>>>>> exchange
> >>>>>>>>> rate of CPU cycles vs IO operations as it is changing so much
> from
> >>>>>>>>> deployment to deployment.
> >>>>>>>>>
> >>>>>>>>> Is this the core of our disagreement here? That you would like
> this
> >>>>>>>>> “cache()” to be mostly hint for the optimiser?
> >>>>>>>>>
> >>>>>>>>> Piotrek
> >>>>>>>>>
> >>>>>>>>>> On 11 Dec 2018, at 06:00, Becket Qin <becket....@gmail.com>
> >>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Another potential concern for semantic 3 is that. In the future,
> >>> we
> >>>>>>> may
> >>>>>>>>> add
> >>>>>>>>>> automatic caching to Flink. e.g. cache the intermediate results
> at
> >>>>>>> the
> >>>>>>>>>> shuffle boundary. If our semantic is that reference to the
> >>> original
> >>>>>>>> table
> >>>>>>>>>> means skipping cache, those users may not be able to benefit
> from
> >>> the
> >>>>>>>>>> implicit cache.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Dec 11, 2018 at 12:10 PM Becket Qin <
> becket....@gmail.com
> >>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Piotrek,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the reply. Thought about it again, I might have
> >>>>>>>> misunderstood
> >>>>>>>>>>> your proposal in earlier emails. Returning a CachedTable might
> >>> not
> >>>>>>> be
> >>>>>>>> a
> >>>>>>>>> bad
> >>>>>>>>>>> idea.
> >>>>>>>>>>>
> >>>>>>>>>>> I was more concerned about the semantic and its intuitiveness
> >>> when a
> >>>>>>>>>>> CachedTable is returned. i..e, if cache() returns CachedTable.
> >>> What
> >>>>>>>> are
> >>>>>>>>> the
> >>>>>>>>>>> semantic in the following code:
> >>>>>>>>>>> {
> >>>>>>>>>>> val cachedTable = a.cache()
> >>>>>>>>>>> val b = cachedTable.select(...)
> >>>>>>>>>>> val c = a.select(...)
> >>>>>>>>>>> }
> >>>>>>>>>>> What is the difference between b and c? At the first glance, I
> >>> see
> >>>>>>> two
> >>>>>>>>>>> options:
> >>>>>>>>>>>
> >>>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses
> >>> original
> >>>>>>>> DAG
> >>>>>>>>> as
> >>>>>>>>>>> user demanded so. In this case, the optimizer has no chance to
> >>>>>>>> optimize.
> >>>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c leaves
> the
> >>>>>>>>> optimizer
> >>>>>>>>>>> to choose whether the cache or DAG should be used. In this
> case,
> >>>>>>> user
> >>>>>>>>> lose
> >>>>>>>>>>> the option to NOT use cache.
> >>>>>>>>>>>
> >>>>>>>>>>> As you can see, neither of the options seem perfect. However, I
> >>>>>>> guess
> >>>>>>>>> you
> >>>>>>>>>>> and Till are proposing the third option:
> >>>>>>>>>>>
> >>>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether cache or
> DAG
> >>>>>>>> should
> >>>>>>>>>>> be used. c always use the DAG.
> >>>>>>>>>>>
> >>>>>>>>>>> This does address all the concerns. It is just that from
> >>>>>>> intuitiveness
> >>>>>>>>>>> perspective, I found that asking user to explicitly use a
> >>>>>>> CachedTable
> >>>>>>>>> while
> >>>>>>>>>>> the optimizer might choose to ignore is a little weird. That
> was
> >>>>>>> why I
> >>>>>>>>> did
> >>>>>>>>>>> not think about that semantic. But given there is material
> >>> benefit,
> >>>>>>> I
> >>>>>>>>> think
> >>>>>>>>>>> this semantic is acceptable.
> >>>>>>>>>>>
> >>>>>>>>>>> 1. If we want to let optimiser make decisions whether to use
> >>> cache
> >>>>>>> or
> >>>>>>>>> not,
> >>>>>>>>>>>> then why do we need “void cache()” method at all? Would It
> >>>>>>>> “increase”
> >>>>>>>>> the
> >>>>>>>>>>>> chance of using the cache? That’s sounds strange. What would
> be
> >>> the
> >>>>>>>>>>>> mechanism of deciding whether to use the cache or not? If we
> >>> want
> >>>>>>> to
> >>>>>>>>>>>> introduce such kind  automated optimisations of “plan nodes
> >>>>>>>>> deduplication”
> >>>>>>>>>>>> I would turn it on globally, not per table, and let the
> >>> optimiser
> >>>>>>> do
> >>>>>>>>> all of
> >>>>>>>>>>>> the work.
> >>>>>>>>>>>> 2. We do not have statistics at the moment for any use/not use
> >>>>>>> cache
> >>>>>>>>>>>> decision.
> >>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether such
> >>> cost
> >>>>>>>>> based
> >>>>>>>>>>>> optimisations would work properly and I would still insist
> >>> first on
> >>>>>>>>>>>> providing explicit caching mechanism (`CachedTable cache()`)
> >>>>>>>>>>>>
> >>>>>>>>>>> We are absolutely on the same page here. An explicit cache()
> >>> method
> >>>>>>> is
> >>>>>>>>>>> necessary not only because optimizer may not be able to make
> the
> >>>>>>> right
> >>>>>>>>>>> decision, but also because of the nature of interactive
> >>> programming.
> >>>>>>>> For
> >>>>>>>>>>> example, if users write the following code in Scala shell:
> >>>>>>>>>>> val b = a.select(...)
> >>>>>>>>>>> val c = b.select(...)
> >>>>>>>>>>> val d = c.select(...).writeToSink(...)
> >>>>>>>>>>> tEnv.execute()
> >>>>>>>>>>> There is no way optimizer will know whether b or c will be used
> >>> in
> >>>>>>>> later
> >>>>>>>>>>> code, unless users hint explicitly.
> >>>>>>>>>>>
> >>>>>>>>>>> At the same time I’m not sure if you have responded to our
> >>>>>>> objections
> >>>>>>>> of
> >>>>>>>>>>>> `void cache()` being implicit/having side effects, which me,
> >>> Jark,
> >>>>>>>>> Fabian,
> >>>>>>>>>>>> Till and I think also Shaoxuan are supporting.
> >>>>>>>>>>>
> >>>>>>>>>>> Is there any other side effects if we use semantic 3 mentioned
> >>>>>>> above?
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>>
> >>>>>>>>>>> JIangjie (Becket) Qin
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski <
> >>>>>>>> pi...@data-artisans.com
> >>>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Becket,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Sorry for not responding long time.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regarding case1.
> >>>>>>>>>>>>
> >>>>>>>>>>>> There wouldn’t be no “a.unCache()” method, but I would expect
> >>> only
> >>>>>>>>>>>> `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t
> >>>>>>> affect
> >>>>>>>>>>>> `cachedTableA2`. Just as in any other database dropping
> >>> modifying
> >>>>>>> one
> >>>>>>>>>>>> independent table/materialised view does not affect others.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> What I meant is that assuming there is already a cached
> table,
> >>>>>>>> ideally
> >>>>>>>>>>>> users need
> >>>>>>>>>>>>> not to specify whether the next query should read from the
> >>> cache
> >>>>>>> or
> >>>>>>>>> use
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> original DAG. This should be decided by the optimizer.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. If we want to let optimiser make decisions whether to use
> >>> cache
> >>>>>>> or
> >>>>>>>>>>>> not, then why do we need “void cache()” method at all? Would
> It
> >>>>>>>>> “increase”
> >>>>>>>>>>>> the chance of using the cache? That’s sounds strange. What
> >>> would be
> >>>>>>>> the
> >>>>>>>>>>>> mechanism of deciding whether to use the cache or not? If we
> >>> want
> >>>>>>> to
> >>>>>>>>>>>> introduce such kind  automated optimisations of “plan nodes
> >>>>>>>>> deduplication”
> >>>>>>>>>>>> I would turn it on globally, not per table, and let the
> >>> optimiser
> >>>>>>> do
> >>>>>>>>> all of
> >>>>>>>>>>>> the work.
> >>>>>>>>>>>> 2. We do not have statistics at the moment for any use/not use
> >>>>>>> cache
> >>>>>>>>>>>> decision.
> >>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether such
> >>> cost
> >>>>>>>>> based
> >>>>>>>>>>>> optimisations would work properly and I would still insist
> >>> first on
> >>>>>>>>>>>> providing explicit caching mechanism (`CachedTable cache()`)
> >>>>>>>>>>>> 4. As Till wrote, having explicit `CachedTable cache()`
> doesn’t
> >>>>>>>>>>>> contradict future work on automated cost based caching.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> At the same time I’m not sure if you have responded to our
> >>>>>>> objections
> >>>>>>>>> of
> >>>>>>>>>>>> `void cache()` being implicit/having side effects, which me,
> >>> Jark,
> >>>>>>>>> Fabian,
> >>>>>>>>>>>> Till and I think also Shaoxuan are supporting.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Piotrek
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On 5 Dec 2018, at 12:42, Becket Qin <becket....@gmail.com>
> >>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Till,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> It is true that after the first job submission, there will be
> >>> no
> >>>>>>>>>>>> ambiguity
> >>>>>>>>>>>>> in terms of whether a cached table is used or not. That is
> the
> >>>>>>> same
> >>>>>>>>> for
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> cache() without returning a CachedTable.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Conceptually one could think of cache() as introducing a
> >>> caching
> >>>>>>>>>>>> operator
> >>>>>>>>>>>>>> from which you need to consume from if you want to benefit
> >>> from
> >>>>>>> the
> >>>>>>>>>>>> caching
> >>>>>>>>>>>>>> functionality.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I am thinking a little differently. I think it is a hint (as
> >>> you
> >>>>>>>>>>>> mentioned
> >>>>>>>>>>>>> later) instead of a new operator. I'd like to be careful
> about
> >>> the
> >>>>>>>>>>>> semantic
> >>>>>>>>>>>>> of the API. A hint is a property set on an existing operator,
> >>> but
> >>>>>>> is
> >>>>>>>>> not
> >>>>>>>>>>>>> itself an operator as it does not really manipulate the data.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I agree, ideally the optimizer makes this kind of decision
> >>> which
> >>>>>>>>>>>>>> intermediate result should be cached. But especially when
> >>>>>>> executing
> >>>>>>>>>>>> ad-hoc
> >>>>>>>>>>>>>> queries the user might better know which results need to be
> >>>>>>> cached
> >>>>>>>>>>>> because
> >>>>>>>>>>>>>> Flink might not see the full DAG. In that sense, I would
> >>> consider
> >>>>>>>> the
> >>>>>>>>>>>>>> cache() method as a hint for the optimizer. Of course, in
> the
> >>>>>>>> future
> >>>>>>>>> we
> >>>>>>>>>>>>>> might add functionality which tries to automatically cache
> >>>>>>> results
> >>>>>>>>>>>> (e.g.
> >>>>>>>>>>>>>> caching the latest intermediate results until so and so much
> >>>>>>> space
> >>>>>>>> is
> >>>>>>>>>>>>>> used). But this should hopefully not contradict with
> >>> `CachedTable
> >>>>>>>>>>>> cache()`.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I agree that cache() method is needed for exactly the reason
> >>> you
> >>>>>>>>>>>> mentioned,
> >>>>>>>>>>>>> i.e. Flink cannot predict what users are going to write
> later,
> >>> so
> >>>>>>>>> users
> >>>>>>>>>>>>> need to tell Flink explicitly that this table will be used
> >>> later.
> >>>>>>>>> What I
> >>>>>>>>>>>>> meant is that assuming there is already a cached table,
> ideally
> >>>>>>>> users
> >>>>>>>>>>>> need
> >>>>>>>>>>>>> not to specify whether the next query should read from the
> >>> cache
> >>>>>>> or
> >>>>>>>>> use
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> original DAG. This should be decided by the optimizer.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> To explain the difference between returning / not returning a
> >>>>>>>>>>>> CachedTable,
> >>>>>>>>>>>>> I want compare the following two case:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> *Case 1:  returning a CachedTable*
> >>>>>>>>>>>>> b = a.map(...)
> >>>>>>>>>>>>> val cachedTableA1 = a.cache()
> >>>>>>>>>>>>> val cachedTableA2 = a.cache()
> >>>>>>>>>>>>> b.print() // Just to make sure a is cached.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> c = a.filter(...) // User specify that the original DAG is
> >>> used?
> >>>>>>> Or
> >>>>>>>>> the
> >>>>>>>>>>>>> optimizer decides whether DAG or cache should be used?
> >>>>>>>>>>>>> d = cachedTableA1.filter() // User specify that the cached
> >>> table
> >>>>>>> is
> >>>>>>>>>>>> used.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> a.unCache() // Can cachedTableA still be used afterwards?
> >>>>>>>>>>>>> cachedTableA1.uncache() // Can cachedTableA2 still be used?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> *Case 2: not returning a CachedTable*
> >>>>>>>>>>>>> b = a.map()
> >>>>>>>>>>>>> a.cache()
> >>>>>>>>>>>>> a.cache() // no-op
> >>>>>>>>>>>>> b.print() // Just to make sure a is cached
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> c = a.filter(...) // Optimizer decides whether the cache or
> DAG
> >>>>>>>> should
> >>>>>>>>>>>> be
> >>>>>>>>>>>>> used
> >>>>>>>>>>>>> d = a.filter(...) // Optimizer decides whether the cache or
> DAG
> >>>>>>>> should
> >>>>>>>>>>>> be
> >>>>>>>>>>>>> used
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> a.unCache()
> >>>>>>>>>>>>> a.unCache() // no-op
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> In case 1, semantic wise, optimizer lose the option to choose
> >>>>>>>> between
> >>>>>>>>>>>> DAG
> >>>>>>>>>>>>> and cache. And the unCache() call becomes tricky.
> >>>>>>>>>>>>> In case 2, users do not need to worry about whether cache or
> >>> DAG
> >>>>>>> is
> >>>>>>>>>>>> used.
> >>>>>>>>>>>>> And the unCache() semantic is clear. However, the caveat is
> >>> that
> >>>>>>>> users
> >>>>>>>>>>>>> cannot explicitly ignore the cache.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> In order to address the issues mentioned in case 2 and
> >>> inspired by
> >>>>>>>> the
> >>>>>>>>>>>>> discussion so far, I am thinking about using hint to allow
> user
> >>>>>>>>>>>> explicitly
> >>>>>>>>>>>>> ignore cache. Although we do not have hint yet, but we
> probably
> >>>>>>>> should
> >>>>>>>>>>>> have
> >>>>>>>>>>>>> one. So the code becomes:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> *Case 3: returning this table*
> >>>>>>>>>>>>> b = a.map()
> >>>>>>>>>>>>> a.cache()
> >>>>>>>>>>>>> a.cache() // no-op
> >>>>>>>>>>>>> b.print() // Just to make sure a is cached
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> c = a.filter(...) // Optimizer decides whether the cache or
> DAG
> >>>>>>>> should
> >>>>>>>>>>>> be
> >>>>>>>>>>>>> used
> >>>>>>>>>>>>> d = a.hint("ignoreCache").filter(...) // DAG will be used
> >>> instead
> >>>>>>> of
> >>>>>>>>> the
> >>>>>>>>>>>>> cache.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> a.unCache()
> >>>>>>>>>>>>> a.unCache() // no-op
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> We could also let cache() return this table to allow chained
> >>>>>>> method
> >>>>>>>>>>>> calls.
> >>>>>>>>>>>>> Do you think this API addresses the concerns?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, Dec 5, 2018 at 10:55 AM Jark Wu <imj...@gmail.com>
> >>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> All the recent discussions are focused on whether there is a
> >>>>>>>> problem
> >>>>>>>>> if
> >>>>>>>>>>>>>> cache() not return a Table.
> >>>>>>>>>>>>>> It seems that returning a Table explicitly is more clear
> (and
> >>>>>>>> safe?).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> So whether there are any problems if cache() returns a
> Table?
> >>>>>>>>> @Becket
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Tue, 4 Dec 2018 at 22:27, Till Rohrmann <
> >>> trohrm...@apache.org
> >>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> It's true that b, c, d and e will all read from the
> original
> >>> DAG
> >>>>>>>>> that
> >>>>>>>>>>>>>>> generates a. But all subsequent operators (when running
> >>> multiple
> >>>>>>>>>>>> queries)
> >>>>>>>>>>>>>>> which reference cachedTableA should not need to reproduce
> `a`
> >>>>>>> but
> >>>>>>>>>>>>>> directly
> >>>>>>>>>>>>>>> consume the intermediate result.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Conceptually one could think of cache() as introducing a
> >>> caching
> >>>>>>>>>>>> operator
> >>>>>>>>>>>>>>> from which you need to consume from if you want to benefit
> >>> from
> >>>>>>>> the
> >>>>>>>>>>>>>> caching
> >>>>>>>>>>>>>>> functionality.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I agree, ideally the optimizer makes this kind of decision
> >>> which
> >>>>>>>>>>>>>>> intermediate result should be cached. But especially when
> >>>>>>>> executing
> >>>>>>>>>>>>>> ad-hoc
> >>>>>>>>>>>>>>> queries the user might better know which results need to be
> >>>>>>> cached
> >>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>> Flink might not see the full DAG. In that sense, I would
> >>>>>>> consider
> >>>>>>>>> the
> >>>>>>>>>>>>>>> cache() method as a hint for the optimizer. Of course, in
> the
> >>>>>>>> future
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>> might add functionality which tries to automatically cache
> >>>>>>> results
> >>>>>>>>>>>> (e.g.
> >>>>>>>>>>>>>>> caching the latest intermediate results until so and so
> much
> >>>>>>> space
> >>>>>>>>> is
> >>>>>>>>>>>>>>> used). But this should hopefully not contradict with
> >>>>>>> `CachedTable
> >>>>>>>>>>>>>> cache()`.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>> Till
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 2:33 PM Becket Qin <
> >>> becket....@gmail.com
> >>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Till,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for the clarification. I am still a little
> confused.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> If cache() returns a CachedTable, the example might
> become:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> b = a.map(...)
> >>>>>>>>>>>>>>>> c = a.map(...)
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> cachedTableA = a.cache()
> >>>>>>>>>>>>>>>> d = cachedTableA.map(...)
> >>>>>>>>>>>>>>>> e = a.map()
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> In the above case, if cache() is lazily evaluated, b, c, d
> >>> and
> >>>>>>> e
> >>>>>>>>> are
> >>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>> going to be reading from the original DAG that generates
> a.
> >>> But
> >>>>>>>>> with
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>>> naive expectation, d should be reading from the cache.
> This
> >>>>>>> seems
> >>>>>>>>> not
> >>>>>>>>>>>>>>>> solving the potential confusion you raised, right?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Just to be clear, my understanding are all based on the
> >>>>>>>> assumption
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> tables are immutable. Therefore, after a.cache(), a the
> >>>>>>>>>>>> c*achedTableA*
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> original table *a * should be completely interchangeable.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> That said, I think a valid argument is optimization. There
> >>> are
> >>>>>>>>> indeed
> >>>>>>>>>>>>>>> cases
> >>>>>>>>>>>>>>>> that reading from the original DAG could be faster than
> >>> reading
> >>>>>>>>> from
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> cache. For example, in the following example:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> a.filter(f1' > 100)
> >>>>>>>>>>>>>>>> a.cache()
> >>>>>>>>>>>>>>>> b = a.filter(f1' < 100)
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Ideally the optimizer should be intelligent enough to
> decide
> >>>>>>>> which
> >>>>>>>>>>>> way
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>> faster, without user intervention. In this case, it will
> >>>>>>> identify
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>> b
> >>>>>>>>>>>>>>>> would just be an empty table, thus skip reading from the
> >>> cache
> >>>>>>>>>>>>>>> completely.
> >>>>>>>>>>>>>>>> But I agree that returning a CachedTable would give user
> the
> >>>>>>>>> control
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>>> when to use cache, even though I still feel that letting
> the
> >>>>>>>>>>>> optimizer
> >>>>>>>>>>>>>>>> handle this is a better option in long run.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 6:51 PM Till Rohrmann <
> >>>>>>>> trohrm...@apache.org
> >>>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Yes you are right Becket that it still depends on the
> >>> actual
> >>>>>>>>>>>>>> execution
> >>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> the job whether a consumer reads from a cached result or
> >>> not.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> My point was actually about the properties of a (cached
> vs.
> >>>>>>>>>>>>>> non-cached)
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> not about the execution. I would not make cache trigger
> the
> >>>>>>>>>>>> execution
> >>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> the job because one loses some flexibility by eagerly
> >>>>>>> triggering
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>> execution.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I tried to argue for an explicit CachedTable which is
> >>> returned
> >>>>>>>> by
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> cache() method like Piotr did in order to make the API
> more
> >>>>>>>>>>>> explicit.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>> Till
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 4:23 PM Becket Qin <
> >>>>>>> becket....@gmail.com
> >>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Till,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> That is a good example. Just a minor correction, in this
> >>>>>>> case,
> >>>>>>>>> b, c
> >>>>>>>>>>>>>>>> and d
> >>>>>>>>>>>>>>>>>> will all consume from a non-cached a. This is because
> >>> cache
> >>>>>>>> will
> >>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> created on the very first job submission that generates
> >>> the
> >>>>>>>> table
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> cached.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> If I understand correctly, this is example is about
> >>> whether
> >>>>>>>>>>>>>> .cache()
> >>>>>>>>>>>>>>>>> method
> >>>>>>>>>>>>>>>>>> should be eagerly evaluated or lazily evaluated. In
> >>> another
> >>>>>>>> word,
> >>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>> cache() method actually triggers a job that creates the
> >>>>>>> cache,
> >>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>> be no such confusion. Is that right?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> In the example, although d will not consume from the
> >>> cached
> >>>>>>>> Table
> >>>>>>>>>>>>>>> while
> >>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>> looks supposed to, from correctness perspective the code
> >>> will
> >>>>>>>>> still
> >>>>>>>>>>>>>>>>> return
> >>>>>>>>>>>>>>>>>> correct result, assuming that tables are immutable.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Personally I feel it is OK because users probably won't
> >>>>>>> really
> >>>>>>>>>>>>>> worry
> >>>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>> whether the table is cached or not. And lazy cache could
> >>>>>>> avoid
> >>>>>>>>> some
> >>>>>>>>>>>>>>>>>> unnecessary caching if a cached table is never created
> in
> >>> the
> >>>>>>>>> user
> >>>>>>>>>>>>>>>>>> application. But I am not opposed to do eager evaluation
> >>> of
> >>>>>>>>> cache.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 10:01 PM Till Rohrmann <
> >>>>>>>>>>>>>> trohrm...@apache.org>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Another argument for Piotr's point is that lazily
> >>> changing
> >>>>>>>>>>>>>>> properties
> >>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> node affects all down stream consumers but does not
> >>>>>>>> necessarily
> >>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> happen before these consumers are defined. From a
> user's
> >>>>>>>>>>>>>>> perspective
> >>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>> can be quite confusing:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> b = a.map(...)
> >>>>>>>>>>>>>>>>>>> c = a.map(...)
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> a.cache()
> >>>>>>>>>>>>>>>>>>> d = a.map(...)
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> now b, c and d will consume from a cached operator. In
> >>> this
> >>>>>>>>> case,
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> user
> >>>>>>>>>>>>>>>>>>> would most likely expect that only d reads from a
> cached
> >>>>>>>> result.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>> Till
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 11:32 AM Piotr Nowojski <
> >>>>>>>>>>>>>>>>> pi...@data-artisans.com>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hey Shaoxuan and Becket,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Can you explain a bit more one what are the side
> >>> effects?
> >>>>>>> So
> >>>>>>>>>>>>>>> far
> >>>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>>>>> understanding is that such side effects only exist
> if a
> >>>>>>>> table
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> mutable.
> >>>>>>>>>>>>>>>>>>>>> Is that the case?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Not only that. There are also performance implications
> >>> and
> >>>>>>>>>>>>>> those
> >>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>> another implicit side effects of using `void cache()`.
> >>> As I
> >>>>>>>>>>>>>> wrote
> >>>>>>>>>>>>>>>>>> before,
> >>>>>>>>>>>>>>>>>>>> reading from cache might not always be desirable, thus
> >>> it
> >>>>>>> can
> >>>>>>>>>>>>>>> cause
> >>>>>>>>>>>>>>>>>>>> performance degradation and I’m fine with that -
> user's
> >>> or
> >>>>>>>>>>>>>>>>> optimiser’s
> >>>>>>>>>>>>>>>>>>>> choice. What I do not like is that this implicit side
> >>>>>>> effect
> >>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>> manifest
> >>>>>>>>>>>>>>>>>>>> in completely different part of code, that wasn’t
> >>> touched
> >>>>>>> by
> >>>>>>>> a
> >>>>>>>>>>>>>>> user
> >>>>>>>>>>>>>>>>>> while
> >>>>>>>>>>>>>>>>>>>> he was adding `void cache()` call somewhere else. And
> >>> even
> >>>>>>> if
> >>>>>>>>>>>>>>>> caching
> >>>>>>>>>>>>>>>>>>>> improves performance, it’s still a side effect of
> `void
> >>>>>>>>>>>>>> cache()`.
> >>>>>>>>>>>>>>>>>> Almost
> >>>>>>>>>>>>>>>>>>>> from the definition `void` methods have only side
> >>> effects.
> >>>>>>>> As I
> >>>>>>>>>>>>>>>> wrote
> >>>>>>>>>>>>>>>>>>>> before, there are couple of scenarios where this might
> >>> be
> >>>>>>>>>>>>>>>> undesirable
> >>>>>>>>>>>>>>>>>>>> and/or unexpected, for example:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 1.
> >>>>>>>>>>>>>>>>>>>> Table b = …;
> >>>>>>>>>>>>>>>>>>>> b.cache()
> >>>>>>>>>>>>>>>>>>>> x = b.join(…)
> >>>>>>>>>>>>>>>>>>>> y = b.count()
> >>>>>>>>>>>>>>>>>>>> // ...
> >>>>>>>>>>>>>>>>>>>> // 100
> >>>>>>>>>>>>>>>>>>>> // hundred
> >>>>>>>>>>>>>>>>>>>> // lines
> >>>>>>>>>>>>>>>>>>>> // of
> >>>>>>>>>>>>>>>>>>>> // code
> >>>>>>>>>>>>>>>>>>>> // later
> >>>>>>>>>>>>>>>>>>>> z = b.filter(…).groupBy(…) // this might be even
> hidden
> >>> in
> >>>>>>> a
> >>>>>>>>>>>>>>>>> different
> >>>>>>>>>>>>>>>>>>>> method/file/package/dependency
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Table b = ...
> >>>>>>>>>>>>>>>>>>>> If (some_condition) {
> >>>>>>>>>>>>>>>>>>>> foo(b)
> >>>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>>>> Else {
> >>>>>>>>>>>>>>>>>>>> bar(b)
> >>>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>>>> z = b.filter(…).groupBy(…)
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Void foo(Table b) {
> >>>>>>>>>>>>>>>>>>>> b.cache()
> >>>>>>>>>>>>>>>>>>>> // do something with b
> >>>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> In both above examples, `b.cache()` will implicitly
> >>> affect
> >>>>>>>>>>>>>>>> (semantic
> >>>>>>>>>>>>>>>>>> of a
> >>>>>>>>>>>>>>>>>>>> program in case of sources being mutable and
> >>> performance)
> >>>>>>> `z
> >>>>>>>> =
> >>>>>>>>>>>>>>>>>>>> b.filter(…).groupBy(…)` which might be far from
> obvious.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On top of that, there is still this argument of mine
> >>> that
> >>>>>>>>>>>>>> having
> >>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> `MaterializedTable` or `CachedTable` handle is more
> >>>>>>> flexible
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>> us
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> future and for the user (as a manual option to bypass
> >>> cache
> >>>>>>>>>>>>>>> reads).
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> But Jiangjie is correct,
> >>>>>>>>>>>>>>>>>>>>> the source table in batching should be immutable. It
> is
> >>>>>>> the
> >>>>>>>>>>>>>>>> user’s
> >>>>>>>>>>>>>>>>>>>>> responsibility to ensure it, otherwise even a regular
> >>>>>>>>>>>>>> failover
> >>>>>>>>>>>>>>>> may
> >>>>>>>>>>>>>>>>>> lead
> >>>>>>>>>>>>>>>>>>>>> to inconsistent results.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Yes, I agree that’s what perfect world/good deployment
> >>>>>>> should
> >>>>>>>>>>>>>> be.
> >>>>>>>>>>>>>>>> But
> >>>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>> often isn’t and while I’m not trying to fix this
> (since
> >>> the
> >>>>>>>>>>>>>>> proper
> >>>>>>>>>>>>>>>>> fix
> >>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> to support transactions), I’m just trying to minimise
> >>>>>>>> confusion
> >>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> users that are not fully aware what’s going on and
> >>> operate
> >>>>>>> in
> >>>>>>>>>>>>>>> less
> >>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>> perfect setup. And if something bites them after
> adding
> >>>>>>>>>>>>>>> `b.cache()`
> >>>>>>>>>>>>>>>>>> call,
> >>>>>>>>>>>>>>>>>>>> to make sure that they at least know all of the places
> >>> that
> >>>>>>>>>>>>>>> adding
> >>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>> line can affect.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks, Piotrek
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On 1 Dec 2018, at 15:39, Becket Qin <
> >>> becket....@gmail.com
> >>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Piotrek,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks again for the clarification. Some more replies
> >>> are
> >>>>>>>>>>>>>>>>> following.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> But keep in mind that `.cache()` will/might not only
> be
> >>>>>>> used
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> interactive
> >>>>>>>>>>>>>>>>>>>>>> programming and not only in batching.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> It is true. Actually in stream processing, cache()
> has
> >>> the
> >>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>> semantic
> >>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>> batch processing. The semantic is following:
> >>>>>>>>>>>>>>>>>>>>> For a table created via a series of computation, save
> >>> that
> >>>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> later
> >>>>>>>>>>>>>>>>>>>>> reference to avoid running the computation logic to
> >>>>>>>>>>>>>> regenerate
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> table.
> >>>>>>>>>>>>>>>>>>>>> Once the application exits, drop all the cache.
> >>>>>>>>>>>>>>>>>>>>> This semantic is same for both batch and stream
> >>>>>>> processing.
> >>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>> difference
> >>>>>>>>>>>>>>>>>>>>> is that stream applications will only run once as
> they
> >>> are
> >>>>>>>>>>>>>> long
> >>>>>>>>>>>>>>>>>>> running.
> >>>>>>>>>>>>>>>>>>>>> And the batch applications may be run multiple times,
> >>>>>>> hence
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> cache
> >>>>>>>>>>>>>>>>>>> may
> >>>>>>>>>>>>>>>>>>>>> be created and dropped each time the application
> runs.
> >>>>>>>>>>>>>>>>>>>>> Admittedly, there will probably be some resource
> >>>>>>> management
> >>>>>>>>>>>>>>>>>>> requirements
> >>>>>>>>>>>>>>>>>>>>> for the streaming cached table, such as time based /
> >>> size
> >>>>>>>>>>>>>> based
> >>>>>>>>>>>>>>>>>>>> retention,
> >>>>>>>>>>>>>>>>>>>>> to address the infinite data issue. But such
> >>> requirement
> >>>>>>>> does
> >>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>> change
> >>>>>>>>>>>>>>>>>>>>> the semantic.
> >>>>>>>>>>>>>>>>>>>>> You are right that interactive programming is just
> one
> >>> use
> >>>>>>>>>>>>>> case
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> cache().
> >>>>>>>>>>>>>>>>>>>>> It is not the only use case.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> For me the more important issue is of not having the
> >>> `void
> >>>>>>>>>>>>>>>> cache()`
> >>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>> side effects.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> This is indeed the key point. The argument around
> >>> whether
> >>>>>>>>>>>>>>> cache()
> >>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>> return something already indicates that cache() and
> >>>>>>>>>>>>>>> materialize()
> >>>>>>>>>>>>>>>>>>> address
> >>>>>>>>>>>>>>>>>>>>> different issues.
> >>>>>>>>>>>>>>>>>>>>> Can you explain a bit more one what are the side
> >>> effects?
> >>>>>>> So
> >>>>>>>>>>>>>>> far
> >>>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>>>>> understanding is that such side effects only exist
> if a
> >>>>>>>> table
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> mutable.
> >>>>>>>>>>>>>>>>>>>>> Is that the case?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I don’t know, probably initially we should make
> >>>>>>> CachedTable
> >>>>>>>>>>>>>>>>>> read-only.
> >>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>> don’t find it more confusing than the fact that user
> >>> can
> >>>>>>>> not
> >>>>>>>>>>>>>>>> write
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> views
> >>>>>>>>>>>>>>>>>>>>>> or materialised views in SQL or that user currently
> >>> can
> >>>>>>> not
> >>>>>>>>>>>>>>>> write
> >>>>>>>>>>>>>>>>>> to a
> >>>>>>>>>>>>>>>>>>>>>> Table.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I don't think anyone should insert something to a
> >>> cache.
> >>>>>>> By
> >>>>>>>>>>>>>>>>>> definition
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> cache should only be updated when the corresponding
> >>>>>>> original
> >>>>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>> updated. What I am wondering is that given the
> >>> following
> >>>>>>> two
> >>>>>>>>>>>>>>>> facts:
> >>>>>>>>>>>>>>>>>>>>> 1. If and only if a table is mutable (with something
> >>> like
> >>>>>>>>>>>>>>>>> insert()),
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>> CachedTable may have implicit behavior.
> >>>>>>>>>>>>>>>>>>>>> 2. A CachedTable extends a Table.
> >>>>>>>>>>>>>>>>>>>>> We can come to the conclusion that a CachedTable is
> >>>>>>> mutable
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> users
> >>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>> insert into the CachedTable directly. This is where I
> >>>>>>>> thought
> >>>>>>>>>>>>>>>>>>> confusing.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski <
> >>>>>>>>>>>>>>>>>> pi...@data-artisans.com
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Regarding naming `cache()` vs `materialize()`. One
> >>> more
> >>>>>>>>>>>>>>>>> explanation
> >>>>>>>>>>>>>>>>>>> why
> >>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>> think `materialize()` is more natural to me is that
> I
> >>>>>>> think
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>>> “Table”s
> >>>>>>>>>>>>>>>>>>>>>> in Table-API as views. They behave the same way as
> SQL
> >>>>>>>>>>>>>> views,
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>> difference for me is that their live scope is short
> -
> >>>>>>>>>>>>>> current
> >>>>>>>>>>>>>>>>>> session
> >>>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>> is limited by different execution model. That’s why
> >>>>>>>>>>>>>> “cashing”
> >>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> view
> >>>>>>>>>>>>>>>>>>>> for me
> >>>>>>>>>>>>>>>>>>>>>> is just materialising it.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> However I see and I understand your point of view.
> >>> Coming
> >>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>> DataSet/DataStream and generally speaking non-SQL
> >>> world,
> >>>>>>>>>>>>>>>> `cache()`
> >>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>>> natural. But keep in mind that `.cache()` will/might
> >>> not
> >>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> used
> >>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> interactive programming and not only in batching.
> But
> >>>>>>>> naming
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> one
> >>>>>>>>>>>>>>>>>>>> issue,
> >>>>>>>>>>>>>>>>>>>>>> and not that critical to me. Especially that once we
> >>>>>>>>>>>>>> implement
> >>>>>>>>>>>>>>>>>> proper
> >>>>>>>>>>>>>>>>>>>>>> materialised views, we can always deprecate/rename
> >>>>>>>> `cache()`
> >>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>> deem
> >>>>>>>>>>>>>>>>>>>> so.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For me the more important issue is of not having the
> >>>>>>> `void
> >>>>>>>>>>>>>>>>> cache()`
> >>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>> side effects. Exactly for the reasons that you have
> >>>>>>>>>>>>>> mentioned.
> >>>>>>>>>>>>>>>>> True:
> >>>>>>>>>>>>>>>>>>>>>> results might be non deterministic if underlying
> >>> source
> >>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>> changing.
> >>>>>>>>>>>>>>>>>>>>>> Problem is that `void cache()` implicitly changes
> the
> >>>>>>>>>>>>>> semantic
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> subsequent uses of the cached/materialized Table. It
> >>> can
> >>>>>>>>>>>>>> cause
> >>>>>>>>>>>>>>>>> “wtf”
> >>>>>>>>>>>>>>>>>>>> moment
> >>>>>>>>>>>>>>>>>>>>>> for a user if he inserts “b.cache()” call in some
> >>> place
> >>>>>>> in
> >>>>>>>>>>>>>> his
> >>>>>>>>>>>>>>>>> code
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> suddenly some other random places are behaving
> >>>>>>> differently.
> >>>>>>>>>>>>>> If
> >>>>>>>>>>>>>>>>>>>>>> `materialize()` or `cache()` returns a Table handle,
> >>> we
> >>>>>>>>>>>>>> force
> >>>>>>>>>>>>>>>> user
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> explicitly use the cache which removes the “random”
> >>> part
> >>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> "suddenly
> >>>>>>>>>>>>>>>>>>>>>> some other random places are behaving differently”.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> This argument and others that I’ve raised (greater
> >>>>>>>>>>>>>>>>>>> flexibility/allowing
> >>>>>>>>>>>>>>>>>>>>>> user to explicitly bypass the cache) are independent
> >>> of
> >>>>>>>>>>>>>>>> `cache()`
> >>>>>>>>>>>>>>>>> vs
> >>>>>>>>>>>>>>>>>>>>>> `materialize()` discussion.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Does that mean one can also insert into the
> >>> CachedTable?
> >>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>> sounds
> >>>>>>>>>>>>>>>>>>>>>> pretty confusing.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I don’t know, probably initially we should make
> >>>>>>> CachedTable
> >>>>>>>>>>>>>>>>>>> read-only. I
> >>>>>>>>>>>>>>>>>>>>>> don’t find it more confusing than the fact that user
> >>> can
> >>>>>>>> not
> >>>>>>>>>>>>>>>> write
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> views
> >>>>>>>>>>>>>>>>>>>>>> or materialised views in SQL or that user currently
> >>> can
> >>>>>>> not
> >>>>>>>>>>>>>>>> write
> >>>>>>>>>>>>>>>>>> to a
> >>>>>>>>>>>>>>>>>>>>>> Table.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Piotrek
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On 30 Nov 2018, at 17:38, Xingcan Cui <
> >>>>>>> xingc...@gmail.com
> >>>>>>>>>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I agree with @Becket that `cache()` and
> >>> `materialize()`
> >>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>> considered as two different methods where the later
> >>> one
> >>>>>>> is
> >>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>>> sophisticated.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> According to my understanding, the initial idea is
> >>> just
> >>>>>>> to
> >>>>>>>>>>>>>>>>>> introduce
> >>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> simple cache or persist mechanism, but as the
> TableAPI
> >>>>>>> is a
> >>>>>>>>>>>>>>>>>> high-level
> >>>>>>>>>>>>>>>>>>>> API,
> >>>>>>>>>>>>>>>>>>>>>> it’s naturally for as to think in a SQL way.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Maybe we can add the `cache()` method to the
> DataSet
> >>> API
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> force
> >>>>>>>>>>>>>>>>>>>> users
> >>>>>>>>>>>>>>>>>>>>>> to translate a Table to a Dataset before caching it.
> >>> Then
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> users
> >>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>> manually register the cached dataset to a table
> again
> >>> (we
> >>>>>>>>>>>>>> may
> >>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>>> table replacement mechanisms for datasets with an
> >>>>>>> identical
> >>>>>>>>>>>>>>>> schema
> >>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>> different contents here). After all, it’s the
> dataset
> >>>>>>>> rather
> >>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> dynamic table that need to be cached, right?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>> Xingcan
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Nov 30, 2018, at 10:57 AM, Becket Qin <
> >>>>>>>>>>>>>>>> becket....@gmail.com>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek and Jark,
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback and explanation. Those are
> >>> good
> >>>>>>>>>>>>>>>>> arguments.
> >>>>>>>>>>>>>>>>>>>> But I
> >>>>>>>>>>>>>>>>>>>>>>>> think those arguments are mostly about
> materialized
> >>>>>>> view.
> >>>>>>>>>>>>>>> Let
> >>>>>>>>>>>>>>>> me
> >>>>>>>>>>>>>>>>>> try
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> explain the reason I believe cache() and
> >>> materialize()
> >>>>>>>> are
> >>>>>>>>>>>>>>>>>>> different.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I think cache() and materialize() have quite
> >>> different
> >>>>>>>>>>>>>>>>>> implications.
> >>>>>>>>>>>>>>>>>>>> An
> >>>>>>>>>>>>>>>>>>>>>>>> analogy I can think of is save()/publish(). When
> >>> users
> >>>>>>>>>>>>>> call
> >>>>>>>>>>>>>>>>>> cache(),
> >>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>> just like they are saving an intermediate result
> as
> >>> a
> >>>>>>>>>>>>>> draft
> >>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> their
> >>>>>>>>>>>>>>>>>>>>>> work,
> >>>>>>>>>>>>>>>>>>>>>>>> this intermediate result may not have any
> realistic
> >>>>>>>>>>>>>> meaning.
> >>>>>>>>>>>>>>>>>> Calling
> >>>>>>>>>>>>>>>>>>>>>>>> cache() does not mean users want to publish the
> >>> cached
> >>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>>>>> manner.
> >>>>>>>>>>>>>>>>>>>>>>>> But when users call materialize(), that means "I
> >>> have
> >>>>>>>>>>>>>>>> something
> >>>>>>>>>>>>>>>>>>>>>> meaningful
> >>>>>>>>>>>>>>>>>>>>>>>> to be reused by others", now users need to think
> >>> about
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>> validation,
> >>>>>>>>>>>>>>>>>>>>>>>> update & versioning, lifecycle of the result, etc.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Piotrek's suggestions on variations of the
> >>>>>>> materialize()
> >>>>>>>>>>>>>>>> methods
> >>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>>>>>>>>> useful. It would be great if Flink have them. The
> >>>>>>> concept
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> materialized
> >>>>>>>>>>>>>>>>>>>>>>>> view is actually a pretty big feature, not to say
> >>> the
> >>>>>>>>>>>>>>> related
> >>>>>>>>>>>>>>>>>> stuff
> >>>>>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>>>>>> triggers/hooks you mentioned earlier. I think the
> >>>>>>>>>>>>>>> materialized
> >>>>>>>>>>>>>>>>>> view
> >>>>>>>>>>>>>>>>>>>>>> itself
> >>>>>>>>>>>>>>>>>>>>>>>> should be discussed in a more thorough and
> >>> systematic
> >>>>>>>>>>>>>>> manner.
> >>>>>>>>>>>>>>>>> And
> >>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>> found
> >>>>>>>>>>>>>>>>>>>>>>>> that discussion is kind of orthogonal and way
> beyond
> >>>>>>>>>>>>>>>> interactive
> >>>>>>>>>>>>>>>>>>>>>>>> programming experience.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> The example you gave was interesting. I still have
> >>> some
> >>>>>>>>>>>>>>>>> questions,
> >>>>>>>>>>>>>>>>>>>>>> though.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Table source = … // some source that scans files
> >>> from a
> >>>>>>>>>>>>>>>>> directory
> >>>>>>>>>>>>>>>>>>>>>>>>> “/foo/bar/“
> >>>>>>>>>>>>>>>>>>>>>>>>> Table t1 = source.groupBy(…).select(…).where(…)
> ….;
> >>>>>>>>>>>>>>>>>>>>>>>>> Table t2 = t1.materialize() // (or `cache()`)
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> t2.count() // initialise cache (if it’s lazily
> >>>>>>>>>>>>>> initialised)
> >>>>>>>>>>>>>>>>>>>>>>>>> int a1 = t1.count()
> >>>>>>>>>>>>>>>>>>>>>>>>> int b1 = t2.count()
> >>>>>>>>>>>>>>>>>>>>>>>>> // something in the background (or we trigger it)
> >>>>>>> writes
> >>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>> files
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>> /foo/bar
> >>>>>>>>>>>>>>>>>>>>>>>>> int a2 = t1.count()
> >>>>>>>>>>>>>>>>>>>>>>>>> int b2 = t2.count()
> >>>>>>>>>>>>>>>>>>>>>>>>> t2.refresh() // possible future extension, not to
> >>> be
> >>>>>>>>>>>>>>>>> implemented
> >>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> initial version
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> what if someone else added some more files to
> >>> /foo/bar
> >>>>>>> at
> >>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>> point?
> >>>>>>>>>>>>>>>>>>>> In
> >>>>>>>>>>>>>>>>>>>>>>>> that case, a3 won't equals to b3, and the result
> >>> become
> >>>>>>>>>>>>>>>>>>>>>> non-deterministic,
> >>>>>>>>>>>>>>>>>>>>>>>> right?
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> int a3 = t1.count()
> >>>>>>>>>>>>>>>>>>>>>>>>> int b3 = t2.count()
> >>>>>>>>>>>>>>>>>>>>>>>>> t2.drop() // another possible future extension,
> >>> manual
> >>>>>>>>>>>>>>>> “cache”
> >>>>>>>>>>>>>>>>>>>> dropping
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> When we talk about interactive programming, in
> most
> >>>>>>>> cases,
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>> talking
> >>>>>>>>>>>>>>>>>>>>>>>> about batch applications. A fundamental assumption
> >>> of
> >>>>>>>> such
> >>>>>>>>>>>>>>>> case
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> source data is complete before the data processing
> >>>>>>>> begins,
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> data
> >>>>>>>>>>>>>>>>>>>>>>>> will not change during the data processing. IMO,
> if
> >>>>>>>>>>>>>>> additional
> >>>>>>>>>>>>>>>>>> rows
> >>>>>>>>>>>>>>>>>>>>>> needs
> >>>>>>>>>>>>>>>>>>>>>>>> to be added to some source during the processing,
> it
> >>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> done
> >>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> ways
> >>>>>>>>>>>>>>>>>>>>>>>> like union the source with another table
> containing
> >>> the
> >>>>>>>>>>>>>> rows
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>> added.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> There are a few cases that computations are
> executed
> >>>>>>>>>>>>>>>> repeatedly
> >>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> changing data source.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> For example, people may run a ML training job
> every
> >>>>>>> hour
> >>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> samples
> >>>>>>>>>>>>>>>>>>>>>>>> newly added in the past hour. In that case, the
> >>> source
> >>>>>>>>>>>>>> data
> >>>>>>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>>>> indeed change. But still, the data remain
> unchanged
> >>>>>>>> within
> >>>>>>>>>>>>>>> one
> >>>>>>>>>>>>>>>>>> run.
> >>>>>>>>>>>>>>>>>>>> And
> >>>>>>>>>>>>>>>>>>>>>>>> usually in that case, the result will need
> >>> versioning,
> >>>>>>>>>>>>>> i.e.
> >>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> given
> >>>>>>>>>>>>>>>>>>>>>>>> result, it tells that the result is a result from
> >>> the
> >>>>>>>>>>>>>> source
> >>>>>>>>>>>>>>>>> data
> >>>>>>>>>>>>>>>>>>> by a
> >>>>>>>>>>>>>>>>>>>>>>>> certain timestamp.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Another example is something like data warehouse.
> In
> >>>>>>> this
> >>>>>>>>>>>>>>>> case,
> >>>>>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>>>>> are a
> >>>>>>>>>>>>>>>>>>>>>>>> few source of original/raw data. On top of those
> >>>>>>> sources,
> >>>>>>>>>>>>>>> many
> >>>>>>>>>>>>>>>>>>>>>> materialized
> >>>>>>>>>>>>>>>>>>>>>>>> view / queries / reports / dashboards can be
> >>> created to
> >>>>>>>>>>>>>>>> generate
> >>>>>>>>>>>>>>>>>>>> derived
> >>>>>>>>>>>>>>>>>>>>>>>> data. Those derived data needs to be updated when
> >>> the
> >>>>>>>>>>>>>>>> underlying
> >>>>>>>>>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>>>>>>>>>> data changes. In that case, the processing logic
> >>> that
> >>>>>>>>>>>>>>> derives
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>>>>>>>>>> data needs to be executed repeatedly to update
> those
> >>>>>>>>>>>>>>>>>> reports/views.
> >>>>>>>>>>>>>>>>>>>>>> Again,
> >>>>>>>>>>>>>>>>>>>>>>>> all those derived data also need to ha
> >>>
> >>>
>
>
>

Reply via email to