You are right Yik San. This feature has only been introduced in the upcoming 1.13 release [1]. Sorry for causing confusion here. I fear that you have to either use 1.13-SNAPSHOT or wait for the 1.13 release which should happen in a couple of weeks if you really need this feature.
[1] https://issues.apache.org/jira/browse/FLINK-19981 Cheers, Till On Tue, Mar 30, 2021 at 6:33 PM Yik San Chan <evan.chanyik...@gmail.com> wrote: > Hi Till, > > From the version I am using (1.12.0), getFieldNames is not available in > Row ... See > https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java > . > > Is there any workaround for this in version 1.12.0? Thanks. > > Best, > Yik San > > On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann <trohrm...@apache.org> > wrote: > >> There is a method Row.getFieldNames. >> >> Cheers, >> Till >> >> On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan <evan.chanyik...@gmail.com> >> wrote: >> >>> Hi Till, >>> >>> I look inside the Row class, it does contain a member `private final >>> Object[] fields;` though I wonder how to get column names out of the >>> member? >>> >>> Thanks! >>> >>> Best, >>> Yik San >>> >>> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Yik San, >>>> >>>> by converting the rows to a Tuple3 you effectively lose the information >>>> about the column names. You could also call `toRetractStream[Row]` which >>>> will give you a `DataStream[Row]` where you keep the column names. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <evan.chanyik...@gmail.com> >>>> wrote: >>>> >>>>> The question is cross-posted on Stack Overflow >>>>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name >>>>> . >>>>> >>>>> I want to consume a Kafka topic into a table using Flink SQL, then >>>>> convert it back to a DataStream. >>>>> >>>>> Here is the `SOURCE_DDL`: >>>>> >>>>> ``` >>>>> CREATE TABLE kafka_source ( >>>>> user_id BIGINT, >>>>> datetime TIMESTAMP(3), >>>>> last_5_clicks STRING >>>>> ) WITH ( >>>>> 'connector' = 'kafka', >>>>> 'topic' = 'aiinfra.fct.userfeature.0', >>>>> 'properties.bootstrap.servers' = 'localhost:9092', >>>>> 'properties.group.id' = 'test-group', >>>>> 'format' = 'json' >>>>> ) >>>>> ``` >>>>> >>>>> With Flink, I execute the DDL. >>>>> >>>>> ```scala >>>>> val settings = EnvironmentSettings.newInstance.build >>>>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment >>>>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings) >>>>> tableEnv.executeSql(SOURCE_DDL) >>>>> val table = tableEnv.from("kafka_source") >>>>> ``` >>>>> >>>>> Then, I convert it into DataStream, and do downstream logic in the >>>>> `map(e => ...)` part. >>>>> >>>>> ```scala >>>>> tableEnv.toRetractStream[(Long, java.sql.Timestamp, >>>>> String)](table).map(e => ...) >>>>> ``` >>>>> >>>>> Inside the `map(e => ...)` part, I would like to access the column >>>>> name, in this case, `last_5_clicks`. Why? Because I may have different >>>>> sources with different columns names (such as `last_10min_page_view`), but >>>>> I would like to reuse the code in `map(e => ...)`. >>>>> >>>>> Is there a way to do this? Thanks. >>>>> >>>>> Best, >>>>> Yik San >>>>> >>>>