同学们好!

    flink Table API 中显示支持 Date 类型,实际应用中 报错异常:Caused by: 
org.apache.flink.table.api.TableException: Type is not supported: Date。
 

    应用场景:

        第一步: 从mysql 数据库中读取 Date  型 数据
                new JDBCInputFormat
                        .JDBCInputFormatBuilder()
                        .setRowTypeInfo(
                                new RowTypeInfo(
                                        
TypeInformation[BasicTypeInfo.DATE_TYPE_INFO]
                                )
                        )
                DataStreamSource dss = 
StreamExecutionEnvironment.createInput(JDBCInputFormat)
        第二步:注册为table表
                ((org.apache.flink.table.api.java.BatchTableEnvironment) tEnv) 
.registerOrReplaceBoundedStream(outTableName,dss,outField);
        第三步:查询 outTableName 表中的数据 sink
                Table sample3 = BatchTableEnvironment.sqlQuery(dataSql);
                JDBCAppendTableSink
                        .builder()
                        .setParameterTypes(InternalType[DataTypes.DATE])
                sample3.writeToSink(JDBCAppendTableSink);

    全部异常信息:
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
************
ERR_ID:
     SQL-00120001
CAUSE:
     SQL validation failed:
     Type is not supported: Date
ACTION:
     Please see descriptions above. If it doesn't help, please contact customer 
support for this.
DETAIL:

************
at 
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:103)
at 
org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:1127)
at com.hundsun.futurex.template.JDBCSink.run(JDBCSink.java:64)  指向上面下划线标注的代码
at com.hundsun.futurex.template.WorkFlow.run(WorkFlow.java:35)
at com.hundsun.futurex.ProgramEntry.main(ProgramEntry.java:35)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: 
Date
at 
org.apache.flink.table.calcite.FlinkTypeFactory$.typeInfoToSqlTypeName(FlinkTypeFactory.scala:485)
at 
org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:84)
at 
org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromInternalType(FlinkTypeFactory.scala:71)
at 
org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:281)
at 
org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:275)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:275)
at 
org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:262)
at 
org.apache.flink.table.plan.schema.InlineTable.getRowType(InlineTable.scala:105)
at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:159)
at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
at 
org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
at 
org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:113)
at 
org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:185)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3276)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3255)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3520)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:225)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:657)
at 
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:99)
... 4 more







baiyg25...@hundsun.com

回复