Thank you, Till!

Actually I find I can access this via `Table.getSchema.getFieldNames` in
version 1.12.0

Best,
Yik San

On Wed, Mar 31, 2021 at 4:26 PM Till Rohrmann <trohrm...@apache.org> wrote:

> You are right Yik San. This feature has only been introduced in the
> upcoming 1.13 release [1]. Sorry for causing confusion here. I fear that
> you have to either use 1.13-SNAPSHOT or wait for the 1.13 release which
> should happen in a couple of weeks if you really need this feature.
>
> [1] https://issues.apache.org/jira/browse/FLINK-19981
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 6:33 PM Yik San Chan <evan.chanyik...@gmail.com>
> wrote:
>
>> Hi Till,
>>
>> From the version I am using (1.12.0), getFieldNames is not available in
>> Row ... See
>> https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java
>> .
>>
>> Is there any workaround for this in version 1.12.0? Thanks.
>>
>> Best,
>> Yik San
>>
>> On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> There is a method Row.getFieldNames.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan <evan.chanyik...@gmail.com>
>>> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> I look inside the Row class, it does contain a member `private final
>>>> Object[] fields;` though I wonder how to get column names out of the
>>>> member?
>>>>
>>>> Thanks!
>>>>
>>>> Best,
>>>> Yik San
>>>>
>>>> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Yik San,
>>>>>
>>>>> by converting the rows to a Tuple3 you effectively lose the
>>>>> information about the column names. You could also call
>>>>> `toRetractStream[Row]` which will give you a `DataStream[Row]` where you
>>>>> keep the column names.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <
>>>>> evan.chanyik...@gmail.com> wrote:
>>>>>
>>>>>> The question is cross-posted on Stack Overflow
>>>>>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
>>>>>> .
>>>>>>
>>>>>> I want to consume a Kafka topic into a table using Flink SQL, then
>>>>>> convert it back to a DataStream.
>>>>>>
>>>>>> Here is the `SOURCE_DDL`:
>>>>>>
>>>>>> ```
>>>>>> CREATE TABLE kafka_source (
>>>>>>     user_id BIGINT,
>>>>>>     datetime TIMESTAMP(3),
>>>>>>     last_5_clicks STRING
>>>>>> ) WITH (
>>>>>>     'connector' = 'kafka',
>>>>>>     'topic' = 'aiinfra.fct.userfeature.0',
>>>>>>     'properties.bootstrap.servers' = 'localhost:9092',
>>>>>>     'properties.group.id' = 'test-group',
>>>>>>     'format' = 'json'
>>>>>> )
>>>>>> ```
>>>>>>
>>>>>> With Flink, I execute the DDL.
>>>>>>
>>>>>> ```scala
>>>>>> val settings = EnvironmentSettings.newInstance.build
>>>>>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
>>>>>> tableEnv.executeSql(SOURCE_DDL)
>>>>>> val table = tableEnv.from("kafka_source")
>>>>>> ```
>>>>>>
>>>>>> Then, I convert it into DataStream, and do downstream logic in the
>>>>>> `map(e => ...)` part.
>>>>>>
>>>>>> ```scala
>>>>>> tableEnv.toRetractStream[(Long, java.sql.Timestamp,
>>>>>> String)](table).map(e => ...)
>>>>>> ```
>>>>>>
>>>>>> Inside the `map(e => ...)` part, I would like to access the column
>>>>>> name, in this case, `last_5_clicks`. Why? Because I may have different
>>>>>> sources with different columns names (such as `last_10min_page_view`), 
>>>>>> but
>>>>>> I would like to reuse the code in `map(e => ...)`.
>>>>>>
>>>>>> Is there a way to do this? Thanks.
>>>>>>
>>>>>> Best,
>>>>>> Yik San
>>>>>>
>>>>>

Reply via email to