我用的是 https://www.apache.org/dyn/closer.lua/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz 官网下载的
您说的 master 最新的版本我稍后试一下,谢谢 JingsongLee <lzljs3620...@aliyun.com> 于2020年1月14日周二 上午11:51写道: > 谢谢, > 你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。 > > Best, > Jingsong Lee > > ------------------------------------------------------------------ > From:Kevin Liao <lia...@gmail.com> > Send Time:2020年1月14日(星期二) 11:38 > To:user-zh <user-zh@flink.apache.org>; JingsongLee < > lzljs3620...@aliyun.com> > Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错 > > flink 版本是 1.9.1 release > > Doc > 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约 > 30 多个字段,我理解这跟字段数关系不大 > > ``` > > import org.apache.commons.lang3.builder.ToStringBuilder; > import > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; > > /** > * @author liaoxu Date: 2020/1/13 Time: 12:03 下午. > */ > @JsonIgnoreProperties(ignoreUnknown = true) > public class Doc { > > private String suv; > private Float factor = 1F; > private String st; > private String agentId; > private Long timestamp; > > ... // omit some, omit getters and setters > > ``` > > 希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy) > > JingsongLee <lzljs3620...@aliyun.com.invalid> 于2020年1月14日周二 上午11:25写道: > Hi Kevin, > > 这是什么版本? > Doc类能完整提供下吗?方便我们复现。 > > Best, > Jingsong Lee > > > ------------------------------------------------------------------ > From:Kevin Liao <lia...@gmail.com> > Send Time:2020年1月13日(星期一) 17:37 > To:user-zh <user-zh@flink.apache.org> > Subject:blink planner的org.apache.flink.table.api.ValidationException报错 > > tEnv.connect(new Kafka() > .version("universal") > .topic("xxx") > .startFromLatest() > .property("bootstrap.servers", > "xxxx") > .property("group.id", "xxxx")) > .withFormat(new Json().failOnMissingField(false).deriveSchema()) > .withSchema(new Schema() > // .field("logger_name", Types.STRING) > // .field("host", Types.STRING) > // .field("@timestamp", Types.SQL_TIMESTAMP) > // .field("_rowtime", Types.SQL_TIMESTAMP) > // .rowtime( > // new > > Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000)) > .field("doc", Types.POJO(Doc.class)) > ) > .inAppendMode() > .registerTableSource("xxx"); > > Table result = tEnv.sqlQuery( > "SELECT doc.xxx1, doc.xxx2, ... , doc.xxxN as seq FROM xxx"); > > // result.printSchema(); > tEnv.toAppendStream(result, > new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING, > STRING, STRING, STRING, > STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING, > STRING, STRING, STRING, > STRING, LONG, STRING, INT, STRING, INT)).print(); > > > > 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下: > > > 、、、 > > Exception in thread "main" > org.apache.flink.table.api.ValidationException: Type > LEGACY(PojoType<com.sogou.qidian.Doc, fields = [(。。。。// 省略)]>) of > table field 'doc' does not match with type > PojoType<com.sogou.qidian.Doc, fields = [(。。。。//省略)]> of the field > 'doc' of the TableSource return type. > at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121) > at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) > at > org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227) > at com.sogou.qidian.BatchJob.main(BatchJob.java:83) > > Execution failed for task ':BatchJob.main()'. > > Process 'command > '/Users/liaoxu/.sdkman/candidates/java/current/bin/java'' finished with > non-zero exit value 1 > > > 、、、 > > > 仔细比对了报错日志里两个 Doc类型是相同的 > > > 谢谢 > >