Re: Flink Table to DataStream: how to access column name?

2021-03-31 Thread Yik San Chan
Thank you, Till!

Actually I find I can access this via `Table.getSchema.getFieldNames` in
version 1.12.0

Best,
Yik San

On Wed, Mar 31, 2021 at 4:26 PM Till Rohrmann  wrote:

> You are right Yik San. This feature has only been introduced in the
> upcoming 1.13 release [1]. Sorry for causing confusion here. I fear that
> you have to either use 1.13-SNAPSHOT or wait for the 1.13 release which
> should happen in a couple of weeks if you really need this feature.
>
> [1] https://issues.apache.org/jira/browse/FLINK-19981
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 6:33 PM Yik San Chan 
> wrote:
>
>> Hi Till,
>>
>> From the version I am using (1.12.0), getFieldNames is not available in
>> Row ... See
>> https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java
>> .
>>
>> Is there any workaround for this in version 1.12.0? Thanks.
>>
>> Best,
>> Yik San
>>
>> On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann 
>> wrote:
>>
>>> There is a method Row.getFieldNames.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan 
>>> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> I look inside the Row class, it does contain a member `private final
>>>> Object[] fields;` though I wonder how to get column names out of the
>>>> member?
>>>>
>>>> Thanks!
>>>>
>>>> Best,
>>>> Yik San
>>>>
>>>> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann 
>>>> wrote:
>>>>
>>>>> Hi Yik San,
>>>>>
>>>>> by converting the rows to a Tuple3 you effectively lose the
>>>>> information about the column names. You could also call
>>>>> `toRetractStream[Row]` which will give you a `DataStream[Row]` where you
>>>>> keep the column names.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <
>>>>> evan.chanyik...@gmail.com> wrote:
>>>>>
>>>>>> The question is cross-posted on Stack Overflow
>>>>>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
>>>>>> .
>>>>>>
>>>>>> I want to consume a Kafka topic into a table using Flink SQL, then
>>>>>> convert it back to a DataStream.
>>>>>>
>>>>>> Here is the `SOURCE_DDL`:
>>>>>>
>>>>>> ```
>>>>>> CREATE TABLE kafka_source (
>>>>>> user_id BIGINT,
>>>>>> datetime TIMESTAMP(3),
>>>>>> last_5_clicks STRING
>>>>>> ) WITH (
>>>>>> 'connector' = 'kafka',
>>>>>> 'topic' = 'aiinfra.fct.userfeature.0',
>>>>>> 'properties.bootstrap.servers' = 'localhost:9092',
>>>>>> 'properties.group.id' = 'test-group',
>>>>>> 'format' = 'json'
>>>>>> )
>>>>>> ```
>>>>>>
>>>>>> With Flink, I execute the DDL.
>>>>>>
>>>>>> ```scala
>>>>>> val settings = EnvironmentSettings.newInstance.build
>>>>>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
>>>>>> tableEnv.executeSql(SOURCE_DDL)
>>>>>> val table = tableEnv.from("kafka_source")
>>>>>> ```
>>>>>>
>>>>>> Then, I convert it into DataStream, and do downstream logic in the
>>>>>> `map(e => ...)` part.
>>>>>>
>>>>>> ```scala
>>>>>> tableEnv.toRetractStream[(Long, java.sql.Timestamp,
>>>>>> String)](table).map(e => ...)
>>>>>> ```
>>>>>>
>>>>>> Inside the `map(e => ...)` part, I would like to access the column
>>>>>> name, in this case, `last_5_clicks`. Why? Because I may have different
>>>>>> sources with different columns names (such as `last_10min_page_view`), 
>>>>>> but
>>>>>> I would like to reuse the code in `map(e => ...)`.
>>>>>>
>>>>>> Is there a way to do this? Thanks.
>>>>>>
>>>>>> Best,
>>>>>> Yik San
>>>>>>
>>>>>


Re: Flink Table to DataStream: how to access column name?

2021-03-31 Thread Till Rohrmann
You are right Yik San. This feature has only been introduced in the
upcoming 1.13 release [1]. Sorry for causing confusion here. I fear that
you have to either use 1.13-SNAPSHOT or wait for the 1.13 release which
should happen in a couple of weeks if you really need this feature.

