Hi Xuannan,

sorry for joining the discussion so late. I agree that this is a very nice and useful feature. However, the impact it has to many components in the stack requires more discussion in my opinion.

1) Separation of concerns:
The current design seems to mix different layers. We should make sure that all layers do what they are supposed to do:

1a) The FLIP states: "The cache() method returns a new Table object with a flag set."

The `Table` object is just a thin API class that wraps a `QueryOperation`. Other than that the `Table` object should not contain futher state. The tree of `QueryOperation` should be an immutable, independent data structure that can be passed around and will eventually be passed to the `Planner`.

The mentioned `CacheSink` should be added by the optimizer. It is not the responsibility of the API do perform optimizer-like tasks. A call to `t1.cache()` should simply wrap the original operation into something like `CacheOperation(t1.getQueryOperation)`. A `CacheOperation` could still extend from `QueryOperation` and assign a unique string identifier already. A specialized `StreamTransformation` would be necessary during translation.

1b) The FLIP states: "The default table service stores the metadata in the client (e.g. TableEnvironment)"

`TableEnvironment` is not a client. Similar to `Table`, it is an API class that just delegates to other session components. Currently, the table environment has (again) to many responsibilities that should better be split into other components. The table's `Executor` is the client that performs the cluster communication. But in general it also just delegates to `org.apache.flink.core.execution.PipelineExecutor`. IMO the `PipelineExecutor` is a better fit for a back-and-forth communication to determine existing cluster partitions and modify the job graph. Or even further down the stack, because as far as I know, `PipelineExecutor` works with `StreamGraph`.

`flink-table-api-java` has no dependency to `flink-runtime`. This has been done on purpose.

2) API
I still see the rejected option 2 a good fit to expose this feature.

A `Table.cache(): CachedTable` with `CachedTable.invalidate(): void` and maybe `CachedTable.getId(): String` makes the feature and its operations very explicit. It also avoids following up questions such as:

Will `invalidateCache()` be transitively propagated in `t1.cache().join(t2.cache()).invalidateCache()`?

Or as the FLIP states:

`Table t3 = t1.select(...) // cache will NOT be used.`
but
`t1.invalidateCache() // cache will be released`

This sounds a bit contradicting to me. Because sometimes the `t1.cache()` has implications on t1 and sometimes not.

3) Big picture

After reading the FLIP, I still don't understand how a user can configure or control the table service. Will we offer options through `TableConfig` or `TableEnvironment` or is this configuration done via ConfigOptions for lower layers?

4) SQL integration

As I mentioned earlier, we should think about a SQL integration as well. Otherwise we need to redesign the Java API to align it with SQL later. SQL has also a bigger user base than Table API. Let's assume we introduce a new keyword and combine the caching with regular CREATE VIEW syntax such as:
`CREATE CACHED TEMPORARY VIEW MyTable AS SELECT *`
This would align well with:
`tEnv.registerTemporaryView("MyTable", table.cache())`

What do you think?

4) TableEnvironment.close()

Will `TableEnvironment` implement `AutoCloseable`?


In summary, I think the FLIP should go into more details how this effort affects each layer. Because a lot of the interfaces are `@Public` or `@PublicEvolving`. And the FLIP still leaves a lot of questions how this high level concept ends up in JobGraph.

Regards,
Timo



On 30.07.20 09:00, Xuannan Su wrote:
Hi folks,

It seems that all the raised concerns so far have been resolved. I plan to 
start a voting thread for FLIP-36 early next week if there are no comments.

Thanks,
Xuannan
On Jul 28, 2020, 7:42 PM +0800, Xuannan Su <suxuanna...@gmail.com>, wrote:
Hi Kurt,

Thanks for the comments.

You are right that the FLIP lacks a proper discussion about the impact of the 
optimizer. I have added the section to talk about how the cache table works 
with the optimizer. I hope this could resolve your concern. Please let me know 
if you have any further comments.

Best,
Xuannan
On Jul 22, 2020, 4:36 PM +0800, Kurt Young <ykt...@gmail.com>, wrote:
Thanks for the reply, I have one more comment about the optimizer
affection. Even if you are
trying to make the cached table be as orthogonal to the optimizer as
possible by introducing
a special sink, it is still not clear why this approach is safe. Maybe you
can add some process
introduction from API to JobGraph, otherwise I can't make sure everyone
reviewing the design
doc will have the same imagination about this. And I'm also quite sure some
of the existing
mechanism will be affected by this special sink, e.g. multi sink
optimization.

Best,
Kurt


On Wed, Jul 22, 2020 at 2:31 PM Xuannan Su <suxuanna...@gmail.com> wrote:

Hi Kurt,

Thanks for the comments.

