同学们好! flink Table API 中显示支持 Date 类型,实际应用中 报错异常:Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date。
应用场景: 第一步: 从mysql 数据库中读取 Date 型 数据 new JDBCInputFormat .JDBCInputFormatBuilder() .setRowTypeInfo( new RowTypeInfo( TypeInformation[BasicTypeInfo.DATE_TYPE_INFO] ) ) DataStreamSource dss = StreamExecutionEnvironment.createInput(JDBCInputFormat) 第二步:注册为table表 ((org.apache.flink.table.api.java.BatchTableEnvironment) tEnv) .registerOrReplaceBoundedStream(outTableName,dss,outField); 第三步:查询 outTableName 表中的数据 sink Table sample3 = BatchTableEnvironment.sqlQuery(dataSql); JDBCAppendTableSink .builder() .setParameterTypes(InternalType[DataTypes.DATE]) sample3.writeToSink(JDBCAppendTableSink); 全部异常信息: Exception in thread "main" org.apache.flink.table.api.ValidationException: ************ ERR_ID: SQL-00120001 CAUSE: SQL validation failed: Type is not supported: Date ACTION: Please see descriptions above. If it doesn't help, please contact customer support for this. DETAIL: ************ at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:103) at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:1127) at com.hundsun.futurex.template.JDBCSink.run(JDBCSink.java:64) 指向上面下划线标注的代码 at com.hundsun.futurex.template.WorkFlow.run(WorkFlow.java:35) at com.hundsun.futurex.ProgramEntry.main(ProgramEntry.java:35) Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date at org.apache.flink.table.calcite.FlinkTypeFactory$.typeInfoToSqlTypeName(FlinkTypeFactory.scala:485) at org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:84) at org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromInternalType(FlinkTypeFactory.scala:71) at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:281) at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildRelDataType$2.apply(FlinkTypeFactory.scala:275) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:275) at org.apache.flink.table.calcite.FlinkTypeFactory.buildRelDataType(FlinkTypeFactory.scala:262) at org.apache.flink.table.plan.schema.InlineTable.getRowType(InlineTable.scala:105) at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:159) at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99) at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203) at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:113) at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:185) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3276) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3255) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3520) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1012) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:972) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:225) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:657) at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:99) ... 4 more baiyg25...@hundsun.com