Hi Kurt,

thanks for your feedback.

1. "moving Schema after DataStream": I don't have a strong opinion here. One could argue that the API would look similar to a CREATE TABLE statement: first schema then connector. I updated the FLIP.

2. "will we do some verification?"
Yes, we will definitely do verification. It will happen based on what is available in TypeInformation.

"if T is a Tuple, do we have some rules for setting field names in Schema?"
The rule in this case would be to take the TupleTypeInfoBase.getFieldNames() similar to the logic we currently have.

"Will we do some type coercion?"
For `fromDataStream()`, type coercion between an explicitly specified Schema and DataStream will not happen (e.g. DataStream<Integer> != Schema.column("f", DataTypes.BIGINT())). Because the user specified the desired data type explicitly and expects correctness. For `toDataStream()`, it has similar type coercion semantics as a regular table sink (first on a logical level, then on a class level).

It is difficult to list all type rules upfront, but it should behave similar to all the work done in FLIP-65 and FLIP-95. I would move the discussion about other type handling to the individual PRs. The general goal should be to stay backwards compatible but reduce manual schema work.

3. "How do you derive schema from DataStream<Row>"

We use RowTypeInfo (if DataStream comes from DataStream API) or ExternalTypeInfo (if DataStream comes from Table API).

4. "toDataStream(AbstractDataType<?>, Table) I'm wondering whether this method is necessary" Dealing with Row in DataStream API is very inconvenient. With the new data format converters, the behavior would be consistent accross DataStream API and Table functions. The logic is already present and seems to be pretty stable so far. We would break a lot of existing code if we get rid of this method.

5. "How does Row behave like GenericRowData?"

Row can contain StringData or further nested RowData. The data format converters support that. The conversion of fields would be a no-op in this case. In the end, both Row and GenericRowData just stored an Object[].

6. "They would expect that all the fields they didn't set should be NULL."

But this will be the case. The full list of all field names and their order is defined by the data type, not the Row instance. During serialization/conversion we can reorder fields, throw exceptions about unknown field names, and set remaining fields to NULL.

If a user uses `new Row(5)` but the serializer is configured by a data type that only supports `Row(3)`, it will also throw an exception during runtime. We cannot guard users from creating invalid rows. But the strongly typed serializers/converters will do the final verification.

Regards,
Timo


On 23.09.20 12:08, Kurt Young wrote:
Sorry for being late, I went through the design doc and here are
my comments:

1. A minor one, how about moving Schema after DataStream in all affected
APIs? Such as:
StreamTableEnvironment.fromDataStream(Schema, DataStream<T>): Table
StreamTableEnvironment.createTemporaryView(String, Schema, DataStream<T>):
Unit
StreamTableEnvironment.fromChangelogStream(Schema, DataStream<Row>): Table
StreamTableEnvironment.toChangelogStream(Schema, Table): DataStream<Row>

It will look more aligned with APIs which don't have Schema. For example:
StreamTableEnvironment.fromDataStream(DataStream<T>): Table
StreamTableEnvironment.fromDataStream(DataStream<T>, Schema): Table

2. A question to: StreamTableEnvironment.fromDataStream(Schema,
DataStream<T>): Table
How do we convert the types between Schema and T, will we do some
verification? Will we do some type coercion? For example,
can we support Schema.LONG with DataStream<Integer>? And if T is a Tuple,
do we have some rules for setting field names in Schema?
I can see lots of imagination from this method but the rules are unclear to
me.

3. A question to: StreamTableEnvironment.fromChangelogStream(DataStream<Row>):
Table
How do you derive schema from DataStream<Row>?

4. A question to: StreamTableEnvironment.toDataStream(AbstractDataType<?>,
Table): DataStream<T>
I'm wondering whether this method is necessary. Always getting a
DataStream<Row> from the table and then manually applying some
map function seems to be not cumbersome and safer (such intelligent
conversion always seems error prone to me).

5.
The `toChangelogStream(Schema, Table)` exists for completeness to have a
symmetric API.
It allows for declaring the data type for output similar to
DynamicTableSinks.
Additionally, internal structures such as StringData, TimestampData can
still be used by power users.
In that sense, Row can behave like a GenericRowData.

How does Row behave like GenericRowData? I don't think Row can work with
RowData for now.

6. Row.withNames() seems dangerous to me. It relies on user setting all the
fields they need during `setField(String name, T value)`.
It's also highly possible that users would not set certain fields when for
example some fields are NULL. They would expect that all the fields
they didn't set should be NULL.
Row.withNames(String[] filedNames) or Row.withNames(List<String>
fieldNames) seems to be a safer choice.
I agree that simplicity is important but making API safer to use is also
important.

Best,
Kurt


On Wed, Sep 23, 2020 at 4:15 PM Timo Walther <twal...@apache.org> wrote:

Hi Jark,

thanks for your feedback. I removed `withNamesAndPositions` from the
public API list and added a comment that this is only internal API for
converters and serializers.

I would start a new vote tomorrow if there are no objections.

What do you think?

Regards,
Timo

On 23.09.20 08:55, Jark Wu wrote:
Hi Timo,

Sorry for the late reply.
I think it would be great if we can make `withNamesAndPositions` internal
visible. This reduces the complexity of the public API.
It's hard to come up with a perfect solution. So let's move on this FLIP.
I don't have other concerns.

