Hi,

Here's an example that works for me:


"""
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.*$*;

import java.util.List;

public class Stream2Table {

    public static void main(String[] args) {

        var streamingEnv = 
StreamExecutionEnvironment.*getExecutionEnvironment*();
        var tableEnv = StreamTableEnvironment.*create*(streamingEnv);

        var userRows = streamingEnv.fromCollection(
                List.*of*(
                        Row.*of*("user1", "al...@mail.org 
<mailto:us...@mail.org>", "Alice"),
                        Row.*of*("user2", "b...@mail.org 
<mailto:us...@mail.org>", "Bob")
                ),
                new RowTypeInfo(Types.*STRING*, Types.*STRING*, 
Types.*STRING*));

        var table = tableEnv
                .fromDataStream(userRows,
                        *$*("user_id"), *$*("handle"), *$*("name"));

        table.execute().print();
    }

}
"""

You can also dig here, you'll probably find better examples
https://github.com/apache/flink/tree/master/flink-examples/flink-examples-table

Cheers,

Svend


On Sun, 11 Apr 2021, at 3:35 PM, vtygoss wrote:
> 
> Hi All,

> 

> there is a scenario where I need to process OGG Log data in kafka using Flink 
> Sql. I can convert the OGG Log Stream to DataStream<RowData> and each event 
> has RowKind, but i have trouble converting DataStream<RowData> to a Table.

> For test, i tried StreamTableEnvironment#fromDataStream and 
> createTemporaryView API, both TableSchema is 

> ```

> root

>  |-- f0: LEGACY('RAW', 'ANY<org.apache.flink.table.data.RowData>')

> ```

> 

> i want to get the schema :

> 

> ```

> root 

>  |— column1: Type,

>  |— column2: Type, 

> ….

> ```

> 

> 

> how to convert DataStream<RowData> with RowKind to Table? 

> 

> 

> Thank you very much for your reply

> 

Reply via email to