Hi Kurt,
Thanks for the comments.
1. How do you identify the CachedTable?
For the current design proposed in FLIP-36, we are using the first
approach you mentioned, where the key of the map is the Cached Table java
object. I think it is fine not to be able to identify another table
representing the same DAG and not using the cached intermediate result
because we want to make the caching table explicit. As mentioned in the
FLIP, the cache API will return a Table object. And the user has to use the
returned Table object to make use of the cached table. The rationale is
that if the user builds the same DAG from scratch with some
TableEnvironment instead of using the cached table object, the user
probably doesn't want to use the cache.
2. How does the CachedTable affect the optimizer?
We try to make the logic dealing with the cached table be as orthogonal to
the optimizer as possible. That's why we introduce a special sink when we
are going to cache a table and a special source when we are going to use a
cached table. This way, we can let the optimizer does it works, and the
logic of modifying the job graph can happen in the job graph generator. We
can recognize the cached node with the special sink and source.
3. What's the effect of calling TableEnvironment.close()?
We introduce the close method to prevent leaking of the cached table when
the user is done with the table environment. Therefore, it makes more sense
that the table environment, including all of its functionality, should not
be used after closing. Otherwise, we should rename the close method to
clearAllCache or something similar.
And thanks for pointing out the use of not existing API used in the given
examples. I have updated the examples in the FLIP accordingly.
Best,
Xuannan
On Jul 16, 2020, 4:15 PM +0800, Kurt Young <[email protected]>, wrote:
Hi Xuanna,
Thanks for the detailed design doc, it described clearly how the API
looks
and how to interact with Flink runtime.
However, the part which relates to SQL's optimizer is kind of blurry. To
be
more precise, I have following questions:
1. How do you identify the CachedTable? I can imagine there would be map
representing the cache, how do you
compare the keys of the map? One approach is they will be compared by
java
objects, which is simple but has
limited scope. For example, users created another table using some
interfaces of TableEnvironment, and the table
is exactly the same as the cached one, you won't be able to identify it.
Another choice is calculating the "signature" or
"diest" of the cached table, which involves string representation of the
whole sub tree represented by the cached table.
I don't think Flink currently provides such a mechanism around Table
though.
2. How does the CachedTable affect the optimizer? Specifically, will you
have a dedicated QueryOperation for it, will you have
a dedicated logical & physical RelNode for it? And I also don't see a
description about how to work with current optimize phases,
from Operation to Calcite rel node, and then to Flink's logical and
physical node, which will be at last translated to Flink's exec node.
There also exists other optimizations such as dead lock breaker, as well
as
sub plan reuse inside the optimizer, I'm not sure whether
the logic dealing with cached tables can be orthogonal to all of these.
Hence I expect you could have a more detailed description here.
3. What's the effect of calling TableEnvironment.close()? You already
explained this would drop all caches this table env has,
could you also explain where other functionality still works for this
table
env? Like can use still create/drop tables/databases/function
through this table env? What happens to the catalog and all temporary
objects of this table env?
One minor comment: I noticed you used some not existing API in the
examples
you gave, like table.collect(), which is a little
misleading.
Best,
Kurt
On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su <[email protected]> wrote:
Hi folks,
I'd like to revive the discussion about FLIP-36 Support Interactive
Programming in Flink Table API
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
The FLIP proposes to add support for interactive programming in Flink
Table API. Specifically, it let users cache the intermediate
results(tables) and use them in the later jobs to avoid recomputing the
intermediate result(tables).
I am looking forward to any opinions and suggestions from the
community.
Best,
Xuannan
On May 7, 2020, 5:40 PM +0800, Xuannan Su <[email protected]>,
wrote:
Hi,
There are some feedbacks from @Timo and @Kurt in the voting thread
for
FLIP-36 and I want to share my thoughts here.
1. How would the FLIP-36 look like after FLIP-84?
I don't think FLIP-84 will affect FLIP-36 from the public API
perspective. Users can call .cache on a table object and the cached
table
will be generated whenever the table job is triggered to execute,
either by
Table#executeInsert or StatementSet#execute. I think that FLIP-36
should
aware of the changes made by FLIP-84, but it shouldn't be a problem.
At the
end of the day, FLIP-36 only requires the ability to add a sink to a
node,
submit a table job with multiple sinks, and replace the cached table
with a
source.
2. How can we support cache in a multi-statement SQL file?
The most intuitive way to support cache in a multi-statement SQL
file is
by using a view, where the view is corresponding to a cached table.
3. Unifying the cached table and materialized views
It is true that the cached table and the materialized view are
similar
in some way. However, I think the materialized view is a more complex
concept. First, a materialized view requires some kind of a refresh
mechanism to synchronize with the table. Secondly, the life cycle of a
materialized view is longer. The materialized view should be accessible
even after the application exits and should be accessible by another
application, while the cached table is only accessible in the
application
where it is created. The cached table is introduced to avoid
recomputation
of an intermediate table to support interactive programming in Flink
Table
API. And I think the materialized view needs more discussion and
certainly
deserves a whole new FLIP.
Please let me know your thought.
Best,
Xuannan
On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <[email protected]>
wrote:
Hi folks,
The FLIP-36 is updated according to the discussion with Becket. In
the
meantime, any comments are very welcome.
If there are no further comments, I would like to start the voting
thread by tomorrow.
Thanks,
Xuannan
On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <[email protected]>
wrote:
Hi Becket,
You are right. It makes sense to treat retry of job 2 as an
ordinary
job. And the config does introduce some unnecessary confusion. Thank
you
for you comment. I will update the FLIP.
Best,
Xuannan
On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <
[email protected]>
wrote:
Hi Xuannan,
If user submits Job 1 and generated a cached intermediate
result. And later
on, user submitted job 2 which should ideally use the
intermediate result.
In that case, if job 2 failed due to missing the intermediate
result, Job 2
should be retried with its full DAG. After that when Job 2
runs,
it will
also re-generate the cache. However, once job 2 has fell
back to
the
original DAG, should it just be treated as an ordinary job
that
follow the
recovery strategy? Having a separate configuration seems a
little
confusing. In another word, re-generating the cache is just a
byproduct of
running the full DAG of job 2, but is not the main purpose.
It
is just like
when job 1 runs to generate cache, it does not have a
separate
config of
retry to make sure the cache is generated. If it fails, it
just
fail like
an ordinary job.
What do you think?
Thanks,
Jiangjie (Becket) Qin
On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <
[email protected]> wrote:
Hi Becket,
The intermediate result will indeed be automatically
re-generated by
resubmitting the original DAG. And that job could fail as
well. In that
case, we need to decide if we should resubmit the original
DAG
to
re-generate the intermediate result or give up and throw an
exception to
the user. And the config is to indicate how many resubmit
should happen
before giving up.
Thanks,
Xuannan
On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <
[email protected]> wrote:
Hi Xuannan,
I am not entirely sure if I understand the cases you
mentioned. The
users
can use the cached table object returned by the
.cache()
method in
other
job and it should read the intermediate result. The
intermediate result
can
gone in the following three cases: 1. the user
explicitly
call the
invalidateCache() method 2. the TableEnvironment is
closed
3. failure
happens on the TM. When that happens, the intermeidate
result will not
be
available unless it is re-generated.
What confused me was that why do we need to have a
*cache.retries.max
*config?
Shouldn't the missing intermediate result always be
automatically
re-generated if it is gone?
Thanks,
Jiangjie (Becket) Qin
On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <
[email protected]>
wrote:
Hi Becket,
Thanks for the comments.
On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <
[email protected]>
wrote:
Hi Xuannan,
Thanks for picking up the FLIP. It looks good to me
overall. Some
quick
comments / questions below:
1. Do we also need changes in the Java API?
Yes, the public interface of Table and TableEnvironment
should be made
in
the Java API.
2. What are the cases that users may want to retry
reading the
intermediate
result? It seems that once the intermediate result
has
gone, it will
not
be
available later without being generated again, right?
I am not entirely sure if I understand the cases you
mentioned. The
users
can use the cached table object returned by the
.cache()
method in
other
job and it should read the intermediate result. The
intermediate result
can
gone in the following three cases: 1. the user
explicitly
call the
invalidateCache() method 2. the TableEnvironment is
closed
3. failure
happens on the TM. When that happens, the intermeidate
result will not
be
available unless it is re-generated.
3. In the "semantic of cache() method" section, the
description "The
semantic of the *cache() *method is a little
different
depending on
whether
auto caching is enabled or not." seems not explained.
This line is actually outdated and should be removed,
as
we are not
adding
the auto caching functionality in this FLIP. Auto
caching
will be added
in
the future, and the semantic of cache() when auto
caching
is enabled
will
be discussed in detail by a new FLIP. I will remove the
descriptor to
avoid
further confusion.
Thanks,
Jiangjie (Becket) Qin
On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
[email protected]>
wrote:
Hi folks,
I'd like to start the discussion about FLIP-36
Support
Interactive
Programming in Flink Table API
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
The FLIP proposes to add support for interactive
programming in
Flink
Table
API. Specifically, it let users cache the
intermediate
results(tables)
and
use them in the later jobs.
Even though the FLIP has been discussed in the
past[1], the FLIP
hasn't
formally passed the vote yet. And some of the
design
and
implementation
detail have to change to incorporates the cluster
partition
proposed
in
FLIP-67[2].
Looking forward to your feedback.
Thanks,
Xuannan
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
[2]
https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E