The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name .
I want to consume a Kafka topic into a table using Flink SQL, then convert it back to a DataStream. Here is the `SOURCE_DDL`: ``` CREATE TABLE kafka_source ( user_id BIGINT, datetime TIMESTAMP(3), last_5_clicks STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'aiinfra.fct.userfeature.0', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'test-group', 'format' = 'json' ) ``` With Flink, I execute the DDL. ```scala val settings = EnvironmentSettings.newInstance.build val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(streamEnv, settings) tableEnv.executeSql(SOURCE_DDL) val table = tableEnv.from("kafka_source") ``` Then, I convert it into DataStream, and do downstream logic in the `map(e => ...)` part. ```scala tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...) ``` Inside the `map(e => ...)` part, I would like to access the column name, in this case, `last_5_clicks`. Why? Because I may have different sources with different columns names (such as `last_10min_page_view`), but I would like to reuse the code in `map(e => ...)`. Is there a way to do this? Thanks. Best, Yik San