Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2022-01-07 Thread Francesco Guardiani
+1 with a separate repo and +1 with the flink-storage name

On Fri, Jan 7, 2022 at 8:40 AM Jingsong Li  wrote:

> Hi everyone,
>
> Vote for create a separate sub project for FLIP-188 thread is here:
> https://lists.apache.org/thread/wzzhr27cvrh6w107bn464m1m1ycfll1z
>
> Best,
> Jingsong
>
>
> On Fri, Jan 7, 2022 at 3:30 PM Jingsong Li  wrote:
> >
> > Hi Timo,
> >
> > I think we can consider exposing to DataStream users in the future, if
> > the API definition is clear after.
> > I am fine with `flink-table-store` too.
> > But I tend to prefer shorter and clearer name:
> > `flink-store`.
> >
> > I think I can create a separate thread to vote.
> >
> > Looking forward to your thoughts!
> >
> > Best,
> > Jingsong
> >
> >
> > On Thu, Dec 30, 2021 at 9:48 PM Timo Walther  wrote:
> > >
> > > +1 for a separate repository. And also +1 for finding a good name.
> > >
> > > `flink-warehouse` would be definitely a good marketing name but I agree
> > > that we should not start marketing for code bases. Are we planning to
> > > make this storage also available to DataStream API users? If not, I
> > > would also vote for `flink-managed-table` or better:
> `flink-table-store`
> > >
> > > Thanks,
> > > Timo
> > >
> > >
> > >
> > > On 29.12.21 07:58, Jingsong Li wrote:
> > > > Thanks Till for your suggestions.
> > > >
> > > > Personally, I like flink-warehouse, this is what we want to convey to
> > > > the user, but it indicates a bit too much scope.
> > > >
> > > > How about just calling it flink-store?
> > > > Simply to convey an impression: this is flink's store project,
> > > > providing a built-in store for the flink compute engine, which can be
> > > > used by flink-table as well as flink-datastream.
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Tue, Dec 28, 2021 at 5:15 PM Till Rohrmann 
> wrote:
> > > >>
> > > >> Hi Jingsong,
> > > >>
> > > >> I think that developing flink-dynamic-storage as a separate sub
> project is
> > > >> a very good idea since it allows us to move a lot faster and
> decouple
> > > >> releases from Flink. Hence big +1.
> > > >>
> > > >> Do we want to name it flink-dynamic-storage or shall we use a more
> > > >> descriptive name? dynamic-storage sounds a bit generic to me and I
> wouldn't
> > > >> know that this has something to do with letting Flink manage your
> tables
> > > >> and their storage. I don't have a very good idea but maybe we can
> call it
> > > >> flink-managed-tables, flink-warehouse, flink-olap or so.
> > > >>
> > > >> Cheers,
> > > >> Till
> > > >>
> > > >> On Tue, Dec 28, 2021 at 9:49 AM Martijn Visser <
> mart...@ververica.com>
> > > >> wrote:
> > > >>
> > > >>> Hi Jingsong,
> > > >>>
> > > >>> That sounds promising! +1 from my side to continue development
> under
> > > >>> flink-dynamic-storage as a Flink subproject. I think having a more
> in-depth
> > > >>> interface will benefit everyone.
> > > >>>
> > > >>> Best regards,
> > > >>>
> > > >>> Martijn
> > > >>>
> > > >>> On Tue, 28 Dec 2021 at 04:23, Jingsong Li 
> wrote:
> > > >>>
> > >  Hi all,
> > > 
> > >  After some experimentation, we felt no problem putting the dynamic
> > >  storage outside of flink, and it also allowed us to design the
> > >  interface in more depth.
> > > 
> > >  What do you think? If there is no problem, I am asking for PMC's
> help
> > >  here: we want to propose flink-dynamic-storage as a flink
> subproject,
> > >  and we want to build the project under apache.
> > > 
> > >  Best,
> > >  Jingsong
> > > 
> > > 
> > >  On Wed, Nov 24, 2021 at 8:10 PM Jingsong Li <
> jingsongl...@gmail.com>
> > >  wrote:
> > > >
> > > > Hi Stephan,
> > > >
> > > > Thanks for your reply.
> > > >
> > > > Data never expires automatically.
> > > >
> > > > If there is a need for data retention, the user can choose one
> of the
> > > > following options:
> > > > - In the SQL for querying the managed table, users filter the
> data by
> > >  themselves
> > > > - Define the time partition, and users can delete the expired
> > > > partition by themselves. (DROP PARTITION ...)
> > > > - In the future version, we will support the "DELETE FROM"
> statement,
> > > > users can delete the expired data according to the conditions.
> > > >
> > > > So to answer your question:
> > > >
> > > >> Will the VMQ send retractions so that the data will be removed
> from
> > >  the table (via compactions)?
> > > >
> > > > The current implementation is not sending retraction, which I
> think
> > > > theoretically should be sent, currently the user can filter by
> > > > subsequent conditions.
> > > > And yes, the subscriber would not see strictly a correct result.
> I
> > > > think this is something we can improve for Flink SQL.
> > > >
> > > >> Do we want time retention semantics handled by the compaction?
> > > >
> > > > Currently, no, Data never 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2022-01-06 Thread Jingsong Li
Hi everyone,

Vote for create a separate sub project for FLIP-188 thread is here:
https://lists.apache.org/thread/wzzhr27cvrh6w107bn464m1m1ycfll1z

Best,
Jingsong


On Fri, Jan 7, 2022 at 3:30 PM Jingsong Li  wrote:
>
> Hi Timo,
>
> I think we can consider exposing to DataStream users in the future, if
> the API definition is clear after.
> I am fine with `flink-table-store` too.
> But I tend to prefer shorter and clearer name:
> `flink-store`.
>
> I think I can create a separate thread to vote.
>
> Looking forward to your thoughts!
>
> Best,
> Jingsong
>
>
> On Thu, Dec 30, 2021 at 9:48 PM Timo Walther  wrote:
> >
> > +1 for a separate repository. And also +1 for finding a good name.
> >
> > `flink-warehouse` would be definitely a good marketing name but I agree
> > that we should not start marketing for code bases. Are we planning to
> > make this storage also available to DataStream API users? If not, I
> > would also vote for `flink-managed-table` or better: `flink-table-store`
> >
> > Thanks,
> > Timo
> >
> >
> >
> > On 29.12.21 07:58, Jingsong Li wrote:
> > > Thanks Till for your suggestions.
> > >
> > > Personally, I like flink-warehouse, this is what we want to convey to
> > > the user, but it indicates a bit too much scope.
> > >
> > > How about just calling it flink-store?
> > > Simply to convey an impression: this is flink's store project,
> > > providing a built-in store for the flink compute engine, which can be
> > > used by flink-table as well as flink-datastream.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, Dec 28, 2021 at 5:15 PM Till Rohrmann  
> > > wrote:
> > >>
> > >> Hi Jingsong,
> > >>
> > >> I think that developing flink-dynamic-storage as a separate sub project 
> > >> is
> > >> a very good idea since it allows us to move a lot faster and decouple
> > >> releases from Flink. Hence big +1.
> > >>
> > >> Do we want to name it flink-dynamic-storage or shall we use a more
> > >> descriptive name? dynamic-storage sounds a bit generic to me and I 
> > >> wouldn't
> > >> know that this has something to do with letting Flink manage your tables
> > >> and their storage. I don't have a very good idea but maybe we can call it
> > >> flink-managed-tables, flink-warehouse, flink-olap or so.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Tue, Dec 28, 2021 at 9:49 AM Martijn Visser 
> > >> wrote:
> > >>
> > >>> Hi Jingsong,
> > >>>
> > >>> That sounds promising! +1 from my side to continue development under
> > >>> flink-dynamic-storage as a Flink subproject. I think having a more 
> > >>> in-depth
> > >>> interface will benefit everyone.
> > >>>
> > >>> Best regards,
> > >>>
> > >>> Martijn
> > >>>
> > >>> On Tue, 28 Dec 2021 at 04:23, Jingsong Li  
> > >>> wrote:
> > >>>
> >  Hi all,
> > 
> >  After some experimentation, we felt no problem putting the dynamic
> >  storage outside of flink, and it also allowed us to design the
> >  interface in more depth.
> > 
> >  What do you think? If there is no problem, I am asking for PMC's help
> >  here: we want to propose flink-dynamic-storage as a flink subproject,
> >  and we want to build the project under apache.
> > 
> >  Best,
> >  Jingsong
> > 
> > 
> >  On Wed, Nov 24, 2021 at 8:10 PM Jingsong Li 
> >  wrote:
> > >
> > > Hi Stephan,
> > >
> > > Thanks for your reply.
> > >
> > > Data never expires automatically.
> > >
> > > If there is a need for data retention, the user can choose one of the
> > > following options:
> > > - In the SQL for querying the managed table, users filter the data by
> >  themselves
> > > - Define the time partition, and users can delete the expired
> > > partition by themselves. (DROP PARTITION ...)
> > > - In the future version, we will support the "DELETE FROM" statement,
> > > users can delete the expired data according to the conditions.
> > >
> > > So to answer your question:
> > >
> > >> Will the VMQ send retractions so that the data will be removed from
> >  the table (via compactions)?
> > >
> > > The current implementation is not sending retraction, which I think
> > > theoretically should be sent, currently the user can filter by
> > > subsequent conditions.
> > > And yes, the subscriber would not see strictly a correct result. I
> > > think this is something we can improve for Flink SQL.
> > >
> > >> Do we want time retention semantics handled by the compaction?
> > >
> > > Currently, no, Data never expires automatically.
> > >
> > >> Do we want to declare those types of queries "out of scope" 
> > >> initially?
> > >
> > > I think we want users to be able to use three options above to
> > > accomplish their requirements.
> > >
> > > I will update FLIP to make the definition clearer and more explicit.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Wed, Nov 24, 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2022-01-06 Thread Jingsong Li
Hi Timo,

I think we can consider exposing to DataStream users in the future, if
the API definition is clear after.
I am fine with `flink-table-store` too.
But I tend to prefer shorter and clearer name:
`flink-store`.

I think I can create a separate thread to vote.

Looking forward to your thoughts!

Best,
Jingsong


On Thu, Dec 30, 2021 at 9:48 PM Timo Walther  wrote:
>
> +1 for a separate repository. And also +1 for finding a good name.
>
> `flink-warehouse` would be definitely a good marketing name but I agree
> that we should not start marketing for code bases. Are we planning to
> make this storage also available to DataStream API users? If not, I
> would also vote for `flink-managed-table` or better: `flink-table-store`
>
> Thanks,
> Timo
>
>
>
> On 29.12.21 07:58, Jingsong Li wrote:
> > Thanks Till for your suggestions.
> >
> > Personally, I like flink-warehouse, this is what we want to convey to
> > the user, but it indicates a bit too much scope.
> >
> > How about just calling it flink-store?
> > Simply to convey an impression: this is flink's store project,
> > providing a built-in store for the flink compute engine, which can be
> > used by flink-table as well as flink-datastream.
> >
> > Best,
> > Jingsong
> >
> > On Tue, Dec 28, 2021 at 5:15 PM Till Rohrmann  wrote:
> >>
> >> Hi Jingsong,
> >>
> >> I think that developing flink-dynamic-storage as a separate sub project is
> >> a very good idea since it allows us to move a lot faster and decouple
> >> releases from Flink. Hence big +1.
> >>
> >> Do we want to name it flink-dynamic-storage or shall we use a more
> >> descriptive name? dynamic-storage sounds a bit generic to me and I wouldn't
> >> know that this has something to do with letting Flink manage your tables
> >> and their storage. I don't have a very good idea but maybe we can call it
> >> flink-managed-tables, flink-warehouse, flink-olap or so.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Tue, Dec 28, 2021 at 9:49 AM Martijn Visser 
> >> wrote:
> >>
> >>> Hi Jingsong,
> >>>
> >>> That sounds promising! +1 from my side to continue development under
> >>> flink-dynamic-storage as a Flink subproject. I think having a more 
> >>> in-depth
> >>> interface will benefit everyone.
> >>>
> >>> Best regards,
> >>>
> >>> Martijn
> >>>
> >>> On Tue, 28 Dec 2021 at 04:23, Jingsong Li  wrote:
> >>>
>  Hi all,
> 
>  After some experimentation, we felt no problem putting the dynamic
>  storage outside of flink, and it also allowed us to design the
>  interface in more depth.
> 
>  What do you think? If there is no problem, I am asking for PMC's help
>  here: we want to propose flink-dynamic-storage as a flink subproject,
>  and we want to build the project under apache.
> 
>  Best,
>  Jingsong
> 
> 
>  On Wed, Nov 24, 2021 at 8:10 PM Jingsong Li 
>  wrote:
> >
> > Hi Stephan,
> >
> > Thanks for your reply.
> >
> > Data never expires automatically.
> >
> > If there is a need for data retention, the user can choose one of the
> > following options:
> > - In the SQL for querying the managed table, users filter the data by
>  themselves
> > - Define the time partition, and users can delete the expired
> > partition by themselves. (DROP PARTITION ...)
> > - In the future version, we will support the "DELETE FROM" statement,
> > users can delete the expired data according to the conditions.
> >
> > So to answer your question:
> >
> >> Will the VMQ send retractions so that the data will be removed from
>  the table (via compactions)?
> >
> > The current implementation is not sending retraction, which I think
> > theoretically should be sent, currently the user can filter by
> > subsequent conditions.
> > And yes, the subscriber would not see strictly a correct result. I
> > think this is something we can improve for Flink SQL.
> >
> >> Do we want time retention semantics handled by the compaction?
> >
> > Currently, no, Data never expires automatically.
> >
> >> Do we want to declare those types of queries "out of scope" initially?
> >
> > I think we want users to be able to use three options above to
> > accomplish their requirements.
> >
> > I will update FLIP to make the definition clearer and more explicit.
> >
> > Best,
> > Jingsong
> >
> > On Wed, Nov 24, 2021 at 5:01 AM Stephan Ewen 
>  wrote:
> >>
> >> Thanks for digging into this.
> >> Regarding this query:
> >>
> >> INSERT INTO the_table
> >>SELECT window_end, COUNT(*)
> >>  FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5'
>  MINUTES))
> >> GROUP BY window_end
> >>HAVING now() - window_end <= INTERVAL '14' DAYS;
> >>
> >> I am not sure I understand what the conclusion is on the data
>  retention question, where the continuous streaming SQL 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-12-30 Thread Timo Walther

+1 for a separate repository. And also +1 for finding a good name.

`flink-warehouse` would be definitely a good marketing name but I agree 
that we should not start marketing for code bases. Are we planning to 
make this storage also available to DataStream API users? If not, I 
would also vote for `flink-managed-table` or better: `flink-table-store`


Thanks,
Timo



On 29.12.21 07:58, Jingsong Li wrote:

Thanks Till for your suggestions.

Personally, I like flink-warehouse, this is what we want to convey to
the user, but it indicates a bit too much scope.

How about just calling it flink-store?
Simply to convey an impression: this is flink's store project,
providing a built-in store for the flink compute engine, which can be
used by flink-table as well as flink-datastream.

Best,
Jingsong

On Tue, Dec 28, 2021 at 5:15 PM Till Rohrmann  wrote:


Hi Jingsong,

I think that developing flink-dynamic-storage as a separate sub project is
a very good idea since it allows us to move a lot faster and decouple
releases from Flink. Hence big +1.

Do we want to name it flink-dynamic-storage or shall we use a more
descriptive name? dynamic-storage sounds a bit generic to me and I wouldn't
know that this has something to do with letting Flink manage your tables
and their storage. I don't have a very good idea but maybe we can call it
flink-managed-tables, flink-warehouse, flink-olap or so.

Cheers,
Till

On Tue, Dec 28, 2021 at 9:49 AM Martijn Visser 
wrote:


Hi Jingsong,

That sounds promising! +1 from my side to continue development under
flink-dynamic-storage as a Flink subproject. I think having a more in-depth
interface will benefit everyone.

Best regards,

Martijn

On Tue, 28 Dec 2021 at 04:23, Jingsong Li  wrote:


Hi all,

After some experimentation, we felt no problem putting the dynamic
storage outside of flink, and it also allowed us to design the
interface in more depth.

What do you think? If there is no problem, I am asking for PMC's help
here: we want to propose flink-dynamic-storage as a flink subproject,
and we want to build the project under apache.

Best,
Jingsong


On Wed, Nov 24, 2021 at 8:10 PM Jingsong Li 
wrote:


Hi Stephan,

Thanks for your reply.

Data never expires automatically.

