Hi all, A comment from my side on the topic of the current, weird renaming/naming/reordering when registering a DataStream. It might be just me, but I find it extremely confusing and I would be really, really happy if we could simplify it.
I really don't like that the actual behaviour of this method depends on the input type and set of used operations. See some examples: public static class TestPojo { public int a; public String b; public long c; } DataStreamSource<TestPojo> ds = env.fromElements(new TestPojo()); Table table = tableEnv.fromDataStream(ds, $("b"), $("a"), $("c")); // reordering of the fields table.printSchema(); table = tableEnv.fromDataStream(ds, $("b"), $("a"), $("c").as("d")); // reordering with renaming table.printSchema(); table = tableEnv.fromDataStream(ds, $("b"), $("c")); // projecting out the 1st field table.printSchema(); DataStreamSource<Tuple3<Integer, String, Long>> ds1 = env.fromElements(Tuple3.of(1, "a", 1L)); table = tableEnv.fromDataStream(ds1, $("b"), $("a"), $("c")); // RENAMING without reordering!!! even though exact same arguments as in the 1st example table.printSchema(); table = tableEnv.fromDataStream(ds1, $("b"), $("c")); // projecting out the 3rd field, even though exact same arguments as in the 3rd example table.printSchema(); table = tableEnv.fromDataStream(ds1, $("b"), $("a"), $("c").as("d")); // illegal renaming, exception is thrown table.printSchema(); Why can't we use established operations such as e.g. projections that always behave the same and field reference is always a field reference (in current solution it is either reference or alias), as described in the FLIP? If it is such a must to be able to rename the fields without their original names (I agree it is useful for tuples), I would be very much prefer to see: tableEnv.fromDataStream(ds, "b", "a", "c"); <- always rename based on the index and then you can further apply projections. Again, maybe I am the only one that find it extremely confusing. Best, Dawid On 02/09/2020 11:47, Jark Wu wrote: > Hi Timo, > > 1. "fromDataStream VS fromInsertStream" > In terms of naming, personally, I prefer `fromDataStream`, > `fromChangelogStream`, `toDataStream`, `toChangelogStream` than > `fromInsertStream`, `toInsertStream`. > > 2. "fromDataStream(DataStream, Expression...) VS > fromInsertStream(DataStream).select()" > "fromDataStream" supports reference input fields by position, and fields > are simply renamed. > I think this is handy, however it is not supported in > "fromInsertStream(DataStream).select()". > Is it possible to keep using `fromDataStream(DataStream, Expression...)` > but deprecate the support of `.rowtime()` and `.proctime()`. > Instead, users should call `system_rowtime()` and `system_proctime()` if > they want to derive the time attribute, e.g. > > DataStream<Tuple2<String, Long>> stream = ... > Table table = tableEnv.fromDataStream(stream, > $("a"), // rename the first field to 'a' > $("b"), // rename the second field to 'b' > system_rowtime().as("rowtime"), // extract the internally attached > timestamp into an event-time > system_proctime().as("proctime")); > > I think this will be more inline fluent, easy to validate, and make it > possible to use the existing API. What do you think? > > 3. "name-based setters should always be based on fieldNames" > +1 to have constant fieldName->index mapping. It will be more > straightforward and avoid confusing. > We can still introduce the dynamic field index mapping in the future if > needed. > > Best, > Jark > > On Wed, 2 Sep 2020 at 16:19, Timo Walther <twal...@apache.org> wrote: > >> Hi everyone >> >> thanks for your feedback. It's a lot of content that needs to be >> digested. I will update the FLIP shortly to incorporate some of the >> feedback already. But let me respond to some topics first: >> >> "not deprecate these API", "the API of the table layer is changing too >> fast" >> >> I agree that deprecating API is definitely not great for users, but in >> this cases I think it is for the greater good it makes the API more >> understandable and focuses on common use cases for the future. I would >> rather say that the API is about to settle because there only a couple >> of shortcomings left and the bigger picture is clearer than ever. IMO >> The proposed changes are one of the last bigger API changes on the >> roadmap. I cannot see other bigger refactorings in the mid-term. Keeping >> methods just because we changed so much in the last releases should not >> be a reason to keep confusing API. Users are happy to upgrade if they >> also get more features by upgrading (e.g. fromChangelogStream). >> >> 1. "fromDataStream VS fromInsertStream" >> >> The main reason to change this API is to have the possibility to update >> the type mapping without breaking backwards compatibility. The name >> `fromInsertStream` makes it possible to have new semantics and makes >> concepts more explicit by naming. >> >> 2. "toAppendStream VS toInsertStream" >> >> "Append" is common in the Flink community but the outside world uses >> "insert". Actually, the term "append-only table" is wrong because SQL >> tables have bag semantics without any order. So "appending" is more of >> an "insertion". This is also represented in FLIP-95's `RowKind` where we >> call the concepts INSERT and `ChangelogKind.insertOnly`. >> >> 3. "`.rowtime()` and `.proctime()`" >> >> "API is also widely used, even in our test code" >> >> The test code is already quite outdated and uses a lot of deprecated >> API. We need to deal with that with a better testing infrastructure. But >> this can be future work. >> >> "users have already accepted it" >> >> I'm not sure if users have already accepted it. I think we get at least >> one question around this topic every week because users would like to >> call `.rowtime` on arbitrary timestamps in the middle of the pipeline. >> And specifying all fields just to give the StreamRecord timestamp a name >> should be made easier. This is necessary in 80% of all use cases. >> >> 4. "toAppendStream(Table table, Class<T>/TypeInformation)" >> >> The DataType system is way easier than the TypeInformation system >> because it provides a consistent look and feel with a lot of utilities. >> E.g. many users didn't know that they can just pass `Row.class` in the >> past. Actually extracting types from a `Row.class` is not supported by >> the TypeExtractor (we recently even printed a warning to the logs) but >> we hacked some logic into the method. With AbstractDataType, users can >> still use classes via `DataTypes.of`; for example >> `toInsertStream(DataTypes.of(MyPojo.class))`. >> >> 5. "tEnv#createTemporaryView was introduced in release-1.10" >> >> Similar to `TableEnvironment.execute()` we did some mistakes during the >> big refactorings. IMHO tEnv#createTemporaryView was one mistake because >> we introduced it too quickly. In general this method is correct, but now >> we cannot change the underlying semantics again without breaking >> existing pipelines. We could keep this method and just change the type >> system under the hood, in most of the cases the pipeline should still >> work but we cannot guarantee this due to slight differences. >> >> 6. "could it be "StreamTableEnvironment.fromDataStream(DataStream<T>, >> ChangelogMode)" >> >> No this is not possible, because T records have no changeflag. Without a >> changeflag, a ChangelogMode makes not much sense. That's why >> `from/toChangelogStream` supports only `Row` whereas the >> `from/toInsertStream` accepts arbitrary type classes. >> >> 7. "i must say I prefer tEnv.fromDataStream(dataStream, Schema)" >> >> I also thought about this method and using `Schema` there. However, with >> a schema you cannot specify the data type of the top-level record >> itself. We would need to offer fromDataStream(dataStream, Schema, >> DataType) or integrate the DataType into the Schema class itself which >> would mix up the concepts. >> >> 8. "name-based setters should always be based on fieldNames" >> >> I'm fine with throwing an exception. If my mentioned semantics, are too >> confusing. >> >> Regards, >> Timo >> >> >> >> On 02.09.20 07:25, Jingsong Li wrote: >>>> a Row has two modes represented by an internal boolean flag >>> `hasFieldOrder` >>> >>> +1 confusion with Dawid that what's the result when index-based setters >> and >>> name-based setters are mixed used. >>> And name-based setters look like append instead of set. >>> >>> It reminds me of Avro's `GenericRecord`, We should support real random >>> name-based setters instead of append. >>> >>> So, what I think is, name-based setters should always be based >>> on fieldNames just like name-based getters. Otherwise, throw an >> exception. >>> Best, >>> Jingsong >>> >>> On Wed, Sep 2, 2020 at 12:43 PM Danny Chan <yuzhao....@gmail.com> wrote: >>> >>>> Timo, Thanks for the discussion >>>> >>>> I have only read the "Conversion of DataStream to Table" part so i would >>>> only put some objections there ~ >>>> >>>>> StreamTableEnvironment.fromInsertStream(DataStream<T>): Table >>>> At first glance, from the perspective of a user, i'm confused by why we >>>> must dintinguish on the API level what a data stream is, e.g. an insert >>>> stream or whatever other kind of stream. >>>> >>>> As a user, he does not expect to must distinguish between several >>>> datastream options. The framework should have the ability to infer the >>>> ChangelogMode of the stream, but sadly we can not at the moment, becase >> we >>>> do not have a metadata to describe the ChangelogMode what actually the >>>> framework need. >>>> >>>> And could it be: >>>> >>>> StreamTableEnvironment.fromDataStream(DataStream<T>, ChangelogMode) >> where >>>> the ChanglogMode is optional because 90% of the datastream are insert >> for >>>> now. >>>> >>>> or: >>>> >>>> DataStream.withChangelogMode(ChangelogMode) so that DataStream can be >>>> self-describing what kind of stream it is (again, if not specified, the >>>> default is INSERT). >>>> >>>>> tEnv >>>>> .fromInsertStream(DataStream<T>) >>>>> .select('*, system_rowtime().as("rowtime"), >>>> system_proctime().as(“proctime”)) >>>> >>>> In order to declare the time-attributes on datastream, i must say I >> prefer >>>> tEnv.fromDataStream(dataStream, Schema) for these reasons: >>>> >>>> - Schema is the uniform interface to declare the metadata for a table in >>>> the Table/SQL API, with an imperative coding style, in Descriptor API we >>>> also use it for the time-attributes purpose >>>> - Use a projection for time-attributes is not a good idea, because from >>>> the SQL side, we declare it as a metadata of part of the table schema >> when >>>> we define the DDL. Although we may explain the DDL internally using >>>> computed column, that does not mean we must do that in the DataStream >> API >>>> explicitly. In the SQL world, no projection function outputs type of >>>> time-attribute, we better still put the time-attributes in the scope of >> the >>>> table metadata. >>>> >>>> Best, >>>> Danny Chan >>>> 在 2020年8月19日 +0800 PM4:22,Timo Walther <twal...@apache.org>,写道: >>>>> Hi everyone, >>>>> >>>>> I would like to propose a FLIP that aims to resolve the remaining >>>>> shortcomings in the Table API: >>>>> >>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API >>>>> The Table API has received many new features over the last year. It >>>>> supports a new type system (FLIP-37), connectors support changelogs >>>>> (FLIP-95), we have well defined internal data structures (FLIP-95), >>>>> support for result retrieval in an interactive fashion (FLIP-84), and >>>>> soon new TableDescriptors (FLIP-129). >>>>> >>>>> However, the interfaces from and to DataStream API have not been >> touched >>>>> during the introduction of these new features and are kind of outdated. >>>>> The interfaces lack important functionality that is available in Table >>>>> API but not exposed to DataStream API users. DataStream API is still >> our >>>>> most important API which is why a good interoperability is crucial. >>>>> >>>>> This FLIP is a mixture of different topics that improve the >>>>> interoperability between DataStream and Table API in terms of: >>>>> >>>>> - DataStream <-> Table conversion >>>>> - translation of type systems TypeInformation <-> DataType >>>>> - schema definition (incl. rowtime, watermarks, primary key) >>>>> - changelog handling >>>>> - row handling in DataStream API >>>>> >>>>> I'm looking forward to your feedback. >>>>> >>>>> Regards, >>>>> Timo >>> >>
signature.asc
Description: OpenPGP digital signature