Re: 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。

2023-10-12 文章 jinzhuguang
感谢大佬!!!

> 2023年10月13日 10:44,tanjialiang  写道:
> 
> Hi, 
> 这个问题已经在1.17.0修复,详细可以看https://issues.apache.org/jira/browse/FLINK-30922
> 
> 
> best wishes,
> tanjialiang.
> 
> 
>  回复的原邮件 
> | 发件人 | jinzhuguang |
> | 发送日期 | 2023年10月13日 10:39 |
> | 收件人 | user-zh |
> | 主题 | 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。 |
> 首先,我的Flink版本为1.16.0
> 为了方便理解,我以Kafka作为案例来描述:
> 我有以下两个表:
> CREATE TABLE orders(
> user_id BIGINT,
> name STRING,
> timestamp TIMESTAMP(3) METADATA VIRTUAL
> )WITH(
> 'connector'='kafka',
> 'topic'='orders',
> 'properties.group.id' = 'test_join_tempral',
> 'scan.startup.mode'='earliest-offset',
> 'properties.bootstrap.servers'='localhost:9092',
> 'format'='json',
> 'json.ignore-parse-errors' = 'true'
> );
> CREATE TABLE kafka_sink(
> user_id BIGINT,
> name STRING,
> timestamp TIMESTAMP(3) METADATA FROM 'timestamp'
> )WITH(
> 'connector'='kafka',
> 'topic'='kafka_sink',
> 'properties.group.id' = 'test_join_tempral',
> 'scan.startup.mode'='earliest-offset',
> 'properties.bootstrap.servers'='localhost:9092',
> 'format'='json',
> 'json.ignore-parse-errors' = 'true'
> );
> 
> 正常情况:
> Flink SQL> insert into kafka_sink select user_id,name,`timestamp` from orders;
> [INFO] Submitting SQL update statement to the cluster...
> [INFO] SQL update statement has been successfully submitted to the cluster:
> Job ID: e419ae9d2cad4c3c2a2c1150c1a86653
> 
> 
> 异常情况:
> Flink SQL> insert into kafka_sink(user_id,name,`timestamp`) select 
> user_id,name,`timestamp` from orders;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.calcite.sql.validate.SqlValidatorException: Unknown target column 
> 'timestamp'
> 很奇怪,为什么指定列名就不行了呢?而且还是识别不到”ts”列,kafka_sink schema如下:
> Flink SQL> describe kafka_sink;
> +---+--+--+-+---+---+
> |  name | type | null | key |extras | 
> watermark |
> +---+--+--+-+---+---+
> |   user_id |   BIGINT | TRUE | |   | 
>   |
> |  name |   STRING | TRUE | |   | 
>   |
> | timestamp | TIMESTAMP(3) | TRUE | | METADATA FROM 'timestamp' | 
>   |
> +---+--+--+-+---+---+
> 
> 
> 
> 恳请解答!



回复:关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。

2023-10-12 文章 tanjialiang
Hi, 
这个问题已经在1.17.0修复,详细可以看https://issues.apache.org/jira/browse/FLINK-30922


best wishes,
tanjialiang.


 回复的原邮件 
| 发件人 | jinzhuguang |
| 发送日期 | 2023年10月13日 10:39 |
| 收件人 | user-zh |
| 主题 | 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。 |
首先,我的Flink版本为1.16.0
为了方便理解,我以Kafka作为案例来描述:
我有以下两个表:
CREATE TABLE orders(
user_id BIGINT,
name STRING,
timestamp TIMESTAMP(3) METADATA VIRTUAL
)WITH(
'connector'='kafka',
'topic'='orders',
'properties.group.id' = 'test_join_tempral',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='localhost:9092',
'format'='json',
'json.ignore-parse-errors' = 'true'
);
CREATE TABLE kafka_sink(
user_id BIGINT,
name STRING,
timestamp TIMESTAMP(3) METADATA FROM 'timestamp'
)WITH(
'connector'='kafka',
'topic'='kafka_sink',
'properties.group.id' = 'test_join_tempral',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='localhost:9092',
'format'='json',
'json.ignore-parse-errors' = 'true'
);

正常情况:
Flink SQL> insert into kafka_sink select user_id,name,`timestamp` from orders;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: e419ae9d2cad4c3c2a2c1150c1a86653


异常情况:
Flink SQL> insert into kafka_sink(user_id,name,`timestamp`) select 
user_id,name,`timestamp` from orders;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Unknown target column 
'timestamp'
很奇怪,为什么指定列名就不行了呢?而且还是识别不到”ts”列,kafka_sink schema如下:
Flink SQL> describe kafka_sink;
+---+--+--+-+---+---+
|  name | type | null | key |extras | watermark 
|
+---+--+--+-+---+---+
|   user_id |   BIGINT | TRUE | |   |   
|
|  name |   STRING | TRUE | |   |   
|
| timestamp | TIMESTAMP(3) | TRUE | | METADATA FROM 'timestamp' |   
|
+---+--+--+-+---+---+



恳请解答!

关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。

2023-10-12 文章 jinzhuguang
首先,我的Flink版本为1.16.0
为了方便理解,我以Kafka作为案例来描述:
我有以下两个表:
CREATE TABLE orders(
user_id BIGINT,
name STRING,
timestamp TIMESTAMP(3) METADATA VIRTUAL
)WITH(
'connector'='kafka',
'topic'='orders',
'properties.group.id' = 'test_join_tempral',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='localhost:9092',
'format'='json',
'json.ignore-parse-errors' = 'true'
);
CREATE TABLE kafka_sink(
user_id BIGINT,
name STRING,
timestamp TIMESTAMP(3) METADATA FROM 'timestamp'
)WITH(
'connector'='kafka',
'topic'='kafka_sink',
'properties.group.id' = 'test_join_tempral',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='localhost:9092',
'format'='json',
'json.ignore-parse-errors' = 'true'
);

正常情况: 
Flink SQL> insert into kafka_sink select user_id,name,`timestamp` from orders;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: e419ae9d2cad4c3c2a2c1150c1a86653


异常情况:
Flink SQL> insert into kafka_sink(user_id,name,`timestamp`) select 
user_id,name,`timestamp` from orders;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Unknown target column 
'timestamp'
很奇怪,为什么指定列名就不行了呢?而且还是识别不到”ts”列,kafka_sink schema如下:
Flink SQL> describe kafka_sink;
+---+--+--+-+---+---+
|  name | type | null | key |extras | watermark 
|
+---+--+--+-+---+---+
|   user_id |   BIGINT | TRUE | |   |   
|
|  name |   STRING | TRUE | |   |   
|
| timestamp | TIMESTAMP(3) | TRUE | | METADATA FROM 'timestamp' |   
|
+---+--+--+-+---+---+



恳请解答!