If there is a need for data retention, the user can choose one of the
following options:
- In the SQL for querying the managed table, users filter the data by

themselves

- Define the time partition, and users can delete the expired
partition by themselves. (DROP PARTITION ...)
- In the future version, we will support the "DELETE FROM" statement,
users can delete the expired data according to the conditions.

So to answer your question:


Will the VMQ send retractions so that the data will be removed from

the table (via compactions)?


The current implementation is not sending retraction, which I think
theoretically should be sent, currently the user can filter by
subsequent conditions.
And yes, the subscriber would not see strictly a correct result. I
think this is something we can improve for Flink SQL.


Do we want time retention semantics handled by the compaction?


Currently, no, Data never expires automatically.


Do we want to declare those types of queries "out of scope" initially?


I think we want users to be able to use three options above to
accomplish their requirements.

I will update FLIP to make the definition clearer and more explicit.

Best,
Jingsong

On Wed, Nov 24, 2021 at 5:01 AM Stephan Ewen 

wrote:


Thanks for digging into this.
Regarding this query:

INSERT INTO the_table
   SELECT window_end, COUNT(*)
 FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5'

MINUTES))

GROUP BY window_end
   HAVING now() - window_end <= INTERVAL '14' DAYS;

I am not sure I understand what the conclusion is on the data

