回复:回复:[DISCUSS] [FLINK-23122] Provide the Dynamic register converter
@Timo Walther Thanks great reply. 1, TIMESTAMP_WITH_TIME_ZONE issue: https://issues.apache.org/jira/browse/FLINK-23145 will track. 2, Unsupported types of other systems: I basic agree that provide modules to load specific types on demand. But to my concern : 2.1 the conversion for different data source have their own conversion (JDBC for org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter#createInternalConverter, Postgres for org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter#createInternalConverter). 2.2 For SQL extend type not provide the dynamic register mechanism (e.g. CREATE TYPE user_enum AS ENUM ('enum1', 'enum2', 'enum3'); CREATE TABLE user_enum_tb( enum_column user_enum);). In summary Flink DataType provide the expansion for user to define custom type they want. But lack of expansion from data source type to Flink datatype , and reverse. I suggest that we can provide the basic type conversion for most case. Different can load the type conversion by their needed. At the same time, we can provide the extension mechanism for them to register the custom type or override the type conversion . -- 发件人:Timo Walther 发送时间:2021年6月24日(星期四) 21:06 收件人:dev 主 题:Re: 回复:[DISCUSS] [FLINK-23122] Provide the Dynamic register converter Hi Jack, thanks for sharing your proposal with us. I totally understand the issues that you are trying to solve. Having a more flexible type support in the connectors is definitely a problem that we would like to address in the mid term. It is already considered in on our internal roadmap planning. I haven't taken a deeper look at your current proposal but will do so soon. Until then, let me give you some general feedback. I see a couple of orthogonal issues that we need to solve: 1) The TIMESTAMP_WITH_TIME_ZONE problem: this is one of the easier issues that we simply need to fix on the runtime side. We are planning to support this type because it is one of the core data structures that you need in basically every pipeline. 2) Unsupported types of other systems: As Jark said, we offer support for RAW types and also user-defined structured types. Since most of the pre-requisite work has been done for user-defined types (e.g. a central type registry). I could imagine that we are able to extend Flink's type system soon. My idea would be to provide modules via Flink's module system to load Postgres or MySQL specific types that could then be used at all regular locations such as DDL or functions. 3) Add connector specific type information in DDL: We should allow to enrich the automatic schema convertion step when translating DDL into other system's types. This is were you proposal might make sense. Ragrds, Timo On 24.06.21 14:19, 云华 wrote: > > @Jark Wuthanks reply. However Several case I want to cover: > > 1, Unknown types CITEXT: > Flink SQL cannot exexute "CREATE TABLE string_table (pk SERIAL, vc > VARCHAR(2), vcv CHARACTER VARYING(2), ch CHARACTER(4), c CHAR(3), t TEXT, b > BYTEA, bnn BYTEA NOT NULL, ct CITEXT, PRIMARY KEY(pk));". > this is because > org.apache.flink.connector.jdbc.catalog.PostgresCatalog#fromJDBCType cannot > support CITEXT. > > 2, TIMESTAMP_WITH_TIME_ZONE unsuppoted : > org.apache.flink.table.runtime.typeutils.InternalSerializers#createInternal > cannot support TIMESTAMP_WITH_TIME_ZONE. > 3, Unsupported types(MySQL): > org.apache.flink.connector.jdbc.dialect.MySQLDialect#unsupportedTypes provide > the unsuppoted types. > 4, Unsupported types(Postgres): > org.apache.flink.connector.jdbc.dialect.PostgresDialect#unsupportedTypes > provide the unsuppoted types. > 5, (Postgres) parts of types implements referenced from Postgres > https://www.postgresql.org/docs/12/datatype.html .6, (MySQL) parts of types > implements referenced from MySQL > https://dev.mysql.com/doc/refman/8.0/en/data-types.html. > > > Please let me If you have any suggestion. > > > -------------- > 发件人:Jark Wu > 发送时间:2021年6月23日(星期三) 23:13 > 收件人:dev ; 云华 > 主 题:Re: [DISCUSS] [FLINK-23122] Provide the Dynamic register converter > > Hi, > > `TIMESTAMP_WITH_TIME_ZONE` is not supported in the Flink SQL engine, > even though it is listed in the type API. > > I think what you are looking for is the RawValueType which can be used as > user-defined type. You can use `DataTypes.RAW(TypeInformation)` to define > a Raw type with the given TypeInformation which includes the serializer > and deserializer. > > Best, > Jark > On Wed, 23 Jun 2021 at 21:09, 云华 wrote: > > Hi everyone, > I want to rework type conversion system in connector and flink table module > to be resuable and
Re: 回复:[DISCUSS] [FLINK-23122] Provide the Dynamic register converter
Hi Jack, thanks for sharing your proposal with us. I totally understand the issues that you are trying to solve. Having a more flexible type support in the connectors is definitely a problem that we would like to address in the mid term. It is already considered in on our internal roadmap planning. I haven't taken a deeper look at your current proposal but will do so soon. Until then, let me give you some general feedback. I see a couple of orthogonal issues that we need to solve: 1) The TIMESTAMP_WITH_TIME_ZONE problem: this is one of the easier issues that we simply need to fix on the runtime side. We are planning to support this type because it is one of the core data structures that you need in basically every pipeline. 2) Unsupported types of other systems: As Jark said, we offer support for RAW types and also user-defined structured types. Since most of the pre-requisite work has been done for user-defined types (e.g. a central type registry). I could imagine that we are able to extend Flink's type system soon. My idea would be to provide modules via Flink's module system to load Postgres or MySQL specific types that could then be used at all regular locations such as DDL or functions. 3) Add connector specific type information in DDL: We should allow to enrich the automatic schema convertion step when translating DDL into other system's types. This is were you proposal might make sense. Ragrds, Timo On 24.06.21 14:19, 云华 wrote: @Jark Wuthanks reply. However Several case I want to cover: 1, Unknown types CITEXT: Flink SQL cannot exexute "CREATE TABLE string_table (pk SERIAL, vc VARCHAR(2), vcv CHARACTER VARYING(2), ch CHARACTER(4), c CHAR(3), t TEXT, b BYTEA, bnn BYTEA NOT NULL, ct CITEXT, PRIMARY KEY(pk));". this is because org.apache.flink.connector.jdbc.catalog.PostgresCatalog#fromJDBCType cannot support CITEXT. 2, TIMESTAMP_WITH_TIME_ZONE unsuppoted : org.apache.flink.table.runtime.typeutils.InternalSerializers#createInternal cannot support TIMESTAMP_WITH_TIME_ZONE. 3, Unsupported types(MySQL): org.apache.flink.connector.jdbc.dialect.MySQLDialect#unsupportedTypes provide the unsuppoted types. 4, Unsupported types(Postgres): org.apache.flink.connector.jdbc.dialect.PostgresDialect#unsupportedTypes provide the unsuppoted types. 5, (Postgres) parts of types implements referenced from Postgres https://www.postgresql.org/docs/12/datatype.html .6, (MySQL) parts of types implements referenced from MySQL https://dev.mysql.com/doc/refman/8.0/en/data-types.html. Please let me If you have any suggestion. -- 发件人:Jark Wu 发送时间:2021年6月23日(星期三) 23:13 收件人:dev ; 云华 主 题:Re: [DISCUSS] [FLINK-23122] Provide the Dynamic register converter Hi, `TIMESTAMP_WITH_TIME_ZONE` is not supported in the Flink SQL engine, even though it is listed in the type API. I think what you are looking for is the RawValueType which can be used as user-defined type. You can use `DataTypes.RAW(TypeInformation)` to define a Raw type with the given TypeInformation which includes the serializer and deserializer. Best, Jark On Wed, 23 Jun 2021 at 21:09, 云华 wrote: Hi everyone, I want to rework type conversion system in connector and flink table module to be resuable and scalability. I Postgres system, the type '_citext' will not supported in org.apache.flink.connector.jdbc.catalog.PostgresCatalog#fromJDBCType. what's more, org.apache.flink.table.runtime.typeutils.InternalSerializers#createInternal cannnot support the TIMESTAMP_WITH_TIME_ZONE. For more background and api design : https://issues.apache.org/jira/browse/FLINK-23122. Please let me know if this matches your thoughts. Regards,Jack
回复:[DISCUSS] [FLINK-23122] Provide the Dynamic register converter
@Jark Wuthanks reply. However Several case I want to cover: 1, Unknown types CITEXT: Flink SQL cannot exexute "CREATE TABLE string_table (pk SERIAL, vc VARCHAR(2), vcv CHARACTER VARYING(2), ch CHARACTER(4), c CHAR(3), t TEXT, b BYTEA, bnn BYTEA NOT NULL, ct CITEXT, PRIMARY KEY(pk));". this is because org.apache.flink.connector.jdbc.catalog.PostgresCatalog#fromJDBCType cannot support CITEXT. 2, TIMESTAMP_WITH_TIME_ZONE unsuppoted : org.apache.flink.table.runtime.typeutils.InternalSerializers#createInternal cannot support TIMESTAMP_WITH_TIME_ZONE. 3, Unsupported types(MySQL): org.apache.flink.connector.jdbc.dialect.MySQLDialect#unsupportedTypes provide the unsuppoted types. 4, Unsupported types(Postgres): org.apache.flink.connector.jdbc.dialect.PostgresDialect#unsupportedTypes provide the unsuppoted types. 5, (Postgres) parts of types implements referenced from Postgres https://www.postgresql.org/docs/12/datatype.html .6, (MySQL) parts of types implements referenced from MySQL https://dev.mysql.com/doc/refman/8.0/en/data-types.html. Please let me If you have any suggestion. -- 发件人:Jark Wu 发送时间:2021年6月23日(星期三) 23:13 收件人:dev ; 云华 主 题:Re: [DISCUSS] [FLINK-23122] Provide the Dynamic register converter Hi, `TIMESTAMP_WITH_TIME_ZONE` is not supported in the Flink SQL engine, even though it is listed in the type API. I think what you are looking for is the RawValueType which can be used as user-defined type. You can use `DataTypes.RAW(TypeInformation)` to define a Raw type with the given TypeInformation which includes the serializer and deserializer. Best, Jark On Wed, 23 Jun 2021 at 21:09, 云华 wrote: Hi everyone, I want to rework type conversion system in connector and flink table module to be resuable and scalability. I Postgres system, the type '_citext' will not supported in org.apache.flink.connector.jdbc.catalog.PostgresCatalog#fromJDBCType. what's more, org.apache.flink.table.runtime.typeutils.InternalSerializers#createInternal cannnot support the TIMESTAMP_WITH_TIME_ZONE. For more background and api design : https://issues.apache.org/jira/browse/FLINK-23122. Please let me know if this matches your thoughts. Regards,Jack
Re: [DISCUSS] [FLINK-23122] Provide the Dynamic register converter
Hi, `TIMESTAMP_WITH_TIME_ZONE` is not supported in the Flink SQL engine, even though it is listed in the type API. I think what you are looking for is the RawValueType which can be used as user-defined type. You can use `DataTypes.RAW(TypeInformation)` to define a Raw type with the given TypeInformation which includes the serializer and deserializer. Best, Jark On Wed, 23 Jun 2021 at 21:09, 云华 wrote: > > Hi everyone, > I want to rework type conversion system in connector and flink table > module to be resuable and scalability. > I Postgres system, the type '_citext' will not supported in > org.apache.flink.connector.jdbc.catalog.PostgresCatalog#fromJDBCType. > what's more, > org.apache.flink.table.runtime.typeutils.InternalSerializers#createInternal > cannnot support the TIMESTAMP_WITH_TIME_ZONE. > For more background and api design : > https://issues.apache.org/jira/browse/FLINK-23122. > Please let me know if this matches your thoughts. > > > > Regards,Jack
[DISCUSS] [FLINK-23122] Provide the Dynamic register converter
Hi everyone, I want to rework type conversion system in connector and flink table module to be resuable and scalability. I Postgres system, the type '_citext' will not supported in org.apache.flink.connector.jdbc.catalog.PostgresCatalog#fromJDBCType. what's more, org.apache.flink.table.runtime.typeutils.InternalSerializers#createInternal cannnot support the TIMESTAMP_WITH_TIME_ZONE. For more background and api design : https://issues.apache.org/jira/browse/FLINK-23122. Please let me know if this matches your thoughts. Regards,Jack