Hi all,

A comment from my side on the topic of the current, weird
renaming/naming/reordering when registering a DataStream. It might be
just me, but I find it extremely confusing and I would be really, really
happy if we could simplify it.

I really don't like that the actual behaviour of this method depends on
the input type and set of used operations.

See some examples:

    public static class TestPojo {
        public int a;
        public String b;
        public long c;
    }

        DataStreamSource<TestPojo> ds = env.fromElements(new TestPojo());
        Table table = tableEnv.fromDataStream(ds, $("b"), $("a"),
$("c")); // reordering of the fields
        table.printSchema();

        table = tableEnv.fromDataStream(ds, $("b"), $("a"),
$("c").as("d")); // reordering with renaming
        table.printSchema();

        table = tableEnv.fromDataStream(ds, $("b"), $("c")); //
projecting out the 1st field
        table.printSchema();

        DataStreamSource<Tuple3<Integer, String, Long>> ds1 =
env.fromElements(Tuple3.of(1, "a", 1L));
        table = tableEnv.fromDataStream(ds1, $("b"), $("a"), $("c")); //
RENAMING without reordering!!! even though exact same arguments as in
the 1st example
        table.printSchema();

        table = tableEnv.fromDataStream(ds1, $("b"), $("c")); //
projecting out the 3rd field, even though exact same arguments as in the
3rd example
        table.printSchema();

        table = tableEnv.fromDataStream(ds1, $("b"), $("a"),
$("c").as("d")); // illegal renaming, exception is thrown
        table.printSchema();

Why can't we use established operations such as e.g. projections that
always behave the same and field reference is always a field reference
(in current solution it is either reference or alias), as described in
the FLIP?

If it is such a must to be able to rename the fields without their
original names (I agree it is useful for tuples), I would be very much
prefer to see:

tableEnv.fromDataStream(ds, "b", "a", "c"); <- always rename based on
the index and then you can further apply projections.

Again, maybe I am the only one that find it extremely confusing.

Best,

Dawid

