Re: Flink Table to DataStream: how to access column name?
Thank you, Till! Actually I find I can access this via `Table.getSchema.getFieldNames` in version 1.12.0 Best, Yik San On Wed, Mar 31, 2021 at 4:26 PM Till Rohrmann wrote: > You are right Yik San. This feature has only been introduced in the > upcoming 1.13 release [1]. Sorry for causing confusion here. I fear that > you have to either use 1.13-SNAPSHOT or wait for the 1.13 release which > should happen in a couple of weeks if you really need this feature. > > [1] https://issues.apache.org/jira/browse/FLINK-19981 > > Cheers, > Till > > On Tue, Mar 30, 2021 at 6:33 PM Yik San Chan > wrote: > >> Hi Till, >> >> From the version I am using (1.12.0), getFieldNames is not available in >> Row ... See >> https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java >> . >> >> Is there any workaround for this in version 1.12.0? Thanks. >> >> Best, >> Yik San >> >> On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann >> wrote: >> >>> There is a method Row.getFieldNames. >>> >>> Cheers, >>> Till >>> >>> On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan >>> wrote: >>> >>>> Hi Till, >>>> >>>> I look inside the Row class, it does contain a member `private final >>>> Object[] fields;` though I wonder how to get column names out of the >>>> member? >>>> >>>> Thanks! >>>> >>>> Best, >>>> Yik San >>>> >>>> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann >>>> wrote: >>>> >>>>> Hi Yik San, >>>>> >>>>> by converting the rows to a Tuple3 you effectively lose the >>>>> information about the column names. You could also call >>>>> `toRetractStream[Row]` which will give you a `DataStream[Row]` where you >>>>> keep the column names. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan < >>>>> evan.chanyik...@gmail.com> wrote: >>>>> >>>>>> The question is cross-posted on Stack Overflow >>>>>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name >>>>>> . >>>>>> >>>>>> I want to consume a Kafka topic into a table using Flink SQL, then >>>>>> convert it back to a DataStream. >>>>>> >>>>>> Here is the `SOURCE_DDL`: >>>>>> >>>>>> ``` >>>>>> CREATE TABLE kafka_source ( >>>>>> user_id BIGINT, >>>>>> datetime TIMESTAMP(3), >>>>>> last_5_clicks STRING >>>>>> ) WITH ( >>>>>> 'connector' = 'kafka', >>>>>> 'topic' = 'aiinfra.fct.userfeature.0', >>>>>> 'properties.bootstrap.servers' = 'localhost:9092', >>>>>> 'properties.group.id' = 'test-group', >>>>>> 'format' = 'json' >>>>>> ) >>>>>> ``` >>>>>> >>>>>> With Flink, I execute the DDL. >>>>>> >>>>>> ```scala >>>>>> val settings = EnvironmentSettings.newInstance.build >>>>>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment >>>>>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings) >>>>>> tableEnv.executeSql(SOURCE_DDL) >>>>>> val table = tableEnv.from("kafka_source") >>>>>> ``` >>>>>> >>>>>> Then, I convert it into DataStream, and do downstream logic in the >>>>>> `map(e => ...)` part. >>>>>> >>>>>> ```scala >>>>>> tableEnv.toRetractStream[(Long, java.sql.Timestamp, >>>>>> String)](table).map(e => ...) >>>>>> ``` >>>>>> >>>>>> Inside the `map(e => ...)` part, I would like to access the column >>>>>> name, in this case, `last_5_clicks`. Why? Because I may have different >>>>>> sources with different columns names (such as `last_10min_page_view`), >>>>>> but >>>>>> I would like to reuse the code in `map(e => ...)`. >>>>>> >>>>>> Is there a way to do this? Thanks. >>>>>> >>>>>> Best, >>>>>> Yik San >>>>>> >>>>>
Re: Flink Table to DataStream: how to access column name?
You are right Yik San. This feature has only been introduced in the upcoming 1.13 release [1]. Sorry for causing confusion here. I fear that you have to either use 1.13-SNAPSHOT or wait for the 1.13 release which should happen in a couple of weeks if you really need this feature. [1] https://issues.apache.org/jira/browse/FLINK-19981 Cheers, Till On Tue, Mar 30, 2021 at 6:33 PM Yik San Chan wrote: > Hi Till, > > From the version I am using (1.12.0), getFieldNames is not available in > Row ... See > https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java > . > > Is there any workaround for this in version 1.12.0? Thanks. > > Best, > Yik San > > On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann > wrote: > >> There is a method Row.getFieldNames. >> >> Cheers, >> Till >> >> On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan >> wrote: >> >>> Hi Till, >>> >>> I look inside the Row class, it does contain a member `private final >>> Object[] fields;` though I wonder how to get column names out of the >>> member? >>> >>> Thanks! >>> >>> Best, >>> Yik San >>> >>> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann >>> wrote: >>> >>>> Hi Yik San, >>>> >>>> by converting the rows to a Tuple3 you effectively lose the information >>>> about the column names. You could also call `toRetractStream[Row]` which >>>> will give you a `DataStream[Row]` where you keep the column names. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan >>>> wrote: >>>> >>>>> The question is cross-posted on Stack Overflow >>>>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name >>>>> . >>>>> >>>>> I want to consume a Kafka topic into a table using Flink SQL, then >>>>> convert it back to a DataStream. >>>>> >>>>> Here is the `SOURCE_DDL`: >>>>> >>>>> ``` >>>>> CREATE TABLE kafka_source ( >>>>> user_id BIGINT, >>>>> datetime TIMESTAMP(3), >>>>> last_5_clicks STRING >>>>> ) WITH ( >>>>> 'connector' = 'kafka', >>>>> 'topic' = 'aiinfra.fct.userfeature.0', >>>>> 'properties.bootstrap.servers' = 'localhost:9092', >>>>> 'properties.group.id' = 'test-group', >>>>> 'format' = 'json' >>>>> ) >>>>> ``` >>>>> >>>>> With Flink, I execute the DDL. >>>>> >>>>> ```scala >>>>> val settings = EnvironmentSettings.newInstance.build >>>>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment >>>>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings) >>>>> tableEnv.executeSql(SOURCE_DDL) >>>>> val table = tableEnv.from("kafka_source") >>>>> ``` >>>>> >>>>> Then, I convert it into DataStream, and do downstream logic in the >>>>> `map(e => ...)` part. >>>>> >>>>> ```scala >>>>> tableEnv.toRetractStream[(Long, java.sql.Timestamp, >>>>> String)](table).map(e => ...) >>>>> ``` >>>>> >>>>> Inside the `map(e => ...)` part, I would like to access the column >>>>> name, in this case, `last_5_clicks`. Why? Because I may have different >>>>> sources with different columns names (such as `last_10min_page_view`), but >>>>> I would like to reuse the code in `map(e => ...)`. >>>>> >>>>> Is there a way to do this? Thanks. >>>>> >>>>> Best, >>>>> Yik San >>>>> >>>>
Re: Flink Table to DataStream: how to access column name?
Hi Till, >From the version I am using (1.12.0), getFieldNames is not available in Row ... See https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java . Is there any workaround for this in version 1.12.0? Thanks. Best, Yik San On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann wrote: > There is a method Row.getFieldNames. > > Cheers, > Till > > On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan > wrote: > >> Hi Till, >> >> I look inside the Row class, it does contain a member `private final >> Object[] fields;` though I wonder how to get column names out of the >> member? >> >> Thanks! >> >> Best, >> Yik San >> >> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann >> wrote: >> >>> Hi Yik San, >>> >>> by converting the rows to a Tuple3 you effectively lose the information >>> about the column names. You could also call `toRetractStream[Row]` which >>> will give you a `DataStream[Row]` where you keep the column names. >>> >>> Cheers, >>> Till >>> >>> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan >>> wrote: >>> >>>> The question is cross-posted on Stack Overflow >>>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name >>>> . >>>> >>>> I want to consume a Kafka topic into a table using Flink SQL, then >>>> convert it back to a DataStream. >>>> >>>> Here is the `SOURCE_DDL`: >>>> >>>> ``` >>>> CREATE TABLE kafka_source ( >>>> user_id BIGINT, >>>> datetime TIMESTAMP(3), >>>> last_5_clicks STRING >>>> ) WITH ( >>>> 'connector' = 'kafka', >>>> 'topic' = 'aiinfra.fct.userfeature.0', >>>> 'properties.bootstrap.servers' = 'localhost:9092', >>>> 'properties.group.id' = 'test-group', >>>> 'format' = 'json' >>>> ) >>>> ``` >>>> >>>> With Flink, I execute the DDL. >>>> >>>> ```scala >>>> val settings = EnvironmentSettings.newInstance.build >>>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment >>>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings) >>>> tableEnv.executeSql(SOURCE_DDL) >>>> val table = tableEnv.from("kafka_source") >>>> ``` >>>> >>>> Then, I convert it into DataStream, and do downstream logic in the >>>> `map(e => ...)` part. >>>> >>>> ```scala >>>> tableEnv.toRetractStream[(Long, java.sql.Timestamp, >>>> String)](table).map(e => ...) >>>> ``` >>>> >>>> Inside the `map(e => ...)` part, I would like to access the column >>>> name, in this case, `last_5_clicks`. Why? Because I may have different >>>> sources with different columns names (such as `last_10min_page_view`), but >>>> I would like to reuse the code in `map(e => ...)`. >>>> >>>> Is there a way to do this? Thanks. >>>> >>>> Best, >>>> Yik San >>>> >>>
Re: Flink Table to DataStream: how to access column name?
There is a method Row.getFieldNames. Cheers, Till On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan wrote: > Hi Till, > > I look inside the Row class, it does contain a member `private final > Object[] fields;` though I wonder how to get column names out of the > member? > > Thanks! > > Best, > Yik San > > On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann > wrote: > >> Hi Yik San, >> >> by converting the rows to a Tuple3 you effectively lose the information >> about the column names. You could also call `toRetractStream[Row]` which >> will give you a `DataStream[Row]` where you keep the column names. >> >> Cheers, >> Till >> >> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan >> wrote: >> >>> The question is cross-posted on Stack Overflow >>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name >>> . >>> >>> I want to consume a Kafka topic into a table using Flink SQL, then >>> convert it back to a DataStream. >>> >>> Here is the `SOURCE_DDL`: >>> >>> ``` >>> CREATE TABLE kafka_source ( >>> user_id BIGINT, >>> datetime TIMESTAMP(3), >>> last_5_clicks STRING >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'aiinfra.fct.userfeature.0', >>> 'properties.bootstrap.servers' = 'localhost:9092', >>> 'properties.group.id' = 'test-group', >>> 'format' = 'json' >>> ) >>> ``` >>> >>> With Flink, I execute the DDL. >>> >>> ```scala >>> val settings = EnvironmentSettings.newInstance.build >>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment >>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings) >>> tableEnv.executeSql(SOURCE_DDL) >>> val table = tableEnv.from("kafka_source") >>> ``` >>> >>> Then, I convert it into DataStream, and do downstream logic in the >>> `map(e => ...)` part. >>> >>> ```scala >>> tableEnv.toRetractStream[(Long, java.sql.Timestamp, >>> String)](table).map(e => ...) >>> ``` >>> >>> Inside the `map(e => ...)` part, I would like to access the column name, >>> in this case, `last_5_clicks`. Why? Because I may have different sources >>> with different columns names (such as `last_10min_page_view`), but I would >>> like to reuse the code in `map(e => ...)`. >>> >>> Is there a way to do this? Thanks. >>> >>> Best, >>> Yik San >>> >>
Re: Flink Table to DataStream: how to access column name?
Hi Till, I look inside the Row class, it does contain a member `private final Object[] fields;` though I wonder how to get column names out of the member? Thanks! Best, Yik San On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann wrote: > Hi Yik San, > > by converting the rows to a Tuple3 you effectively lose the information > about the column names. You could also call `toRetractStream[Row]` which > will give you a `DataStream[Row]` where you keep the column names. > > Cheers, > Till > > On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan > wrote: > >> The question is cross-posted on Stack Overflow >> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name >> . >> >> I want to consume a Kafka topic into a table using Flink SQL, then >> convert it back to a DataStream. >> >> Here is the `SOURCE_DDL`: >> >> ``` >> CREATE TABLE kafka_source ( >> user_id BIGINT, >> datetime TIMESTAMP(3), >> last_5_clicks STRING >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'aiinfra.fct.userfeature.0', >> 'properties.bootstrap.servers' = 'localhost:9092', >> 'properties.group.id' = 'test-group', >> 'format' = 'json' >> ) >> ``` >> >> With Flink, I execute the DDL. >> >> ```scala >> val settings = EnvironmentSettings.newInstance.build >> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment >> val tableEnv = StreamTableEnvironment.create(streamEnv, settings) >> tableEnv.executeSql(SOURCE_DDL) >> val table = tableEnv.from("kafka_source") >> ``` >> >> Then, I convert it into DataStream, and do downstream logic in the `map(e >> => ...)` part. >> >> ```scala >> tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e >> => ...) >> ``` >> >> Inside the `map(e => ...)` part, I would like to access the column name, >> in this case, `last_5_clicks`. Why? Because I may have different sources >> with different columns names (such as `last_10min_page_view`), but I would >> like to reuse the code in `map(e => ...)`. >> >> Is there a way to do this? Thanks. >> >> Best, >> Yik San >> >
Re: Flink Table to DataStream: how to access column name?
Hi Yik San, by converting the rows to a Tuple3 you effectively lose the information about the column names. You could also call `toRetractStream[Row]` which will give you a `DataStream[Row]` where you keep the column names. Cheers, Till On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan wrote: > The question is cross-posted on Stack Overflow > https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name > . > > I want to consume a Kafka topic into a table using Flink SQL, then convert > it back to a DataStream. > > Here is the `SOURCE_DDL`: > > ``` > CREATE TABLE kafka_source ( > user_id BIGINT, > datetime TIMESTAMP(3), > last_5_clicks STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'aiinfra.fct.userfeature.0', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'test-group', > 'format' = 'json' > ) > ``` > > With Flink, I execute the DDL. > > ```scala > val settings = EnvironmentSettings.newInstance.build > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(streamEnv, settings) > tableEnv.executeSql(SOURCE_DDL) > val table = tableEnv.from("kafka_source") > ``` > > Then, I convert it into DataStream, and do downstream logic in the `map(e > => ...)` part. > > ```scala > tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e > => ...) > ``` > > Inside the `map(e => ...)` part, I would like to access the column name, > in this case, `last_5_clicks`. Why? Because I may have different sources > with different columns names (such as `last_10min_page_view`), but I would > like to reuse the code in `map(e => ...)`. > > Is there a way to do this? Thanks. > > Best, > Yik San >
Flink Table to DataStream: how to access column name?
The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name . I want to consume a Kafka topic into a table using Flink SQL, then convert it back to a DataStream. Here is the `SOURCE_DDL`: ``` CREATE TABLE kafka_source ( user_id BIGINT, datetime TIMESTAMP(3), last_5_clicks STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'aiinfra.fct.userfeature.0', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'test-group', 'format' = 'json' ) ``` With Flink, I execute the DDL. ```scala val settings = EnvironmentSettings.newInstance.build val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(streamEnv, settings) tableEnv.executeSql(SOURCE_DDL) val table = tableEnv.from("kafka_source") ``` Then, I convert it into DataStream, and do downstream logic in the `map(e => ...)` part. ```scala tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...) ``` Inside the `map(e => ...)` part, I would like to access the column name, in this case, `last_5_clicks`. Why? Because I may have different sources with different columns names (such as `last_10min_page_view`), but I would like to reuse the code in `map(e => ...)`. Is there a way to do this? Thanks. Best, Yik San