The question is cross-posted on Stack Overflow
https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
.

I want to consume a Kafka topic into a table using Flink SQL, then convert
it back to a DataStream.

Here is the `SOURCE_DDL`:

```
CREATE TABLE kafka_source (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test-group',
    'format' = 'json'
)
```

With Flink, I execute the DDL.

```scala
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
```

Then, I convert it into DataStream, and do downstream logic in the `map(e
=> ...)` part.

```scala
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e
=> ...)
```

Inside the `map(e => ...)` part, I would like to access the column name, in
this case, `last_5_clicks`. Why? Because I may have different sources with
different columns names (such as `last_10min_page_view`), but I would like
to reuse the code in `map(e => ...)`.

Is there a way to do this? Thanks.

Best,
Yik San

Reply via email to