retention question, where the continuous streaming SQL query has retention
semantics. I think we would need to answer the following questions (I will
call the query that computed the managed table the "view materializer
query" - VMQ).


(1) I guess the VMQ will send no updates for windows beyond the

"retention period" is over (14 days), as you said. That makes sense.


(2) Will the VMQ send retractions so that the data will be removed

from the table (via compactions)?

   - if yes, this seems semantically better for users, but it will be

expensive to keep the timers for retractions.

   - if not, we can still solve this by adding filters to queries

against the managed table, as long as these queries are in Flink.

   - any subscriber to the changelog stream would not see strictly a

correct result if we are not doing the retractions


(3) Do we want time retention semantics handled by the compaction?
   - if we say that we lazily apply the deletes in the queries that

read the managed tables, then we could also age out the old data during
compaction.

   - that is cheap, but it might be too much of a special case to be

very relevant here.


(4) Do we want to declare 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-12-28 Thread Jingsong Li
Thanks Till for your suggestions.

Personally, I like flink-warehouse, this is what we want to convey to
the user, but it indicates a bit too much scope.

How about just calling it flink-store?
Simply to convey an impression: this is flink's store project,
providing a built-in store for the flink compute engine, which can be
used by flink-table as well as flink-datastream.

Best,
Jingsong

On Tue, Dec 28, 2021 at 5:15 PM Till Rohrmann  wrote:
>
> Hi Jingsong,
>
> I think that developing flink-dynamic-storage as a separate sub project is
> a very good idea since it allows us to move a lot faster and decouple
> releases from Flink. Hence big +1.
>
> Do we want to name it flink-dynamic-storage or shall we use a more
> descriptive name? dynamic-storage sounds a bit generic to me and I wouldn't
> know that this has something to do with letting Flink manage your tables
> and their storage. I don't have a very good idea but maybe we can call it
> flink-managed-tables, flink-warehouse, flink-olap or so.
>
> Cheers,
> Till
>
> On Tue, Dec 28, 2021 at 9:49 AM Martijn Visser 
> wrote:
>
> > Hi Jingsong,
> >
> > That sounds promising! +1 from my side to continue development under
> > flink-dynamic-storage as a Flink subproject. I think having a more in-depth
> > interface will benefit everyone.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Tue, 28 Dec 2021 at 04:23, Jingsong Li  wrote:
> >
> >> Hi all,
> >>
> >> After some experimentation, we felt no problem putting the dynamic
> >> storage outside of flink, and it also allowed us to design the
> >> interface in more depth.
> >>
> >> What do you think? If there is no problem, I am asking for PMC's help
> >> here: we want to propose flink-dynamic-storage as a flink subproject,
> >> and we want to build the project under apache.
> >>
> >> Best,
> >> Jingsong
> >>
> >>
> >> On Wed, Nov 24, 2021 at 8:10 PM Jingsong Li 
> >> wrote:
> >> >
> >> > Hi Stephan,
> >> >
> >> > Thanks for your reply.
> >> >
> >> > Data never expires automatically.
> >> >
> >> > If there is a need for data retention, the user can choose one of the
> >> > following options:
> >> > - In the SQL for querying the managed table, users filter the data by
> >> themselves
> >> > - Define the time partition, and users can delete the expired
> >> > partition by themselves. (DROP PARTITION ...)
> >> > - In the future version, we will support the "DELETE FROM" statement,
> >> > users can delete the expired data according to the conditions.
> >> >
> >> > So to answer your question:
> >> >
> >> > > Will the VMQ send retractions so that the data will be removed from
> >> the table (via compactions)?
> >> >
> >> > The current implementation is not sending retraction, which I think
> >> > theoretically should be sent, currently the user can filter by
> >> > subsequent conditions.
> >> > And yes, the subscriber would not see strictly a correct result. I
> >> > think this is something we can improve for Flink SQL.
> >> >
> >> > > Do we want time retention semantics handled by the compaction?
> >> >
> >> > Currently, no, Data never expires automatically.
> >> >
> >> > > Do we want to declare those types of queries "out of scope" initially?
> >> >
> >> > I think we want users to be able to use three options above to
> >> > accomplish their requirements.
> >> >
> >> > I will update FLIP to make the definition clearer and more explicit.
> >> >
> >> > Best,
> >> > Jingsong
> >> >
> >> > On Wed, Nov 24, 2021 at 5:01 AM Stephan Ewen 
> >> wrote:
> >> > >
> >> > > Thanks for digging into this.
> >> > > Regarding this query:
> >> > >
> >> > > INSERT INTO the_table
> >> > >   SELECT window_end, COUNT(*)
> >> > > FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5'
> >> MINUTES))
> >> > > GROUP BY window_end
> >> > >   HAVING now() - window_end <= INTERVAL '14' DAYS;
> >> > >
> >> > > I am not sure I understand what the conclusion is on the data
> >> retention question, where the continuous streaming SQL query has retention
> >> semantics. I think we would need to answer the following questions (I will
> >> call the query that computed the managed table the "view materializer
> >> query" - VMQ).
> >> > >
> >> > > (1) I guess the VMQ will send no updates for windows beyond the
> >> "retention period" is over (14 days), as you said. That makes sense.
> >> > >
> >> > > (2) Will the VMQ send retractions so that the data will be removed
> >> from the table (via compactions)?
> >> > >   - if yes, this seems semantically better for users, but it will be
> >> expensive to keep the timers for retractions.
> >> > >   - if not, we can still solve this by adding filters to queries
> >> against the managed table, as long as these queries are in Flink.
> >> > >   - any subscriber to the changelog stream would not see strictly a
> >> correct result if we are not doing the retractions
> >> > >
> >> > > (3) Do we want time retention semantics handled by the compaction?
> >> > >   - if we say that we lazily apply the deletes in 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-12-28 Thread Till Rohrmann
Hi Jingsong,

I think that developing flink-dynamic-storage as a separate sub project is
a very good idea since it allows us to move a lot faster and decouple
releases from Flink. Hence big +1.

Do we want to name it flink-dynamic-storage or shall we use a more
descriptive name? dynamic-storage sounds a bit generic to me and I wouldn't
know that this has something to do with letting Flink manage your tables
and their storage. I don't have a very good idea but maybe we can call it
flink-managed-tables, flink-warehouse, flink-olap or so.

Cheers,
Till

On Tue, Dec 28, 2021 at 9:49 AM Martijn Visser 
wrote:

> Hi Jingsong,
>
> That sounds promising! +1 from my side to continue development under
> flink-dynamic-storage as a Flink subproject. I think having a more in-depth
> interface will benefit everyone.
>
> Best regards,
>
> Martijn
>
> On Tue, 28 Dec 2021 at 04:23, Jingsong Li  wrote:
>
>> Hi all,
>>
>> After some experimentation, we felt no problem putting the dynamic
>> storage outside of flink, and it also allowed us to design the
>> interface in more depth.
>>
>> What do you think? If there is no problem, I am asking for PMC's help
>> here: we want to propose flink-dynamic-storage as a flink subproject,
>> and we want to build the project under apache.
>>
>> Best,
>> Jingsong
>>
>>
>> On Wed, Nov 24, 2021 at 8:10 PM Jingsong Li 
>> wrote:
>> >
>> > Hi Stephan,
>> >
>> > Thanks for your reply.
>> >
>> > Data never expires automatically.
>> >
>> > If there is a need for data retention, the user can choose one of the
>> > following options:
>> > - In the SQL for querying the managed table, users filter the data by
>> themselves
>> > - Define the time partition, and users can delete the expired
>> > partition by themselves. (DROP PARTITION ...)
>> > - In the future version, we will support the "DELETE FROM" statement,
>> > users can delete the expired data according to the conditions.
>> >
>> > So to answer your question:
>> >
>> > > Will the VMQ send retractions so that the data will be removed from
>> the table (via compactions)?
>> >
>> > The current implementation is not sending retraction, which I think
>> > theoretically should be sent, currently the user can filter by
>> > subsequent conditions.
>> > And yes, the subscriber would not see strictly a correct result. I
>> > think this is something we can improve for Flink SQL.
>> >
>> > > Do we want time retention semantics handled by the compaction?
>> >
>> > Currently, no, Data never expires automatically.
>> >
>> > > Do we want to declare those types of queries "out of scope" initially?
>> >
>> > I think we want users to be able to use three options above to
>> > accomplish their requirements.
>> >
>> > I will update FLIP to make the definition clearer and more explicit.
>> >
>> > Best,
>> > Jingsong
>> >
>> > On Wed, Nov 24, 2021 at 5:01 AM Stephan Ewen 
>> wrote:
>> > >
>> > > Thanks for digging into this.
>> > > Regarding this query:
>> > >
>> > > INSERT INTO the_table
>> > >   SELECT window_end, COUNT(*)
>> > > FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5'
>> MINUTES))
>> > > GROUP BY window_end
>> > >   HAVING now() - window_end <= INTERVAL '14' DAYS;
>> > >
>> > > I am not sure I understand what the conclusion is on the data
>> retention question, where the continuous streaming SQL query has retention
>> semantics. I think we would need to answer the following questions (I will
>> call the query that computed the managed table the "view materializer
>> query" - VMQ).
>> > >
>> > > (1) I guess the VMQ will send no updates for windows beyond the
>> "retention period" is over (14 days), as you said. That makes sense.
>> > >
>> > > (2) Will the VMQ send retractions so that the data will be removed
>> from the table (via compactions)?
>> > >   - if yes, this seems semantically better for users, but it will be
>> expensive to keep the timers for retractions.
>> > >   - if not, we can still solve this by adding filters to queries
>> against the managed table, as long as these queries are in Flink.
>> > >   - any subscriber to the changelog stream would not see strictly a
>> correct result if we are not doing the retractions
>> > >
>> > > (3) Do we want time retention semantics handled by the compaction?
>> > >   - if we say that we lazily apply the deletes in the queries that
>> read the managed tables, then we could also age out the old data during
>> compaction.
>> > >   - that is cheap, but it might be too much of a special case to be
>> very relevant here.
>> > >
>> > > (4) Do we want to declare those types of queries "out of scope"
>> initially?
>> > >   - if yes, how many users are we affecting? (I guess probably not
>> many, but would be good to hear some thoughts from others on this)
>> > >   - should we simply reject such queries in the optimizer as "not
>> possible to support in managed tables"? I would suggest that, always better
>> to tell users exactly what works and what not, rather than letting them be
>> surprised 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-12-28 Thread Martijn Visser
Hi Jingsong,

That sounds promising! +1 from my side to continue development under
flink-dynamic-storage as a Flink subproject. I think having a more in-depth
interface will benefit everyone.

Best regards,

Martijn

On Tue, 28 Dec 2021 at 04:23, Jingsong Li  wrote:

> Hi all,
>
> After some experimentation, we felt no problem putting the dynamic
> storage outside of flink, and it also allowed us to design the
> interface in more depth.
>
> What do you think? If there is no problem, I am asking for PMC's help
> here: we want to propose flink-dynamic-storage as a flink subproject,
> and we want to build the project under apache.
>
> Best,
> Jingsong
>
>
> On Wed, Nov 24, 2021 at 8:10 PM Jingsong Li 
> wrote:
> >
> > Hi Stephan,
> >
> > Thanks for your reply.
> >
> > Data never expires automatically.
> >
> > If there is a need for data retention, the user can choose one of the
> > following options:
> > - In the SQL for querying the managed table, users filter the data by
> themselves
> > - Define the time partition, and users can delete the expired
> > partition by themselves. (DROP PARTITION ...)
> > - In the future version, we will support the "DELETE FROM" statement,
> > users can delete the expired data according to the conditions.
> >
> > So to answer your question:
> >
> > > Will the VMQ send retractions so that the data will be removed from
> the table (via compactions)?
> >
> > The current implementation is not sending retraction, which I think
> > theoretically should be sent, currently the user can filter by
> > subsequent conditions.
> > And yes, the subscriber would not see strictly a correct result. I
> > think this is something we can improve for Flink SQL.
> >
> > > Do we want time retention semantics handled by the compaction?
> >
> > Currently, no, Data never expires automatically.
> >
> > > Do we want to declare those types of queries "out of scope" initially?
> >
> > I think we want users to be able to use three options above to
> > accomplish their requirements.
> >
> > I will update FLIP to make the definition clearer and more explicit.
> >
> > Best,
> > Jingsong
> >
> > On Wed, Nov 24, 2021 at 5:01 AM Stephan Ewen 
> wrote:
> > >
> > > Thanks for digging into this.
> > > Regarding this query:
> > >
> > > INSERT INTO the_table
> > >   SELECT window_end, COUNT(*)
> > > FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5'
> MINUTES))
> > > GROUP BY window_end
> > >   HAVING now() - window_end <= INTERVAL '14' DAYS;
> > >
> > > I am not sure I understand what the conclusion is on the data
> retention question, where the continuous streaming SQL query has retention
> semantics. I think we would need to answer the following questions (I will
> call the query that computed the managed table the "view materializer
> query" - VMQ).
> > >
> > > (1) I guess the VMQ will send no updates for windows beyond the
> "retention period" is over (14 days), as you said. That makes sense.
> > >
> > > (2) Will the VMQ send retractions so that the data will be removed
> from the table (via compactions)?
> > >   - if yes, this seems semantically better for users, but it will be
> expensive to keep the timers for retractions.
> > >   - if not, we can still solve this by adding filters to queries
> against the managed table, as long as these queries are in Flink.
> > >   - any subscriber to the changelog stream would not see strictly a
> correct result if we are not doing the retractions
> > >
> > > (3) Do we want time retention semantics handled by the compaction?
> > >   - if we say that we lazily apply the deletes in the queries that
> read the managed tables, then we could also age out the old data during
> compaction.
> > >   - that is cheap, but it might be too much of a special case to be
> very relevant here.
> > >
> > > (4) Do we want to declare those types of queries "out of scope"
> initially?
> > >   - if yes, how many users are we affecting? (I guess probably not
> many, but would be good to hear some thoughts from others on this)
> > >   - should we simply reject such queries in the optimizer as "not
> possible to support in managed tables"? I would suggest that, always better
> to tell users exactly what works and what not, rather than letting them be
> surprised in the end. Users can still remove the HAVING clause if they want
> the query to run, and that would be better than if the VMQ just silently
> ignores those semantics.
> > >
> > > Thanks,
> > > Stephan
> > >
> >
> >
> > --
> > Best, Jingsong Lee
>
>
>
> --
> Best, Jingsong Lee
>


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-12-27 Thread Jingsong Li
Hi all,

After some experimentation, we felt no problem putting the dynamic
storage outside of flink, and it also allowed us to design the
interface in more depth.

What do you think? If there is no problem, I am asking for PMC's help
here: we want to propose flink-dynamic-storage as a flink subproject,
and we want to build the project under apache.

Best,
Jingsong


On Wed, Nov 24, 2021 at 8:10 PM Jingsong Li  wrote:
>
> Hi Stephan,
>
> Thanks for your reply.
>
> Data never expires automatically.
>
> If there is a need for data retention, the user can choose one of the
> following options:
> - In the SQL for querying the managed table, users filter the data by 
> themselves
> - Define the time partition, and users can delete the expired
> partition by themselves. (DROP PARTITION ...)
> - In the future version, we will support the "DELETE FROM" statement,
> users can delete the expired data according to the conditions.
>
> So to answer your question:
>
> > Will the VMQ send retractions so that the data will be removed from the 
> > table (via compactions)?
>
> The current implementation is not sending retraction, which I think
> theoretically should be sent, currently the user can filter by
> subsequent conditions.
> And yes, the subscriber would not see strictly a correct result. I
> think this is something we can improve for Flink SQL.
>
> > Do we want time retention semantics handled by the compaction?
>
> Currently, no, Data never expires automatically.
>
> > Do we want to declare those types of queries "out of scope" initially?
>
> I think we want users to be able to use three options above to
> accomplish their requirements.
>
> I will update FLIP to make the definition clearer and more explicit.
>
> Best,
> Jingsong
>
> On Wed, Nov 24, 2021 at 5:01 AM Stephan Ewen  wrote:
> >
> > Thanks for digging into this.
> > Regarding this query:
> >
> > INSERT INTO the_table
> >   SELECT window_end, COUNT(*)
> > FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5' MINUTES))
> > GROUP BY window_end
> >   HAVING now() - window_end <= INTERVAL '14' DAYS;
> >
> > I am not sure I understand what the conclusion is on the data retention 
> > question, where the continuous streaming SQL query has retention semantics. 
> > I think we would need to answer the following questions (I will call the 
> > query that computed the managed table the "view materializer query" - VMQ).
> >
> > (1) I guess the VMQ will send no updates for windows beyond the "retention 
> > period" is over (14 days), as you said. That makes sense.
> >
> > (2) Will the VMQ send retractions so that the data will be removed from the 
> > table (via compactions)?
> >   - if yes, this seems semantically better for users, but it will be 
> > expensive to keep the timers for retractions.
> >   - if not, we can still solve this by adding filters to queries against 
> > the managed table, as long as these queries are in Flink.
> >   - any subscriber to the changelog stream would not see strictly a correct 
> > result if we are not doing the retractions
> >
> > (3) Do we want time retention semantics handled by the compaction?
> >   - if we say that we lazily apply the deletes in the queries that read the 
> > managed tables, then we could also age out the old data during compaction.
> >   - that is cheap, but it might be too much of a special case to be very 
> > relevant here.
> >
> > (4) Do we want to declare those types of queries "out of scope" initially?
> >   - if yes, how many users are we affecting? (I guess probably not many, 
> > but would be good to hear some thoughts from others on this)
> >   - should we simply reject such queries in the optimizer as "not possible 
> > to support in managed tables"? I would suggest that, always better to tell 
> > users exactly what works and what not, rather than letting them be 
> > surprised in the end. Users can still remove the HAVING clause if they want 
> > the query to run, and that would be better than if the VMQ just silently 
> > ignores those semantics.
> >
> > Thanks,
> > Stephan
> >
>
>
> --
> Best, Jingsong Lee



--
Best, Jingsong Lee


[jira] [Created] (FLINK-25152) [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-12-02 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-25152:


 Summary: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table 
Storage
 Key: FLINK-25152
 URL: https://issues.apache.org/jira/browse/FLINK-25152
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API, Table SQL / Ecosystem
Reporter: Jingsong Lee
 Fix For: 1.15.0


introduce built-in storage support for dynamic table, a truly unified changelog 
& table representation, from Flink SQL’s perspective. tHE storage will improve 
the usability a lot.

More detail see: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-24 Thread Jingsong Li
Hi Stephan,

Thanks for your reply.

Data never expires automatically.

If there is a need for data retention, the user can choose one of the
following options:
- In the SQL for querying the managed table, users filter the data by themselves
- Define the time partition, and users can delete the expired
partition by themselves. (DROP PARTITION ...)
- In the future version, we will support the "DELETE FROM" statement,
users can delete the expired data according to the conditions.

So to answer your question:

> Will the VMQ send retractions so that the data will be removed from the table 
> (via compactions)?

The current implementation is not sending retraction, which I think
theoretically should be sent, currently the user can filter by
subsequent conditions.
And yes, the subscriber would not see strictly a correct result. I
think this is something we can improve for Flink SQL.

> Do we want time retention semantics handled by the compaction?

Currently, no, Data never expires automatically.

> Do we want to declare those types of queries "out of scope" initially?

I think we want users to be able to use three options above to
accomplish their requirements.

I will update FLIP to make the definition clearer and more explicit.

Best,
Jingsong

On Wed, Nov 24, 2021 at 5:01 AM Stephan Ewen  wrote:
>
> Thanks for digging into this.
> Regarding this query:
>
> INSERT INTO the_table
>   SELECT window_end, COUNT(*)
> FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5' MINUTES))
> GROUP BY window_end
>   HAVING now() - window_end <= INTERVAL '14' DAYS;
>
> I am not sure I understand what the conclusion is on the data retention 
> question, where the continuous streaming SQL query has retention semantics. I 
> think we would need to answer the following questions (I will call the query 
> that computed the managed table the "view materializer query" - VMQ).
>
> (1) I guess the VMQ will send no updates for windows beyond the "retention 
> period" is over (14 days), as you said. That makes sense.
>
> (2) Will the VMQ send retractions so that the data will be removed from the 
> table (via compactions)?
>   - if yes, this seems semantically better for users, but it will be 
> expensive to keep the timers for retractions.
>   - if not, we can still solve this by adding filters to queries against the 
> managed table, as long as these queries are in Flink.
>   - any subscriber to the changelog stream would not see strictly a correct 
> result if we are not doing the retractions
>
> (3) Do we want time retention semantics handled by the compaction?
>   - if we say that we lazily apply the deletes in the queries that read the 
> managed tables, then we could also age out the old data during compaction.
>   - that is cheap, but it might be too much of a special case to be very 
> relevant here.
>
> (4) Do we want to declare those types of queries "out of scope" initially?
>   - if yes, how many users are we affecting? (I guess probably not many, but 
> would be good to hear some thoughts from others on this)
>   - should we simply reject such queries in the optimizer as "not possible to 
> support in managed tables"? I would suggest that, always better to tell users 
> exactly what works and what not, rather than letting them be surprised in the 
> end. Users can still remove the HAVING clause if they want the query to run, 
> and that would be better than if the VMQ just silently ignores those 
> semantics.
>
> Thanks,
> Stephan
>


-- 
Best, Jingsong Lee


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-23 Thread Stephan Ewen
Thanks for digging into this.
Regarding this query:

INSERT INTO the_table
  SELECT window_end, COUNT(*)
FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5' MINUTES))
GROUP BY window_end
  HAVING now() - window_end <= INTERVAL '14' DAYS;

I am not sure I understand what the conclusion is on the data retention
question, where the continuous streaming SQL query has retention semantics.
I think we would need to answer the following questions (I will call the
query that computed the managed table the "view materializer query" - VMQ).

(1) I guess the VMQ will send no updates for windows beyond the "retention
period" is over (14 days), as you said. That makes sense.

(2) Will the VMQ send retractions so that the data will be removed from the
table (via compactions)?
  - if yes, this seems semantically better for users, but it will be
expensive to keep the timers for retractions.
  - if not, we can still solve this by adding filters to queries against
the managed table, as long as these queries are in Flink.
  - any subscriber to the changelog stream would not see strictly a correct
result if we are not doing the retractions

(3) Do we want time retention semantics handled by the compaction?
  - if we say that we lazily apply the deletes in the queries that read the
managed tables, then we could also age out the old data during compaction.
  - that is cheap, but it might be too much of a special case to be very
relevant here.

(4) Do we want to declare those types of queries "out of scope" initially?
  - if yes, how many users are we affecting? (I guess probably not many,
but would be good to hear some thoughts from others on this)
  - should we simply reject such queries in the optimizer as "not possible
to support in managed tables"? I would suggest that, always better to tell
users exactly what works and what not, rather than letting them be
surprised in the end. Users can still remove the HAVING clause if they want
the query to run, and that would be better than if the VMQ just silently
ignores those semantics.

Thanks,
Stephan


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-19 Thread Jingsong Li
Hi Stephan,

The discussion with you made me think more deeply about some things.

(3) Log Scan Startup mode.

> As I understand it, that means, though, that any managed table that uses 
> another managed table as a source will automatically always use the "full 
> scan" mode (snapshot + log subscription).

You are right, yes.

(4) Data retention

INSERT INTO the_table
  SELECT window_end, COUNT(*)
FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5' MINUTES))
GROUP BY window_end
  HAVING now() - window_end <= INTERVAL '14' DAYS;

> When a user does "Select *" from that table, they don't want to see old data, 
> it would make the query reading from the managed table incorrect. Who would 
> then filter or would prune the old data? The query maintaining the managed 
> table? The query reading from the managed table?

This is an interesting SQL. According to one understanding of batch,
the uncertainty function is determined before the job is submitted, so
there is no dynamic randomness problem.

But streaming jobs are indeed different, and in the case of this
Query, I prefer to leave the life cycle of the data to the user. And
let his reading query determine the filtering of the data. The Writing
Query only determines the updating of the data (results from 14 days
ago are no longer updated).

Because overall I think this is a query-independent table, but also
query-independent storage, it has its own definition, as long as the
data into it, by it to explain, it is unrelated to the query. If the
query semantics do not want to see historical data, the query should
generate enough retraction.

If it is a materialized view, I prefer that the old data should be retracted.

(6, 7) PK mode and consistency

> If I understand correctly, "transactional" with PK would actually not be 
> correct, it would still contain some duplicates.

Yes, unless the store itself has the ability to query and produce the
correct changelog, which is something we thought about and we can
consider it later. (queries are more costly and we need to consider
them carefully)

> What would be the additional overhead of creating a separate config switch 
> "log consistency" with values "transactional", "eventual".

The transactional mode will bring some benefits:
- semantics unified.
- with pk, the log can produce complete changes (including
UPDATE_BEFORE) too. In scenarios where updating is trustworthy (e.g.,
writing aggregated results), we can provide such a mode to avoid
downstream de-duplication. (Not default mode)

I am OK to add "log consistency".

On Wed, Nov 17, 2021 at 1:15 AM Stephan Ewen  wrote:
>
> Hi Jingsong!
>
> Thank you for all the explanations. To follow up on the points:
>
>
> (1) Log implementation
>
> Good to hear you are looking to make this extensible.
>
>
> (2) change tracking
>
> Understood, makes sense (used for re-processing).
>
>
> (3) Log Scan Startup mode.
>
> Your explanation makes sense.
>
> As I understand it, that means, though, that any managed table that uses 
> another managed table as a source will automatically always use the "full 
> scan" mode (snapshot + log subscription).
>
>
> (4) Data retention
>
> > - The data of the snapshot will never expire, and the user needs to
> > delete the partition by themselves if needed.
> > - The expiration time of the log is unlimited by default, but it can
> > be configured. In fact, we only need the latest log by default,
> > because we have saved the previous snapshot.
>
> Are you referring to cleanup here in the sense of garbage collection, or also 
> the deletion of data that makes the Managed Table semantically wrong?
>
> Let's assume I define a managed table with such a query below, where the 
> "interactions" table is derived from a stream of Kafka records.
>
>  INSERT INTO the_table
>   SELECT window_end, COUNT(*)
> FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5' MINUTES))
> GROUP BY window_end
>   HAVING now() - window_end <= INTERVAL '14' DAYS;
>
> When a user does "Select *" from that table, they don't want to see old data, 
> it would make the query reading from the managed table incorrect. Who would 
> then filter or would prune the old data? The query maintaining the managed 
> table? The query reading from the managed table?
>
> Maybe this isn't something you want to solve in the first version, but it 
> would be good to have a definite answer what the plan in case data retention 
> is part of the query semantics.
>
> If the assumption is that managed tables can never have retention defined in 
> their query semantics (and the assumption is that all filtering is done by 
> the query that reads the managed table), then I think this is a supercritical 
> design property that we need to make very explicit for users to understand.
>
>
> (5) Unified format in the log
>
> Makes sense.
>
>
> (6, 7) PK mode and consistency
>
> I get the technical reason that with PKs there is a case for duplicate 
> filtering, but I am skeptical of 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-16 Thread Stephan Ewen
Hi Jingsong!

Thank you for all the explanations. To follow up on the points:


(1) Log implementation

Good to hear you are looking to make this extensible.


(2) change tracking

Understood, makes sense (used for re-processing).


(3) Log Scan Startup mode.

Your explanation makes sense.

As I understand it, that means, though, that any managed table that uses
another managed table as a source will automatically always use the "full
scan" mode (snapshot + log subscription).


(4) Data retention

> - The data of the snapshot will never expire, and the user needs to
> delete the partition by themselves if needed.
> - The expiration time of the log is unlimited by default, but it can
> be configured. In fact, we only need the latest log by default,
> because we have saved the previous snapshot.

Are you referring to cleanup here in the sense of garbage collection, or
also the deletion of data that makes the Managed Table semantically wrong?

Let's assume I define a managed table with such a query below, where the
"interactions" table is derived from a stream of Kafka records.

 INSERT INTO the_table
  SELECT window_end, COUNT(*)
FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5' MINUTES))
GROUP BY window_end
  HAVING now() - window_end <= INTERVAL '14' DAYS;

When a user does "Select *" from that table, they don't want to see old
data, it would make the query reading from the managed table incorrect. Who
would then filter or would prune the old data? The query maintaining the
managed table? The query reading from the managed table?

Maybe this isn't something you want to solve in the first version, but it
would be good to have a definite answer what the plan in case data
retention is part of the query semantics.

If the assumption is that managed tables can never have retention defined
in their query semantics (and the assumption is that all filtering is done
by the query that reads the managed table), then I think this is a
supercritical design property that we need to make very explicit for users
to understand.


(5) Unified format in the log

Makes sense.


(6, 7) PK mode and consistency

I get the technical reason that with PKs there is a case for duplicate
filtering, but I am skeptical of the user experience if this details
implicitly changes the consistency users see. This may not be relevant for
Flink queries that read from the changelog, but it is relevant for external
programs subscribing to the changelog.

What would be the additional overhead of creating a separate config switch
"log consistency" with values "transactional", "eventual".
By default, this would be transactional regardless of whether the result
has a PK or not. If there is a PK, then users can optimize the latency if
they want by using the "eventual" setting. That means there is not a
surprise for users through the fact that changing the schema of the table
(adding a PK) suddenly changes the consistency semantics.

If I understand correctly, "transactional" with PK would actually not be
correct, it would still contain some duplicates.
But that means we cannot expose a correct changelog to users (external
subscribers) in all cases?

The PK behavior seems like something that needs some general improvement in
the SQL engine.


(8) Optimistic locking.

Thanks for clarifying, that makes sense. I think that would be good to add
to the FLIP. The reason why something is done is as important as what
exactly is being done.


Thanks a lot!
Stephan

On Mon, Nov 15, 2021 at 10:41 AM Jingsong Li  wrote:

> Hi Timo,
>
> > It would be great if we can add not only `Receive any type of changelog`
> but also `Receive any type of datatype`.
>
> Nice, I think we can.
>
> > Please clarify whether the compact DDL is a synchronous or asynchrounous
> operation in the API? So far all DDL was synchrounous. And only DML
> asynchrounous.
>
> It should be a synchronous operation.
>
> > I find this 'change-tracking' = 'false' a bit confusing. Even in batch
> scenarios we have a changelog, only with insert-only changes. Can you
> elaborate? Wouldn't 'exclude-from-log-store' or 'exclude-log-store' or
> 'log.disabled' be more accurate?
>
> Change tracking is from Oracle and snowflake [1][2][3]. It matches the
> "emit-changes" syntax. It means that after closing, the downstream
> consumption cannot obtain the corresponding changes.
>
> > DESCRIBE DETAIL TABLE
>
> +1 to `DESCRIBE TABLE EXTENDED`.
>
> > Set checkpoint interval to 1 min if checkpoint is not enabled
> when the planner detects a sink to built-in dynamic table.
> This sounds like too much magic to me.
>
> You are right. And one minute may not be enough for all situations. +1
> to throw detailed exception to alert user.
>
> > GenericCatalog to `Catalog#supportesTableStorage`
>
> I originally thought about completely distinguishing it from external
> catalog, but it is also possible to add a new method.
>
> > CatalogBaseTable.TableKind
>
> Yes, we can create a new TableKind for this table. 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-15 Thread Jingsong Li
Hi Timo,

> It would be great if we can add not only `Receive any type of changelog` but 
> also `Receive any type of datatype`.

Nice, I think we can.

> Please clarify whether the compact DDL is a synchronous or asynchrounous 
> operation in the API? So far all DDL was synchrounous. And only DML 
> asynchrounous.

It should be a synchronous operation.

> I find this 'change-tracking' = 'false' a bit confusing. Even in batch 
> scenarios we have a changelog, only with insert-only changes. Can you 
> elaborate? Wouldn't 'exclude-from-log-store' or 'exclude-log-store' or 
> 'log.disabled' be more accurate?

Change tracking is from Oracle and snowflake [1][2][3]. It matches the
"emit-changes" syntax. It means that after closing, the downstream
consumption cannot obtain the corresponding changes.

> DESCRIBE DETAIL TABLE

+1 to `DESCRIBE TABLE EXTENDED`.

> Set checkpoint interval to 1 min if checkpoint is not enabled
when the planner detects a sink to built-in dynamic table.
This sounds like too much magic to me.

You are right. And one minute may not be enough for all situations. +1
to throw detailed exception to alert user.

> GenericCatalog to `Catalog#supportesTableStorage`

I originally thought about completely distinguishing it from external
catalog, but it is also possible to add a new method.

> CatalogBaseTable.TableKind

Yes, we can create a new TableKind for this table. Catalog can easily
distinguish them.

> enrichOptions(Context context)
Why is this method returning a Map? Shouldn't the caller
assume that all options enriched via `CatalogTable.copy` should have
been applied by `enrichOptions`?

Yes, the planner adds some options to the table before creating it.

>  Partitioning and Event-time:
Have you considered to support semantics similar to
`sink.partition-commit.trigger` based on `partition-time`. It could
beneficial to have the partitions committed by watermarks as well. My
biggest concern is how we can enable watermarking end-to-end using a
file store (I think for log store this should not be a problem?).

Yes, we can also write watermark to storage, which has many advantages:
- Users can see the progress of a partition.
- Partition commit can have some optimization for this partition, like compact.
- Lookup join a table, we can use the watermark to align.   Many users
complain that the lookup join cannot find data when the lookup table
is not ready, it is difficult to align the main stream and the lookup
table.

But this is not the current blocker. We can improve this in future.

[1] https://docs.snowflake.com/en/user-guide/streams.html
[2] 
https://docs.oracle.com/database/121/ADMQS/GUID-3BAA0D48-CA35-4CD7-810E-50C703DC6FEB.htm
[3] 
https://docs.oracle.com/database/121/DWHSG/advmv.htm#DWHSG-GUID-F7394DFE-7CF6-401C-A312-C36603BEB01B

Best,
Jingsong


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-15 Thread Jingsong Li
Hi Stephan,

###

about PK:

> (6) Different consistency guarantees with PK and without PK
> (7) Eventual Consistency Mode vs. Faster Checkpoints

Let's clarify one thing first:
For sink with PK, Flink SQL cannot generate complete changes (without
UPDATE_BEFORE).
- For databases like mysql, MySQL will generate the UPDATE_BEFORE,
that is, MySQL will query the old value corresponding to each data PK
before generating binlog. Mysql upsert sink does not rely on
checkpoint, during failing, Mysql will read binlog and recover the
database, then get eventual consistency.
- For Upsert-Kakfa, regardless of the consistency guarantee, the
resulting log may not contain a complete/correct UPDATE_BEFORE, so
there needs to be a normalized node downstream to remove the
duplication.

So for us, either option 1 or option 2, we don't have the ability to
query per PK at the moment, so we choose 2, which needs downstream de
duplicated records.
If we enable log transactions for cases with PK, the only advantage is
" transactional consistency vs eventual consistency". I don't know if
the price is reasonable. We are moving towards the fast checkpoint,
which is correct, but it will not arrive so soon. For storage, it also
requires a lot of work to adapt to this mode (This is not very easy,
and efficiency is often considered). This means that the current
default mode is "slow". We need to explain to users under what
scenario they can add option to speed up the pipeline.

> (5) Different formats for cases with PK and without PK

Jing Zhang also mentioned that we may need to expose the log format.
So I am +1 to unify log format with PK and without PK.
We can introduce an option to let users choose debezium-json or
debezium-avro-confluent in future. They are all public formats.

###

about Concurrent Write:

The story is that the streaming pipeline is running all the time, and
it is difficult to control and modify it.
However, in many cases, we need to recalculate the historical data in
the data warehouse. For example, there are some data problems in the
partition 10 days ago. I need to recalculate the whole pipeline. In
this case, we need a batch job pipeline instead of correcting the root
table of the streaming pipeline. Because the streaming job can not
recalculate the data 10 days ago.
- The streaming job state may have expired.
- Computing is not as efficient as batch jobs.

Conceptually, all changes will generate logs, but in this case, we
need to turn off log generation, so this is "Change Tracking".
So yes. it just skip writing to the change log.

Meanwhile, the streaming job is still running, so two jobs are
committing snapshots at the same time.

The lock is just for multiple snapshot commits. For example, the
insert statements and compact will produce new snapshots.

> What exactly is the optimistic locking behavior

Optimism is reflected in the deletion of files. If the file to be
deleted is found missing when committing, it will fail instead of
locking at the beginning.

> what is the lock scoped to? a single query (via jobID)?

Lock scope is database and table name.

> what happens if a system that holds (like an entry in the Hive Metastore 
> Database) a lock crashes and is manually restarted as a new job (from the 
> latest checkpoint)? does that need manual admin intervention to release the 
> lock (manually update the HMS Database)?

If the client crashes, the lock it holds will also expire with the
heartbeat timeout.

Best,
Jingsong


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-14 Thread Jingsong Li
Hi, Stephan and Timo, thanks very much for your replies. I try to
reply to you one by one, split into multiple emails.

First, according to my understanding, from the perspective of database
(mysql) and storage, think about what streaming and batch are:
- Batch is the snapshot, it contains the full amount of records.
- Streaming is the binlog, the log contains “events” that describe
changes to table data.

Binlog helps us do two things:
1. As a write-ahead-log, it helps recover a snapshot in case of
failure, which can reduce the time delay of transaction commit.
2. Downstream businesses can use binlog for database synchronization
(or active & standby synchronization), or directly consume binlog for
further computation.

So back to your comments:

> (1) Log Implementation

I agree with you about log.system and kafka.bootstrap.servers, but I
think log.retention is a common concept and we should hide kafka here.

> (4) Data retention: how and when data is ever cleaned up

- The data of the snapshot will never expire, and the user needs to
delete the partition by themselves if needed.
- The expiration time of the log is unlimited by default, but it can
be configured. In fact, we only need the latest log by default,
because we have saved the previous snapshot.

> (3) "table-storage.log.scan.startup.mode"

I agree with you. By default, of course, read the snapshot first and
then log changes. However, there are many scenarios that require two
other options, such as:
- My scenario is a monitoring scenario. I only need to calculate the
latest data. The old data is outdated and useless to me.
- Today, the data before 8 o'clock has been calculated. I don't need
the previous data. I just start consumption from 8 o'clock, and then
add a filter condition later.

This seems to bring some complexity, but a large number of users will
actually involve the above two similar scenarios.

Actually, even if the user uses "the snapshot first and then log
changes", most scenarios are read incremental data too, but this
increment is defined by the user. For example, the user can stream
read with the following SQL:
SELECT * FROM T WHERE dt >= '2021-11-15';
The "dt" is the partition field of the user-defined table. In this
way, the job only consumes the data starting from today. Storage will
help read, partition pruning and switch to the log.

> Also, this seems to be a per-query setting, more than a global setting, so 
> should this be part of the config with which the query is submitted that 
> reads from the table-storage?

Global settings only affect the create table DDL, after DDL, the table
options will be stored in the Catalog, users can use dynamic table
options to configure reading.

> The names could also be improved a bit

+1

Best,
Jingsong


On Sat, Nov 13, 2021 at 1:31 AM Timo Walther  wrote:
>
> Hi everyone,
>
> sorry for the delay in joining this thread. I went through the FLIP and
> have some comments (maybe overlapping with Stephan's comments, which I
> haven't read yet):
>
> a. > More importantly, in order to solve the cognitive bar...
>
> It would be great if we can add not only `Receive any type of changelog`
> but also `Receive any type of datatype`.
>
>
> b. > COMPACT [...] Compact table for high performance query. Launch a
> job to rewrite files.
>
> Please clarify whether this is a synchronous or asynchrounous operation
> in the API? So far all DDL was synchrounous. And only DML asynchrounous.
>
> c. > 'change-tracking' = 'false'
>
> I find this option a bit confusing. Even in batch scenarios we have a
> changelog, only with insert-only changes. Can you elaborate? Wouldn't
> 'exclude-from-log-store' or 'exclude-log-store' or 'log.disabled' be
> more accurate?
>
> d. > DESCRIBE DETAIL TABLE
>
> This seems very uncommon for SQL. How about `DESCRIBE TABLE EXTENDED`?
>
> e. > Set checkpoint interval to 1 min if checkpoint is not enabled when
> the planner detects a sink to built-in dynamic table.
>
> This sounds like too much magic to me. It will be super hard to debug
> why suddenly checkpointing is enabled. If a user has not configured the
> checkpointing yet, it could lead to unitended behavior without a proper
> checkpoint storage. It would be better to either throw an exception or
> just have weaker consistency guarantees in this case.
>
> f. > GenericCatalog
>
> Why do we need an additional marker interface without any methods in
> there? It only further complicates the catalog interfaces. Isn't a
> `Catalog#supportesTableStorage` enough? Also, we just introduced
> `CatalogBaseTable.TableKind` for exactly such new features. We can add a
> new table kind. A managed table can set this for a `CatalogTable`.
>
> g. > enrichOptions(Context context)
>
> Why is this method returning a Map? Shouldn't the caller
> assume that all options enriched via `CatalogTable.copy` should have
> been applied by `enrichOptions`?
>
>
> h. Partitioning and Event-time:
>
> Have you considered to support semantics similar to
> 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-12 Thread Timo Walther

Hi everyone,

sorry for the delay in joining this thread. I went through the FLIP and 
have some comments (maybe overlapping with Stephan's comments, which I 
haven't read yet):


a. > More importantly, in order to solve the cognitive bar...

It would be great if we can add not only `Receive any type of changelog` 
but also `Receive any type of datatype`.



b. > COMPACT [...] Compact table for high performance query. Launch a 
job to rewrite files.


Please clarify whether this is a synchronous or asynchrounous operation 
in the API? So far all DDL was synchrounous. And only DML asynchrounous.


c. > 'change-tracking' = 'false'

I find this option a bit confusing. Even in batch scenarios we have a 
changelog, only with insert-only changes. Can you elaborate? Wouldn't 
'exclude-from-log-store' or 'exclude-log-store' or 'log.disabled' be 
more accurate?


d. > DESCRIBE DETAIL TABLE

This seems very uncommon for SQL. How about `DESCRIBE TABLE EXTENDED`?

e. > Set checkpoint interval to 1 min if checkpoint is not enabled when 
the planner detects a sink to built-in dynamic table.


This sounds like too much magic to me. It will be super hard to debug 
why suddenly checkpointing is enabled. If a user has not configured the 
checkpointing yet, it could lead to unitended behavior without a proper 
checkpoint storage. It would be better to either throw an exception or 
just have weaker consistency guarantees in this case.


f. > GenericCatalog

Why do we need an additional marker interface without any methods in 
there? It only further complicates the catalog interfaces. Isn't a 
`Catalog#supportesTableStorage` enough? Also, we just introduced 
`CatalogBaseTable.TableKind` for exactly such new features. We can add a 
new table kind. A managed table can set this for a `CatalogTable`.


g. > enrichOptions(Context context)

Why is this method returning a Map? Shouldn't the caller 
assume that all options enriched via `CatalogTable.copy` should have 
been applied by `enrichOptions`?



h. Partitioning and Event-time:

Have you considered to support semantics similar to 
`sink.partition-commit.trigger` based on `partition-time`. It could 
beneficial to have the partitions committed by watermarks as well. My 
biggest concern is how we can enable watermarking end-to-end using a 
file store (I think for log store this should not be a problem?).


Looking forward to your feedback.

Regards,
Timo


On 12.11.21 16:35, Stephan Ewen wrote:

Hi all!

Thank you for the writeup of this feature. I like the general direction a
lot.

There are some open questions and confusing details still, which I think we
need to clarify first to make this feature really good.
Below are questions/suggestions on the FLIP:

Best,
Stephan

===

*(1) Log Implementation*

I agree with Eron that we should not design this hardwired to Kafka. Let's
have the internal interfaces in place to make this open to other streaming
storage systems as well.
The config options seem to be designed in a way that is Kafka-exclusive.
Can we change this, for example to something like
   - storage.log.system=kafka
   - storage.log.kafka.properties.bootstrap.servers
   - storage.log.kafka.retention

*(2) Change Tracking*

I am not sure I understand this fully. When a batch query inserts without
change tracking what happens then?
   - does it skip writing to the change log?
   - does it simply overwrite the managed table with the new result?
   - something different?

*(3) "table-storage.log.scan.startup.mode"*

Somehow the presence of this flag seems to break the abstraction of managed
tables.
Let's say someone creates a managed table that is computed via a query over
another managed table. It would need all the data from the previous table,
or it would be inconsistent.

What is the reason to have this setting? Support cases where one doesn't
need all past data (let's say only data from the previous month)? Exposing
this again somewhat destroys the nice "transparent out of the box"
behavior, because now users need to think again about the incremental
building of the tables. I think that case shows that we miss a bit better
handling of data retention (see next point).

Also, this seems to be a per-query setting, more than a global setting, so
should this be part of the config with which the query is submitted that
reads from the table-storage?

The names could also be improved a bit, I think, for example we could call
it just  "table-storage.log.scan" with values "full", "latest",
"from-timestamp".

*(4) Data retention*

I am wondering how and when data is ever cleaned up.
For example, when the table definition has a time attribute and predicate
so that the managed table should only contain the data from the previous
month. How does old data get cleaned up? Only through deletes coming from
timers in the Flink SQL layer?

I think if we want this to be really good and efficient, we need to look at
dropping data during the compaction. The compaction should know 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-12 Thread Stephan Ewen
Hi all!

Thank you for the writeup of this feature. I like the general direction a
lot.

There are some open questions and confusing details still, which I think we
need to clarify first to make this feature really good.
Below are questions/suggestions on the FLIP:

Best,
Stephan

===

*(1) Log Implementation*

I agree with Eron that we should not design this hardwired to Kafka. Let's
have the internal interfaces in place to make this open to other streaming
storage systems as well.
The config options seem to be designed in a way that is Kafka-exclusive.
Can we change this, for example to something like
  - storage.log.system=kafka
  - storage.log.kafka.properties.bootstrap.servers
  - storage.log.kafka.retention

*(2) Change Tracking*

I am not sure I understand this fully. When a batch query inserts without
change tracking what happens then?
  - does it skip writing to the change log?
  - does it simply overwrite the managed table with the new result?
  - something different?

*(3) "table-storage.log.scan.startup.mode"*

Somehow the presence of this flag seems to break the abstraction of managed
tables.
Let's say someone creates a managed table that is computed via a query over
another managed table. It would need all the data from the previous table,
or it would be inconsistent.

What is the reason to have this setting? Support cases where one doesn't
need all past data (let's say only data from the previous month)? Exposing
this again somewhat destroys the nice "transparent out of the box"
behavior, because now users need to think again about the incremental
building of the tables. I think that case shows that we miss a bit better
handling of data retention (see next point).

Also, this seems to be a per-query setting, more than a global setting, so
should this be part of the config with which the query is submitted that
reads from the table-storage?

The names could also be improved a bit, I think, for example we could call
it just  "table-storage.log.scan" with values "full", "latest",
"from-timestamp".

*(4) Data retention*

I am wondering how and when data is ever cleaned up.
For example, when the table definition has a time attribute and predicate
so that the managed table should only contain the data from the previous
month. How does old data get cleaned up? Only through deletes coming from
timers in the Flink SQL layer?

I think if we want this to be really good and efficient, we need to look at
dropping data during the compaction. The compaction should know it needs to
retain only data from WaterMark - 1 month or so. That is somewhat similar
to the optimization I proposed also for SQL in general, to get rid of
timers and only use TTL (and compaction filters) for data expiration. I
think for managed tables, this is even more crucial for performance.

But it would mean that we need to have a better model for inferring
required data retention based on predicates over the time columns, and not
simply just have fixed retention based on the watermark.


*(5) Different formats for cases with PK and without PK*

The FLIP proposes Debezium-Avro for cases without a PK and just Arvo for
cases with PK.

Do we expect that some users directly subscribe to the Table Changelog,
meaning directly read via a Kafka Consumer from the topic?
  - I would expect that this will happen, because users want to avoid
writing the log twice (one for Flink managed table queries, one for
external subscribers).
  - If this is publicly exposed, then the fact that it uses different
formats in different cases (PK or no PK) seems really confusing and not
intuitive for users.
  - Can the format be just Debezium-JSON in all cases?

*(6) Different consistency guarantees with PK and without PK*

Is this purely an internal implementation detail, or will users see a
difference? My understanding is that users see a difference.
Having that difference implicitly happen when users add a PK reference
seems very confusing to me. What about cases where the table has a PK
(because users want the data in Kafka that way) but want transactional
consistency?

If we need the "low-latency eventual consistency" mode with PKs, I would
suggest making this a separate mode that users can choose to activate if
they want.
We can restrict it to cases that have a PK, but not automatically change
the behavior when a PK is declared.

*(7) Eventual Consistency Mode vs. Faster Checkpoints*

The eventual consistency mode with PK seems mainly there to get lower
latency for the changelog. What latencies are we looking for here?
There is also the work on generalized incremental checkpoints, which should
get the latency down to a few seconds, would that be good enough?

The current Upsert Kafka Source (which would be used with PK eventual
consistency mode) has a big inefficiency in the way it needs to retain all
state to convert the records to changelog records. That is also a high
price to pay for that mode.

*(8) Concurrent Write / Locking*

I don't 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-10 Thread Jingsong Li
Hi Eron,

There is a POC LogStore abstraction: [1].

However, our current focus is not on the abstract log store, because
it is a very complex system. We can't clarify all requirements and
abstractions at the beginning, such as whether to use log store as the
WAL of file store. File store and log store can have more
collaboration, so we can't put forward the interface design with
compatibility commitment.

I believe that when the MVP comes out, it will be much clearer. Then
we will consider the extensibility of log store.

On the other hand, I think we can also have some communication in the
implementation process, and try to use Pulsar as the log store too.

[1] 
https://github.com/JingsongLi/flink/blob/table_storage/flink-table/flink-table-storage/src/main/java/org/apache/flink/table/storage/logstore/LogStoreFactory.java

Best,
Jingsong

On Thu, Nov 11, 2021 at 12:57 AM Eron Wright
 wrote:
>
> Jingsong, regarding the LogStore abstraction, I understand that you want to
> retain some flexibility as the implementation evolves.  It makes sense that
> the abstract interfaces would be @Internal for now.  Would you kindly
> ensure the minimal extensibility is in place, so that the Pulsar dev
> community may hack on a prototype implementation?
>
> I believe this is important for maintaining the perception that Flink
> doesn't unduly favor Kafka.
>
> -Eron
>
> On Tue, Nov 9, 2021 at 6:53 PM Jingsong Li  wrote:
>
> > Hi all,
> >
> > I have started the voting thread [1]. Please cast your vote there or
> > ask additional questions here.
> >
> > [1] https://lists.apache.org/thread/v3fzx0p6n2jogn86sptzr30kr3yw37sq
> >
> > Best,
> > Jingsong
> >
> > On Mon, Nov 1, 2021 at 5:41 PM Jingsong Li  wrote:
> > >
> > > Hi Till,
> > >
> > > Thanks for your suggestion.
> > >
> > > At present, we do not want users to use other storage implementations,
> > > which will undoubtedly require us to propose interfaces and APIs with
> > > compatibility guarantee, which will complicate our design. And some
> > > designs are constantly changing, we will constantly adjust according
> > > to the needs of end users.
> > >
> > > However, this does not prevent us from proposing some internal
> > > interfaces, such as ManagedTableStorageProvider you said, which can
> > > make our code more robust and testable. However, these interfaces will
> > > not be public, which means that we have no compatibility burden.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Mon, Nov 1, 2021 at 3:57 PM Till Rohrmann 
> > wrote:
> > > >
> > > > Hi Kurt,
> > > >
> > > > Thanks a lot for the detailed explanation. I do see that implementing
> > this
> > > > feature outside of Flink will be a bigger effort because we probably
> > have
> > > > to think more about the exact interfaces and contracts. On the other
> > hand,
> > > > I can also imagine that users might want to use different storage
> > > > implementations (e.g. Pulsar instead of Kafka for the changelog
> > storage) at
> > > > some point.
> > > >
> > > > Starting with a feature branch and keeping this question in mind is
> > > > probably a good compromise. Getting this feature off the ground in
> > order to
> > > > evaluate it with users is likely more important than thinking of all
> > > > possible storage implementations and how to arrange the code. In case
> > we
> > > > should split it, maybe we need something like a
> > ManagedTableStorageProvider
> > > > that encapsulates the logic where to store the managed tables.
> > > >
> > > > Looking forward to this feature and the improvements it will add to
> > Flink's
> > > > SQL usability :-)
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Nov 1, 2021 at 2:46 AM Kurt Young  wrote:
> > > >
> > > > > Hi Till,
> > > > >
> > > > > We have discussed the possibility of putting this FLIP into another
> > > > > repository offline
> > > > > with Stephan and Timo. This looks similar with another under going
> > effort
> > > > > which trying
> > > > > to put all connectors outside the Flink core repository.
> > > > >
> > > > > From the motivation and scope of this FLIP, it's quite different from
> > > > > current connectors in
> > > > > some aspects. What we are trying to offer is some kind of built-in
> > storage,
> > > > > or we can call it
> > > > > internal/managed tables, compared with current connectors, they kind
> > of
> > > > > express external
> > > > > tables of Flink SQL. Functionality-wise, this managed table would
> > have more
> > > > > ability than
> > > > > all these connectors, since we controlled the implementation of such
> > > > > storage. Thus this table
> > > > > storage will interact with lots SQL components, like metadata
> > handling,
> > > > > optimization, execution,
> > > > > etc.
> > > > >
> > > > > However we do see some potential benefits if we choose to put it
> > outside
> > > > > Flink:
> > > > > - We may achieve more rapid development speed and maybe more frequent
> > > > > release.
> > > > > - Force us to think really 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-10 Thread Eron Wright
Jingsong, regarding the LogStore abstraction, I understand that you want to
retain some flexibility as the implementation evolves.  It makes sense that
the abstract interfaces would be @Internal for now.  Would you kindly
ensure the minimal extensibility is in place, so that the Pulsar dev
community may hack on a prototype implementation?

I believe this is important for maintaining the perception that Flink
doesn't unduly favor Kafka.

-Eron

On Tue, Nov 9, 2021 at 6:53 PM Jingsong Li  wrote:

> Hi all,
>
> I have started the voting thread [1]. Please cast your vote there or
> ask additional questions here.
>
> [1] https://lists.apache.org/thread/v3fzx0p6n2jogn86sptzr30kr3yw37sq
>
> Best,
> Jingsong
>
> On Mon, Nov 1, 2021 at 5:41 PM Jingsong Li  wrote:
> >
> > Hi Till,
> >
> > Thanks for your suggestion.
> >
> > At present, we do not want users to use other storage implementations,
> > which will undoubtedly require us to propose interfaces and APIs with
> > compatibility guarantee, which will complicate our design. And some
> > designs are constantly changing, we will constantly adjust according
> > to the needs of end users.
> >
> > However, this does not prevent us from proposing some internal
> > interfaces, such as ManagedTableStorageProvider you said, which can
> > make our code more robust and testable. However, these interfaces will
> > not be public, which means that we have no compatibility burden.
> >
> > Best,
> > Jingsong
> >
> > On Mon, Nov 1, 2021 at 3:57 PM Till Rohrmann 
> wrote:
> > >
> > > Hi Kurt,
> > >
> > > Thanks a lot for the detailed explanation. I do see that implementing
> this
> > > feature outside of Flink will be a bigger effort because we probably
> have
> > > to think more about the exact interfaces and contracts. On the other
> hand,
> > > I can also imagine that users might want to use different storage
> > > implementations (e.g. Pulsar instead of Kafka for the changelog
> storage) at
> > > some point.
> > >
> > > Starting with a feature branch and keeping this question in mind is
> > > probably a good compromise. Getting this feature off the ground in
> order to
> > > evaluate it with users is likely more important than thinking of all
> > > possible storage implementations and how to arrange the code. In case
> we
> > > should split it, maybe we need something like a
> ManagedTableStorageProvider
> > > that encapsulates the logic where to store the managed tables.
> > >
> > > Looking forward to this feature and the improvements it will add to
> Flink's
> > > SQL usability :-)
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Nov 1, 2021 at 2:46 AM Kurt Young  wrote:
> > >
> > > > Hi Till,
> > > >
> > > > We have discussed the possibility of putting this FLIP into another
> > > > repository offline
> > > > with Stephan and Timo. This looks similar with another under going
> effort
> > > > which trying
> > > > to put all connectors outside the Flink core repository.
> > > >
> > > > From the motivation and scope of this FLIP, it's quite different from
> > > > current connectors in
> > > > some aspects. What we are trying to offer is some kind of built-in
> storage,
> > > > or we can call it
> > > > internal/managed tables, compared with current connectors, they kind
> of
> > > > express external
> > > > tables of Flink SQL. Functionality-wise, this managed table would
> have more
> > > > ability than
> > > > all these connectors, since we controlled the implementation of such
> > > > storage. Thus this table
> > > > storage will interact with lots SQL components, like metadata
> handling,
> > > > optimization, execution,
> > > > etc.
> > > >
> > > > However we do see some potential benefits if we choose to put it
> outside
> > > > Flink:
> > > > - We may achieve more rapid development speed and maybe more frequent
> > > > release.
> > > > - Force us to think really clearly about the interfaces it should be,
> > > > because we don't have
> > > > the shortcut to modify core & connector codes all at the same time.
> > > >
> > > > But we also can't ignore the overhead:
> > > > - We almost need everything that is discussed in the splitting
> connectors
> > > > thread.
> > > > - We have to create lots more interface than TableSource/TableSink
> to make
> > > > it just work at the first
> > > > place, e.g. interfaces to express such tables should be managed by
> Flink,
> > > > interfaces to express the
> > > > physical capability of the storage then it can be bridged to SQL
> optimizer
> > > > and executor.
> > > > - If we create lots of interfaces with only one implementation, that
> sounds
> > > > overengineering to me.
> > > >
> > > > Combining the pros and cons above, what we are trying to do is
> firstly
> > > > implement it in a feature branch,
> > > > and also keep good engineering and design in mind. At some point we
> > > > re-evaluate the decision whether
> > > > to put it inside or outside the Flink core. What do you think?
> > > >
> > > > Best,
> > > > Kurt
> > > 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-09 Thread Jingsong Li
Hi all,

I have started the voting thread [1]. Please cast your vote there or
ask additional questions here.

[1] https://lists.apache.org/thread/v3fzx0p6n2jogn86sptzr30kr3yw37sq

Best,
Jingsong

On Mon, Nov 1, 2021 at 5:41 PM Jingsong Li  wrote:
>
> Hi Till,
>
> Thanks for your suggestion.
>
> At present, we do not want users to use other storage implementations,
> which will undoubtedly require us to propose interfaces and APIs with
> compatibility guarantee, which will complicate our design. And some
> designs are constantly changing, we will constantly adjust according
> to the needs of end users.
>
> However, this does not prevent us from proposing some internal
> interfaces, such as ManagedTableStorageProvider you said, which can
> make our code more robust and testable. However, these interfaces will
> not be public, which means that we have no compatibility burden.
>
> Best,
> Jingsong
>
> On Mon, Nov 1, 2021 at 3:57 PM Till Rohrmann  wrote:
> >
> > Hi Kurt,
> >
> > Thanks a lot for the detailed explanation. I do see that implementing this
> > feature outside of Flink will be a bigger effort because we probably have
> > to think more about the exact interfaces and contracts. On the other hand,
> > I can also imagine that users might want to use different storage
> > implementations (e.g. Pulsar instead of Kafka for the changelog storage) at
> > some point.
> >
> > Starting with a feature branch and keeping this question in mind is
> > probably a good compromise. Getting this feature off the ground in order to
> > evaluate it with users is likely more important than thinking of all
> > possible storage implementations and how to arrange the code. In case we
> > should split it, maybe we need something like a ManagedTableStorageProvider
> > that encapsulates the logic where to store the managed tables.
> >
> > Looking forward to this feature and the improvements it will add to Flink's
> > SQL usability :-)
> >
> > Cheers,
> > Till
> >
> > On Mon, Nov 1, 2021 at 2:46 AM Kurt Young  wrote:
> >
> > > Hi Till,
> > >
> > > We have discussed the possibility of putting this FLIP into another
> > > repository offline
> > > with Stephan and Timo. This looks similar with another under going effort
> > > which trying
> > > to put all connectors outside the Flink core repository.
> > >
> > > From the motivation and scope of this FLIP, it's quite different from
> > > current connectors in
> > > some aspects. What we are trying to offer is some kind of built-in 
> > > storage,
> > > or we can call it
> > > internal/managed tables, compared with current connectors, they kind of
> > > express external
> > > tables of Flink SQL. Functionality-wise, this managed table would have 
> > > more
> > > ability than
> > > all these connectors, since we controlled the implementation of such
> > > storage. Thus this table
> > > storage will interact with lots SQL components, like metadata handling,
> > > optimization, execution,
> > > etc.
> > >
> > > However we do see some potential benefits if we choose to put it outside
> > > Flink:
> > > - We may achieve more rapid development speed and maybe more frequent
> > > release.
> > > - Force us to think really clearly about the interfaces it should be,
> > > because we don't have
> > > the shortcut to modify core & connector codes all at the same time.
> > >
> > > But we also can't ignore the overhead:
> > > - We almost need everything that is discussed in the splitting connectors
> > > thread.
> > > - We have to create lots more interface than TableSource/TableSink to make
> > > it just work at the first
> > > place, e.g. interfaces to express such tables should be managed by Flink,
> > > interfaces to express the
> > > physical capability of the storage then it can be bridged to SQL optimizer
> > > and executor.
> > > - If we create lots of interfaces with only one implementation, that 
> > > sounds
> > > overengineering to me.
> > >
> > > Combining the pros and cons above, what we are trying to do is firstly
> > > implement it in a feature branch,
> > > and also keep good engineering and design in mind. At some point we
> > > re-evaluate the decision whether
> > > to put it inside or outside the Flink core. What do you think?
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Fri, Oct 29, 2021 at 11:53 PM Till Rohrmann 
> > > wrote:
> > >
> > > > Hi Jingsong,
> > > >
> > > > Thanks for creating this FLIP. I don't have a lot to add because I am 
> > > > not
> > > > very familiar with the SQL components. While reading the FLIP I was
> > > > wondering what would we need in Flink to build something like the BDT
> > > > feature outside of Flink as a kind of extension? Would something like
> > > this
> > > > be possible? Maybe the answer is a quick no ;-)
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Thu, Oct 28, 2021 at 8:06 AM Jingsong Li 
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I updated FLIP based on your feedback:
> > > > >
> > > > > 1. Introduce 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-01 Thread Jingsong Li
Hi Till,

Thanks for your suggestion.

At present, we do not want users to use other storage implementations,
which will undoubtedly require us to propose interfaces and APIs with
compatibility guarantee, which will complicate our design. And some
designs are constantly changing, we will constantly adjust according
to the needs of end users.

However, this does not prevent us from proposing some internal
interfaces, such as ManagedTableStorageProvider you said, which can
make our code more robust and testable. However, these interfaces will
not be public, which means that we have no compatibility burden.

Best,
Jingsong

On Mon, Nov 1, 2021 at 3:57 PM Till Rohrmann  wrote:
>
> Hi Kurt,
>
> Thanks a lot for the detailed explanation. I do see that implementing this
> feature outside of Flink will be a bigger effort because we probably have
> to think more about the exact interfaces and contracts. On the other hand,
> I can also imagine that users might want to use different storage
> implementations (e.g. Pulsar instead of Kafka for the changelog storage) at
> some point.
>
> Starting with a feature branch and keeping this question in mind is
> probably a good compromise. Getting this feature off the ground in order to
> evaluate it with users is likely more important than thinking of all
> possible storage implementations and how to arrange the code. In case we
> should split it, maybe we need something like a ManagedTableStorageProvider
> that encapsulates the logic where to store the managed tables.
>
> Looking forward to this feature and the improvements it will add to Flink's
> SQL usability :-)
>
> Cheers,
> Till
>
> On Mon, Nov 1, 2021 at 2:46 AM Kurt Young  wrote:
>
> > Hi Till,
> >
> > We have discussed the possibility of putting this FLIP into another
> > repository offline
> > with Stephan and Timo. This looks similar with another under going effort
> > which trying
> > to put all connectors outside the Flink core repository.
> >
> > From the motivation and scope of this FLIP, it's quite different from
> > current connectors in
> > some aspects. What we are trying to offer is some kind of built-in storage,
> > or we can call it
> > internal/managed tables, compared with current connectors, they kind of
> > express external
> > tables of Flink SQL. Functionality-wise, this managed table would have more
> > ability than
> > all these connectors, since we controlled the implementation of such
> > storage. Thus this table
> > storage will interact with lots SQL components, like metadata handling,
> > optimization, execution,
> > etc.
> >
> > However we do see some potential benefits if we choose to put it outside
> > Flink:
> > - We may achieve more rapid development speed and maybe more frequent
> > release.
> > - Force us to think really clearly about the interfaces it should be,
> > because we don't have
> > the shortcut to modify core & connector codes all at the same time.
> >
> > But we also can't ignore the overhead:
> > - We almost need everything that is discussed in the splitting connectors
> > thread.
> > - We have to create lots more interface than TableSource/TableSink to make
> > it just work at the first
> > place, e.g. interfaces to express such tables should be managed by Flink,
> > interfaces to express the
> > physical capability of the storage then it can be bridged to SQL optimizer
> > and executor.
> > - If we create lots of interfaces with only one implementation, that sounds
> > overengineering to me.
> >
> > Combining the pros and cons above, what we are trying to do is firstly
> > implement it in a feature branch,
> > and also keep good engineering and design in mind. At some point we
> > re-evaluate the decision whether
> > to put it inside or outside the Flink core. What do you think?
> >
> > Best,
> > Kurt
> >
> >
> > On Fri, Oct 29, 2021 at 11:53 PM Till Rohrmann 
> > wrote:
> >
> > > Hi Jingsong,
> > >
> > > Thanks for creating this FLIP. I don't have a lot to add because I am not
> > > very familiar with the SQL components. While reading the FLIP I was
> > > wondering what would we need in Flink to build something like the BDT
> > > feature outside of Flink as a kind of extension? Would something like
> > this
> > > be possible? Maybe the answer is a quick no ;-)
> > >
> > > Cheers,
> > > Till
> > >
> > > On Thu, Oct 28, 2021 at 8:06 AM Jingsong Li 
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I updated FLIP based on your feedback:
> > > >
> > > > 1. Introduce interfaces: GenericCatalog, ManagedTableFactory,
> > > > TableDescriptor.forManaged
> > > >
> > > > 2. Introduce log.scan.startup.mode (default initial) to Hybrid source.
> > > >
> > > > 3. Add description to miss dropped table.
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Mon, Oct 25, 2021 at 3:39 PM Jingsong Li 
> > > > wrote:
> > > > >
> > > > > Hi Ingo,
> > > > >
> > > > > Really appreciate your feedback.
> > > > >
> > > > > #1. The reason why we insist on using no "connector" option is 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-01 Thread Till Rohrmann
Hi Kurt,