[1] https://issues.apache.org/jira/browse/FLINK-19981

Cheers,
Till

On Tue, Mar 30, 2021 at 6:33 PM Yik San Chan 
wrote:

> Hi Till,
>
> From the version I am using (1.12.0), getFieldNames is not available in
> Row ... See
> https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java
> .
>
> Is there any workaround for this in version 1.12.0? Thanks.
>
> Best,
> Yik San
>
> On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann 
> wrote:
>
>> There is a method Row.getFieldNames.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan 
>> wrote:
>>
>>> Hi Till,
>>>
>>> I look inside the Row class, it does contain a member `private final
>>> Object[] fields;` though I wonder how to get column names out of the
>>> member?
>>>
>>> Thanks!
>>>
>>> Best,
>>> Yik San
>>>
>>> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Yik San,
>>>>
>>>> by converting the rows to a Tuple3 you effectively lose the information
>>>> about the column names. You could also call `toRetractStream[Row]` which
>>>> will give you a `DataStream[Row]` where you keep the column names.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan 
>>>> wrote:
>>>>
>>>>> The question is cross-posted on Stack Overflow
>>>>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
>>>>> .
>>>>>
>>>>> I want to consume a Kafka topic into a table using Flink SQL, then
>>>>> convert it back to a DataStream.
>>>>>
>>>>> Here is the `SOURCE_DDL`:
>>>>>
>>>>> ```
>>>>> CREATE TABLE kafka_source (
>>>>> user_id BIGINT,
>>>>> datetime TIMESTAMP(3),
>>>>> last_5_clicks STRING
>>>>> ) WITH (
>>>>> 'connector' = 'kafka',
>>>>> 'topic' = 'aiinfra.fct.userfeature.0',
>>>>> 'properties.bootstrap.servers' = 'localhost:9092',
>>>>> 'properties.group.id' = 'test-group',
>>>>> 'format' = 'json'
>>>>> )
>>>>> ```
>>>>>
>>>>> With Flink, I execute the DDL.
>>>>>
>>>>> ```scala
>>>>> val settings = EnvironmentSettings.newInstance.build
>>>>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
>>>>> tableEnv.executeSql(SOURCE_DDL)
>>>>> val table = tableEnv.from("kafka_source")
>>>>> ```
>>>>>
>>>>> Then, I convert it into DataStream, and do downstream logic in the
>>>>> `map(e => ...)` part.
>>>>>
>>>>> ```scala
>>>>> tableEnv.toRetractStream[(Long, java.sql.Timestamp,
>>>>> String)](table).map(e => ...)
>>>>> ```
>>>>>
>>>>> Inside the `map(e => ...)` part, I would like to access the column
>>>>> name, in this case, `last_5_clicks`. Why? Because I may have different
>>>>> sources with different columns names (such as `last_10min_page_view`), but
>>>>> I would like to reuse the code in `map(e => ...)`.
>>>>>
>>>>> Is there a way to do this? Thanks.
>>>>>
>>>>> Best,
>>>>> Yik San
>>>>>
>>>>


Re: Flink Table to DataStream: how to access column name?

2021-03-30 Thread Yik San Chan
Hi Till,

>From the version I am using (1.12.0), getFieldNames is not available in Row
... See
https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java
.

Is there any workaround for this in version 1.12.0? Thanks.

Best,
Yik San

On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann  wrote:

> There is a method Row.getFieldNames.
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan 
> wrote:
>
>> Hi Till,
>>
>> I look inside the Row class, it does contain a member `private final
>> Object[] fields;` though I wonder how to get column names out of the
>> member?
>>
>> Thanks!
>>
>> Best,
>> Yik San
>>
>> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Yik San,
>>>
>>> by converting the rows to a Tuple3 you effectively lose the information
>>> about the column names. You could also call `toRetractStream[Row]` which
>>> will give you a `DataStream[Row]` where you keep the column names.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan 
>>> wrote:
>>>
>>>> The question is cross-posted on Stack Overflow
>>>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
>>>> .
>>>>
>>>> I want to consume a Kafka topic into a table using Flink SQL, then
>>>> convert it back to a DataStream.
>>>>
>>>> Here is the `SOURCE_DDL`:
>>>>
>>>> ```
>>>> CREATE TABLE kafka_source (
>>>> user_id BIGINT,
>>>> datetime TIMESTAMP(3),
>>>> last_5_clicks STRING
>>>> ) WITH (
>>>> 'connector' = 'kafka',
>>>> 'topic' = 'aiinfra.fct.userfeature.0',
>>>> 'properties.bootstrap.servers' = 'localhost:9092',
>>>> 'properties.group.id' = 'test-group',
>>>> 'format' = 'json'
>>>> )
>>>> ```
>>>>
>>>> With Flink, I execute the DDL.
>>>>
>>>> ```scala
>>>> val settings = EnvironmentSettings.newInstance.build
>>>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
>>>> tableEnv.executeSql(SOURCE_DDL)
>>>> val table = tableEnv.from("kafka_source")
>>>> ```
>>>>
>>>> Then, I convert it into DataStream, and do downstream logic in the
>>>> `map(e => ...)` part.
>>>>
>>>> ```scala
>>>> tableEnv.toRetractStream[(Long, java.sql.Timestamp,
>>>> String)](table).map(e => ...)
>>>> ```
>>>>
>>>> Inside the `map(e => ...)` part, I would like to access the column
>>>> name, in this case, `last_5_clicks`. Why? Because I may have different
>>>> sources with different columns names (such as `last_10min_page_view`), but
>>>> I would like to reuse the code in `map(e => ...)`.
>>>>
>>>> Is there a way to do this? Thanks.
>>>>
>>>> Best,
>>>> Yik San
>>>>
>>>


Re: Flink Table to DataStream: how to access column name?

2021-03-30 Thread Till Rohrmann
There is a method Row.getFieldNames.

Cheers,
Till

On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan 
wrote:

> Hi Till,
>
> I look inside the Row class, it does contain a member `private final
> Object[] fields;` though I wonder how to get column names out of the
> member?
>
> Thanks!
>
> Best,
> Yik San
>
> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann 
> wrote:
>
>> Hi Yik San,
>>
>> by converting the rows to a Tuple3 you effectively lose the information
>> about the column names. You could also call `toRetractStream[Row]` which
>> will give you a `DataStream[Row]` where you keep the column names.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan 
>> wrote:
>>
>>> The question is cross-posted on Stack Overflow
>>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
>>> .
>>>
>>> I want to consume a Kafka topic into a table using Flink SQL, then
>>> convert it back to a DataStream.
>>>
>>> Here is the `SOURCE_DDL`:
>>>
>>> ```
>>> CREATE TABLE kafka_source (
>>> user_id BIGINT,
>>> datetime TIMESTAMP(3),
>>> last_5_clicks STRING
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = 'aiinfra.fct.userfeature.0',
>>> 'properties.bootstrap.servers' = 'localhost:9092',
>>> 'properties.group.id' = 'test-group',
>>> 'format' = 'json'
>>> )
>>> ```
>>>
>>> With Flink, I execute the DDL.
>>>
>>> ```scala
>>> val settings = EnvironmentSettings.newInstance.build
>>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
>>> tableEnv.executeSql(SOURCE_DDL)
>>> val table = tableEnv.from("kafka_source")
>>> ```
>>>
>>> Then, I convert it into DataStream, and do downstream logic in the
>>> `map(e => ...)` part.
>>>
>>> ```scala
>>> tableEnv.toRetractStream[(Long, java.sql.Timestamp,
>>> String)](table).map(e => ...)
>>> ```
>>>
>>> Inside the `map(e => ...)` part, I would like to access the column name,
>>> in this case, `last_5_clicks`. Why? Because I may have different sources
>>> with different columns names (such as `last_10min_page_view`), but I would
>>> like to reuse the code in `map(e => ...)`.
>>>
>>> Is there a way to do this? Thanks.
>>>
>>> Best,
>>> Yik San
>>>
>>


Re: Flink Table to DataStream: how to access column name?

2021-03-30 Thread Yik San Chan
Hi Till,

I look inside the Row class, it does contain a member `private final Object[]
fields;` though I wonder how to get column names out of the member?

Thanks!

Best,
Yik San

On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann  wrote:

> Hi Yik San,
>
> by converting the rows to a Tuple3 you effectively lose the information
> about the column names. You could also call `toRetractStream[Row]` which
> will give you a `DataStream[Row]` where you keep the column names.
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan 
> wrote:
>
>> The question is cross-posted on Stack Overflow
>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
>> .
>>
>> I want to consume a Kafka topic into a table using Flink SQL, then
>> convert it back to a DataStream.
>>
>> Here is the `SOURCE_DDL`:
>>
>> ```
>> CREATE TABLE kafka_source (
>> user_id BIGINT,
>> datetime TIMESTAMP(3),
>> last_5_clicks STRING
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'aiinfra.fct.userfeature.0',
>> 'properties.bootstrap.servers' = 'localhost:9092',
>> 'properties.group.id' = 'test-group',
>> 'format' = 'json'
>> )
>> ```
>>
>> With Flink, I execute the DDL.
>>
>> ```scala
>> val settings = EnvironmentSettings.newInstance.build
>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
>> tableEnv.executeSql(SOURCE_DDL)
>> val table = tableEnv.from("kafka_source")
>> ```
>>
>> Then, I convert it into DataStream, and do downstream logic in the `map(e
>> => ...)` part.
>>
>> ```scala
>> tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e
>> => ...)
>> ```
>>
>> Inside the `map(e => ...)` part, I would like to access the column name,
>> in this case, `last_5_clicks`. Why? Because I may have different sources
>> with different columns names (such as `last_10min_page_view`), but I would
>> like to reuse the code in `map(e => ...)`.
>>
>> Is there a way to do this? Thanks.
>>
>> Best,
>> Yik San
>>
>


Re: Flink Table to DataStream: how to access column name?

2021-03-30 Thread Till Rohrmann
Hi Yik San,

by converting the rows to a Tuple3 you effectively lose the information
about the column names. You could also call `toRetractStream[Row]` which
will give you a `DataStream[Row]` where you keep the column names.

Cheers,
Till

On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan 
wrote:

> The question is cross-posted on Stack Overflow
> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
> .
>
> I want to consume a Kafka topic into a table using Flink SQL, then convert
> it back to a DataStream.
>
> Here is the `SOURCE_DDL`:
>
> ```
> CREATE TABLE kafka_source (
> user_id BIGINT,
> datetime TIMESTAMP(3),
> last_5_clicks STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'aiinfra.fct.userfeature.0',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'test-group',
> 'format' = 'json'
> )
> ```
>
> With Flink, I execute the DDL.
>
> ```scala
> val settings = EnvironmentSettings.newInstance.build
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
> tableEnv.executeSql(SOURCE_DDL)
> val table = tableEnv.from("kafka_source")
> ```
>
> Then, I convert it into DataStream, and do downstream logic in the `map(e
> => ...)` part.
>
> ```scala
> tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e
> => ...)
> ```
>
> Inside the `map(e => ...)` part, I would like to access the column name,
> in this case, `last_5_clicks`. Why? Because I may have different sources
> with different columns names (such as `last_10min_page_view`), but I would
> like to reuse the code in `map(e => ...)`.
>
> Is there a way to do this? Thanks.
>
> Best,
> Yik San
>


Flink Table to DataStream: how to access column name?

2021-03-30 Thread Yik San Chan
The question is cross-posted on Stack Overflow
https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
.

I want to consume a Kafka topic into a table using Flink SQL, then convert
it back to a DataStream.

Here is the `SOURCE_DDL`:

```
CREATE TABLE kafka_source (
user_id BIGINT,
datetime TIMESTAMP(3),
last_5_clicks STRING
) WITH (
'connector' = 'kafka',
'topic' = 'aiinfra.fct.userfeature.0',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'test-group',
'format' = 'json'
)
```

With Flink, I execute the DDL.

```scala
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
```

Then, I convert it into DataStream, and do downstream logic in the `map(e
=> ...)` part.

```scala
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e
=> ...)
```

Inside the `map(e => ...)` part, I would like to access the column name, in
this case, `last_5_clicks`. Why? Because I may have different sources with
different columns names (such as `last_10min_page_view`), but I would like
to reuse the code in `map(e => ...)`.

Is there a way to do this? Thanks.

Best,
Yik San