[
https://issues.apache.org/jira/browse/FLINK-36847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-36847:
-----------------------------------
Labels: pull-request-available (was: )
> Table API toDataStream() cannot be converted to an enum type field
> -------------------------------------------------------------------
>
> Key: FLINK-36847
> URL: https://issues.apache.org/jira/browse/FLINK-36847
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.16.1
> Environment: jdk: 1.8
> flink: 1.16.1
>
> Reporter: LuJiang
> Priority: Not a Priority
> Labels: pull-request-available
>
> toDataStream(table,clazz) will run fail when pojo Class contains enum
> fields.Exception occurs when searching for Class constructor:
> org.apache.flink.table.types.extraction.DataTypeExtractor#extractStructuredType
> {code:java}
> private DataType extractStructuredType(
> DataTypeTemplate template, List<Type> typeHierarchy, Type type) {
> final Class<?> clazz = toClass(type);
> if (clazz == null) {
> throw extractionError("Not a class type.");
> } validateStructuredClass(clazz);
> validateStructuredSelfReference(type, typeHierarchy); final
> List<Field> fields = collectStructuredFields(clazz); if
> (fields.isEmpty()) {
> throw extractionError("Class '%s' has no fields.",
> clazz.getName());
> } // if not all fields are mutable, a default constructor is
> not enough
> final boolean allFieldsMutable =
> fields.stream()
> .allMatch(
> f -> {
> validateStructuredFieldReadability(clazz,
> f);
> return isStructuredFieldMutable(clazz, f);
> }); final
> ExtractionUtils.AssigningConstructor constructor =
> extractAssigningConstructor(clazz, fields);
> if (!allFieldsMutable && constructor == null) {
> throw extractionError(
> "Class '%s' has immutable fields and thus requires a
> constructor that is publicly "
> + "accessible and assigns all fields: %s",
> clazz.getName(),
>
> fields.stream().map(Field::getName).collect(Collectors.joining(", ")));
> }
> // check for a default constructor otherwise
> else if (constructor == null && !hasInvokableConstructor(clazz)) {
> throw extractionError(
> "Class '%s' has neither a constructor that assigns all
> fields nor a default constructor.",
> clazz.getName());
> } final Map<String, DataType> fieldDataTypes =
> extractStructuredTypeFields(template, typeHierarchy, type,
> fields); final DataTypes.Field[] attributes =
> createStructuredTypeAttributes(constructor, fieldDataTypes);
> return DataTypes.STRUCTURED(clazz, attributes);
> }{code}
> extractAssigningConstructor cannot find any available constructor.
>
> It's easy to reproduce as following:
>
>
> {code:java}
> @Test
> public void testTable2Pojo() throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> String sql = readFromClasspath("table2pojo.sql");
> tEnv.executeSql(sql);
> Table table = tEnv.sqlQuery("select * from my_user");
> DataStream<MyUser> dataStream = tEnv.toDataStream(table, MyUser.class);
> dataStream.process(new ProcessFunction<MyUser, String>() {
> @Override
> public void processElement(MyUser value, ProcessFunction<MyUser,
> String>.Context ctx, Collector<String> out) throws Exception {
> out.collect(JSON.toJSONString(value));
> }
> }).addSink(new PrintSinkFunction<>());
> env.execute(getClass().getSimpleName());
> }
> @Data
> @Accessors(chain = true)
> public class MyUser {
> private String username;
> private Integer age;
> private MyExtInfo info;
> }
> @Data
> @Accessors(chain = true)
> public class MyExtInfo {
> private String family;
> private String myHobby;
> private MyJobType job;
> }
> public enum MyJobType {
> Teacher,
> Artist,
> Scientist;
> }
> // table2pojo.sql
> create table my_user
> (
> username STRING,
> age INT,
> info ROW <
> my_hobby STRING,
> family STRING,
> job STRING >
> )
> with
> ( 'connector' = 'kafka',
> 'value.format' ='json',
> 'topic'='test',
> 'properties.group.id'='test001',
> 'properties.auto.offset.reset'= 'latest',
> 'properties.bootstrap.servers'='localhost:50000',
> 'value.fields-include' ='ALL')
> {code}
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)