Hi Danny,

"if ChangelogMode.INSERT is the default, existing pipelines should be compatible"

It is not about changelog mode compatibility, it is about the type compatibility. The renaming to `toInsertStream` is only to have a mean of dealing with data type inconsistencies that could break existing pipelines.

As the FLIP describes, the following new behavior should be implemented:

- It does this by translating the TypeInformation to DataType.
- This will happen with a new TypeInfoDataTypeConverter that will no longer produce LegacyTypeInformationType.
- All types from DataStream API should be supported by this converter.
- TupleTypeInfoBase will be translated into a proper RowType or StructuredType.
- BigDecimals will be converted to DECIMAL(38,18) by default.
- Composite types (tuples, POJOs, rows) will be flattened by default if they are used as top-level records (similar to the old behavior). - The order of POJO field's is determined by the DataTypeExtractor and must not be defined manually anymore. - GenericTypeInfo is converted to RawType immediately by considering the current configuration. - A DataStream that originated from Table API will keep its DataType information due to ExternalTypeInfo implementing DataTypeQueryable.

I would feel safer if we do this under a new method name.

"toDataStream(table, schema.bindTo(DataType))"

This is what I meant with "integrate the DataType into the Schema class itself". Yes, we can do that if everybody is fine with it. But why should a user specify both a schema and a data type? This means potentially duplicate definition of fields and their data types etc.

Regards,
Timo


On 03.09.20 11:31, Danny Chan wrote:
"It is a more conservative approach to introduce that in a
new method rather than changing the existing one under the hood and
potentially break existing pipelines silently”

I like the idea actually, but if ChangelogMode.INSERT is the default, existing 
pipelines should be compatible. We can see the other kinds of ChangelogMode as 
an extension.

“for `toDataStream` users need to be
able to express whether they would prefer Row, POJO or atomic”

I think most of the cases people do not need to convert the stream to a Row or 
POJO, because the table projection always returns a flatternned internal row, 
if people did want a POJO there, how about we bind the DataType to the existing 
schema, like this

toDataStream(table, schema.bindTo(DataType))

Best,
Danny Chan
在 2020年9月3日 +0800 PM3:18,dev@flink.apache.org,写道:

It is a more conservative approach to introduce that in a
new method rather than changing the existing one under the hood and
potentially break existing pipelines silently


Reply via email to