[ 
https://issues.apache.org/jira/browse/FLINK-16345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17062254#comment-17062254
 ] 

Danny Chen commented on FLINK-16345:
------------------------------------

Let me re-represent the cause of the issue in detail:

Assumes we have a DDL

{code:sql}
create table t(
  a timestamp,
  b a - INTERVAL '5' HOUR,
  WATERMARK FOR a AS a
)
{code}

In SqlToOperationConverter, during TableSchema generation, when we do a type 
inference for column b, we actually recard column a as a normal
timestamp type column, so the result type of b is also a normal timestamp.[A] 
So the generated schema is:

--
  +- a: TIMESTAMP
  +- b as a - INTERVAL '5' HOUR: TIMESTAMP

Then we generates the CatalogSchemaTable and patched up time attributes for the 
row type ROW(a: TIMESTAMP, b:TIMESTAMP),
based on the watermark definition, we think that a is a rowtime, so the row 
type becomes: ROW(a: RowTimeType, b:TIMESTAMP).

Then in CatalogSourceTable, we generates the node for column b with expression 
"a - INTERVAL '5' HOUR" with patched row type ROW(a: RowTimeType, b:TIMESTAMP), 
this time we have a inference that the result type is a rowtype.[B]

Basically i think that:
- CatalogSchemaTable and CatalogSourceTable all have correct patched row type 
as an unexpanded logical table;
- There is no need to do any type erase because 1: There is alreay a tool to do 
that RelTimeIndicatorConverter(although it does nothing to TableScan) 2: if we 
want to erase, why we generates it firstly;

The root cause is that logic [A] and [B] are in-consistent while actually they 
have the same logic. Erase the time attributes may work for this case but seems 
hacky.

> Computed column can not refer time attribute column 
> ----------------------------------------------------
>
>                 Key: FLINK-16345
>                 URL: https://issues.apache.org/jira/browse/FLINK-16345
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.10.0
>            Reporter: Leonard Xu
>            Assignee: Jark Wu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.10.1, 1.11.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> If a computed column refer a time attribute column, computed column will lose 
>  time attribute and cause validation fail.
> {code:java}
> CREATE TABLE orders (
>   order_id STRING,
>   order_time TIMESTAMP(3),
>   amount DOUBLE,
>   amount_kg as amount * 1000,
>   // can not select computed column standard_ts which from column order_time 
> that used as WATERMARK
>   standard_ts as order_time + INTERVAL '8' HOUR,
>   WATERMARK FOR order_time AS order_time
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = '0.10',
>   'connector.topic' = 'flink_orders',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'json',
>   'format.derive-schema' = 'true'
> );
> {code}
> The query `select amount_kg from orders` runs normally,  
> the` he query `select standard_ts from orders` throws a validation exception 
> message as following:
> {noformat}
> [ERROR] Could not execute SQL statement. Reason:
>  java.lang.AssertionError: Conversion to relational algebra failed to 
> preserve datatypes:
>  validated type:
>  RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME 
> ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIMESTAMP(3) 
> ts) NOT NULL
>  converted type:
>  RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME 
> ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIME 
> ATTRIBUTE(ROWTIME) ts) NOT NULL
>  rel:
>  LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[$3], 
> ts=[$4])
>  LogicalWatermarkAssigner(rowtime=[order_time], watermark=[$1])
>  LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[*($2, 
> 1000)], ts=[+($1, 28800000:INTERVAL HOUR)])
>  LogicalTableScan(table=[[default_catalog, default_database, orders, source: 
> [Kafka010TableSource(order_id, order_time, amount)]]])
>  {noformat}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to