[ 
https://issues.apache.org/jira/browse/FLINK-18548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-18548:
-------------------------------
    Description: 
The reason why Flink does not support computed column in dimension(temporal) 
table is that calcite has limitation with Snapshot node, the computed column is 
a Project node upon TableScan which will hit the limitation.

We can improve calcite to 1.23 to fix this or override *Sql2RelConverter* to 
fix this issue.

 

In Flink 1.10, we bring computed column feature, but I found dimension table do 
not support this feature.

public static final String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
 " currency_id BIGINT,\n" +
 " currency_name STRING,\n" +
 " rate DECIMAL(38, 4),\n" +
 " currency_time TIMESTAMP(3),\n" +
 " country STRING,\n" +
 " timestamp6 TIMESTAMP(6),\n" +
 " currency_next as currency_id + 1,\n" +
 " time6 TIME(6),\n" +
 " gdp DECIMAL(10, 4)\n" +
 ") WITH (\n" +
 " 'connector.type' = 'jdbc',\n" +
 " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
 " 'connector.username' = 'root'," +
 " 'connector.table' = 'currency',\n" +
 " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
 " 'connector.lookup.cache.max-rows' = '500', \n" +
 " 'connector.lookup.cache.ttl' = '10s',\n" +
 " 'connector.lookup.max-retries' = '3'" +
 ")";

 

//

// 
 Exception in thread "main" java.lang.ClassCastException: 
org.apache.calcite.rel.logical.LogicalProject cannot be cast to 
org.apache.calcite.rel.core.TableScanException in thread "main" 
java.lang.ClassCastException: org.apache.calcite.rel.logical.LogicalProject 
cannot be cast to org.apache.calcite.rel.core.TableScan at 
org.apache.calcite.sql2rel.SqlToRelConverter.snapshotTemporalTable(SqlToRelConverter.java:2438)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2062)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2085)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:148)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:523)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:437)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:343)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
 at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
 at job.KafkaJoinJdbc2Jdbc.main(KafkaJoinJdbc2Jdbc.java:59)
 Process finished with exit code 1

 

  was:
In Flink 1.10, we bring computed column feature, but I found dimension table do 
not support this feature.

public static final String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
" currency_id BIGINT,\n" +
" currency_name STRING,\n" +
" rate DECIMAL(38, 4),\n" +
" currency_time TIMESTAMP(3),\n" +
" country STRING,\n" +
" timestamp6 TIMESTAMP(6),\n" +
" currency_next as currency_id + 1,\n" +
" time6 TIME(6),\n" +
" gdp DECIMAL(10, 4)\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
" 'connector.username' = 'root'," +
" 'connector.table' = 'currency',\n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
" 'connector.lookup.cache.max-rows' = '500', \n" +
" 'connector.lookup.cache.ttl' = '10s',\n" +
" 'connector.lookup.max-retries' = '3'" +
")";

 

//

// 
Exception in thread "main" java.lang.ClassCastException: 
org.apache.calcite.rel.logical.LogicalProject cannot be cast to 
org.apache.calcite.rel.core.TableScanException in thread "main" 
java.lang.ClassCastException: org.apache.calcite.rel.logical.LogicalProject 
cannot be cast to org.apache.calcite.rel.core.TableScan at 
org.apache.calcite.sql2rel.SqlToRelConverter.snapshotTemporalTable(SqlToRelConverter.java:2438)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2062)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2085)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:148)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:523)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:437)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:343)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
 at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
 at job.KafkaJoinJdbc2Jdbc.main(KafkaJoinJdbc2Jdbc.java:59)
Process finished with exit code 1

 


> support flexible syntax for Temporal table
> ------------------------------------------
>
>                 Key: FLINK-18548
>                 URL: https://issues.apache.org/jira/browse/FLINK-18548
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.0
>            Reporter: Leonard Xu
>            Assignee: Leonard Xu
>            Priority: Major
>              Labels: pull-request-available
>
> The reason why Flink does not support computed column in dimension(temporal) 
> table is that calcite has limitation with Snapshot node, the computed column 
> is a Project node upon TableScan which will hit the limitation.
> We can improve calcite to 1.23 to fix this or override *Sql2RelConverter* to 
> fix this issue.
>  
> In Flink 1.10, we bring computed column feature, but I found dimension table 
> do not support this feature.
> public static final String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
>  " currency_id BIGINT,\n" +
>  " currency_name STRING,\n" +
>  " rate DECIMAL(38, 4),\n" +
>  " currency_time TIMESTAMP(3),\n" +
>  " country STRING,\n" +
>  " timestamp6 TIMESTAMP(6),\n" +
>  " currency_next as currency_id + 1,\n" +
>  " time6 TIME(6),\n" +
>  " gdp DECIMAL(10, 4)\n" +
>  ") WITH (\n" +
>  " 'connector.type' = 'jdbc',\n" +
>  " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
>  " 'connector.username' = 'root'," +
>  " 'connector.table' = 'currency',\n" +
>  " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
>  " 'connector.lookup.cache.max-rows' = '500', \n" +
>  " 'connector.lookup.cache.ttl' = '10s',\n" +
>  " 'connector.lookup.max-retries' = '3'" +
>  ")";
>  
> //
> // 
>  Exception in thread "main" java.lang.ClassCastException: 
> org.apache.calcite.rel.logical.LogicalProject cannot be cast to 
> org.apache.calcite.rel.core.TableScanException in thread "main" 
> java.lang.ClassCastException: org.apache.calcite.rel.logical.LogicalProject 
> cannot be cast to org.apache.calcite.rel.core.TableScan at 
> org.apache.calcite.sql2rel.SqlToRelConverter.snapshotTemporalTable(SqlToRelConverter.java:2438)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2062)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2085)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:148)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:523)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:437)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:343)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>  at job.KafkaJoinJdbc2Jdbc.main(KafkaJoinJdbc2Jdbc.java:59)
>  Process finished with exit code 1
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to