1. How do you identify the CachedTable?
For the current design proposed in FLIP-36, we are using the first
approach you mentioned, where the key of the map is the Cached Table java
object. I think it is fine not to be able to identify another table
representing the same DAG and not using the cached intermediate result
because we want to make the caching table explicit. As mentioned in the
FLIP, the cache API will return a Table object. And the user has to use the
returned Table object to make use of the cached table. The rationale is
that if the user builds the same DAG from scratch with some
TableEnvironment instead of using the cached table object, the user
probably doesn't want to use the cache.

2. How does the CachedTable affect the optimizer?
We try to make the logic dealing with the cached table be as orthogonal to
the optimizer as possible. That's why we introduce a special sink when we
are going to cache a table and a special source when we are going to use a
cached table. This way, we can let the optimizer does it works, and the
logic of modifying the job graph can happen in the job graph generator. We
can recognize the cached node with the special sink and source.

3. What's the effect of calling TableEnvironment.close()?
We introduce the close method to prevent leaking of the cached table when
the user is done with the table environment. Therefore, it makes more sense
that the table environment, including all of its functionality, should not
be used after closing. Otherwise, we should rename the close method to
clearAllCache or something similar.

And thanks for pointing out the use of not existing API used in the given
examples. I have updated the examples in the FLIP accordingly.

Best,
Xuannan
On Jul 16, 2020, 4:15 PM +0800, Kurt Young <ykt...@gmail.com>, wrote:
Hi Xuanna,

Thanks for the detailed design doc, it described clearly how the API
looks
and how to interact with Flink runtime.
However, the part which relates to SQL's optimizer is kind of blurry. To
be
more precise, I have following questions:

1. How do you identify the CachedTable? I can imagine there would be map
representing the cache, how do you
compare the keys of the map? One approach is they will be compared by
java
objects, which is simple but has
limited scope. For example, users created another table using some
interfaces of TableEnvironment, and the table
is exactly the same as the cached one, you won't be able to identify it.
Another choice is calculating the "signature" or
"diest" of the cached table, which involves string representation of the
whole sub tree represented by the cached table.
I don't think Flink currently provides such a mechanism around Table
though.

2. How does the CachedTable affect the optimizer? Specifically, will you
have a dedicated QueryOperation for it, will you have
a dedicated logical & physical RelNode for it? And I also don't see a
description about how to work with current optimize phases,
from Operation to Calcite rel node, and then to Flink's logical and
physical node, which will be at last translated to Flink's exec node.
There also exists other optimizations such as dead lock breaker, as well
as
sub plan reuse inside the optimizer, I'm not sure whether
the logic dealing with cached tables can be orthogonal to all of these.
Hence I expect you could have a more detailed description here.

3. What's the effect of calling TableEnvironment.close()? You already
explained this would drop all caches this table env has,
could you also explain where other functionality still works for this
table
env? Like can use still create/drop tables/databases/function
through this table env? What happens to the catalog and all temporary
objects of this table env?

One minor comment: I noticed you used some not existing API in the
examples
you gave, like table.collect(), which is a little
misleading.

Best,
Kurt


On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su <suxuanna...@gmail.com> wrote:

Hi folks,

I'd like to revive the discussion about FLIP-36 Support Interactive
Programming in Flink Table API


https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink

The FLIP proposes to add support for interactive programming in Flink
Table API. Specifically, it let users cache the intermediate
results(tables) and use them in the later jobs to avoid recomputing the
intermediate result(tables).

I am looking forward to any opinions and suggestions from the
community.

Best,
Xuannan
On May 7, 2020, 5:40 PM +0800, Xuannan Su <suxuanna...@gmail.com>,
wrote:
Hi,

There are some feedbacks from @Timo and @Kurt in the voting thread
for
FLIP-36 and I want to share my thoughts here.

1. How would the FLIP-36 look like after FLIP-84?
I don't think FLIP-84 will affect FLIP-36 from the public API
perspective. Users can call .cache on a table object and the cached
table
will be generated whenever the table job is triggered to execute,
either by
Table#executeInsert or StatementSet#execute. I think that FLIP-36
should
aware of the changes made by FLIP-84, but it shouldn't be a problem.
At the
end of the day, FLIP-36 only requires the ability to add a sink to a
node,
submit a table job with multiple sinks, and replace the cached table
with a
source.

2. How can we support cache in a multi-statement SQL file?
The most intuitive way to support cache in a multi-statement SQL
file is
by using a view, where the view is corresponding to a cached table.

3. Unifying the cached table and materialized views
It is true that the cached table and the materialized view are
similar
in some way. However, I think the materialized view is a more complex
concept. First, a materialized view requires some kind of a refresh
mechanism to synchronize with the table. Secondly, the life cycle of a
materialized view is longer. The materialized view should be accessible
even after the application exits and should be accessible by another
application, while the cached table is only accessible in the
application
where it is created. The cached table is introduced to avoid
recomputation
of an intermediate table to support interactive programming in Flink
Table
API. And I think the materialized view needs more discussion and
certainly
deserves a whole new FLIP.

