Hey Flinkers,

I am trying to follow the docs
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api>
to
convert a DataStream to a Table. Specifically, I have a DataStream of Row
and want the columns of the row to become the columns of the resulting
table.

That works but only if I construct the Rows statically. If I construct them
dynamically (in a map) then Flink turns the entire Row into one column of
type "RAW('org.apache.flink.types.Row', '...')".

Does anybody know why this is the case or how to fix it? Take a look at the
simple Flink program below where I construct the DataStream "rows" in two
different ways. I would expect those to be identical (and the sink does
print identical information) but the inferred table schema is different.

Thanks a ton,
Matthias

------------------------------

        StreamExecutionEnvironment flinkEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
        flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);

        DataStream<Integer> integers = flinkEnv.fromElements(12, 5);

        DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i));

//  This alternative way of constructing this data stream produces the
expected table schema
//      DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12",
12), Row.of("Name5", 5));

        StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(flinkEnv);
        Table table = tableEnv.fromDataStream(rows);
        table.printSchema();

        rows.addSink(new PrintSinkFunction<>());

        flinkEnv.execute();

Reply via email to