[ 
https://issues.apache.org/jira/browse/FLINK-16345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-16345:
----------------------------
    Fix Version/s: 1.11.0

> Computed column can not refer time attribute column 
> ----------------------------------------------------
>
>                 Key: FLINK-16345
>                 URL: https://issues.apache.org/jira/browse/FLINK-16345
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.10.0
>            Reporter: Leonard Xu
>            Priority: Major
>             Fix For: 1.10.1, 1.11.0
>
>
> If a computed column refer a time attribute column, computed column will lose 
>  time attribute and cause validation fail.
> {code:java}
> CREATE TABLE orders (
>   order_id STRING,
>   order_time TIMESTAMP(3),
>   amount DOUBLE,
>   amount_kg as amount * 1000,
>   // can not select computed column standard_ts which from column order_time 
> that used as WATERMARK
>   standard_ts as order_time + INTERVAL '8' HOUR,
>   WATERMARK FOR order_time AS order_time
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = '0.10',
>   'connector.topic' = 'flink_orders',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'json',
>   'format.derive-schema' = 'true'
> );
> {code}
> The query `select amount_kg from orders` runs normally,  
> the` he query `select standard_ts from orders` throws a validation exception 
> message as following:
> {noformat}
> [ERROR] Could not execute SQL statement. Reason:
>  java.lang.AssertionError: Conversion to relational algebra failed to 
> preserve datatypes:
>  validated type:
>  RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME 
> ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIMESTAMP(3) 
> ts) NOT NULL
>  converted type:
>  RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME 
> ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIME 
> ATTRIBUTE(ROWTIME) ts) NOT NULL
>  rel:
>  LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[$3], 
> ts=[$4])
>  LogicalWatermarkAssigner(rowtime=[order_time], watermark=[$1])
>  LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[*($2, 
> 1000)], ts=[+($1, 28800000:INTERVAL HOUR)])
>  LogicalTableScan(table=[[default_catalog, default_database, orders, source: 
> [Kafka010TableSource(order_id, order_time, amount)]]])
>  {noformat}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to