Re: 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。
感谢大佬!!! > 2023年10月13日 10:44,tanjialiang 写道: > > Hi, > 这个问题已经在1.17.0修复,详细可以看https://issues.apache.org/jira/browse/FLINK-30922 > > > best wishes, > tanjialiang. > > > 回复的原邮件 > | 发件人 | jinzhuguang | > | 发送日期 | 2023年10月13日 10:39 | > | 收件人 | user-zh | > | 主题 | 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。 | > 首先,我的Flink版本为1.16.0 > 为了方便理解,我以Kafka作为案例来描述: > 我有以下两个表: > CREATE TABLE orders( > user_id BIGINT, > name STRING, > timestamp TIMESTAMP(3) METADATA VIRTUAL > )WITH( > 'connector'='kafka', > 'topic'='orders', > 'properties.group.id' = 'test_join_tempral', > 'scan.startup.mode'='earliest-offset', > 'properties.bootstrap.servers'='localhost:9092', > 'format'='json', > 'json.ignore-parse-errors' = 'true' > ); > CREATE TABLE kafka_sink( > user_id BIGINT, > name STRING, > timestamp TIMESTAMP(3) METADATA FROM 'timestamp' > )WITH( > 'connector'='kafka', > 'topic'='kafka_sink', > 'properties.group.id' = 'test_join_tempral', > 'scan.startup.mode'='earliest-offset', > 'properties.bootstrap.servers'='localhost:9092', > 'format'='json', > 'json.ignore-parse-errors' = 'true' > ); > > 正常情况: > Flink SQL> insert into kafka_sink select user_id,name,`timestamp` from orders; > [INFO] Submitting SQL update statement to the cluster... > [INFO] SQL update statement has been successfully submitted to the cluster: > Job ID: e419ae9d2cad4c3c2a2c1150c1a86653 > > > 异常情况: > Flink SQL> insert into kafka_sink(user_id,name,`timestamp`) select > user_id,name,`timestamp` from orders; > [ERROR] Could not execute SQL statement. Reason: > org.apache.calcite.sql.validate.SqlValidatorException: Unknown target column > 'timestamp' > 很奇怪,为什么指定列名就不行了呢?而且还是识别不到”ts”列,kafka_sink schema如下: > Flink SQL> describe kafka_sink; > +---+--+--+-+---+---+ > | name | type | null | key |extras | > watermark | > +---+--+--+-+---+---+ > | user_id | BIGINT | TRUE | | | > | > | name | STRING | TRUE | | | > | > | timestamp | TIMESTAMP(3) | TRUE | | METADATA FROM 'timestamp' | > | > +---+--+--+-+---+---+ > > > > 恳请解答!
回复:关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。
Hi, 这个问题已经在1.17.0修复,详细可以看https://issues.apache.org/jira/browse/FLINK-30922 best wishes, tanjialiang. 回复的原邮件 | 发件人 | jinzhuguang | | 发送日期 | 2023年10月13日 10:39 | | 收件人 | user-zh | | 主题 | 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。 | 首先,我的Flink版本为1.16.0 为了方便理解,我以Kafka作为案例来描述: 我有以下两个表: CREATE TABLE orders( user_id BIGINT, name STRING, timestamp TIMESTAMP(3) METADATA VIRTUAL )WITH( 'connector'='kafka', 'topic'='orders', 'properties.group.id' = 'test_join_tempral', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='localhost:9092', 'format'='json', 'json.ignore-parse-errors' = 'true' ); CREATE TABLE kafka_sink( user_id BIGINT, name STRING, timestamp TIMESTAMP(3) METADATA FROM 'timestamp' )WITH( 'connector'='kafka', 'topic'='kafka_sink', 'properties.group.id' = 'test_join_tempral', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='localhost:9092', 'format'='json', 'json.ignore-parse-errors' = 'true' ); 正常情况: Flink SQL> insert into kafka_sink select user_id,name,`timestamp` from orders; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: e419ae9d2cad4c3c2a2c1150c1a86653 异常情况: Flink SQL> insert into kafka_sink(user_id,name,`timestamp`) select user_id,name,`timestamp` from orders; [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Unknown target column 'timestamp' 很奇怪,为什么指定列名就不行了呢?而且还是识别不到”ts”列,kafka_sink schema如下: Flink SQL> describe kafka_sink; +---+--+--+-+---+---+ | name | type | null | key |extras | watermark | +---+--+--+-+---+---+ | user_id | BIGINT | TRUE | | | | | name | STRING | TRUE | | | | | timestamp | TIMESTAMP(3) | TRUE | | METADATA FROM 'timestamp' | | +---+--+--+-+---+---+ 恳请解答!
关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。
首先,我的Flink版本为1.16.0 为了方便理解,我以Kafka作为案例来描述: 我有以下两个表: CREATE TABLE orders( user_id BIGINT, name STRING, timestamp TIMESTAMP(3) METADATA VIRTUAL )WITH( 'connector'='kafka', 'topic'='orders', 'properties.group.id' = 'test_join_tempral', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='localhost:9092', 'format'='json', 'json.ignore-parse-errors' = 'true' ); CREATE TABLE kafka_sink( user_id BIGINT, name STRING, timestamp TIMESTAMP(3) METADATA FROM 'timestamp' )WITH( 'connector'='kafka', 'topic'='kafka_sink', 'properties.group.id' = 'test_join_tempral', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='localhost:9092', 'format'='json', 'json.ignore-parse-errors' = 'true' ); 正常情况: Flink SQL> insert into kafka_sink select user_id,name,`timestamp` from orders; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: e419ae9d2cad4c3c2a2c1150c1a86653 异常情况: Flink SQL> insert into kafka_sink(user_id,name,`timestamp`) select user_id,name,`timestamp` from orders; [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Unknown target column 'timestamp' 很奇怪,为什么指定列名就不行了呢?而且还是识别不到”ts”列,kafka_sink schema如下: Flink SQL> describe kafka_sink; +---+--+--+-+---+---+ | name | type | null | key |extras | watermark | +---+--+--+-+---+---+ | user_id | BIGINT | TRUE | | | | | name | STRING | TRUE | | | | | timestamp | TIMESTAMP(3) | TRUE | | METADATA FROM 'timestamp' | | +---+--+--+-+---+---+ 恳请解答!