Jingsong raised a good point. We need to be more careful when deprecating APIs. For example, tEnv#createTemporaryView was introduced in release-1.10, users became familiar with this API in the previous release, but now we want to deprecate it in the next release.
I also have some concerns about deprecating `.rowtime()`, `.proctime()`. I agree it's a misunderstanding to apply expressions on non-exist fields. However, these APIs have been introduced since the early time of Table API and have been highly used. So I think the misunderstanding shouldn't be a big problem, users have already accepted it, and the `.rowtime()` and `.proctime()` is a more fluent API. Dropping a highly used API and educating users to learn a new one will hurt users a lot. Could we keep the old API and introduce the new one (which is the advanced one)? In a word, I'm +1 to keep the `fromDataStream` which is more straightforward than `fromInsertStream` for batch users and most streaming users. Besides, if we want to have a corresponding thing in the sink side, maybe we can have `toDataStream` and deprecate `toAppendStream`. Best, Jark On Wed, 2 Sep 2020 at 11:55, Jingsong Li <jingsongl...@gmail.com> wrote: > Thanks Timo for driving. > > My first impression is, can we not deprecate these API? > - StreamTableEnvironment.fromDataStream(DataStream<T>): Table > - StreamTableEnvironment.fromDataStream(DataStream<T>, Expression...): > Table > - StreamTableEnvironment.createTemporaryView(String, DataStream<T>, > Expression...): Unit > - StreamTableEnvironment.createTemporaryView(String, DataStream<T>): Unit > - StreamTableEnvironment.toAppendStream(Table table, Class<T> clazz): > DataStream<T> > - StreamTableEnvironment.toAppendStream(Table table, TypeInformation<T> > typeInfo): DataStream<T> > > I think they may also be commonly used APIs. My intuitive feeling is that > the API of the table layer is changing too fast, and there are a lot of > changes in each version. Even if there is a "deprecated", they will be > removed one day. We can avoid the change unless there's a strong reason. > > 1.fromDataStream VS fromInsertStream: > - In big data systems, or in our previous designs, APIs, including DDL, the > default is pure insert. Subsequent CDC, upsert, and delete are all > supplementary extension capabilities. Therefore, by default, it is insert, > which is known and familiar to users. So I think, "fromDataStream" can be > as it is. > > 2.toAppendStream VS toInsertStream: > - What is the difference between append and insert? I don't think there is > a clear distinction between them in our daily discussions. For me, Append > is OK. > > 3.Calling `.rowtime()` and `.proctime()` on fields that don't exist caused > further misunderstandings > - This API is also widely used, even in our test code. Although we have > introduced DDL, our test code has not been switched. > - exist caused further misunderstandings: Can we remove this > misunderstanding by modifying behavior? For example, duplicate names are > not allowed. As far as I know, a new column name is the most widely used. > > 4.toAppendStream(Table table, Class<T>/TypeInformation) > - I know a AbstractDataType is more powerful, but I think a simple class > or TypeInformation is easier to be accepted by DataStream users, simpler, > they had a chance to not take care of datatype. > > I don't have a strong opinion on these, but I feel it's best not to have an > impact on non-CDC users. > > Best, > Jingsong > > On Tue, Sep 1, 2020 at 9:10 PM Timo Walther <twal...@apache.org> wrote: > > > Thanks for the healthy discussion Jark and Dawid. > > > > 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)" > > > > Yes, I'm concerned about about the per-record performance. A converter > > or serializer should prepare an immutable Map instance before (stored in > > a member variable) that is simply passed to every new Row instance. > > > > 7. "a Row has two modes represented by an internal boolean flag > > `hasFieldOrder`." > > > > The accumulator code in the FLIP is just an example, sure in this > > example we could use a POJO. But in general it should also be easy for > > DataStream API users to quickly create a Row and use names instead of > > indices for code readability. > > > > I think we should not add to much validation to the setters to keep the > > runtime overhead low. > > > > Users should not mix position-based and string-based setters if they > > construct rows themselves. If they do, the result depends on the calling > > order. IMHO this should be straight forward once the concept is clear. > > > > Row row = new Row(2); > > row.setField(0, "ABC"); // always position 0 > > row.setField(1, "ABC"); // always position 1 > > row.setField("f1", "ABC"); // position 0 because first usage of "f1" > > row.setField("f0", "ABC"); // position 1 because first usage of "f0" > > row.setField("f1", "ABC"); // position 0 because second usage of "f1" > > > > Row row = new Row(2); > > row.setField("f0", "ABC"); // position 0 because first usage of "f0" > > row.setField(0, "ABC"); // always position 0 > > > > Row row = new Row(2, fieldNames); > > row.setField(0, "ABC"); // always position 0 > > row.setField("f1", "ABC"); // position defined by fieldNames > > > > Regards, > > Timo > > > > On 01.09.20 14:51, Jark Wu wrote: > > > Hi Timo, > > > > > > Thanks for the quick response. > > > > > > 5. "StreamStatementSet#attachToStream()" > > > Joining or using connect() with a different DataStream is a good case. > > > cc @Godfrey , what do you think about the `attachToStream()` API? > > > > > > 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)" > > >> We need a Map for constant time of mapping field name to index. > > > But we can easily build the Map from the List<String> fieldNames in Row > > > constructor. > > > IMO, manually building the Map and mapping names to indices is verbose > > and > > > error-prone. > > > Are you concerned about the per-record performance? > > > > > > 7. "a Row has two modes represented by an internal boolean flag > > > `hasFieldOrder`." > > > Thanks for the explanation. > > > Regarding the case (b), I have the same confusion with Dawid that > what's > > > the result when index-based setters and name-based setters are mixed > used > > > (esp. in foreach and if branches). > > > TBH, I don't see a strong need for named setters. Using it as the UDAF > > > accumulator is not as good as POJO in terms of performance and ease of > > use. > > > > > > Best, > > > Jark > > > > > > On Tue, 1 Sep 2020 at 20:28, Dawid Wysakowicz <dwysakow...@apache.org> > > > wrote: > > > > > >> Hi all, > > >> > > >> I really like the ideas of this FLIP. I think it improves user > > >> experience quite a bit. I wanted to add just two comments: > > >> > > >> 1. As for the StatementSet I like the approach described in the FLIP > for > > >> its simplicity. Moreover the way I see it is that if a user wants to > > >> work with DataStream, then he/she wants to end up in the DataStream > API, > > >> or in other words call the StreamExecutionEnvironment#execute. > > >> > > >> 2. @Timo What is the interaction between Row setters from the > different > > >> modes? What happens if the user calls both in different order. E.g. > > >> > > >> row.setField(0, "ABC"); > > >> > > >> row.setField("f0", "ABC"); // is this a valid call ? > > >> > > >> or > > >> > > >> row.setField("f0", "ABC"); > > >> > > >> row.setField(0, "ABC"); // is this a valid call ? > > >> > > >> or > > >> > > >> row.setFieldNames(...); > > >> > > >> row.setField(0, "ABC"); // is this a valid call ? > > >> > > >> Best, > > >> > > >> Dawid > > >> > > >> On 01/09/2020 11:49, Timo Walther wrote: > > >>> Hi Jark, > > >>> > > >>> thanks for the detailed review. Let me answer your concerns: > > >>> > > >>> ## Conversion of DataStream to Table > > >>> > > >>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the > > >>> leaf of a QueryOperation tree in the validation phase." > > >>> I'm fine with allowing `system_proctime` everywhere in the query. > Also > > >>> for SQL, I think we should have done that earlier already to give > > >>> users the chance to have time based operations also at later stages. > > >>> > > >>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be > > >>> assigned implicitly. " > > >>> Yes, we just use the DataStream API watermark. `system_rowtime()` > will > > >>> just introduce a time attribute, the watermark travels to the Table > > >>> API and into DataStream API without further code changes. > > >>> > > >>> ## Conversion of Table to DataStream > > >>> > > >>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table): > > >>> DataStream<Row>" > > >>> 4. "Table.execute(ChangelogMode)" > > >>> Filtering UPDATE_BEFORE is already quite important as it reduces the > > >>> amount of data by factor 2. But I also understand your concerns > > >>> regarding confusing users. I also got the request for a > > >>> `Table.getChangelogMode()` a couple of times in the past, because > > >>> users would like to get information about the kind of query that is > > >>> executed. However, in this case `toChangelogStream(Table)` is > > >>> equivalent to call ``toChangelogStream(Table.getChangelogMode(), > > >>> Table)` so we don't need `Table.getChangelogMode()` in the current > > >>> FLIP design. But this can be future work. Let's start with > > >>> `toChangelogStream(Table)` and wait for more feedback about this new > > >>> feature. What do others think? > > >>> > > >>> ## Conversion of StatementSet to DataStream API > > >>> > > >>> 5. "StreamStatementSet#attachToStream()" > > >>> > > >>> I think Godfrey's proposal is too complex for regular users. Instead > > >>> of continuing with the fluent programming, we would force users to > > >>> define a DataStream pipeline in a lambda. > > >>> > > >>> Furthermore, joining or using connect() with a different DataStream > > >>> source would not be possible in this design. > > >>> > > >>> The `execute()` method of `StatementSet` should not execute the > > >>> DataStream API subprogram. It mixes the concepts because we tell > > >>> users: "If you use toDataStream" you need to use > > >>> `StreamExecutionEnvironment.execute()`. > > >>> > > >>> We don't solve every potential use case with the current FLIP design > > >>> but the most important one where a pipeline just uses an INSERT INTO > > >>> but also uses Table API for connectors and preprocessing and does the > > >>> main logic in DataStream API: > > >>> > > >>> T1 -> T2, T3 -> DataStream, T4 -> DataStream > > >>> > > >>> I would consider `StatementSet.addDataStream(Table, ...)` future work > > >>> for now as it is only an opimization for reusing parts of the > > >>> StreamGraph. We could even perform this optimization when calling > > >>> `toInsertStream` or `toChangelogStream`. > > >>> > > >>> ## Improve dealing with Row in DataStream API > > >>> > > >>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)" > > >>> > > >>> We need a Map for constant time of mapping field name to index. > > >>> > > >>> We accept a nullable `fieldNames` because names are not mandatory, > one > > >>> can also work with indices as before. > > >>> > > >>> But you are right that the fieldNames member variable can be > > >>> immutable. I just wanted to avoid too many overloaded constructors. > > >>> I'm fine with having one full constructor for RowKind, arity and > field > > >>> names (or null). > > >>> > > >>> 7. "a Row has two modes represented by an internal boolean flag > > >>> `hasFieldOrder`." > > >>> Maybe I leaked to many implementation details there that rather > > >>> confuse readers than help. Internally, we need to distinguish between > > >>> two kinds of rows. A user should not be bothered by this. > > >>> > > >>> a) Row comes from Table API runtime: hasFieldOrder = true > > >>> Map("myAge" -> 0, "myName" -> 1) > > >>> > > >>> row.getField("myName") == row.getField(1) > > >>> row.getField("myAge") == row.getField(0) > > >>> > > >>> b) Row comes from user: hasFieldOrder = false > > >>> Row row = new Row(2); > > >>> row.setField("myName", "Alice"); > > >>> row.setField("myAge", 32); > > >>> > > >>> Map("myAge" -> 1, "myName" -> 0) > > >>> > > >>> But the type information will decide about the order of the fields > > >>> later and reorder them accordingly during serialization or RowData > > >>> conversion: > > >>> > > >>> ["myName", "myAge"] vs. ["myAge", "myName"] > > >>> > > >>> The user must not care about this as it always feels naturally to > deal > > >>> with the rows. > > >>> > > >>> Regards, > > >>> Timo > > >>> > > >>> > > >>> On 01.09.20 06:19, Jark Wu wrote: > > >>>> Hi Timo, > > >>>> > > >>>> Thanks a lot for the great proposal and sorry for the late reply. > > >>>> This is > > >>>> an important improvement for DataStream and Table API users. > > >>>> > > >>>> I have listed my thoughts and questions below ;-) > > >>>> > > >>>> ## Conversion of DataStream to Table > > >>>> > > >>>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the > > >>>> leaf of > > >>>> a QueryOperation tree in the validation phase." > > >>>> IIUC, that means `system_rowtime()` can only be used in the first > > >>>> `select()` after `fromXxxStream()`, right? > > >>>> However, I think `system_proctime()` shouldn't have this limitation, > > >>>> because it doesn't rely on the underlying timestamp of StreamRecord > > and > > >>>> can be generated in any stage of the query. > > >>>> > > >>>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be > > >>>> assigned implicitly. " > > >>>> What watermark will be used here? Is the pre-assigned watermark in > the > > >>>> DataStream (so called `system_watermak()`)? > > >>>> > > >>>> ## Conversion of Table to DataStream > > >>>> > > >>>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table): > > >>>> DataStream<Row>" > > >>>> I'm not sure whether this method is useful for users. Currently, the > > >>>> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for > > >>>> filtering UPDATE_BEFORE if possible. > > >>>> However, if we expose this method to users, it may be confusing. > > >>>> Users may > > >>>> try to use this method to convert a changelog stream to an > insert-only > > >>>> stream by applying ChangelogMode.insertOnly(). This might be > > misleading. > > >>>> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They > > >>>> have > > >>>> to know the ChangelogMode of the current Table first, and remove > > >>>> UPDATE_BEFORE from the ChagnelogMode. > > >>>> That means we have to support `Table.getChangelogMode()` first? But > > >>>> `ChangelogMode` derivation requires a full optimization path on the > > >>>> Table, > > >>>> which seems impossible now. > > >>>> Therefore, IMHO, we can introduce this interface in the future if > > users > > >>>> indeed need this. For most users, I think `toChangelogStream(Table)` > > is > > >>>> enough. > > >>>> > > >>>> 4. "Table.execute(ChangelogMode)" > > >>>> Ditto. > > >>>> > > >>>> ## Conversion of StatementSet to DataStream API > > >>>> > > >>>> 5. "StreamStatementSet#attachToStream()" > > >>>> I think the potential drawback is that it can't support multi-sink > > >>>> optimization, i.e. share pipeline. > > >>>> For example, if we have a Table `t1` (a heavy view uses join, > > >>>> aggregate), > > >>>> and want to sink to "mysql" using SQL and want to continue > processing > > >>>> using > > >>>> DataStream in a job. > > >>>> It's a huge waste of resources if we re-compute `t1`. It would be > > >>>> nice if > > >>>> we can come up with a solution to share the pipeline. > > >>>> > > >>>> I borrowed Godfrey's idea in FLINK-18840 and added some > > >>>> modifications. What > > >>>> do you think about the following proposal? > > >>>> > > >>>> interface StatementSet { > > >>>> StatementSet addDataStream(Table table, > TableDataStreamTransform > > >>>> transform); > > >>>> } > > >>>> > > >>>> interface TableDataStreamTransform { > > >>>> void transform(Context); > > >>>> > > >>>> interface Context { > > >>>> Table getTable(); > > >>>> DataStream<Row> toInsertStream(Table); > > >>>> DataStream<T> toInsertStream(AbstractDataType<?>, Table); > > >>>> DataStream<Row> toChangelogStream(Table); > > >>>> } > > >>>> } > > >>>> > > >>>> tEnv > > >>>> .createStatementSet() > > >>>> .addInsert("mysql", table1) > > >>>> .addDataStream(table1, ctx -> { > > >>>> ctx.toInsertStream(ctx.getTable()) > > >>>> .flatmap(..) > > >>>> .keyBy(..) > > >>>> .process(..) > > >>>> .addSink(...); > > >>>> }) > > >>>> > > >>>> > > >>>> ## Improve dealing with Row in DataStream API > > >>>> > > >>>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)" > > >>>> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter > > is > > >>>> enough and more handy than Map ? > > >>>> - Currently, the fieldNames member variable is mutable, is it on > > >>>> purpose? > > >>>> Can we make it immutable? For example, only accept from the > > constructor. > > >>>> - Why do we accept a nullable `fieldNames`? > > >>>> > > >>>> 7. "a Row has two modes represented by an internal boolean flag > > >>>> `hasFieldOrder`." > > >>>> Sorry, I don't fully understand what does the `hasFieldOrder` mean > > >>>> and is > > >>>> used for. Could you explain a bit more for this? > > >>>> > > >>>> Best, > > >>>> Jark > > >>>> > > >>>> > > >>>> On Wed, 19 Aug 2020 at 17:38, Timo Walther <twal...@apache.org> > > wrote: > > >>>> > > >>>>> Hi David, > > >>>>> > > >>>>> thanks for your feedback. Feedback from someone who interacts with > > many > > >>>>> users is very valuable. I added an explanation for StatementSets to > > the > > >>>>> FLIP. > > >>>>> > > >>>>> Regarding watermarks and fromInsertStream, actually the > > >>>>> > > >>>>> `Schema.watermark("ts", system_watermark())` > > >>>>> > > >>>>> is not really necessary in the `fromChangelogStream`. It is added > to > > >>>>> satify the Schema interface and be similar to SQL DDL. > > >>>>> > > >>>>> We could already extract the watermark strategy if we see > > >>>>> `system_rowtime()` because in most of the cases we will simply use > > the > > >>>>> DataStream API watermarks. > > >>>>> > > >>>>> But maybe some users want to generate watermarks after > preprocessing > > in > > >>>>> DataStream API. In this cases users what to define a computed > > watermark > > >>>>> expression. > > >>>>> > > >>>>> So for simplicity in the Simple API we introduce: > > >>>>> > > >>>>> tEnv > > >>>>> .fromInsertStream(DataStream<T>) > > >>>>> .select('*, system_rowtime().as("rowtime"), > > >>>>> system_proctime().as("proctime")) > > >>>>> > > >>>>> and just rely on the watermarks that travel through DataStream API > > >>>>> already. I added another comment to the FLIP. > > >>>>> > > >>>>> Regards, > > >>>>> Timo > > >>>>> > > >>>>> > > >>>>> On 19.08.20 10:53, David Anderson wrote: > > >>>>>> Timo, nice to see this. > > >>>>>> > > >>>>>> As someone who expects to use these interfaces, but who doesn't > > fully > > >>>>>> understand the existing Table API, I like what I see. Just a > couple > > of > > >>>>>> comments: > > >>>>>> > > >>>>>> The way that watermarks fit into the fromChangelogStream case > makes > > >>>>>> sense > > >>>>>> to me, and I'm wondering why watermarks don't come up in the > > previous > > >>>>>> section about fromInsertStream. > > >>>>>> > > >>>>>> I wasn't familiar with StatementSets, and I couldn't find an > > >>>>>> explanation > > >>>>> in > > >>>>>> the docs. I eventually found this short paragraph in an email from > > >>>>>> Fabian > > >>>>>> Hueske, which clarified everything in that section for me: > > >>>>>> > > >>>>>> FLIP-84 [1] added the concept of a "statement set" to group > > >>>>>> multiple > > >>>>>> INSERT > > >>>>>> INTO statements (SQL or Table API) together. The statements > > in a > > >>>>>> statement > > >>>>>> set are jointly optimized and executed as a single Flink > job. > > >>>>>> > > >>>>>> Maybe if you add this to the FLIP it will help other readers as > > well. > > >>>>>> > > >>>>>> Best, > > >>>>>> David > > >>>>>> > > >>>>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <twal...@apache.org > > > > >>>>> wrote: > > >>>>>> > > >>>>>>> Hi everyone, > > >>>>>>> > > >>>>>>> I would like to propose a FLIP that aims to resolve the remaining > > >>>>>>> shortcomings in the Table API: > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API > > >>>>> > > >>>>>>> > > >>>>>>> The Table API has received many new features over the last year. > It > > >>>>>>> supports a new type system (FLIP-37), connectors support > changelogs > > >>>>>>> (FLIP-95), we have well defined internal data structures > (FLIP-95), > > >>>>>>> support for result retrieval in an interactive fashion (FLIP-84), > > and > > >>>>>>> soon new TableDescriptors (FLIP-129). > > >>>>>>> > > >>>>>>> However, the interfaces from and to DataStream API have not been > > >>>>>>> touched > > >>>>>>> during the introduction of these new features and are kind of > > >>>>>>> outdated. > > >>>>>>> The interfaces lack important functionality that is available in > > >>>>>>> Table > > >>>>>>> API but not exposed to DataStream API users. DataStream API is > > >>>>>>> still our > > >>>>>>> most important API which is why a good interoperability is > crucial. > > >>>>>>> > > >>>>>>> This FLIP is a mixture of different topics that improve the > > >>>>>>> interoperability between DataStream and Table API in terms of: > > >>>>>>> > > >>>>>>> - DataStream <-> Table conversion > > >>>>>>> - translation of type systems TypeInformation <-> DataType > > >>>>>>> - schema definition (incl. rowtime, watermarks, primary key) > > >>>>>>> - changelog handling > > >>>>>>> - row handling in DataStream API > > >>>>>>> > > >>>>>>> I'm looking forward to your feedback. > > >>>>>>> > > >>>>>>> Regards, > > >>>>>>> Timo > > >>>>>>> > > >>>>>> > > >>>>> > > >>>>> > > >>>> > > >>> > > >> > > >> > > > > > > > > > -- > Best, Jingsong Lee >