Please let me know your thought.

Best,
Xuannan

On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <suxuanna...@gmail.com>
wrote:
Hi folks,

The FLIP-36 is updated according to the discussion with Becket. In
the
meantime, any comments are very welcome.

If there are no further comments, I would like to start the voting
thread by tomorrow.

Thanks,
Xuannan


On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <suxuanna...@gmail.com>
wrote:
Hi Becket,

You are right. It makes sense to treat retry of job 2 as an
ordinary
job. And the config does introduce some unnecessary confusion. Thank
you
for you comment. I will update the FLIP.

Best,
Xuannan

On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <
becket....@gmail.com>
wrote:
Hi Xuannan,

If user submits Job 1 and generated a cached intermediate
result. And later
on, user submitted job 2 which should ideally use the
intermediate result.
In that case, if job 2 failed due to missing the intermediate
result, Job 2
should be retried with its full DAG. After that when Job 2
runs,
it will
also re-generate the cache. However, once job 2 has fell
back to
the
original DAG, should it just be treated as an ordinary job
that
follow the
recovery strategy? Having a separate configuration seems a
little
confusing. In another word, re-generating the cache is just a
byproduct of
running the full DAG of job 2, but is not the main purpose.
It
is just like
when job 1 runs to generate cache, it does not have a
separate
config of
retry to make sure the cache is generated. If it fails, it
just
fail like
an ordinary job.

What do you think?

Thanks,

Jiangjie (Becket) Qin

On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <
suxuanna...@gmail.com> wrote:

Hi Becket,

The intermediate result will indeed be automatically
re-generated by
resubmitting the original DAG. And that job could fail as
well. In that
case, we need to decide if we should resubmit the original
DAG
to
re-generate the intermediate result or give up and throw an
exception to
the user. And the config is to indicate how many resubmit
should happen
before giving up.

Thanks,
Xuannan

On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <
becket....@gmail.com> wrote:

Hi Xuannan,

I am not entirely sure if I understand the cases you
mentioned. The
users
can use the cached table object returned by the
.cache()
method in
other
job and it should read the intermediate result. The
intermediate result
can
gone in the following three cases: 1. the user
explicitly
call the
invalidateCache() method 2. the TableEnvironment is
closed
3. failure
happens on the TM. When that happens, the intermeidate
result will not
be
available unless it is re-generated.


What confused me was that why do we need to have a
*cache.retries.max
*config?
Shouldn't the missing intermediate result always be
automatically
re-generated if it is gone?

Thanks,

Jiangjie (Becket) Qin


On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <
suxuanna...@gmail.com>
wrote:

Hi Becket,

Thanks for the comments.

On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <
becket....@gmail.com>
wrote:

Hi Xuannan,

Thanks for picking up the FLIP. It looks good to me
overall. Some
quick
comments / questions below:

1. Do we also need changes in the Java API?


Yes, the public interface of Table and TableEnvironment
should be made
in
the Java API.


2. What are the cases that users may want to retry
reading the
intermediate
result? It seems that once the intermediate result
has
gone, it will
not
be
available later without being generated again, right?


I am not entirely sure if I understand the cases you
mentioned. The
users
can use the cached table object returned by the
.cache()
method in
other
job and it should read the intermediate result. The
intermediate result
can
gone in the following three cases: 1. the user
explicitly
call the
invalidateCache() method 2. the TableEnvironment is
closed
3. failure
happens on the TM. When that happens, the intermeidate
result will not
be
available unless it is re-generated.

3. In the "semantic of cache() method" section, the
description "The
semantic of the *cache() *method is a little
different
depending on
whether
auto caching is enabled or not." seems not explained.


This line is actually outdated and should be removed,
as
we are not
adding
the auto caching functionality in this FLIP. Auto
caching
will be added
in
the future, and the semantic of cache() when auto
caching
is enabled
will
be discussed in detail by a new FLIP. I will remove the
descriptor to
avoid
further confusion.


Thanks,

Jiangjie (Becket) Qin



On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
suxuanna...@gmail.com>
wrote:

Hi folks,

I'd like to start the discussion about FLIP-36
Support
Interactive
Programming in Flink Table API







https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink

The FLIP proposes to add support for interactive
programming in
Flink
Table
API. Specifically, it let users cache the
intermediate
results(tables)
and
use them in the later jobs.

Even though the FLIP has been discussed in the
past[1], the FLIP
hasn't
formally passed the vote yet. And some of the
design
and
implementation
detail have to change to incorporates the cluster
partition
proposed
in
FLIP-67[2].

Looking forward to your feedback.

Thanks,
Xuannan

[1]







https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
[2]







https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E









Reply via email to