Re: org.apache.flink.table.api.ValidationException
Hi, This error occurs when the data type can not be parsed. You could read this part to see more details about the User-Defined Data Types[1]. Best, Hang [1] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/types/#user-defined-data-types 柒朵 <1303809...@qq.com> 于2023年3月29日周三 17:52写道: > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Could not extract a data type from 'class > UserStatus'. Please pass the required data type manually or allow RAW types. > -- > 柒朵 > 1303809...@qq.com > > <https://wx.mail.qq.com/home/index?t=readmail_businesscard_midpage=true=%E6%9F%92%E6%9C%B5=http%3A%2F%2Fthirdqq.qlogo.cn%2Fg%3Fb%3Dsdk%26k%3DheXagJ7coEPyFt1caniaMyg%26s%3D100%26t%3D1644638307%3Frand%3D1646983371=1303809094%40qq.com=> > >
org.apache.flink.table.api.ValidationException
Exceptioninthread"main"org.apache.flink.table.api.ValidationException:Couldnotextractadatatypefrom'class UserStatus'.PleasepasstherequireddatatypemanuallyorallowRAWtypes. 1303809...@qq.com
Re: py4j.protocol.Py4JJavaError: An error occurred while calling o21.select. : org.apache.flink.table.api.ValidationException: Undefined function: createTime
Hi, As far as I know, a TimeWindow does not have the attribute createTime? What is the semantics of createTime you want Best, Xingbo anfeng 于2020年11月17日周二 下午5:26写道: > st_env.from_path("mysource") \ > > > .window(Slide.over("10.minutes").every("1.minutes").on("createTime").alias("w")) > \ > .group_by("w") \ > .select("w.createTime as a, w.start as b, w.end as c, uid.count > as > d") \ > .insert_into("mysink") > > > .select("w.createTime as a, w.start as b, w.end as c, uid.count as d") > \ > File > > "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpyflink.zip/pyflink/table/table.py", > line 784, in select > File > > "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpy4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > File > > "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpyflink.zip/pyflink/util/exceptions.py", > line 147, in deco > File > > "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpy4j-0.10.8.1-src.zip/py4j/protocol.py", > line 328, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o21.select. > : org.apache.flink.table.api.ValidationException: Undefined function: > createTime > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
py4j.protocol.Py4JJavaError: An error occurred while calling o21.select. : org.apache.flink.table.api.ValidationException: Undefined function: createTime
st_env.from_path("mysource") \ .window(Slide.over("10.minutes").every("1.minutes").on("createTime").alias("w")) \ .group_by("w") \ .select("w.createTime as a, w.start as b, w.end as c, uid.count as d") \ .insert_into("mysink") .select("w.createTime as a, w.start as b, w.end as c, uid.count as d") \ File "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpyflink.zip/pyflink/table/table.py", line 784, in select File "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpy4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ File "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpy4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o21.select. : org.apache.flink.table.api.ValidationException: Undefined function: createTime -- Sent from: http://apache-flink.147419.n8.nabble.com/
Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not resolve over call.
I'm learningOfficial document-Over Window Aggregation My pojo is: https://paste.ubuntu.com/p/kPydGd2Cwd/ Completed test code is: https://paste.ubuntu.com/p/KyCVxBz254/ The error I got is: Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not resolve over call. at org.apache.flink.table.expressions.resolver.rules.OverWindowResolverRule$ExpressionResolverVisitor.lambda$visit$0(OverWindowResolverRule.java:72) at java.util.Optional.orElseThrow(Optional.java:290) at org.apache.flink.table.expressions.resolver.rules.OverWindowResolverRule$ExpressionResolverVisitor.visit(OverWindowResolverRule.java:72) at org.apache.flink.table.expressions.resolver.rules.OverWindowResolverRule$ExpressionResolverVisitor.visit(OverWindowResolverRule.java:58) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) at org.apache.flink.table.expressions.resolver.rules.OverWindowResolverRule.lambda$apply$0(OverWindowResolverRule.java:54) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.expressions.resolver.rules.OverWindowResolverRule.apply(OverWindowResolverRule.java:55) at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211) at java.util.function.Function.lambda$andThen$1(Function.java:88) at java.util.function.Function.lambda$andThen$1(Function.java:88) at java.util.function.Function.lambda$andThen$1(Function.java:88) at java.util.function.Function.lambda$andThen$1(Function.java:88) at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178) at org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:191) at org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:170) at org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:963) at OverWindowAggregation.main(OverWindowAggregation.java:39) Could you tell me where I am wrong in my code? Thanks for your help.
Re: blink planner的org.apache.flink.table.api.ValidationException报错
改用最新 master 代码编译(打包后版本 1.11-SNAPSHOT) 将这段 .withSchema(new Schema() //.field("logger_name", Types.STRING) //.field("host", Types.STRING) //.field("@timestamp", Types.SQL_TIMESTAMP) //.field("_rowtime", Types.SQL_TIMESTAMP) //.rowtime( //new Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000)) .field("doc", Types.POJO(Doc.class)) ) 改成使用 DataTypes 后可以跑通 Kevin Liao 于2020年1月14日周二 上午11:52写道: > 我用的是 > https://www.apache.org/dyn/closer.lua/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz > 官网下载的 > > 您说的 master 最新的版本我稍后试一下,谢谢 > > JingsongLee 于2020年1月14日周二 上午11:51写道: > >> 谢谢, >> 你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。 >> >> Best, >> Jingsong Lee >> >> -- >> From:Kevin Liao >> Send Time:2020年1月14日(星期二) 11:38 >> To:user-zh ; JingsongLee < >> lzljs3620...@aliyun.com> >> Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错 >> >> flink 版本是 1.9.1 release >> >> Doc >> 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约 >> 30 多个字段,我理解这跟字段数关系不大 >> >> ``` >> >> import org.apache.commons.lang3.builder.ToStringBuilder; >> import >> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; >> >> /** >> * @author liaoxu Date: 2020/1/13 Time: 12:03 下午. >> */ >> @JsonIgnoreProperties(ignoreUnknown = true) >> public class Doc { >> >> private String suv; >> private Float factor = 1F; >> private String st; >> private String agentId; >> private Long timestamp; >> >> ... // omit some, omit getters and setters >> >> ``` >> >> 希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy) >> >> JingsongLee 于2020年1月14日周二 上午11:25写道: >> Hi Kevin, >> >> 这是什么版本? >> Doc类能完整提供下吗?方便我们复现。 >> >> Best, >> Jingsong Lee >> >> >> -- >> From:Kevin Liao >> Send Time:2020年1月13日(星期一) 17:37 >> To:user-zh >> Subject:blink planner的org.apache.flink.table.api.ValidationException报错 >> >> tEnv.connect(new Kafka() >> .version("universal") >> .topic("xxx") >> .startFromLatest() >> .property("bootstrap.servers", >> "") >> .property("group.id", "")) >> .withFormat(new Json().failOnMissingField(false).deriveSchema()) >> .withSchema(new Schema() >> //.field("logger_name", Types.STRING) >> //.field("host", Types.STRING) >> //.field("@timestamp", Types.SQL_TIMESTAMP) >> //.field("_rowtime", Types.SQL_TIMESTAMP) >> //.rowtime( >> //new >> >> Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000)) >> .field("doc", Types.POJO(Doc.class)) >> ) >> .inAppendMode() >> .registerTableSource("xxx"); >> >> Table result = tEnv.sqlQuery( >> "SELECT doc.xxx1, doc.xxx2, ... , doc.xxxN as seq FROM xxx"); >> >> //result.printSchema(); >> tEnv.toAppendStream(result, >> new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING, >> STRING, STRING, STRING, >> STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING, >> STRING, STRING, STRING, >> STRING, LONG, STRING, INT, STRING, INT)).print(); >> >> >> >> 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下: >> >> >> 、、、 >> >> Exception in thread "main" >> org.apache.flink.table.api.ValidationException: Type >> LEGACY(PojoType) of >> table field 'doc' does not match with type >> PojoType of the field >> 'doc' of the TableSource return type. >> at >> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121) >> at >> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at >&g
Re: blink planner的org.apache.flink.table.api.ValidationException报错
我用的是 https://www.apache.org/dyn/closer.lua/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz 官网下载的 您说的 master 最新的版本我稍后试一下,谢谢 JingsongLee 于2020年1月14日周二 上午11:51写道: > 谢谢, > 你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。 > > Best, > Jingsong Lee > > -- > From:Kevin Liao > Send Time:2020年1月14日(星期二) 11:38 > To:user-zh ; JingsongLee < > lzljs3620...@aliyun.com> > Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错 > > flink 版本是 1.9.1 release > > Doc > 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约 > 30 多个字段,我理解这跟字段数关系不大 > > ``` > > import org.apache.commons.lang3.builder.ToStringBuilder; > import > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; > > /** > * @author liaoxu Date: 2020/1/13 Time: 12:03 下午. > */ > @JsonIgnoreProperties(ignoreUnknown = true) > public class Doc { > > private String suv; > private Float factor = 1F; > private String st; > private String agentId; > private Long timestamp; > > ... // omit some, omit getters and setters > > ``` > > 希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy) > > JingsongLee 于2020年1月14日周二 上午11:25写道: > Hi Kevin, > > 这是什么版本? > Doc类能完整提供下吗?方便我们复现。 > > Best, > Jingsong Lee > > > -- > From:Kevin Liao > Send Time:2020年1月13日(星期一) 17:37 > To:user-zh > Subject:blink planner的org.apache.flink.table.api.ValidationException报错 > > tEnv.connect(new Kafka() > .version("universal") > .topic("xxx") > .startFromLatest() > .property("bootstrap.servers", > "") > .property("group.id", "")) > .withFormat(new Json().failOnMissingField(false).deriveSchema()) > .withSchema(new Schema() > //.field("logger_name", Types.STRING) > //.field("host", Types.STRING) > //.field("@timestamp", Types.SQL_TIMESTAMP) > //.field("_rowtime", Types.SQL_TIMESTAMP) > //.rowtime( > //new > > Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000)) > .field("doc", Types.POJO(Doc.class)) > ) > .inAppendMode() > .registerTableSource("xxx"); > > Table result = tEnv.sqlQuery( > "SELECT doc.xxx1, doc.xxx2, ... , doc.xxxN as seq FROM xxx"); > > //result.printSchema(); > tEnv.toAppendStream(result, > new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING, > STRING, STRING, STRING, > STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING, > STRING, STRING, STRING, > STRING, LONG, STRING, INT, STRING, INT)).print(); > > > > 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下: > > > 、、、 > > Exception in thread "main" > org.apache.flink.table.api.ValidationException: Type > LEGACY(PojoType) of > table field 'doc' does not match with type > PojoType of the field > 'doc' of the TableSource return type. > at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121) > at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) > at > org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55) > at > org.apache.flink.table.planner.pl
Re: blink planner的org.apache.flink.table.api.ValidationException报错
谢谢, 你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。 Best, Jingsong Lee -- From:Kevin Liao Send Time:2020年1月14日(星期二) 11:38 To:user-zh ; JingsongLee Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错 flink 版本是 1.9.1 release Doc 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约 30 多个字段,我理解这跟字段数关系不大 ``` import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; /** * @author liaoxu Date: 2020/1/13 Time: 12:03 下午. */ @JsonIgnoreProperties(ignoreUnknown = true) public class Doc { private String suv; private Float factor = 1F; private String st; private String agentId; private Long timestamp; ... // omit some, omit getters and setters ``` 希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy) JingsongLee 于2020年1月14日周二 上午11:25写道: Hi Kevin, 这是什么版本? Doc类能完整提供下吗?方便我们复现。 Best, Jingsong Lee -- From:Kevin Liao Send Time:2020年1月13日(星期一) 17:37 To:user-zh Subject:blink planner的org.apache.flink.table.api.ValidationException报错 tEnv.connect(new Kafka() .version("universal") .topic("xxx") .startFromLatest() .property("bootstrap.servers", "") .property("group.id", "")) .withFormat(new Json().failOnMissingField(false).deriveSchema()) .withSchema(new Schema() //.field("logger_name", Types.STRING) //.field("host", Types.STRING) //.field("@timestamp", Types.SQL_TIMESTAMP) //.field("_rowtime", Types.SQL_TIMESTAMP) //.rowtime( //new Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000)) .field("doc", Types.POJO(Doc.class)) ) .inAppendMode() .registerTableSource("xxx"); Table result = tEnv.sqlQuery( "SELECT doc.xxx1, doc.xxx2, ... , doc.xxxN as seq FROM xxx"); //result.printSchema(); tEnv.toAppendStream(result, new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING, STRING, STRING, STRING, STRING, LONG, STRING, INT, STRING, INT)).print(); 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下: 、、、 Exception in thread "main" org.apache.flink.table.api.ValidationException: Type LEGACY(PojoType) of table field 'doc' does not match with type PojoType of the field 'doc' of the TableSource return type. at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154) at org.apache.flink.table.planner.plan.nodes.physica
Re: blink planner的org.apache.flink.table.api.ValidationException报错
flink 版本是 1.9.1 release Doc 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约 30 多个字段,我理解这跟字段数关系不大 ``` import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; /** * @author liaoxu Date: 2020/1/13 Time: 12:03 下午. */ @JsonIgnoreProperties(ignoreUnknown = true) public class Doc { private String suv; private Float factor = 1F; private String st; private String agentId; private Long timestamp; ... // omit some, omit getters and setters ``` 希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy) JingsongLee 于2020年1月14日周二 上午11:25写道: > Hi Kevin, > > 这是什么版本? > Doc类能完整提供下吗?方便我们复现。 > > Best, > Jingsong Lee > > > -- > From:Kevin Liao > Send Time:2020年1月13日(星期一) 17:37 > To:user-zh > Subject:blink planner的org.apache.flink.table.api.ValidationException报错 > > tEnv.connect(new Kafka() > .version("universal") > .topic("xxx") > .startFromLatest() > .property("bootstrap.servers", > "") > .property("group.id", "")) > .withFormat(new Json().failOnMissingField(false).deriveSchema()) > .withSchema(new Schema() > //.field("logger_name", Types.STRING) > //.field("host", Types.STRING) > //.field("@timestamp", Types.SQL_TIMESTAMP) > //.field("_rowtime", Types.SQL_TIMESTAMP) > //.rowtime( > //new > > Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000)) > .field("doc", Types.POJO(Doc.class)) > ) > .inAppendMode() > .registerTableSource("xxx"); > > Table result = tEnv.sqlQuery( > "SELECT doc.xxx1, doc.xxx2, ... , doc.xxxN as seq FROM xxx"); > > //result.printSchema(); > tEnv.toAppendStream(result, > new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING, > STRING, STRING, STRING, > STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING, > STRING, STRING, STRING, > STRING, LONG, STRING, INT, STRING, INT)).print(); > > > > 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下: > > > 、、、 > > Exception in thread "main" > org.apache.flink.table.api.ValidationException: Type > LEGACY(PojoType) of > table field 'doc' does not match with type > PojoType of the field > 'doc' of the TableSource return type. > at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121) > at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) > at > org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185) > at > org.apache.flink.table.planner.pla
blink planner的org.apache.flink.table.api.ValidationException报错
tEnv.connect(new Kafka() .version("universal") .topic("xxx") .startFromLatest() .property("bootstrap.servers", "") .property("group.id", "")) .withFormat(new Json().failOnMissingField(false).deriveSchema()) .withSchema(new Schema() //.field("logger_name", Types.STRING) //.field("host", Types.STRING) //.field("@timestamp", Types.SQL_TIMESTAMP) //.field("_rowtime", Types.SQL_TIMESTAMP) //.rowtime( //new Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000)) .field("doc", Types.POJO(Doc.class)) ) .inAppendMode() .registerTableSource("xxx"); Table result = tEnv.sqlQuery( "SELECT doc.xxx1, doc.xxx2, ... , doc.xxxN as seq FROM xxx"); //result.printSchema(); tEnv.toAppendStream(result, new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING, STRING, STRING, STRING, STRING, LONG, STRING, INT, STRING, INT)).print(); 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下: 、、、 Exception in thread "main" org.apache.flink.table.api.ValidationException: Type LEGACY(PojoType) of table field 'doc' does not match with type PojoType of the field 'doc' of the TableSource return type. at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)