Hi Danny,

Your proposed signatures sound good to me.

fromDataStream(dataStream, Schema)
toDataStream(table, AbstractDataType<?>)

They address all my concerns. The API would not be symmetric anymore, but this is fine with me. Others raised concerns about deprecating `fromDataStream(dataStream, Expression)`. Are they fine with this as well?

If there are no objections, I would update the FLIP with the methods above. Bu let me briefly summarize my thoughts on this again, so that we are all on the same page:
- The biggest discussion point seems the fromInsertStream/toInsertStream.
- I don’t have a strong opinion on naming, from/toDataStream would be fine for me. But: - It slightly different type mappings and might break existing pipelines silently. This point can be neglected as the differences are only minor. - We need a way of declaring the rowtime attribute but without declaring all columns again. Reduce manual schema work as much as possible. - Both Dawid and I don’t like the current either “position based” or “name based” expression logic that looks like a projection but is not. - Actually name based expressions are not necessary, since we have positions for all new data types. - Schema is not suitable to influence the output type for toDataStream. It should be DataType.

All items are solved by Danny's suggestion.

Regards,
Timo



On 08.09.20 14:04, Danny Chan wrote:
Hi, Timo ~

"It is not about changelog mode compatibility, it is about the type
compatibility.”

For fromDataStream(dataStream, Schema), there should not be compatibility 
problem or data type inconsistency. We know the logical type from Schema and 
physical type from the dataStream itself.

For toDataStream(table, AbstractDataType<?>), we can get the logical type from 
the table itself
and the physical type from the passed data type.

If both behavior are deterministic, what's the problem for type compatibility 
and safety?

My concern is that in most of the cases, people use the "insert stream", they 
do not need to care about
the data stream ChangelogMode, so there is no need to distinguish them from the 
APIs, an optional param is enough. If we introduces 2 new API there, people 
have to choose between them, and can fromChangelogStream()
accept an insert stream ? What is the behavior if fromInsertStream() accepts a 
changelog stream ?


"This means potentially duplicate definition of fields and their data types etc”

I agree, because table already has an underlying schema there.

Best,
Danny Chan
在 2020年9月3日 +0800 PM8:12,Timo Walther <twal...@apache.org>,写道:
Hi Danny,

"if ChangelogMode.INSERT is the default, existing pipelines should be
compatible"

It is not about changelog mode compatibility, it is about the type
compatibility. The renaming to `toInsertStream` is only to have a mean
of dealing with data type inconsistencies that could break existing
pipelines.

As the FLIP describes, the following new behavior should be implemented:

- It does this by translating the TypeInformation to DataType.
- This will happen with a new TypeInfoDataTypeConverter that will no
longer produce LegacyTypeInformationType.
- All types from DataStream API should be supported by this converter.
- TupleTypeInfoBase will be translated into a proper RowType or
StructuredType.
- BigDecimals will be converted to DECIMAL(38,18) by default.
- Composite types (tuples, POJOs, rows) will be flattened by default if
they are used as top-level records (similar to the old behavior).
- The order of POJO field's is determined by the DataTypeExtractor and
must not be defined manually anymore.
- GenericTypeInfo is converted to RawType immediately by considering the
current configuration.
- A DataStream that originated from Table API will keep its DataType
information due to ExternalTypeInfo implementing DataTypeQueryable.

I would feel safer if we do this under a new method name.

"toDataStream(table, schema.bindTo(DataType))"

This is what I meant with "integrate the DataType into the Schema class
itself". Yes, we can do that if everybody is fine with it. But why
should a user specify both a schema and a data type? This means
potentially duplicate definition of fields and their data types etc.

Regards,
Timo


On 03.09.20 11:31, Danny Chan wrote:
"It is a more conservative approach to introduce that in a
new method rather than changing the existing one under the hood and
potentially break existing pipelines silently”

I like the idea actually, but if ChangelogMode.INSERT is the default, existing 
pipelines should be compatible. We can see the other kinds of ChangelogMode as 
an extension.

“for `toDataStream` users need to be
able to express whether they would prefer Row, POJO or atomic”

I think most of the cases people do not need to convert the stream to a Row or 
POJO, because the table projection always returns a flatternned internal row, 
if people did want a POJO there, how about we bind the DataType to the existing 
schema, like this

toDataStream(table, schema.bindTo(DataType))

Best,
Danny Chan
在 2020年9月3日 +0800 PM3:18,dev@flink.apache.org,写道:

It is a more conservative approach to introduce that in a
new method rather than changing the existing one under the hood and
potentially break existing pipelines silently




Reply via email to