Thanks a lot for the detailed explanation. I do see that implementing this
feature outside of Flink will be a bigger effort because we probably have
to think more about the exact interfaces and contracts. On the other hand,
I can also imagine that users might want to use different storage
implementations (e.g. Pulsar instead of Kafka for the changelog storage) at
some point.

Starting with a feature branch and keeping this question in mind is
probably a good compromise. Getting this feature off the ground in order to
evaluate it with users is likely more important than thinking of all
possible storage implementations and how to arrange the code. In case we
should split it, maybe we need something like a ManagedTableStorageProvider
that encapsulates the logic where to store the managed tables.

Looking forward to this feature and the improvements it will add to Flink's
SQL usability :-)

Cheers,
Till

On Mon, Nov 1, 2021 at 2:46 AM Kurt Young  wrote:

> Hi Till,
>
> We have discussed the possibility of putting this FLIP into another
> repository offline
> with Stephan and Timo. This looks similar with another under going effort
> which trying
> to put all connectors outside the Flink core repository.
>
> From the motivation and scope of this FLIP, it's quite different from
> current connectors in
> some aspects. What we are trying to offer is some kind of built-in storage,
> or we can call it
> internal/managed tables, compared with current connectors, they kind of
> express external
> tables of Flink SQL. Functionality-wise, this managed table would have more
> ability than
> all these connectors, since we controlled the implementation of such
> storage. Thus this table
> storage will interact with lots SQL components, like metadata handling,
> optimization, execution,
> etc.
>
> However we do see some potential benefits if we choose to put it outside
> Flink:
> - We may achieve more rapid development speed and maybe more frequent
> release.
> - Force us to think really clearly about the interfaces it should be,
> because we don't have
> the shortcut to modify core & connector codes all at the same time.
>
> But we also can't ignore the overhead:
> - We almost need everything that is discussed in the splitting connectors
> thread.
> - We have to create lots more interface than TableSource/TableSink to make
> it just work at the first
> place, e.g. interfaces to express such tables should be managed by Flink,
> interfaces to express the
> physical capability of the storage then it can be bridged to SQL optimizer
> and executor.
> - If we create lots of interfaces with only one implementation, that sounds
> overengineering to me.
>
> Combining the pros and cons above, what we are trying to do is firstly
> implement it in a feature branch,
> and also keep good engineering and design in mind. At some point we
> re-evaluate the decision whether
> to put it inside or outside the Flink core. What do you think?
>
> Best,
> Kurt
>
>
> On Fri, Oct 29, 2021 at 11:53 PM Till Rohrmann 
> wrote:
>
> > Hi Jingsong,
> >
> > Thanks for creating this FLIP. I don't have a lot to add because I am not
> > very familiar with the SQL components. While reading the FLIP I was
> > wondering what would we need in Flink to build something like the BDT
> > feature outside of Flink as a kind of extension? Would something like
> this
> > be possible? Maybe the answer is a quick no ;-)
> >
> > Cheers,
> > Till
> >
> > On Thu, Oct 28, 2021 at 8:06 AM Jingsong Li 
> > wrote:
> >
> > > Hi all,
> > >
> > > I updated FLIP based on your feedback:
> > >
> > > 1. Introduce interfaces: GenericCatalog, ManagedTableFactory,
> > > TableDescriptor.forManaged
> > >
> > > 2. Introduce log.scan.startup.mode (default initial) to Hybrid source.
> > >
> > > 3. Add description to miss dropped table.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Mon, Oct 25, 2021 at 3:39 PM Jingsong Li 
> > > wrote:
> > > >
> > > > Hi Ingo,
> > > >
> > > > Really appreciate your feedback.
> > > >
> > > > #1. The reason why we insist on using no "connector" option is that
> we
> > > > want to bring the following design to users:
> > > > - With the "connector" option, it is a mapping, unmanaged table.
> > > > - Without the "connector" option, it is a managed table. It may be an
> > > > Iceberg managed table, or may be a JDBC managed table, or may be a
> > > > Flink managed table.
> > > >
> > > > #2. About:
> > > > CREATE TABLE T (f0 INT);
> > > > ALTER TABLE T SET ('connector' = '…');
> > > >
> > > > I think it is dangerous, even for a generic table. The managed table
> > > > should prohibit it.
> > > >
> > > > #3. DDL and Table API
> > > >
> > > > You are right, Table Api should be a superset of SQL. There is no
> > > > doubt that it should support BDT.
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Mon, Oct 25, 2021 at 2:18 PM Ingo Bürk 
> wrote:
> > > > >
> > > > > Hi Jingsong,
> > > > >
> > > > > thanks again for the answers. I think requiring 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-10-31 Thread Kurt Young
Hi Till,

