Thank you, Till! Actually I find I can access this via `Table.getSchema.getFieldNames` in version 1.12.0
Best, Yik San On Wed, Mar 31, 2021 at 4:26 PM Till Rohrmann <trohrm...@apache.org> wrote: > You are right Yik San. This feature has only been introduced in the > upcoming 1.13 release [1]. Sorry for causing confusion here. I fear that > you have to either use 1.13-SNAPSHOT or wait for the 1.13 release which > should happen in a couple of weeks if you really need this feature. > > [1] https://issues.apache.org/jira/browse/FLINK-19981 > > Cheers, > Till > > On Tue, Mar 30, 2021 at 6:33 PM Yik San Chan <evan.chanyik...@gmail.com> > wrote: > >> Hi Till, >> >> From the version I am using (1.12.0), getFieldNames is not available in >> Row ... See >> https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java >> . >> >> Is there any workaround for this in version 1.12.0? Thanks. >> >> Best, >> Yik San >> >> On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> There is a method Row.getFieldNames. >>> >>> Cheers, >>> Till >>> >>> On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan <evan.chanyik...@gmail.com> >>> wrote: >>> >>>> Hi Till, >>>> >>>> I look inside the Row class, it does contain a member `private final >>>> Object[] fields;` though I wonder how to get column names out of the >>>> member? >>>> >>>> Thanks! >>>> >>>> Best, >>>> Yik San >>>> >>>> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> Hi Yik San, >>>>> >>>>> by converting the rows to a Tuple3 you effectively lose the >>>>> information about the column names. You could also call >>>>> `toRetractStream[Row]` which will give you a `DataStream[Row]` where you >>>>> keep the column names. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan < >>>>> evan.chanyik...@gmail.com> wrote: >>>>> >>>>>> The question is cross-posted on Stack Overflow >>>>>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name >>>>>> . >>>>>> >>>>>> I want to consume a Kafka topic into a table using Flink SQL, then >>>>>> convert it back to a DataStream. >>>>>> >>>>>> Here is the `SOURCE_DDL`: >>>>>> >>>>>> ``` >>>>>> CREATE TABLE kafka_source ( >>>>>> user_id BIGINT, >>>>>> datetime TIMESTAMP(3), >>>>>> last_5_clicks STRING >>>>>> ) WITH ( >>>>>> 'connector' = 'kafka', >>>>>> 'topic' = 'aiinfra.fct.userfeature.0', >>>>>> 'properties.bootstrap.servers' = 'localhost:9092', >>>>>> 'properties.group.id' = 'test-group', >>>>>> 'format' = 'json' >>>>>> ) >>>>>> ``` >>>>>> >>>>>> With Flink, I execute the DDL. >>>>>> >>>>>> ```scala >>>>>> val settings = EnvironmentSettings.newInstance.build >>>>>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment >>>>>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings) >>>>>> tableEnv.executeSql(SOURCE_DDL) >>>>>> val table = tableEnv.from("kafka_source") >>>>>> ``` >>>>>> >>>>>> Then, I convert it into DataStream, and do downstream logic in the >>>>>> `map(e => ...)` part. >>>>>> >>>>>> ```scala >>>>>> tableEnv.toRetractStream[(Long, java.sql.Timestamp, >>>>>> String)](table).map(e => ...) >>>>>> ``` >>>>>> >>>>>> Inside the `map(e => ...)` part, I would like to access the column >>>>>> name, in this case, `last_5_clicks`. Why? Because I may have different >>>>>> sources with different columns names (such as `last_10min_page_view`), >>>>>> but >>>>>> I would like to reuse the code in `map(e => ...)`. >>>>>> >>>>>> Is there a way to do this? Thanks. >>>>>> >>>>>> Best, >>>>>> Yik San >>>>>> >>>>>