Best,
Jark

On Fri, 18 Sep 2020 at 22:14, Timo Walther <twal...@apache.org> wrote:

Hi Jark,

the fieldNames map is not intended for users. I would also be fine to
make it a default scope constructor and access it with some internal
utility class next to the Row class. The fieldNames map must only be
used by serializers and converters. A user has no benefit in using it.

For the creation of new rows (without reusing, which only advanced users
usually do), I don't see a benefit of having:

final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
reuse.setField("myField", 12);
reuse.setField("myOtherField", "This is a test");

The purpose of Row.withName() is too create a Row easily and readable
without declaring 50+ column names or dealing with indices in this
range.

Personally, I would like to make Row an interface and have concrete row
implementations for different purposes but this would break existing
programs too much.

What do you think?

Regards,
Timo


On 18.09.20 11:04, Jark Wu wrote:
Personally I think the fieldNames Map is confusing and not handy.
I just have an idea but not sure what you think.
What about adding a new constructor with List field names, this enables
all
name-based setter/getters.
Regarding to List -> Map cost for every record, we can suggest users to
reuse the Row in the task.

new Row(int arity)
new Row(List<String> fieldNames)

final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
reuse.setField("myField", 12);
reuse.setField("myOtherField", "This is a test");

My point is that, if we can have a handy constructor for named Row, we
may
not need to distinguish the named-only or positionAndNamed mode.
This can avoid (fast-fail) the potential problem when setting an
invalid
field.

We can also come up with a new class for the field names which will
construct the Map and be shared among all Row instances.

What do you think?

Best,
Jark

On Thu, 17 Sep 2020 at 16:48, Timo Walther <twal...@apache.org> wrote:

Hi everyone,

thanks for all the feedback. I updated the FLIP again on Thursday to
integrate the feedback I got from Jingsong and Jark offline. In
particular I updated the `Improve dealing with Row in DataStream API`
section another time. We introduced static methods for Row that should
make the semantics clear to users:

// allows to use index-based setters and getters (equivalent to new
Row(int))
// method exists for completeness
public static withPositions(int length);

// allows to use name-based setters and getters
public static withNames();

// allows to use both name-based and position-based setters and
getters
public static withNamesAndPositions(Map<String, Integer> fieldNames);

In any case, non of the existing methods will be deprecated and only
additional functionality will be available through the methods above.

I started a voting thread on Friday. Please feel free to vote.

Regards,
Timo

On 10.09.20 10:21, Danny Chan wrote:
Thanks for driving this Timo, +1 for voting ~

Best,
Danny Chan
在 2020年9月10日 +0800 PM3:54,Timo Walther <twal...@apache.org>,写道:
Thanks everyone for this healthy discussion. I updated the FLIP with
the
outcome. I think the result is one of the last core API refactoring
and
users will be happy to have a consistent changelog support. Thanks
for
all the contributions.

If there are no objections, I would continue with a voting.

What do you think?

Regards,
Timo


On 09.09.20 14:31, Danny Chan wrote:
Thanks, i'm fine with that.


Timo Walther <twal...@apache.org> 于2020年9月9日周三 下午7:02写道:

I agree with Jark. It reduces confusion.

The DataStream API doesn't know changelog processing at all. A
DataStream of Row can be used with both `fromDataStream` and
`fromChangelogStream`. But only the latter API will interpret it
as
a
changelog something.

And as I mentioned before, the `toChangelogStream` must work with
Row
otherwise users are confused due to duplicate records with a
missing
changeflag.

I will update the FLIP-136 a last time. I hope we can then
continue
to a
vote.

Regards,
Timo


On 09.09.20 10:50, Danny Chan wrote:
I think it would bring in much confusion by a different API name
just
because the DataStream generic type is different.
If there are ChangelogMode that only works for Row, can we have a
type
check there ?

Switch to a new API name does not really solve the problem well,
people
still need to declare the ChangelogMode explicitly, and there are
some
confusions:

• Should DataStream of Row type always use #fromChangelogStream ?
• Does fromChangelogStream works for only INSERT ChangelogMode ?


Best,
Danny Chan
在 2020年9月9日 +0800 PM4:21,Timo Walther <twal...@apache.org>,写道:
I had this in the inital design, but Jark had concerns at least
for
the
`toChangelogStream(ChangelogMode)` (see earlier discussion).

`fromDataStream(dataStream, schema, changelogMode)` would be
possible.

But in this case I would vote for a symmetric API. If we keep
toChangelogStream we should also have a fromChangelogStream.

And if we unify `toChangelogStream` and `toDataStream`,
retractions
cannot be represented for non-Rows and users will experience
duplicate
records with a missing changeflag.

Regards,
Timo


On 09.09.20 09:31, Danny Chan wrote:
“But I think the planner needs to
know whether the input is insert-only or not.”

Does fromDataStream(dataStream, schema, changelogMode)

solve your concerns ? People can pass around whatever
ChangelogMode
they like as an optional param.
By default: fromDataStream(dataStream, schema), the
ChangelogMode
is
INSERT.

Best,
Danny Chan
在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:

But I think the planner needs to
know whether the input is insert-only or not.


















Reply via email to