We have discussed the possibility of putting this FLIP into another
repository offline
with Stephan and Timo. This looks similar with another under going effort
which trying
to put all connectors outside the Flink core repository.

>From the motivation and scope of this FLIP, it's quite different from
current connectors in
some aspects. What we are trying to offer is some kind of built-in storage,
or we can call it
internal/managed tables, compared with current connectors, they kind of
express external
tables of Flink SQL. Functionality-wise, this managed table would have more
ability than
all these connectors, since we controlled the implementation of such
storage. Thus this table
storage will interact with lots SQL components, like metadata handling,
optimization, execution,
etc.

However we do see some potential benefits if we choose to put it outside
Flink:
- We may achieve more rapid development speed and maybe more frequent
release.
- Force us to think really clearly about the interfaces it should be,
because we don't have
the shortcut to modify core & connector codes all at the same time.

But we also can't ignore the overhead:
- We almost need everything that is discussed in the splitting connectors
thread.
- We have to create lots more interface than TableSource/TableSink to make
it just work at the first
place, e.g. interfaces to express such tables should be managed by Flink,
interfaces to express the
physical capability of the storage then it can be bridged to SQL optimizer
and executor.
- If we create lots of interfaces with only one implementation, that sounds
overengineering to me.

Combining the pros and cons above, what we are trying to do is firstly
implement it in a feature branch,
and also keep good engineering and design in mind. At some point we
re-evaluate the decision whether
to put it inside or outside the Flink core. What do you think?

