Hi haishui,
The enum type cannot be mapped as flink table type directly.
I think the easiest way is to convert enum to string type first:
DataStreamSource<Tuple2<String, String>> source = env.fromElements(
new Tuple2<>("1", TestEnum.A.name()),
new Tuple2<>("2", TestEnum.B.name())
);
Or add a map transformation:
DataStream<Tuple2<String, String>> source1 =
env.fromElements(
new TestData("1", TestEnum.A),
new TestData("2", TestEnum.B))
.map(t -> new Tuple2<>(t.s, t.en.name()))
.returns(new TypeHint<Tuple2<String, String>>() {});
Hope it helps.
Best,
Jiabao
On 2023/08/02 06:43:30 haishui wrote:
> I want to convert dataStream to Table. The type of dataSream is a POJO, which
> contains a enum field.
>
>
> 1. The enum field is RAW('classname', '...') in table. When I execute `SELECT
> * FROM t_test` and print the result, It throws EOFException.
> 2. If I assign the field is STRING in schema, It throws cannot cast
> "TestEnum" to "java.lang.String"
>
>
> Is there any way to define the enum field as STRING in table?
>
>
> My code is as follows:
> Flink 1.17.1
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> DataStreamSource<TestData> source = env.fromElements(
> new TestData("1", TestEnum.A),
> new TestData("2", TestEnum.B)
> );
> Schema schema = Schema
> .newBuilder()
> .column("s", DataTypes.STRING())
> .column("en", DataTypes.STRING())
> .build();
> Table table = tableEnv.fromDataStream(source);
> tableEnv.createTemporaryView("t_test", table);
> tableEnv.executeSql("DESC t_test").print();
> tableEnv.executeSql("select * from t_test").print();
> @Data
> @NoArgsConstructor
> @AllArgsConstructor
> public static class TestData {
> private String s;
> private TestEnum en;
> }
>
> public enum TestEnum {
> A, B, C
> }
> +----+--------------------------------+--------------------------------+
> | op | s | en |
> +----+--------------------------------+--------------------------------+
> | +I | 1 | SqlRawValue{?} |
> Exception in thread "main" org.apache.flink.util.FlinkRuntimeException:
> java.io.EOFException
> at
> org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:66)
> at GeneratedCastExecutor$1.cast(Unknown Source)
> at
> org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl.lambda$init$0(RowDataToStringConverterImpl.java:74)
> at
> org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl.convert(RowDataToStringConverterImpl.java:87)
> at
> org.apache.flink.table.utils.print.TableauStyle.rowFieldsToString(TableauStyle.java:167)
> at
> org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:148)
> at
> org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)
> Caused by: java.io.EOFException
> at java.base/java.io.DataInputStream.readFully(DataInputStream.java:202)
> at java.base/java.io.DataInputStream.readFully(DataInputStream.java:170)
> at
> org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.deserialize(RawValueDataSerializer.java:96)
> at
> org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.deserialize(RawValueDataSerializer.java:36)
> at
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:505)
> at
> org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
> ... 7 more