On 02/09/2020 11:47, Jark Wu wrote:
> Hi Timo,
>
> 1. "fromDataStream VS fromInsertStream"
> In terms of naming, personally, I prefer `fromDataStream`,
> `fromChangelogStream`, `toDataStream`, `toChangelogStream` than
> `fromInsertStream`, `toInsertStream`.
>
> 2.  "fromDataStream(DataStream, Expression...) VS
> fromInsertStream(DataStream).select()"
> "fromDataStream" supports reference input fields by position, and fields
> are simply renamed.
> I think this is handy, however it is not supported in
> "fromInsertStream(DataStream).select()".
> Is it possible to keep using `fromDataStream(DataStream, Expression...)`
> but deprecate the support of `.rowtime()` and `.proctime()`.
> Instead, users should call `system_rowtime()` and `system_proctime()` if
> they want to derive the time attribute, e.g.
>
> DataStream<Tuple2<String, Long>> stream = ...
> Table table = tableEnv.fromDataStream(stream,
>    $("a"), // rename the first field to 'a'
>    $("b"), // rename the second field to 'b'
>    system_rowtime().as("rowtime"), // extract the internally attached
> timestamp into an event-time
>    system_proctime().as("proctime"));
>
> I think this will be more inline fluent, easy to validate, and make it
> possible to use the existing API. What do you think?
>
> 3. "name-based setters should always be based on fieldNames"
> +1 to have constant fieldName->index mapping. It will be more
> straightforward and avoid confusing.
> We can still introduce the dynamic field index mapping in the future if
> needed.
>
> Best,
> Jark
>
> On Wed, 2 Sep 2020 at 16:19, Timo Walther <twal...@apache.org> wrote:
>
>> Hi everyone
>>
>> thanks for your feedback. It's a lot of content that needs to be
>> digested. I will update the FLIP shortly to incorporate some of the
>> feedback already. But let me respond to some topics first:
>>
>> "not deprecate these API", "the API of the table layer is changing too
>> fast"
>>
>> I agree that deprecating API is definitely not great for users, but in
>> this cases I think it is for the greater good it makes the API more
>> understandable and focuses on common use cases for the future. I would
>> rather say that the API is about to settle because there only a couple
>> of shortcomings left and the bigger picture is clearer than ever. IMO
>> The proposed changes are one of the last bigger API changes on the
>> roadmap. I cannot see other bigger refactorings in the mid-term. Keeping
>> methods just because we changed so much in the last releases should not
>> be a reason to keep confusing API. Users are happy to upgrade if they
>> also get more features by upgrading (e.g. fromChangelogStream).
>>
>> 1. "fromDataStream VS fromInsertStream"
>>
>> The main reason to change this API is to have the possibility to update
>> the type mapping without breaking backwards compatibility. The name
>> `fromInsertStream` makes it possible to have new semantics and makes
>> concepts more explicit by naming.
>>
>> 2. "toAppendStream VS toInsertStream"
>>
>> "Append" is common in the Flink community but the outside world uses
>> "insert". Actually, the term "append-only table" is wrong because SQL
>> tables have bag semantics without any order. So "appending" is more of
>> an "insertion". This is also represented in FLIP-95's `RowKind` where we
>> call the concepts INSERT and `ChangelogKind.insertOnly`.
>>
>> 3. "`.rowtime()` and `.proctime()`"
>>
>> "API is also widely used, even in our test code"
>>
>> The test code is already quite outdated and uses a lot of deprecated
>> API. We need to deal with that with a better testing infrastructure. But
>> this can be future work.
>>
>> "users have already accepted it"
>>
>> I'm not sure if users have already accepted it. I think we get at least
>> one question around this topic every week because users would like to
>> call `.rowtime` on arbitrary timestamps in the middle of the pipeline.
>> And specifying all fields just to give the StreamRecord timestamp a name
>> should be made easier. This is necessary in 80% of all use cases.
>>
>> 4. "toAppendStream(Table table, Class<T>/TypeInformation)"
>>
>> The DataType system is way easier than the TypeInformation system
>> because it provides a consistent look and feel with a lot of utilities.
>> E.g. many users didn't know that they can just pass `Row.class` in the
>> past. Actually extracting types from a `Row.class` is not supported by
>> the TypeExtractor (we recently even printed a warning to the logs) but
>> we hacked some logic into the method. With AbstractDataType, users can
>> still use classes via `DataTypes.of`; for example
>> `toInsertStream(DataTypes.of(MyPojo.class))`.
>>
>> 5. "tEnv#createTemporaryView was introduced in release-1.10"
>>
>> Similar to `TableEnvironment.execute()` we did some mistakes during the
>> big refactorings. IMHO tEnv#createTemporaryView was one mistake because
>> we introduced it too quickly. In general this method is correct, but now
>> we cannot change the underlying semantics again without breaking
>> existing pipelines. We could keep this method and just change the type
>> system under the hood, in most of the cases the pipeline should still
>> work but we cannot guarantee this due to slight differences.
>>
>> 6. "could it be "StreamTableEnvironment.fromDataStream(DataStream<T>,
>> ChangelogMode)"
>>
>> No this is not possible, because T records have no changeflag. Without a
>> changeflag, a ChangelogMode makes not much sense. That's why
>> `from/toChangelogStream` supports only `Row` whereas the
>> `from/toInsertStream` accepts arbitrary type classes.
>>
>> 7. "i must say I prefer tEnv.fromDataStream(dataStream, Schema)"
>>
>> I also thought about this method and using `Schema` there. However, with
>> a schema you cannot specify the data type of the top-level record
>> itself. We would need to offer fromDataStream(dataStream, Schema,
>> DataType) or integrate the DataType into the Schema class itself which
>> would mix up the concepts.
>>
>> 8. "name-based setters should always be based on fieldNames"
>>
>> I'm fine with throwing an exception. If my mentioned semantics, are too
>> confusing.
>>
>> Regards,
>> Timo
>>
>>
>>
>> On 02.09.20 07:25, Jingsong Li wrote:
>>>> a Row has two modes represented by an internal boolean flag
>>> `hasFieldOrder`
>>>
>>> +1 confusion with Dawid that what's the result when index-based setters
>> and
>>> name-based setters are mixed used.
>>> And name-based setters look like append instead of set.
>>>
>>> It reminds me of Avro's `GenericRecord`, We should support real random
>>> name-based setters instead of append.
>>>
>>> So, what I think is, name-based setters should always be based
>>> on fieldNames just like name-based getters. Otherwise, throw an
>> exception.
>>> Best,
>>> Jingsong
>>>
>>> On Wed, Sep 2, 2020 at 12:43 PM Danny Chan <yuzhao....@gmail.com> wrote:
>>>
>>>> Timo, Thanks for the discussion
>>>>
>>>> I have only read the "Conversion of DataStream to Table" part so i would
>>>> only put some objections there ~
>>>>
>>>>> StreamTableEnvironment.fromInsertStream(DataStream<T>): Table
>>>> At first glance, from the perspective of a user, i'm confused by why we
>>>> must dintinguish on the API level what a data stream is, e.g. an insert
>>>> stream or whatever other kind of stream.
>>>>
>>>> As a user, he does not expect to must distinguish between several
>>>> datastream options. The framework should have the ability to infer the
>>>> ChangelogMode of the stream, but sadly we can not at the moment, becase
>> we
>>>> do not have a metadata to describe the ChangelogMode what actually the
>>>> framework need.
>>>>
>>>> And could it be:
>>>>
>>>> StreamTableEnvironment.fromDataStream(DataStream<T>, ChangelogMode)
>> where
>>>> the ChanglogMode is optional because 90% of the datastream are insert
>> for
>>>> now.
>>>>
>>>> or:
>>>>
>>>> DataStream.withChangelogMode(ChangelogMode) so that DataStream can be
>>>> self-describing what kind of stream it is (again, if not specified, the
>>>> default is INSERT).
>>>>
>>>>> tEnv
>>>>> .fromInsertStream(DataStream<T>)
>>>>> .select('*, system_rowtime().as("rowtime"),
>>>> system_proctime().as(“proctime”))
>>>>
>>>> In order to declare the time-attributes on datastream, i must say I
>> prefer
>>>> tEnv.fromDataStream(dataStream, Schema) for these reasons:
>>>>
>>>> - Schema is the uniform interface to declare the metadata for a table in
>>>> the Table/SQL API, with an imperative coding style, in Descriptor API we
>>>> also use it for the time-attributes purpose
>>>> - Use a projection for time-attributes is not a good idea, because from
>>>> the SQL side, we declare it as a metadata of part of the table schema
>> when
>>>> we define the DDL. Although we may explain the DDL internally using
>>>> computed column, that does not mean we must do that in the DataStream
>> API
>>>> explicitly. In the SQL world, no projection function outputs type of
>>>> time-attribute, we better still put the time-attributes in the scope of
>> the
>>>> table metadata.
>>>>
>>>> Best,
>>>> Danny Chan
>>>> 在 2020年8月19日 +0800 PM4:22,Timo Walther <twal...@apache.org>,写道:
>>>>> Hi everyone,
>>>>>
>>>>> I would like to propose a FLIP that aims to resolve the remaining
>>>>> shortcomings in the Table API:
>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>>> The Table API has received many new features over the last year. It
>>>>> supports a new type system (FLIP-37), connectors support changelogs
>>>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
>>>>> support for result retrieval in an interactive fashion (FLIP-84), and
>>>>> soon new TableDescriptors (FLIP-129).
>>>>>
>>>>> However, the interfaces from and to DataStream API have not been
>> touched
>>>>> during the introduction of these new features and are kind of outdated.
>>>>> The interfaces lack important functionality that is available in Table
>>>>> API but not exposed to DataStream API users. DataStream API is still
>> our
>>>>> most important API which is why a good interoperability is crucial.
>>>>>
>>>>> This FLIP is a mixture of different topics that improve the
>>>>> interoperability between DataStream and Table API in terms of:
>>>>>
>>>>> - DataStream <-> Table conversion
>>>>> - translation of type systems TypeInformation <-> DataType
>>>>> - schema definition (incl. rowtime, watermarks, primary key)
>>>>> - changelog handling
>>>>> - row handling in DataStream API
>>>>>
>>>>> I'm looking forward to your feedback.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to