Best,
Kurt


On Fri, Oct 29, 2021 at 11:53 PM Till Rohrmann  wrote:

> Hi Jingsong,
>
> Thanks for creating this FLIP. I don't have a lot to add because I am not
> very familiar with the SQL components. While reading the FLIP I was
> wondering what would we need in Flink to build something like the BDT
> feature outside of Flink as a kind of extension? Would something like this
> be possible? Maybe the answer is a quick no ;-)
>
> Cheers,
> Till
>
> On Thu, Oct 28, 2021 at 8:06 AM Jingsong Li 
> wrote:
>
> > Hi all,
> >
> > I updated FLIP based on your feedback:
> >
> > 1. Introduce interfaces: GenericCatalog, ManagedTableFactory,
> > TableDescriptor.forManaged
> >
> > 2. Introduce log.scan.startup.mode (default initial) to Hybrid source.
> >
> > 3. Add description to miss dropped table.
> >
> > Best,
> > Jingsong
> >
> > On Mon, Oct 25, 2021 at 3:39 PM Jingsong Li 
> > wrote:
> > >
> > > Hi Ingo,
> > >
> > > Really appreciate your feedback.
> > >
> > > #1. The reason why we insist on using no "connector" option is that we
> > > want to bring the following design to users:
> > > - With the "connector" option, it is a mapping, unmanaged table.
> > > - Without the "connector" option, it is a managed table. It may be an
> > > Iceberg managed table, or may be a JDBC managed table, or may be a
> > > Flink managed table.
> > >
> > > #2. About:
> > > CREATE TABLE T (f0 INT);
> > > ALTER TABLE T SET ('connector' = '…');
> > >
> > > I think it is dangerous, even for a generic table. The managed table
> > > should prohibit it.
> > >
> > > #3. DDL and Table API
> > >
> > > You are right, Table Api should be a superset of SQL. There is no
> > > doubt that it should support BDT.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Mon, Oct 25, 2021 at 2:18 PM Ingo Bürk  wrote:
> > > >
> > > > Hi Jingsong,
> > > >
> > > > thanks again for the answers. I think requiring catalogs to implement
> > an
> > > > interface to support BDTs is something we'll need (though personally
> I
> > > > still prefer explicit DDL here over the "no connector option"
> > approach).
> > > >
> > > > What about more edge cases like
> > > >
> > > > CREATE TABLE T (f0 INT);
> > > > ALTER TABLE T SET ('connector' = '…');
> > > >
> > > > This would have to first create the physical storage and then delete
> it
> > > > again, right?
> > > >
> > > > On a separate note, he FLIP currently only discusses SQL DDL, and you
> > have
> > > > also mentioned
> > > >
> > > > > BDT only can be dropped by Flink SQL DDL now.
> > > >
> > > > Something Flink suffers from a lot is inconsistencies across APIs. I
> > think
> > > > it is important that we support features on all major APIs, i.e.
> > including
> > > > Table API.
> > > > For example for creating a BDT this would mean e.g. adding something
> > like
> > > > #forManaged(…) to TableDescriptor.
> > > >
> > > >
> > > > Best
> > > > Ingo
> > > >
> > > > On Mon, Oct 25, 2021 at 5:27 AM Jingsong Li 
> > wrote:
> > > >
> > > > > Hi Ingo,
> > > > >
> > > > > I thought again.
> > > > >
> > > > > I'll try to sort out the current catalog behaviors.
> > > 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-10-29 Thread Till Rohrmann
Hi Jingsong,

Thanks for creating this FLIP. I don't have a lot to add because I am not
very familiar with the SQL components. While reading the FLIP I was
wondering what would we need in Flink to build something like the BDT
feature outside of Flink as a kind of extension? Would something like this
be possible? Maybe the answer is a quick no ;-)

Cheers,
Till

On Thu, Oct 28, 2021 at 8:06 AM Jingsong Li  wrote:

> Hi all,
>
> I updated FLIP based on your feedback:
>
> 1. Introduce interfaces: GenericCatalog, ManagedTableFactory,
> TableDescriptor.forManaged
>
> 2. Introduce log.scan.startup.mode (default initial) to Hybrid source.
>
> 3. Add description to miss dropped table.
>
> Best,
> Jingsong
>
> On Mon, Oct 25, 2021 at 3:39 PM Jingsong Li 
> wrote:
> >
> > Hi Ingo,
> >
> > Really appreciate your feedback.
> >
> > #1. The reason why we insist on using no "connector" option is that we
> > want to bring the following design to users:
> > - With the "connector" option, it is a mapping, unmanaged table.
> > - Without the "connector" option, it is a managed table. It may be an
> > Iceberg managed table, or may be a JDBC managed table, or may be a
> > Flink managed table.
> >
> > #2. About:
> > CREATE TABLE T (f0 INT);
> > ALTER TABLE T SET ('connector' = '…');
> >
> > I think it is dangerous, even for a generic table. The managed table
> > should prohibit it.
> >
> > #3. DDL and Table API
> >
> > You are right, Table Api should be a superset of SQL. There is no
> > doubt that it should support BDT.
> >
> > Best,
> > Jingsong
> >
> > On Mon, Oct 25, 2021 at 2:18 PM Ingo Bürk  wrote:
> > >
> > > Hi Jingsong,
> > >
> > > thanks again for the answers. I think requiring catalogs to implement
> an
> > > interface to support BDTs is something we'll need (though personally I
> > > still prefer explicit DDL here over the "no connector option"
> approach).
> > >
> > > What about more edge cases like
> > >
> > > CREATE TABLE T (f0 INT);
> > > ALTER TABLE T SET ('connector' = '…');
> > >
> > > This would have to first create the physical storage and then delete it
> > > again, right?
> > >
> > > On a separate note, he FLIP currently only discusses SQL DDL, and you
> have
> > > also mentioned
> > >
> > > > BDT only can be dropped by Flink SQL DDL now.
> > >
> > > Something Flink suffers from a lot is inconsistencies across APIs. I
> think
> > > it is important that we support features on all major APIs, i.e.
> including
> > > Table API.
> > > For example for creating a BDT this would mean e.g. adding something
> like
> > > #forManaged(…) to TableDescriptor.
> > >
> > >
> > > Best
> > > Ingo
> > >
> > > On Mon, Oct 25, 2021 at 5:27 AM Jingsong Li 
> wrote:
> > >
> > > > Hi Ingo,
> > > >
> > > > I thought again.
> > > >
> > > > I'll try to sort out the current catalog behaviors.
> > > > Actually, we can divide catalogs into three categories:
> > > >
> > > > 1. ExternalCatalog: it can only read or create a single table kind
> > > > which connects to external storage. TableFactory is provided by
> > > > Catalog, which can have nothing to do with Flink's Factory discovery
> > > > mechanism, such as IcebergCatalog, JdbcCatalog, PostgresCatalog, etc.
> > > > Catalog manages the life cycle of its **managed** tables, which means
> > > > that creation and drop will affect the real physical storage. The DDL
> > > > has no "connector" option.
> > > >
> > > > 2. GenericCatalog (or FlinkCatalog): only Flink tables are saved and
> > > > factories are created through Flink's factory discovery mechanism. At
> > > > this time, the catalog is actually only a storage medium for saving
> > > > schema and options, such as GenericInMemoryCatalog. Catalog only
> saves
> > > > meta information and does not manage the underlying physical storage
> > > > of tables. These tables are **unmanaged**. The DDL must have a
> > > > "connector" option.
> > > >
> > > > 3. HybridCatalog: It can save both its own **managed** table and
> > > > generic Flink **unmanaged** table, such as HiveCatalog.
> > > >
> > > > We want to use the "connector" option to distinguish whether it is
> > > > managed or not.
> > > >
> > > > Now, consider the Flink managed table in this FLIP.
> > > > a. ExternalCatalog can not support Flink managed tables.
> > > > b. GenericCatalog can support Flink managed tables without the
> > > > "connector" option.
> > > > c. What about HybridCatalog (HiveCatalog)? Yes, we want HiveCatalog
> to
> > > > support Flink managed tables:
> > > > - with "connector" option in Flink dialect is unmanaged tables
> > > > - Hive DDL in Hive dialect is Hive managed tables, the parser will
> add
> > > > "connector = hive" automatically. At present, there are many
> > > > differences between Flink DDL and Hive DDL, and even their features
> > > > have many differences.
> > > > - without "connector" option in Flink dialect is Flink managed
> tables.
> > > >
> > > > In this way, we can support Flink managed tables while maintaining
> > > > compatibility.
> > 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-10-28 Thread Jingsong Li
Hi all,

