Hi Jesse, I think that the type of rowtime you declared on the source schema is DataTypes.Timestamp(), you also use DataTypes.Timestamp() on the sink schema
Best, Xingbo Jesse Lord <jl...@vectra.ai> 于2020年7月15日周三 下午11:41写道: > I am trying to sink the rowtime field in pyflink 1.10. I get the following > error > > > > For the source schema I use > > > > .field("rowtime", DataTypes.TIMESTAMP(2)) > > .rowtime( > > Rowtime() > > .timestamps_from_field("timestamp") > > .watermarks_periodic_ascending() > > ) > > > > To create the rowtime field and have tried variations on > > > > .field("rowtime", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) > > > > In the sink schema. > > > > Trying all of the different types in DataTypes I get essentially the > following error: > > > > py4j.protocol.Py4JJavaError: An error occurred while calling > o56.insertInto. > > : org.apache.flink.table.api.ValidationException: Field types of query > result and registered TableSink > `default_catalog`.`default_database`.`output` do not match. > > Query result schema: [rowtime: LocalDateTime] > > TableSink schema: [rowtime: Timestamp] > > > > > > I know that in Java there is > org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME and the python > documentation lists Types.SQL_TIMESTAMP, but I cannot find the > corresponding type in the python library. Can anyone help point me to the > correct type for the schema? > > > > Thanks, > > Jesse > > > > > > > > > > >