You are right Yik San. This feature has only been introduced in the
upcoming 1.13 release [1]. Sorry for causing confusion here. I fear that
you have to either use 1.13-SNAPSHOT or wait for the 1.13 release which
should happen in a couple of weeks if you really need this feature.

[1] https://issues.apache.org/jira/browse/FLINK-19981

Cheers,
Till

On Tue, Mar 30, 2021 at 6:33 PM Yik San Chan <evan.chanyik...@gmail.com>
wrote:

> Hi Till,
>
> From the version I am using (1.12.0), getFieldNames is not available in
> Row ... See
> https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java
> .
>
> Is there any workaround for this in version 1.12.0? Thanks.
>
> Best,
> Yik San
>
> On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> There is a method Row.getFieldNames.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan <evan.chanyik...@gmail.com>
>> wrote:
>>
>>> Hi Till,
>>>
>>> I look inside the Row class, it does contain a member `private final
>>> Object[] fields;` though I wonder how to get column names out of the
>>> member?
>>>
>>> Thanks!
>>>
>>> Best,
>>> Yik San
>>>
>>> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Hi Yik San,
>>>>
>>>> by converting the rows to a Tuple3 you effectively lose the information
>>>> about the column names. You could also call `toRetractStream[Row]` which
>>>> will give you a `DataStream[Row]` where you keep the column names.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <evan.chanyik...@gmail.com>
>>>> wrote:
>>>>
>>>>> The question is cross-posted on Stack Overflow
>>>>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
>>>>> .
>>>>>
>>>>> I want to consume a Kafka topic into a table using Flink SQL, then
>>>>> convert it back to a DataStream.
>>>>>
>>>>> Here is the `SOURCE_DDL`:
>>>>>
>>>>> ```
>>>>> CREATE TABLE kafka_source (
>>>>>     user_id BIGINT,
>>>>>     datetime TIMESTAMP(3),
>>>>>     last_5_clicks STRING
>>>>> ) WITH (
>>>>>     'connector' = 'kafka',
>>>>>     'topic' = 'aiinfra.fct.userfeature.0',
>>>>>     'properties.bootstrap.servers' = 'localhost:9092',
>>>>>     'properties.group.id' = 'test-group',
>>>>>     'format' = 'json'
>>>>> )
>>>>> ```
>>>>>
>>>>> With Flink, I execute the DDL.
>>>>>
>>>>> ```scala
>>>>> val settings = EnvironmentSettings.newInstance.build
>>>>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
>>>>> tableEnv.executeSql(SOURCE_DDL)
>>>>> val table = tableEnv.from("kafka_source")
>>>>> ```
>>>>>
>>>>> Then, I convert it into DataStream, and do downstream logic in the
>>>>> `map(e => ...)` part.
>>>>>
>>>>> ```scala
>>>>> tableEnv.toRetractStream[(Long, java.sql.Timestamp,
>>>>> String)](table).map(e => ...)
>>>>> ```
>>>>>
>>>>> Inside the `map(e => ...)` part, I would like to access the column
>>>>> name, in this case, `last_5_clicks`. Why? Because I may have different
>>>>> sources with different columns names (such as `last_10min_page_view`), but
>>>>> I would like to reuse the code in `map(e => ...)`.
>>>>>
>>>>> Is there a way to do this? Thanks.
>>>>>
>>>>> Best,
>>>>> Yik San
>>>>>
>>>>

Reply via email to