I updated FLIP based on your feedback:

1. Introduce interfaces: GenericCatalog, ManagedTableFactory,
TableDescriptor.forManaged

2. Introduce log.scan.startup.mode (default initial) to Hybrid source.

3. Add description to miss dropped table.

Best,
Jingsong

On Mon, Oct 25, 2021 at 3:39 PM Jingsong Li  wrote:
>
> Hi Ingo,
>
> Really appreciate your feedback.
>
> #1. The reason why we insist on using no "connector" option is that we
> want to bring the following design to users:
> - With the "connector" option, it is a mapping, unmanaged table.
> - Without the "connector" option, it is a managed table. It may be an
> Iceberg managed table, or may be a JDBC managed table, or may be a
> Flink managed table.
>
> #2. About:
> CREATE TABLE T (f0 INT);
> ALTER TABLE T SET ('connector' = '…');
>
> I think it is dangerous, even for a generic table. The managed table
> should prohibit it.
>
> #3. DDL and Table API
>
> You are right, Table Api should be a superset of SQL. There is no
> doubt that it should support BDT.
>
> Best,
> Jingsong
>
> On Mon, Oct 25, 2021 at 2:18 PM Ingo Bürk  wrote:
> >
> > Hi Jingsong,
> >
> > thanks again for the answers. I think requiring catalogs to implement an
> > interface to support BDTs is something we'll need (though personally I
> > still prefer explicit DDL here over the "no connector option" approach).
> >
> > What about more edge cases like
> >
> > CREATE TABLE T (f0 INT);
> > ALTER TABLE T SET ('connector' = '…');
> >
> > This would have to first create the physical storage and then delete it
> > again, right?
> >
> > On a separate note, he FLIP currently only discusses SQL DDL, and you have
> > also mentioned
> >
> > > BDT only can be dropped by Flink SQL DDL now.
> >
> > Something Flink suffers from a lot is inconsistencies across APIs. I think
> > it is important that we support features on all major APIs, i.e. including
> > Table API.
> > For example for creating a BDT this would mean e.g. adding something like
> > #forManaged(…) to TableDescriptor.
> >
> >
> > Best
> > Ingo
> >
> > On Mon, Oct 25, 2021 at 5:27 AM Jingsong Li  wrote:
> >
> > > Hi Ingo,
> > >
> > > I thought again.
> > >
> > > I'll try to sort out the current catalog behaviors.
> > > Actually, we can divide catalogs into three categories:
> > >
> > > 1. ExternalCatalog: it can only read or create a single table kind
> > > which connects to external storage. TableFactory is provided by
> > > Catalog, which can have nothing to do with Flink's Factory discovery
> > > mechanism, such as IcebergCatalog, JdbcCatalog, PostgresCatalog, etc.
> > > Catalog manages the life cycle of its **managed** tables, which means
> > > that creation and drop will affect the real physical storage. The DDL
> > > has no "connector" option.
> > >
> > > 2. GenericCatalog (or FlinkCatalog): only Flink tables are saved and
> > > factories are created through Flink's factory discovery mechanism. At
> > > this time, the catalog is actually only a storage medium for saving
> > > schema and options, such as GenericInMemoryCatalog. Catalog only saves
> > > meta information and does not manage the underlying physical storage
> > > of tables. These tables are **unmanaged**. The DDL must have a
> > > "connector" option.
> > >
> > > 3. HybridCatalog: It can save both its own **managed** table and
> > > generic Flink **unmanaged** table, such as HiveCatalog.
> > >
> > > We want to use the "connector" option to distinguish whether it is
> > > managed or not.
> > >
> > > Now, consider the Flink managed table in this FLIP.
> > > a. ExternalCatalog can not support Flink managed tables.
> > > b. GenericCatalog can support Flink managed tables without the
> > > "connector" option.
> > > c. What about HybridCatalog (HiveCatalog)? Yes, we want HiveCatalog to
> > > support Flink managed tables:
> > > - with "connector" option in Flink dialect is unmanaged tables
> > > - Hive DDL in Hive dialect is Hive managed tables, the parser will add
> > > "connector = hive" automatically. At present, there are many
> > > differences between Flink DDL and Hive DDL, and even their features
> > > have many differences.
> > > - without "connector" option in Flink dialect is Flink managed tables.
> > >
> > > In this way, we can support Flink managed tables while maintaining
> > > compatibility.
> > >
> > > Anyway, we need introduce a "SupportsFlinkManagedTable" to catalog.
> > >
> > > ## Back to your question #
> > >
> > > > but we should make it clear that this is a limitation and probably
> > > document how users can clean up the underlying physical storage manually 
> > > in
> > > this case
> > >
> > > Yes, it's strange that the catalog should manage tables, but some
> > > catalogs don't have this ability.
> > > - For PersistentCatalog, the meta will continue until the underlying
> > > physical storage is deleted.
> > > - For InMemoryCatalog, yes, we should document it for the underlying
> > > physical storage of Flink managed 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-10-25 Thread Jingsong Li
Hi Ingo,

Really appreciate your feedback.

#1. The reason why we insist on using no "connector" option is that we
want to bring the following design to users:
- With the "connector" option, it is a mapping, unmanaged table.
- Without the "connector" option, it is a managed table. It may be an
Iceberg managed table, or may be a JDBC managed table, or may be a
Flink managed table.

#2. About:
CREATE TABLE T (f0 INT);
ALTER TABLE T SET ('connector' = '…');

I think it is dangerous, even for a generic table. The managed table
should prohibit it.

#3. DDL and Table API

You are right, Table Api should be a superset of SQL. There is no
doubt that it should support BDT.

Best,
Jingsong

On Mon, Oct 25, 2021 at 2:18 PM Ingo Bürk  wrote:
>
> Hi Jingsong,
>
> thanks again for the answers. I think requiring catalogs to implement an
> interface to support BDTs is something we'll need (though personally I
> still prefer explicit DDL here over the "no connector option" approach).
>
> What about more edge cases like
>
> CREATE TABLE T (f0 INT);
> ALTER TABLE T SET ('connector' = '…');
>
> This would have to first create the physical storage and then delete it
> again, right?
>
> On a separate note, he FLIP currently only discusses SQL DDL, and you have
> also mentioned
>
> > BDT only can be dropped by Flink SQL DDL now.
>
> Something Flink suffers from a lot is inconsistencies across APIs. I think
> it is important that we support features on all major APIs, i.e. including
> Table API.
> For example for creating a BDT this would mean e.g. adding something like
> #forManaged(…) to TableDescriptor.
>
>
> Best
> Ingo
>
> On Mon, Oct 25, 2021 at 5:27 AM Jingsong Li  wrote:
>
> > Hi Ingo,
> >
> > I thought again.
> >
> > I'll try to sort out the current catalog behaviors.
> > Actually, we can divide catalogs into three categories:
> >
> > 1. ExternalCatalog: it can only read or create a single table kind
> > which connects to external storage. TableFactory is provided by
> > Catalog, which can have nothing to do with Flink's Factory discovery
> > mechanism, such as IcebergCatalog, JdbcCatalog, PostgresCatalog, etc.
> > Catalog manages the life cycle of its **managed** tables, which means
> > that creation and drop will affect the real physical storage. The DDL
> > has no "connector" option.
> >
> > 2. GenericCatalog (or FlinkCatalog): only Flink tables are saved and
> > factories are created through Flink's factory discovery mechanism. At
> > this time, the catalog is actually only a storage medium for saving
> > schema and options, such as GenericInMemoryCatalog. Catalog only saves
> > meta information and does not manage the underlying physical storage
> > of tables. These tables are **unmanaged**. The DDL must have a
> > "connector" option.
> >
> > 3. HybridCatalog: It can save both its own **managed** table and
> > generic Flink **unmanaged** table, such as HiveCatalog.
> >
> > We want to use the "connector" option to distinguish whether it is
> > managed or not.
> >
> > Now, consider the Flink managed table in this FLIP.
> > a. ExternalCatalog can not support Flink managed tables.
> > b. GenericCatalog can support Flink managed tables without the
> > "connector" option.
> > c. What about HybridCatalog (HiveCatalog)? Yes, we want HiveCatalog to
> > support Flink managed tables:
> > - with "connector" option in Flink dialect is unmanaged tables
> > - Hive DDL in Hive dialect is Hive managed tables, the parser will add
> > "connector = hive" automatically. At present, there are many
> > differences between Flink DDL and Hive DDL, and even their features
> > have many differences.
> > - without "connector" option in Flink dialect is Flink managed tables.
> >
> > In this way, we can support Flink managed tables while maintaining
> > compatibility.
> >
> > Anyway, we need introduce a "SupportsFlinkManagedTable" to catalog.
> >
> > ## Back to your question #
> >
> > > but we should make it clear that this is a limitation and probably
> > document how users can clean up the underlying physical storage manually in
> > this case
> >
> > Yes, it's strange that the catalog should manage tables, but some
> > catalogs don't have this ability.
> > - For PersistentCatalog, the meta will continue until the underlying
> > physical storage is deleted.
> > - For InMemoryCatalog, yes, we should document it for the underlying
> > physical storage of Flink managed tables.
> >
> > > the HiveCatalog doesn't list a 'connector' option for its tables.
> >
> > Actually, It can be divided into two steps: create and save:
> > - When creating a table, the table seen by HiveCatalog must have
> > "connector = hive", which is the hive table (Hive managed table). You
> > can see the "HiveCatalog.isHiveTable".
> > - When saving the table, it will remove the connector of the hive
> > table. We can do this: with "connector" option is Flink generic table,
> > without "connector" option is Hive table, with "flink-managed = true"
> 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-10-25 Thread Ingo Bürk
Hi Jingsong,

thanks again for the answers. I think requiring catalogs to implement an
interface to support BDTs is something we'll need (though personally I
still prefer explicit DDL here over the "no connector option" approach).

What about more edge cases like

CREATE TABLE T (f0 INT);
ALTER TABLE T SET ('connector' = '…');

This would have to first create the physical storage and then delete it
again, right?

On a separate note, he FLIP currently only discusses SQL DDL, and you have
also mentioned

> BDT only can be dropped by Flink SQL DDL now.

Something Flink suffers from a lot is inconsistencies across APIs. I think
it is important that we support features on all major APIs, i.e. including
Table API.
For example for creating a BDT this would mean e.g. adding something like
#forManaged(…) to TableDescriptor.


Best
Ingo

On Mon, Oct 25, 2021 at 5:27 AM Jingsong Li  wrote:

> Hi Ingo,
>
> I thought again.
>
> I'll try to sort out the current catalog behaviors.
> Actually, we can divide catalogs into three categories:
>
> 1. ExternalCatalog: it can only read or create a single table kind
> which connects to external storage. TableFactory is provided by
> Catalog, which can have nothing to do with Flink's Factory discovery
> mechanism, such as IcebergCatalog, JdbcCatalog, PostgresCatalog, etc.
> Catalog manages the life cycle of its **managed** tables, which means
> that creation and drop will affect the real physical storage. The DDL
> has no "connector" option.
>
> 2. GenericCatalog (or FlinkCatalog): only Flink tables are saved and
> factories are created through Flink's factory discovery mechanism. At
> this time, the catalog is actually only a storage medium for saving
> schema and options, such as GenericInMemoryCatalog. Catalog only saves
> meta information and does not manage the underlying physical storage
> of tables. These tables are **unmanaged**. The DDL must have a
> "connector" option.
>
> 3. HybridCatalog: It can save both its own **managed** table and
> generic Flink **unmanaged** table, such as HiveCatalog.
>
> We want to use the "connector" option to distinguish whether it is
> managed or not.
>
> Now, consider the Flink managed table in this FLIP.
> a. ExternalCatalog can not support Flink managed tables.
> b. GenericCatalog can support Flink managed tables without the
> "connector" option.
> c. What about HybridCatalog (HiveCatalog)? Yes, we want HiveCatalog to
> support Flink managed tables:
> - with "connector" option in Flink dialect is unmanaged tables
> - Hive DDL in Hive dialect is Hive managed tables, the parser will add
> "connector = hive" automatically. At present, there are many
> differences between Flink DDL and Hive DDL, and even their features
> have many differences.
> - without "connector" option in Flink dialect is Flink managed tables.
>
> In this way, we can support Flink managed tables while maintaining
> compatibility.
>
> Anyway, we need introduce a "SupportsFlinkManagedTable" to catalog.
>
> ## Back to your question #
>
> > but we should make it clear that this is a limitation and probably
> document how users can clean up the underlying physical storage manually in
> this case
>
> Yes, it's strange that the catalog should manage tables, but some
> catalogs don't have this ability.
> - For PersistentCatalog, the meta will continue until the underlying
> physical storage is deleted.
> - For InMemoryCatalog, yes, we should document it for the underlying
> physical storage of Flink managed tables.
>
> > the HiveCatalog doesn't list a 'connector' option for its tables.
>
> Actually, It can be divided into two steps: create and save:
> - When creating a table, the table seen by HiveCatalog must have
> "connector = hive", which is the hive table (Hive managed table). You
> can see the "HiveCatalog.isHiveTable".
> - When saving the table, it will remove the connector of the hive
> table. We can do this: with "connector" option is Flink generic table,
> without "connector" option is Hive table, with "flink-managed = true"
> is Flink managed table.
>
> Best,
> Jingsong Lee
>
> On Thu, Oct 21, 2021 at 8:23 PM Ingo Bürk  wrote:
> >
> > Hi JingSong,
> >
> > thank you for the answers!
> >
> > > BDT only can be dropped by Flink SQL DDL now.
> >
> > Maybe I'm misunderstanding, but that's only true from the Flink side.
> What
> > I meant is that a table could disappear from a catalog entirely outside
> of
> > Flink. As a simple example, consider a catalog which represents an IMAP
> > mail server and each folder as a table. If a folder is deleted from the
> > mail account, the table would disappear, but Flink would have no way of
> > knowing that. I don't see a way around this problem, to be honest, but we
> > should make it clear that this is a limitation and probably document how
> > users can clean up the underlying physical storage manually in this case?
> >
> > > - Option 1: Create table without the connector option, the table will
> > > be 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-10-24 Thread Jingsong Li
Hi Ingo,

I thought again.

I'll try to sort out the current catalog behaviors.
Actually, we can divide catalogs into three categories:

1. ExternalCatalog: it can only read or create a single table kind
which connects to external storage. TableFactory is provided by
Catalog, which can have nothing to do with Flink's Factory discovery
mechanism, such as IcebergCatalog, JdbcCatalog, PostgresCatalog, etc.
Catalog manages the life cycle of its **managed** tables, which means
that creation and drop will affect the real physical storage. The DDL
has no "connector" option.

2. GenericCatalog (or FlinkCatalog): only Flink tables are saved and
factories are created through Flink's factory discovery mechanism. At
this time, the catalog is actually only a storage medium for saving
schema and options, such as GenericInMemoryCatalog. Catalog only saves
meta information and does not manage the underlying physical storage
of tables. These tables are **unmanaged**. The DDL must have a
"connector" option.

3. HybridCatalog: It can save both its own **managed** table and
generic Flink **unmanaged** table, such as HiveCatalog.

We want to use the "connector" option to distinguish whether it is
managed or not.

Now, consider the Flink managed table in this FLIP.
a. ExternalCatalog can not support Flink managed tables.
b. GenericCatalog can support Flink managed tables without the
"connector" option.
c. What about HybridCatalog (HiveCatalog)? Yes, we want HiveCatalog to
support Flink managed tables:
- with "connector" option in Flink dialect is unmanaged tables
- Hive DDL in Hive dialect is Hive managed tables, the parser will add
"connector = hive" automatically. At present, there are many
differences between Flink DDL and Hive DDL, and even their features
have many differences.
- without "connector" option in Flink dialect is Flink managed tables.

In this way, we can support Flink managed tables while maintaining
compatibility.

Anyway, we need introduce a "SupportsFlinkManagedTable" to catalog.

