Jingsong raised a good point. We need to be more careful when deprecating
APIs.
For example, tEnv#createTemporaryView was introduced in release-1.10, users
became familiar with this API in the previous release, but now we want to
deprecate it in the next release.

I also have some concerns about deprecating `.rowtime()`, `.proctime()`. I
agree it's a misunderstanding to apply expressions on non-exist fields.
However, these APIs have been introduced since the early time of Table API
and have been highly used.
So I think the misunderstanding shouldn't be a big problem, users have
already accepted it, and the `.rowtime()` and `.proctime()` is a more
fluent API.
Dropping a highly used API and educating users to learn a new one will hurt
users a lot. Could we keep the old API and introduce the new one (which is
the advanced one)?

In a word, I'm +1 to keep the `fromDataStream` which is more
straightforward than `fromInsertStream` for batch users and most streaming
users.
Besides, if we want to have a corresponding thing in the sink side, maybe
we can have `toDataStream` and deprecate `toAppendStream`.

Best,
Jark

On Wed, 2 Sep 2020 at 11:55, Jingsong Li <jingsongl...@gmail.com> wrote:

> Thanks Timo for driving.
>
> My first impression is, can we not deprecate these API?
> - StreamTableEnvironment.fromDataStream(DataStream<T>): Table
> - StreamTableEnvironment.fromDataStream(DataStream<T>, Expression...):
> Table
> - StreamTableEnvironment.createTemporaryView(String, DataStream<T>,
> Expression...): Unit
> - StreamTableEnvironment.createTemporaryView(String, DataStream<T>): Unit
> - StreamTableEnvironment.toAppendStream(Table table, Class<T> clazz):
> DataStream<T>
> - StreamTableEnvironment.toAppendStream(Table table, TypeInformation<T>
> typeInfo): DataStream<T>
>
> I think they may also be commonly used APIs. My intuitive feeling is that
> the API of the table layer is changing too fast, and there are a lot of
> changes in each version. Even if there is a "deprecated", they will be
> removed one day. We can avoid the change unless there's a strong reason.
>
> 1.fromDataStream VS fromInsertStream:
> - In big data systems, or in our previous designs, APIs, including DDL, the
> default is pure insert. Subsequent CDC, upsert, and delete are all
> supplementary extension capabilities. Therefore, by default, it is insert,
> which is known and familiar to users. So I think, "fromDataStream" can be
> as it is.
>
> 2.toAppendStream VS toInsertStream:
> - What is the difference between append and insert? I don't think there is
> a clear distinction between them in our daily discussions. For me, Append
> is OK.
>
> 3.Calling `.rowtime()` and `.proctime()` on fields that don't exist caused
> further misunderstandings
> - This API is also widely used, even in our test code. Although we have
> introduced DDL, our test code has not been switched.
> - exist caused further misunderstandings: Can we remove this
> misunderstanding by modifying behavior? For example, duplicate names are
> not allowed. As far as I know, a new column name is the most widely used.
>
> 4.toAppendStream(Table table, Class<T>/TypeInformation)
> - I know a AbstractDataType is more powerful, but I think a simple class
> or TypeInformation is easier to be accepted by DataStream users, simpler,
> they had a chance to not take care of datatype.
>
> I don't have a strong opinion on these, but I feel it's best not to have an
> impact on non-CDC users.
>
> Best,
> Jingsong
>
> On Tue, Sep 1, 2020 at 9:10 PM Timo Walther <twal...@apache.org> wrote:
>
> > Thanks for the healthy discussion Jark and Dawid.
> >
> > 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> >
> > Yes, I'm concerned about about the per-record performance. A converter
> > or serializer should prepare an immutable Map instance before (stored in
> > a member variable) that is simply passed to every new Row instance.
> >
> > 7. "a Row has two modes represented by an internal boolean flag
> > `hasFieldOrder`."
> >
> > The accumulator code in the FLIP is just an example, sure in this
> > example we could use a POJO. But in general it should also be easy for
> > DataStream API users to quickly create a Row and use names instead of
> > indices for code readability.
> >
> > I think we should not add to much validation to the setters to keep the
> > runtime overhead low.
> >
> > Users should not mix position-based and string-based setters if they
> > construct rows themselves. If they do, the result depends on the calling
> > order. IMHO this should be straight forward once the concept is clear.
> >
> > Row row = new Row(2);
> > row.setField(0, "ABC"); // always position 0
> > row.setField(1, "ABC"); // always position 1
> > row.setField("f1", "ABC"); // position 0 because first usage of "f1"
> > row.setField("f0", "ABC"); // position 1 because first usage of "f0"
> > row.setField("f1", "ABC"); // position 0 because second usage of "f1"
> >
> > Row row = new Row(2);
> > row.setField("f0", "ABC"); // position 0 because first usage of "f0"
> > row.setField(0, "ABC");    // always position 0
> >
> > Row row = new Row(2, fieldNames);
> > row.setField(0, "ABC"); // always position 0
> > row.setField("f1", "ABC"); // position defined by fieldNames
> >
> > Regards,
> > Timo
> >
> > On 01.09.20 14:51, Jark Wu wrote:
> > > Hi Timo,
> > >
> > > Thanks for the quick response.
> > >
> > > 5. "StreamStatementSet#attachToStream()"
> > > Joining or using connect() with a different DataStream is a good case.
> > > cc @Godfrey , what do you think about the `attachToStream()` API?
> > >
> > > 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> > >> We need a Map for constant time of mapping field name to index.
> > > But we can easily build the Map from the List<String> fieldNames in Row
> > > constructor.
> > > IMO, manually building the Map and mapping names to indices is verbose
> > and
> > > error-prone.
> > > Are you concerned about the per-record performance?
> > >
> > > 7. "a Row has two modes represented by an internal boolean flag
> > > `hasFieldOrder`."
> > > Thanks for the explanation.
> > > Regarding the case (b), I have the same confusion with Dawid that
> what's
> > > the result when index-based setters and name-based setters are mixed
> used
> > > (esp. in foreach and if branches).
> > > TBH, I don't see a strong need for named setters. Using it as the UDAF
> > > accumulator is not as good as POJO in terms of performance and ease of
> > use.
> > >
> > > Best,
> > > Jark
> > >
> > > On Tue, 1 Sep 2020 at 20:28, Dawid Wysakowicz <dwysakow...@apache.org>
> > > wrote:
> > >
> > >> Hi all,
> > >>
> > >> I really like the ideas of this FLIP. I think it improves user
> > >> experience quite a bit. I wanted to add just two comments:
> > >>
> > >> 1. As for the StatementSet I like the approach described in the FLIP
> for
> > >> its simplicity. Moreover the way I see it is that if a user wants to
> > >> work with DataStream, then he/she wants to end up in the DataStream
> API,
> > >> or in other words call the StreamExecutionEnvironment#execute.
> > >>
> > >> 2. @Timo What is the interaction between Row setters from the
> different
> > >> modes? What happens if the user calls both in different order. E.g.
> > >>
> > >> row.setField(0, "ABC");
> > >>
> > >> row.setField("f0", "ABC"); // is this a valid call ?
> > >>
> > >> or
> > >>
> > >> row.setField("f0", "ABC");
> > >>
> > >> row.setField(0, "ABC"); // is this a valid call ?
> > >>
> > >> or
> > >>
> > >> row.setFieldNames(...);
> > >>
> > >> row.setField(0, "ABC"); // is this a valid call ?
> > >>
> > >> Best,
> > >>
> > >> Dawid
> > >>
> > >> On 01/09/2020 11:49, Timo Walther wrote:
> > >>> Hi Jark,
> > >>>
> > >>> thanks for the detailed review. Let me answer your concerns:
> > >>>
> > >>> ## Conversion of DataStream to Table
> > >>>
> > >>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> > >>> leaf of a QueryOperation tree in the validation phase."
> > >>> I'm fine with allowing `system_proctime` everywhere in the query.
> Also
> > >>> for SQL, I think we should have done that earlier already to give
> > >>> users the chance to have time based operations also at later stages.
> > >>>
> > >>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> > >>> assigned implicitly. "
> > >>> Yes, we just use the DataStream API watermark. `system_rowtime()`
> will
> > >>> just introduce a time attribute, the watermark travels to the Table
> > >>> API and into DataStream API without further code changes.
> > >>>
> > >>> ## Conversion of Table to DataStream
> > >>>
> > >>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> > >>> DataStream<Row>"
> > >>> 4. "Table.execute(ChangelogMode)"
> > >>> Filtering UPDATE_BEFORE is already quite important as it reduces the
> > >>> amount of data by factor 2. But I also understand your concerns
> > >>> regarding confusing users. I also got the request for a
> > >>> `Table.getChangelogMode()` a couple of times in the past, because
> > >>> users would like to get information about the kind of query that is
> > >>> executed. However, in this case `toChangelogStream(Table)` is
> > >>> equivalent to call ``toChangelogStream(Table.getChangelogMode(),
> > >>> Table)` so we don't need `Table.getChangelogMode()` in the current
> > >>> FLIP design. But this can be future work. Let's start with
> > >>> `toChangelogStream(Table)` and wait for more feedback about this new
> > >>> feature. What do others think?
> > >>>
> > >>> ## Conversion of StatementSet to DataStream API
> > >>>
> > >>> 5. "StreamStatementSet#attachToStream()"
> > >>>
> > >>> I think Godfrey's proposal is too complex for regular users. Instead
> > >>> of continuing with the fluent programming, we would force users to
> > >>> define a DataStream pipeline in a lambda.
> > >>>
> > >>> Furthermore, joining or using connect() with a different DataStream
> > >>> source would not be possible in this design.
> > >>>
> > >>> The `execute()` method of `StatementSet` should not execute the
> > >>> DataStream API subprogram. It mixes the concepts because we tell
> > >>> users: "If you use toDataStream" you need to use
> > >>> `StreamExecutionEnvironment.execute()`.
> > >>>
> > >>> We don't solve every potential use case with the current FLIP design
> > >>> but the most important one where a pipeline just uses an INSERT INTO
> > >>> but also uses Table API for connectors and preprocessing and does the
> > >>> main logic in DataStream API:
> > >>>
> > >>> T1 -> T2, T3 -> DataStream, T4 -> DataStream
> > >>>
> > >>> I would consider `StatementSet.addDataStream(Table, ...)` future work
> > >>> for now as it is only an opimization for reusing parts of the
> > >>> StreamGraph. We could even perform this optimization when calling
> > >>> `toInsertStream` or `toChangelogStream`.
> > >>>
> > >>> ## Improve dealing with Row in DataStream API
> > >>>
> > >>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> > >>>
> > >>> We need a Map for constant time of mapping field name to index.
> > >>>
> > >>> We accept a nullable `fieldNames` because names are not mandatory,
> one
> > >>> can also work with indices as before.
> > >>>
> > >>> But you are right that the fieldNames member variable can be
> > >>> immutable. I just wanted to avoid too many overloaded constructors.
> > >>> I'm fine with having one full constructor for RowKind, arity and
> field
> > >>> names (or null).
> > >>>
> > >>> 7. "a Row has two modes represented by an internal boolean flag
> > >>> `hasFieldOrder`."
> > >>> Maybe I leaked to many implementation details there that rather
> > >>> confuse readers than help. Internally, we need to distinguish between
> > >>> two kinds of rows. A user should not be bothered by this.
> > >>>
> > >>> a) Row comes from Table API runtime: hasFieldOrder = true
> > >>> Map("myAge" -> 0, "myName" -> 1)
> > >>>
> > >>> row.getField("myName") == row.getField(1)
> > >>> row.getField("myAge") == row.getField(0)
> > >>>
> > >>> b) Row comes from user: hasFieldOrder = false
> > >>> Row row = new Row(2);
> > >>> row.setField("myName", "Alice");
> > >>> row.setField("myAge", 32);
> > >>>
> > >>> Map("myAge" -> 1, "myName" -> 0)
> > >>>
> > >>> But the type information will decide about the order of the fields
> > >>> later and reorder them accordingly during serialization or RowData
> > >>> conversion:
> > >>>
> > >>> ["myName", "myAge"] vs. ["myAge", "myName"]
> > >>>
> > >>> The user must not care about this as it always feels naturally to
> deal
> > >>> with the rows.
> > >>>
> > >>> Regards,
> > >>> Timo
> > >>>
> > >>>
> > >>> On 01.09.20 06:19, Jark Wu wrote:
> > >>>> Hi Timo,
> > >>>>
> > >>>> Thanks a lot for the great proposal and sorry for the late reply.
> > >>>> This is
> > >>>> an important improvement for DataStream and Table API users.
> > >>>>
> > >>>> I have listed my thoughts and questions below ;-)
> > >>>>
> > >>>> ## Conversion of DataStream to Table
> > >>>>
> > >>>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> > >>>> leaf of
> > >>>> a QueryOperation tree in the validation phase."
> > >>>> IIUC, that means `system_rowtime()` can only be used in the first
> > >>>> `select()` after `fromXxxStream()`, right?
> > >>>> However, I think `system_proctime()` shouldn't have this limitation,
> > >>>> because it doesn't rely on the underlying timestamp of StreamRecord
> > and
> > >>>>    can be generated in any stage of the query.
> > >>>>
> > >>>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> > >>>> assigned implicitly. "
> > >>>> What watermark will be used here? Is the pre-assigned watermark in
> the
> > >>>> DataStream (so called `system_watermak()`)?
> > >>>>
> > >>>> ## Conversion of Table to DataStream
> > >>>>
> > >>>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> > >>>> DataStream<Row>"
> > >>>> I'm not sure whether this method is useful for users. Currently, the
> > >>>> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
> > >>>> filtering UPDATE_BEFORE if possible.
> > >>>> However, if we expose this method to users, it may be confusing.
> > >>>> Users may
> > >>>> try to use this method to convert a changelog stream to an
> insert-only
> > >>>> stream by applying ChangelogMode.insertOnly(). This might be
> > misleading.
> > >>>> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They
> > >>>> have
> > >>>> to know the ChangelogMode of the current Table first, and remove
> > >>>> UPDATE_BEFORE from the ChagnelogMode.
> > >>>> That means we have to support `Table.getChangelogMode()` first? But
> > >>>> `ChangelogMode` derivation requires a full optimization path on the
> > >>>> Table,
> > >>>> which seems impossible now.
> > >>>> Therefore, IMHO, we can introduce this interface in the future if
> > users
> > >>>> indeed need this. For most users, I think `toChangelogStream(Table)`
> > is
> > >>>> enough.
> > >>>>
> > >>>> 4. "Table.execute(ChangelogMode)"
> > >>>> Ditto.
> > >>>>
> > >>>> ## Conversion of StatementSet to DataStream API
> > >>>>
> > >>>> 5. "StreamStatementSet#attachToStream()"
> > >>>> I think the potential drawback is that it can't support multi-sink
> > >>>> optimization, i.e. share pipeline.
> > >>>> For example, if we have a Table `t1` (a heavy view uses join,
> > >>>> aggregate),
> > >>>> and want to sink to "mysql" using SQL and want to continue
> processing
> > >>>> using
> > >>>> DataStream in a job.
> > >>>> It's a huge waste of resources if we re-compute `t1`. It would be
> > >>>> nice if
> > >>>> we can come up with a solution to share the pipeline.
> > >>>>
> > >>>> I borrowed Godfrey's idea in FLINK-18840 and added some
> > >>>> modifications. What
> > >>>> do you think about the following proposal?
> > >>>>
> > >>>> interface StatementSet {
> > >>>>      StatementSet addDataStream(Table table,
> TableDataStreamTransform
> > >>>> transform);
> > >>>> }
> > >>>>
> > >>>> interface TableDataStreamTransform {
> > >>>>      void transform(Context);
> > >>>>
> > >>>>      interface Context {
> > >>>>          Table getTable();
> > >>>>          DataStream<Row> toInsertStream(Table);
> > >>>>          DataStream<T> toInsertStream(AbstractDataType<?>, Table);
> > >>>>          DataStream<Row> toChangelogStream(Table);
> > >>>>      }
> > >>>> }
> > >>>>
> > >>>> tEnv
> > >>>>     .createStatementSet()
> > >>>>     .addInsert("mysql", table1)
> > >>>>     .addDataStream(table1, ctx -> {
> > >>>>         ctx.toInsertStream(ctx.getTable())
> > >>>>           .flatmap(..)
> > >>>>           .keyBy(..)
> > >>>>           .process(..)
> > >>>>           .addSink(...);
> > >>>>     })
> > >>>>
> > >>>>
> > >>>> ## Improve dealing with Row in DataStream API
> > >>>>
> > >>>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> > >>>> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter
> > is
> > >>>> enough and more handy than Map ?
> > >>>> - Currently, the fieldNames member variable is mutable, is it on
> > >>>> purpose?
> > >>>> Can we make it immutable? For example, only accept from the
> > constructor.
> > >>>> - Why do we accept a nullable `fieldNames`?
> > >>>>
> > >>>> 7. "a Row has two modes represented by an internal boolean flag
> > >>>> `hasFieldOrder`."
> > >>>> Sorry, I don't fully understand what does the `hasFieldOrder` mean
> > >>>> and is
> > >>>> used for. Could you explain a bit more for this?
> > >>>>
> > >>>> Best,
> > >>>> Jark
> > >>>>
> > >>>>
> > >>>> On Wed, 19 Aug 2020 at 17:38, Timo Walther <twal...@apache.org>
> > wrote:
> > >>>>
> > >>>>> Hi David,
> > >>>>>
> > >>>>> thanks for your feedback. Feedback from someone who interacts with
> > many
> > >>>>> users is very valuable. I added an explanation for StatementSets to
> > the
> > >>>>> FLIP.
> > >>>>>
> > >>>>> Regarding watermarks and fromInsertStream, actually the
> > >>>>>
> > >>>>> `Schema.watermark("ts", system_watermark())`
> > >>>>>
> > >>>>> is not really necessary in the `fromChangelogStream`. It is added
> to
> > >>>>> satify the Schema interface and be similar to SQL DDL.
> > >>>>>
> > >>>>> We could already extract the watermark strategy if we see
> > >>>>> `system_rowtime()` because in most of the cases we will simply use
> > the
> > >>>>> DataStream API watermarks.
> > >>>>>
> > >>>>> But maybe some users want to generate watermarks after
> preprocessing
> > in
> > >>>>> DataStream API. In this cases users what to define a computed
> > watermark
> > >>>>> expression.
> > >>>>>
> > >>>>> So for simplicity in the Simple API we introduce:
> > >>>>>
> > >>>>> tEnv
> > >>>>>      .fromInsertStream(DataStream<T>)
> > >>>>>      .select('*, system_rowtime().as("rowtime"),
> > >>>>> system_proctime().as("proctime"))
> > >>>>>
> > >>>>> and just rely on the watermarks that travel through DataStream API
> > >>>>> already. I added another comment to the FLIP.
> > >>>>>
> > >>>>> Regards,
> > >>>>> Timo
> > >>>>>
> > >>>>>
> > >>>>> On 19.08.20 10:53, David Anderson wrote:
> > >>>>>> Timo, nice to see this.
> > >>>>>>
> > >>>>>> As someone who expects to use these interfaces, but who doesn't
> > fully
> > >>>>>> understand the existing Table API, I like what I see. Just a
> couple
> > of
> > >>>>>> comments:
> > >>>>>>
> > >>>>>> The way that watermarks fit into the fromChangelogStream case
> makes
> > >>>>>> sense
> > >>>>>> to me, and I'm wondering why watermarks don't come up in the
> > previous
> > >>>>>> section about fromInsertStream.
> > >>>>>>
> > >>>>>> I wasn't familiar with StatementSets, and I couldn't find an
> > >>>>>> explanation
> > >>>>> in
> > >>>>>> the docs. I eventually found this short paragraph in an email from
> > >>>>>> Fabian
> > >>>>>> Hueske, which clarified everything in that section for me:
> > >>>>>>
> > >>>>>>        FLIP-84 [1] added the concept of a "statement set" to group
> > >>>>>> multiple
> > >>>>>> INSERT
> > >>>>>>        INTO statements (SQL or Table API) together. The statements
> > in a
> > >>>>>> statement
> > >>>>>>        set are jointly optimized and executed as a single Flink
> job.
> > >>>>>>
> > >>>>>> Maybe if you add this to the FLIP it will help other readers as
> > well.
> > >>>>>>
> > >>>>>> Best,
> > >>>>>> David
> > >>>>>>
> > >>>>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <twal...@apache.org
> >
> > >>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi everyone,
> > >>>>>>>
> > >>>>>>> I would like to propose a FLIP that aims to resolve the remaining
> > >>>>>>> shortcomings in the Table API:
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> > >>>>>
> > >>>>>>>
> > >>>>>>> The Table API has received many new features over the last year.
> It
> > >>>>>>> supports a new type system (FLIP-37), connectors support
> changelogs
> > >>>>>>> (FLIP-95), we have well defined internal data structures
> (FLIP-95),
> > >>>>>>> support for result retrieval in an interactive fashion (FLIP-84),
> > and
> > >>>>>>> soon new TableDescriptors (FLIP-129).
> > >>>>>>>
> > >>>>>>> However, the interfaces from and to DataStream API have not been
> > >>>>>>> touched
> > >>>>>>> during the introduction of these new features and are kind of
> > >>>>>>> outdated.
> > >>>>>>> The interfaces lack important functionality that is available in
> > >>>>>>> Table
> > >>>>>>> API but not exposed to DataStream API users. DataStream API is
> > >>>>>>> still our
> > >>>>>>> most important API which is why a good interoperability is
> crucial.
> > >>>>>>>
> > >>>>>>> This FLIP is a mixture of different topics that improve the
> > >>>>>>> interoperability between DataStream and Table API in terms of:
> > >>>>>>>
> > >>>>>>> - DataStream <-> Table conversion
> > >>>>>>> - translation of type systems TypeInformation <-> DataType
> > >>>>>>> - schema definition (incl. rowtime, watermarks, primary key)
> > >>>>>>> - changelog handling
> > >>>>>>> - row handling in DataStream API
> > >>>>>>>
> > >>>>>>> I'm looking forward to your feedback.
> > >>>>>>>
> > >>>>>>> Regards,
> > >>>>>>> Timo
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>
> --
> Best, Jingsong Lee
>

Reply via email to