## Back to your question #

> but we should make it clear that this is a limitation and probably document 
> how users can clean up the underlying physical storage manually in this case

Yes, it's strange that the catalog should manage tables, but some
catalogs don't have this ability.
- For PersistentCatalog, the meta will continue until the underlying
physical storage is deleted.
- For InMemoryCatalog, yes, we should document it for the underlying
physical storage of Flink managed tables.

> the HiveCatalog doesn't list a 'connector' option for its tables.

Actually, It can be divided into two steps: create and save:
- When creating a table, the table seen by HiveCatalog must have
"connector = hive", which is the hive table (Hive managed table). You
can see the "HiveCatalog.isHiveTable".
- When saving the table, it will remove the connector of the hive
table. We can do this: with "connector" option is Flink generic table,
without "connector" option is Hive table, with "flink-managed = true"
is Flink managed table.

Best,
Jingsong Lee

On Thu, Oct 21, 2021 at 8:23 PM Ingo Bürk  wrote:
>
> Hi JingSong,
>
> thank you for the answers!
>
> > BDT only can be dropped by Flink SQL DDL now.
>
> Maybe I'm misunderstanding, but that's only true from the Flink side. What
> I meant is that a table could disappear from a catalog entirely outside of
> Flink. As a simple example, consider a catalog which represents an IMAP
> mail server and each folder as a table. If a folder is deleted from the
> mail account, the table would disappear, but Flink would have no way of
> knowing that. I don't see a way around this problem, to be honest, but we
> should make it clear that this is a limitation and probably document how
> users can clean up the underlying physical storage manually in this case?
>
> > - Option 1: Create table without the connector option, the table will
> > be forcibly translated to BDT.
>
> This would be a breaking change, right? If I remember correctly (but I
> might not :-)), even the HiveCatalog doesn't list a 'connector' option for
> its tables.
>
> This approach is also very implicit, and creating physical storage isn't
> exactly "free", so I personally would favor one of the other approaches.
> Option (2) would be explicit for the end user, while Option (3) is again
> implicit for the user and only explicit for the catalog implementor, so I
> kind of favor Option (2) because I feel that users should be aware of
> creating a Flink-managed table.
>
> We also need to consider the upgrade path here: if a catalog exposes tables
> without 'connector' options today, we need to make sure that once this FLIP
> is implemented no errors are thrown because codepaths assume that physical
> storage must exist for such tables (since they were created before the
> FLIP).
>
>
> Best
> Ingo
>
> On Thu, Oct 21, 2021 at 1:31 PM Jingsong Li  wrote:
>
> > Hi Ingo and wenlong,
> >
> > Thanks for your feedback. Very good 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-10-21 Thread Ingo Bürk
Hi JingSong,

thank you for the answers!

> BDT only can be dropped by Flink SQL DDL now.

Maybe I'm misunderstanding, but that's only true from the Flink side. What
I meant is that a table could disappear from a catalog entirely outside of
Flink. As a simple example, consider a catalog which represents an IMAP
mail server and each folder as a table. If a folder is deleted from the
mail account, the table would disappear, but Flink would have no way of
knowing that. I don't see a way around this problem, to be honest, but we
should make it clear that this is a limitation and probably document how
users can clean up the underlying physical storage manually in this case?

> - Option 1: Create table without the connector option, the table will
> be forcibly translated to BDT.

This would be a breaking change, right? If I remember correctly (but I
might not :-)), even the HiveCatalog doesn't list a 'connector' option for
its tables.

This approach is also very implicit, and creating physical storage isn't
exactly "free", so I personally would favor one of the other approaches.
Option (2) would be explicit for the end user, while Option (3) is again
implicit for the user and only explicit for the catalog implementor, so I
kind of favor Option (2) because I feel that users should be aware of
creating a Flink-managed table.

We also need to consider the upgrade path here: if a catalog exposes tables
without 'connector' options today, we need to make sure that once this FLIP
is implemented no errors are thrown because codepaths assume that physical
storage must exist for such tables (since they were created before the
FLIP).


Best
Ingo

On Thu, Oct 21, 2021 at 1:31 PM Jingsong Li  wrote:

> Hi Ingo and wenlong,
>
> Thanks for your feedback. Very good questions!
>
> (Built-in Dynamic Table is simplified as BDT)
>
> First, let's look at the following questions:
>
> 1. Does BDT want a separate catalog or can it be placed in all
> catalogs (such as InMemoryCatalog and HiveCatalog)?
>  - BDT wants the latter. Because in iceberg, we have seen that a
> separate catalog undoubtedly needs to recreate a set of catalogs. We
> often don't know whether it is Flink's HiveCatalog or iceberg's
> HiveCatalog. This brings not only duplication of work, but also
> confusion.
>  - How does catalog persist BDT? As a general Flink table, persist the
> schema and options of the table.
>
> 2. Is Flink's DDL mapping or real physical storage?
> - Mapping: creating and dropping tables only change the mapping
> relationship,
> - Physical storage: creating and dropping tables will actually delete
> the underlying storage
> - Status quo: the general connectors are all mapping, while the self
> managed tables of Catalog are real storage.
> - BDT wants real physical storage, because it can provide database
> level experience, and BDT wants to be orthogonal to catalog.
> Therefore, BDT is bound to break the current situation and become a
> new concept.
>
> Based on the above conclusion, let's look at your question.
>
> To Ingo:
>
> > if tables are dropped externally rather than through Flink SQL DDL, how
> would Flink be able to remove the physical storage for it.
>
> BDT only can be dropped by Flink SQL DDL now.
>
> To wenlong:
>
> > How the built-in table would be persisted in Catalog?
>
> Just like a general Flink table, persist the schema and options of the
> table.
>
> > Is it possible to read historical data from the file store first and
> then fetch new changes from the log store? something like a hybrid source,
> but I think we need a mechanism to get exactly-once semantic.
>
> This can be implemented, but we need to save the Kafka offset of the
> current checkpoint in the snapshot, so that we can accurately switch
> between file and log. But this is not in MVP.
>
> To Ingo and wenlong:
>
> > Currently a catalog can provide a default table factory and would be
> used as the top priority factory, what would happen after the default
> factory was introduced.
>
> - Option 1: Create table without the connector option, the table will
> be forcibly translated to BDT.
> - Option 2: Introduce new grammar, for example, "CREATE MANAGED
> TABLE...", this will separate from the default table of catalog.
> Catalog can define its own managed tables.
> - Option 3: Create table without the connector option, but introduce
> interface to Catalog, for example, "SupportsFlinkManagedTable". The
> catalog that can support BDT can implement
> it.(InMemoryCatalog,HiveCatalog). Catalogs that do not support BDT can
> implement their own managed tables.(IcebergCatalog, these catalogs do
> not even support other flink tables)
>
> Best,
> Jingsong
>
> On Thu, Oct 21, 2021 at 11:37 AM wenlong.lwl 
> wrote:
> >
> > Hi Jingsong, thanks for the proposal, providing a built-in storage
> solution
> > for users will make flink SQL much more easier to use in production.
> >
> > I have some questions which may be missed in the FLIP, but may be
> important
> > IMO:
> > 1. Is 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-10-21 Thread Jingsong Li
Hi Ingo and wenlong,

Thanks for your feedback. Very good questions!

(Built-in Dynamic Table is simplified as BDT)

First, let's look at the following questions:

1. Does BDT want a separate catalog or can it be placed in all
catalogs (such as InMemoryCatalog and HiveCatalog)?
 - BDT wants the latter. Because in iceberg, we have seen that a
separate catalog undoubtedly needs to recreate a set of catalogs. We
often don't know whether it is Flink's HiveCatalog or iceberg's
HiveCatalog. This brings not only duplication of work, but also
confusion.
 - How does catalog persist BDT? As a general Flink table, persist the
schema and options of the table.

2. Is Flink's DDL mapping or real physical storage?
- Mapping: creating and dropping tables only change the mapping relationship,
- Physical storage: creating and dropping tables will actually delete
the underlying storage
- Status quo: the general connectors are all mapping, while the self
managed tables of Catalog are real storage.
- BDT wants real physical storage, because it can provide database
level experience, and BDT wants to be orthogonal to catalog.
Therefore, BDT is bound to break the current situation and become a
new concept.

Based on the above conclusion, let's look at your question.

To Ingo:

> if tables are dropped externally rather than through Flink SQL DDL, how would 
> Flink be able to remove the physical storage for it.

BDT only can be dropped by Flink SQL DDL now.

To wenlong:

> How the built-in table would be persisted in Catalog?

Just like a general Flink table, persist the schema and options of the table.

> Is it possible to read historical data from the file store first and then 
> fetch new changes from the log store? something like a hybrid source, but I 
> think we need a mechanism to get exactly-once semantic.

This can be implemented, but we need to save the Kafka offset of the
current checkpoint in the snapshot, so that we can accurately switch
between file and log. But this is not in MVP.

To Ingo and wenlong:

> Currently a catalog can provide a default table factory and would be used as 
> the top priority factory, what would happen after the default factory was 
> introduced.

- Option 1: Create table without the connector option, the table will
be forcibly translated to BDT.
- Option 2: Introduce new grammar, for example, "CREATE MANAGED
TABLE...", this will separate from the default table of catalog.
Catalog can define its own managed tables.
- Option 3: Create table without the connector option, but introduce
interface to Catalog, for example, "SupportsFlinkManagedTable". The
catalog that can support BDT can implement
it.(InMemoryCatalog,HiveCatalog). Catalogs that do not support BDT can
implement their own managed tables.(IcebergCatalog, these catalogs do
not even support other flink tables)

Best,
Jingsong

On Thu, Oct 21, 2021 at 11:37 AM wenlong.lwl  wrote:
>
> Hi Jingsong, thanks for the proposal, providing a built-in storage solution
> for users will make flink SQL much more easier to use in production.
>
> I have some questions which may be missed in the FLIP, but may be important
> IMO:
> 1. Is it possible to read historical data from the file store first and
> then fetch new changes from the log store? something like a hybrid source,
> but I think we need a mechanism to get exactly-once semantic.
> 2. How the built-in table would be persisted in Catalog?
> 3. Currently a catalog can provide a default table factory and would be
> used as the top priority factory, what would happen after the default
> factory was introduced.
>
> On Wed, 20 Oct 2021 at 19:35, Ingo Bürk  wrote:
>
> > Hi Jingsong,
> >
> > thank you for writing up the proposal. The benefits such a mechanism will
> > bring will be very valuable! I haven't yet looked into this in detail, but
> > one question came to my mind immediately:
> >
> > The DDL for these tables seems to rely on there not being a 'connector'
> > option. However, catalogs can provide a custom factory, and thus tables
> > don't necessarily need to contain such an option already today. How will
> > this interact / work with catalogs? I think there are more points regarding
> > interaction with catalogs, e.g. if tables are dropped externally rather
> > than through Flink SQL DDL, how would Flink be able to remove the physical
> > storage for it.
> >
> >
> > Best
> > Ingo
> >
> > On Wed, Oct 20, 2021 at 11:14 AM Jingsong Li 
> > wrote:
> >
> > > Hi all,
> > >
> > > Kurt and I propose to introduce built-in storage support for dynamic
> > > table, a truly unified changelog & table representation, from Flink
> > > SQL’s perspective. We believe this kind of storage will improve the
> > > usability a lot.
> > >
> > > We want to highlight some characteristics about this storage:
> > >
> > > - It’s a built-in storage for Flink SQL
> > > ** Improve usability issues
> > > ** Flink DDL is no longer just a mapping, but a real creation for these
> > > tables
> > > ** Masks & abstracts the 

Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-10-20 Thread wenlong.lwl
Hi Jingsong, thanks for the proposal, providing a built-in storage solution
for users will make flink SQL much more easier to use in production.

I have some questions which may be missed in the FLIP, but may be important
IMO:
1. Is it possible to read historical data from the file store first and
then fetch new changes from the log store? something like a hybrid source,
but I think we need a mechanism to get exactly-once semantic.
2. How the built-in table would be persisted in Catalog?
3. Currently a catalog can provide a default table factory and would be
used as the top priority factory, what would happen after the default
factory was introduced.

On Wed, 20 Oct 2021 at 19:35, Ingo Bürk  wrote:

> Hi Jingsong,
>
> thank you for writing up the proposal. The benefits such a mechanism will
> bring will be very valuable! I haven't yet looked into this in detail, but
> one question came to my mind immediately:
>
> The DDL for these tables seems to rely on there not being a 'connector'
> option. However, catalogs can provide a custom factory, and thus tables
> don't necessarily need to contain such an option already today. How will
> this interact / work with catalogs? I think there are more points regarding
> interaction with catalogs, e.g. if tables are dropped externally rather
> than through Flink SQL DDL, how would Flink be able to remove the physical
> storage for it.
>
>
> Best
> Ingo
>
> On Wed, Oct 20, 2021 at 11:14 AM Jingsong Li 
> wrote:
>
> > Hi all,
> >
> > Kurt and I propose to introduce built-in storage support for dynamic
> > table, a truly unified changelog & table representation, from Flink
> > SQL’s perspective. We believe this kind of storage will improve the
> > usability a lot.
> >
> > We want to highlight some characteristics about this storage:
> >
> > - It’s a built-in storage for Flink SQL
> > ** Improve usability issues
> > ** Flink DDL is no longer just a mapping, but a real creation for these
> > tables
> > ** Masks & abstracts the underlying technical details, no annoying
> options
> >
> > - Supports subsecond streaming write & consumption
> > ** It could be backed by a service-oriented message queue (Like Kafka)
> > ** High throughput scan capability
> > ** Filesystem with columnar formats would be an ideal choice just like
> > iceberg/hudi does.
> >
> > - More importantly, in order to solve the cognitive bar, storage needs
> > to automatically address various Insert/Update/Delete inputs and table
> > definitions
> > ** Receive any type of changelog
> > ** Table can have primary key or no primary key
> >
> > Looking forward to your feedback.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> >
> > Best,
> > Jingsong Lee
> >
>


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-10-20 Thread Ingo Bürk
Hi Jingsong,

thank you for writing up the proposal. The benefits such a mechanism will
bring will be very valuable! I haven't yet looked into this in detail, but
one question came to my mind immediately:

The DDL for these tables seems to rely on there not being a 'connector'
option. However, catalogs can provide a custom factory, and thus tables
don't necessarily need to contain such an option already today. How will
this interact / work with catalogs? I think there are more points regarding
interaction with catalogs, e.g. if tables are dropped externally rather
than through Flink SQL DDL, how would Flink be able to remove the physical
storage for it.


Best
Ingo

On Wed, Oct 20, 2021 at 11:14 AM Jingsong Li  wrote:

> Hi all,
>
> Kurt and I propose to introduce built-in storage support for dynamic
> table, a truly unified changelog & table representation, from Flink
> SQL’s perspective. We believe this kind of storage will improve the
> usability a lot.
>
> We want to highlight some characteristics about this storage:
>
> - It’s a built-in storage for Flink SQL
> ** Improve usability issues
> ** Flink DDL is no longer just a mapping, but a real creation for these
> tables
> ** Masks & abstracts the underlying technical details, no annoying options
>
> - Supports subsecond streaming write & consumption
> ** It could be backed by a service-oriented message queue (Like Kafka)
> ** High throughput scan capability
> ** Filesystem with columnar formats would be an ideal choice just like
> iceberg/hudi does.
>
> - More importantly, in order to solve the cognitive bar, storage needs
> to automatically address various Insert/Update/Delete inputs and table
> definitions
> ** Receive any type of changelog
> ** Table can have primary key or no primary key
>
> Looking forward to your feedback.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
>
> Best,
> Jingsong Lee
>


[DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-10-20 Thread Jingsong Li
Hi all,

Kurt and I propose to introduce built-in storage support for dynamic
table, a truly unified changelog & table representation, from Flink
SQL’s perspective. We believe this kind of storage will improve the
usability a lot.

We want to highlight some characteristics about this storage:

- It’s a built-in storage for Flink SQL
** Improve usability issues
** Flink DDL is no longer just a mapping, but a real creation for these tables
** Masks & abstracts the underlying technical details, no annoying options

- Supports subsecond streaming write & consumption
** It could be backed by a service-oriented message queue (Like Kafka)
** High throughput scan capability
** Filesystem with columnar formats would be an ideal choice just like
iceberg/hudi does.

- More importantly, in order to solve the cognitive bar, storage needs
to automatically address various Insert/Update/Delete inputs and table
definitions
** Receive any type of changelog
** Table can have primary key or no primary key

Looking forward to your feedback.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage

Best,
Jingsong Lee