Re: org.apache.flink.table.api.ValidationException

2023-03-29 Thread Hang Ruan
Hi,

This error occurs when the data type can not be parsed. You could read this
part to see more details about the User-Defined Data Types[1].

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/types/#user-defined-data-types


柒朵 <1303809...@qq.com> 于2023年3月29日周三 17:52写道:

>
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Could not extract a data type from 'class
> UserStatus'. Please pass the required data type manually or allow RAW types.
> --
> 柒朵
> 1303809...@qq.com
>
> <https://wx.mail.qq.com/home/index?t=readmail_businesscard_midpage=true=%E6%9F%92%E6%9C%B5=http%3A%2F%2Fthirdqq.qlogo.cn%2Fg%3Fb%3Dsdk%26k%3DheXagJ7coEPyFt1caniaMyg%26s%3D100%26t%3D1644638307%3Frand%3D1646983371=1303809094%40qq.com=>
>
>


org.apache.flink.table.api.ValidationException

2023-03-29 Thread ????
Exceptioninthread"main"org.apache.flink.table.api.ValidationException:Couldnotextractadatatypefrom'class
 
UserStatus'.PleasepasstherequireddatatypemanuallyorallowRAWtypes.



1303809...@qq.com





Re: py4j.protocol.Py4JJavaError: An error occurred while calling o21.select. : org.apache.flink.table.api.ValidationException: Undefined function: createTime

2020-11-17 Thread Xingbo Huang
Hi,

As far as I know, a TimeWindow does not have the attribute createTime? What
is the semantics of createTime you want

Best,
Xingbo

anfeng  于2020年11月17日周二 下午5:26写道:

> st_env.from_path("mysource") \
>
>
> .window(Slide.over("10.minutes").every("1.minutes").on("createTime").alias("w"))
> \
>   .group_by("w") \
>   .select("w.createTime as a, w.start as b, w.end as c, uid.count
> as
> d") \
>   .insert_into("mysink")
>
>
> .select("w.createTime as a, w.start as b, w.end as c, uid.count as d")
> \
>   File
>
> "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpyflink.zip/pyflink/table/table.py",
> line 784, in select
>   File
>
> "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpy4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>   File
>
> "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>   File
>
> "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpy4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o21.select.
> : org.apache.flink.table.api.ValidationException: Undefined function:
> createTime
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


py4j.protocol.Py4JJavaError: An error occurred while calling o21.select. : org.apache.flink.table.api.ValidationException: Undefined function: createTime

2020-11-17 Thread anfeng
st_env.from_path("mysource") \
 
.window(Slide.over("10.minutes").every("1.minutes").on("createTime").alias("w"))
\
  .group_by("w") \
  .select("w.createTime as a, w.start as b, w.end as c, uid.count as
d") \
  .insert_into("mysink")


.select("w.createTime as a, w.start as b, w.end as c, uid.count as d") \
  File
"/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpyflink.zip/pyflink/table/table.py",
line 784, in select
  File
"/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpy4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
  File
"/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
  File
"/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpy4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o21.select.
: org.apache.flink.table.api.ValidationException: Undefined function:
createTime





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not resolve over call.

2020-10-21 Thread ??????
I'm learningOfficial document-Over Window Aggregation


My pojo is:
https://paste.ubuntu.com/p/kPydGd2Cwd/




Completed test code is:
https://paste.ubuntu.com/p/KyCVxBz254/


The error I got is:


Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Could not resolve over call.
at 
org.apache.flink.table.expressions.resolver.rules.OverWindowResolverRule$ExpressionResolverVisitor.lambda$visit$0(OverWindowResolverRule.java:72)
at java.util.Optional.orElseThrow(Optional.java:290)
at 
org.apache.flink.table.expressions.resolver.rules.OverWindowResolverRule$ExpressionResolverVisitor.visit(OverWindowResolverRule.java:72)
at 
org.apache.flink.table.expressions.resolver.rules.OverWindowResolverRule$ExpressionResolverVisitor.visit(OverWindowResolverRule.java:58)
at 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at 
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at 
org.apache.flink.table.expressions.resolver.rules.OverWindowResolverRule.lambda$apply$0(OverWindowResolverRule.java:54)
at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at 
org.apache.flink.table.expressions.resolver.rules.OverWindowResolverRule.apply(OverWindowResolverRule.java:55)
at 
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at 
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:191)
at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:170)
at 
org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:963)
at OverWindowAggregation.main(OverWindowAggregation.java:39)





Could you tell me where I am wrong in my code?
Thanks for your help.

Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-19 Thread Kevin Liao
改用最新 master 代码编译(打包后版本 1.11-SNAPSHOT)

将这段

.withSchema(new Schema()
//.field("logger_name", Types.STRING)
//.field("host", Types.STRING)
//.field("@timestamp", Types.SQL_TIMESTAMP)
//.field("_rowtime", Types.SQL_TIMESTAMP)
//.rowtime(
//new
Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
.field("doc", Types.POJO(Doc.class))
)


改成使用 DataTypes 后可以跑通


Kevin Liao  于2020年1月14日周二 上午11:52写道:

> 我用的是
> https://www.apache.org/dyn/closer.lua/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz
> 官网下载的
>
> 您说的 master 最新的版本我稍后试一下,谢谢
>
> JingsongLee  于2020年1月14日周二 上午11:51写道:
>
>> 谢谢,
>> 你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。
>>
>> Best,
>> Jingsong Lee
>>
>> --
>> From:Kevin Liao 
>> Send Time:2020年1月14日(星期二) 11:38
>> To:user-zh ; JingsongLee <
>> lzljs3620...@aliyun.com>
>> Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错
>>
>> flink 版本是 1.9.1 release
>>
>> Doc
>> 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约
>> 30 多个字段,我理解这跟字段数关系不大
>>
>> ```
>>
>> import org.apache.commons.lang3.builder.ToStringBuilder;
>> import 
>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
>>
>> /**
>>  * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
>>  */
>> @JsonIgnoreProperties(ignoreUnknown = true)
>> public class Doc {
>>
>>   private String suv;
>>   private Float factor = 1F;
>>   private String st;
>>   private String agentId;
>>   private Long timestamp;
>>
>>   ... // omit some, omit getters and setters
>>
>> ```
>>
>> 希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)
>>
>> JingsongLee  于2020年1月14日周二 上午11:25写道:
>> Hi Kevin,
>>
>> 这是什么版本?
>> Doc类能完整提供下吗?方便我们复现。
>>
>> Best,
>> Jingsong Lee
>>
>>
>> --
>> From:Kevin Liao 
>> Send Time:2020年1月13日(星期一) 17:37
>> To:user-zh 
>> Subject:blink planner的org.apache.flink.table.api.ValidationException报错
>>
>> tEnv.connect(new Kafka()
>> .version("universal")
>> .topic("xxx")
>> .startFromLatest()
>> .property("bootstrap.servers",
>> "")
>> .property("group.id", ""))
>> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>> .withSchema(new Schema()
>> //.field("logger_name", Types.STRING)
>> //.field("host", Types.STRING)
>> //.field("@timestamp", Types.SQL_TIMESTAMP)
>> //.field("_rowtime", Types.SQL_TIMESTAMP)
>> //.rowtime(
>> //new
>>
>> Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
>> .field("doc", Types.POJO(Doc.class))
>>     )
>> .inAppendMode()
>> .registerTableSource("xxx");
>>
>> Table result = tEnv.sqlQuery(
>> "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");
>>
>> //result.printSchema();
>> tEnv.toAppendStream(result,
>> new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
>> STRING, STRING, STRING,
>> STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
>> STRING, STRING, STRING,
>> STRING, LONG, STRING, INT, STRING, INT)).print();
>>
>>
>>
>> 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:
>>
>>
>> 、、、
>>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Type
>> LEGACY(PojoType) of
>> table field 'doc' does not match with type
>> PojoType of the field
>> 'doc' of the TableSource return type.
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>>  at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at
>&g

Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 Thread Kevin Liao
我用的是
https://www.apache.org/dyn/closer.lua/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz
官网下载的

您说的 master 最新的版本我稍后试一下,谢谢

JingsongLee  于2020年1月14日周二 上午11:51写道:

> 谢谢,
> 你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。
>
> Best,
> Jingsong Lee
>
> --
> From:Kevin Liao 
> Send Time:2020年1月14日(星期二) 11:38
> To:user-zh ; JingsongLee <
> lzljs3620...@aliyun.com>
> Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错
>
> flink 版本是 1.9.1 release
>
> Doc
> 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约
> 30 多个字段,我理解这跟字段数关系不大
>
> ```
>
> import org.apache.commons.lang3.builder.ToStringBuilder;
> import 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
>
> /**
>  * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
>  */
> @JsonIgnoreProperties(ignoreUnknown = true)
> public class Doc {
>
>   private String suv;
>   private Float factor = 1F;
>   private String st;
>   private String agentId;
>   private Long timestamp;
>
>   ... // omit some, omit getters and setters
>
> ```
>
> 希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)
>
> JingsongLee  于2020年1月14日周二 上午11:25写道:
> Hi Kevin,
>
> 这是什么版本?
> Doc类能完整提供下吗?方便我们复现。
>
> Best,
> Jingsong Lee
>
>
> --
> From:Kevin Liao 
> Send Time:2020年1月13日(星期一) 17:37
> To:user-zh 
> Subject:blink planner的org.apache.flink.table.api.ValidationException报错
>
> tEnv.connect(new Kafka()
> .version("universal")
> .topic("xxx")
> .startFromLatest()
> .property("bootstrap.servers",
> "")
> .property("group.id", ""))
> .withFormat(new Json().failOnMissingField(false).deriveSchema())
> .withSchema(new Schema()
> //.field("logger_name", Types.STRING)
> //.field("host", Types.STRING)
> //.field("@timestamp", Types.SQL_TIMESTAMP)
> //.field("_rowtime", Types.SQL_TIMESTAMP)
> //.rowtime(
> //new
>
> Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
> .field("doc", Types.POJO(Doc.class))
> )
> .inAppendMode()
> .registerTableSource("xxx");
>
> Table result = tEnv.sqlQuery(
> "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");
>
> //result.printSchema();
> tEnv.toAppendStream(result,
> new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
> STRING, STRING, STRING,
> STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
> STRING, STRING, STRING,
> STRING, LONG, STRING, INT, STRING, INT)).print();
>
>
>
> 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:
>
>
> 、、、
>
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Type
> LEGACY(PojoType) of
> table field 'doc' does not match with type
> PojoType of the field
> 'doc' of the TableSource return type.
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.pl

Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 Thread JingsongLee
谢谢,
你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。

Best,
Jingsong Lee


--
From:Kevin Liao 
Send Time:2020年1月14日(星期二) 11:38
To:user-zh ; JingsongLee 
Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错

flink 版本是 1.9.1 release

Doc 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约 
30 多个字段,我理解这跟字段数关系不大

```
import org.apache.commons.lang3.builder.ToStringBuilder;
import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;

/**
 * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
 */
@JsonIgnoreProperties(ignoreUnknown = true)
public class Doc {

  private String suv;
  private Float factor = 1F;
  private String st;
  private String agentId;
  private Long timestamp;
  ... // omit some, omit getters and setters
```

希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)
JingsongLee  于2020年1月14日周二 上午11:25写道:
Hi Kevin,

 这是什么版本?
 Doc类能完整提供下吗?方便我们复现。

 Best,
 Jingsong Lee


 --
 From:Kevin Liao 
 Send Time:2020年1月13日(星期一) 17:37
 To:user-zh 
 Subject:blink planner的org.apache.flink.table.api.ValidationException报错

 tEnv.connect(new Kafka()
 .version("universal")
 .topic("xxx")
 .startFromLatest()
 .property("bootstrap.servers",
 "")
 .property("group.id", ""))
 .withFormat(new Json().failOnMissingField(false).deriveSchema())
 .withSchema(new Schema()
 //.field("logger_name", Types.STRING)
 //.field("host", Types.STRING)
 //.field("@timestamp", Types.SQL_TIMESTAMP)
 //.field("_rowtime", Types.SQL_TIMESTAMP)
 //.rowtime(
 //new
 Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
 .field("doc", Types.POJO(Doc.class))
 )
 .inAppendMode()
 .registerTableSource("xxx");

 Table result = tEnv.sqlQuery(
 "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");

 //result.printSchema();
 tEnv.toAppendStream(result,
 new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
 STRING, STRING, STRING,
 STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
 STRING, STRING, STRING,
     STRING, LONG, STRING, INT, STRING, INT)).print();



 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:


 、、、

 Exception in thread "main"
 org.apache.flink.table.api.ValidationException: Type
 LEGACY(PojoType) of
 table field 'doc' does not match with type
 PojoType of the field
 'doc' of the TableSource return type.
  at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
  at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at 
org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
  at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
  at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
  at 
org.apache.flink.table.planner.plan.nodes.physica

Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 Thread Kevin Liao
flink 版本是 1.9.1 release

Doc
完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约
30 多个字段,我理解这跟字段数关系不大

```

import org.apache.commons.lang3.builder.ToStringBuilder;
import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;

/**
 * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
 */
@JsonIgnoreProperties(ignoreUnknown = true)
public class Doc {

  private String suv;
  private Float factor = 1F;
  private String st;
  private String agentId;
  private Long timestamp;

  ... // omit some, omit getters and setters

```

希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)

JingsongLee  于2020年1月14日周二 上午11:25写道:

> Hi Kevin,
>
> 这是什么版本?
> Doc类能完整提供下吗?方便我们复现。
>
> Best,
> Jingsong Lee
>
>
> --
> From:Kevin Liao 
> Send Time:2020年1月13日(星期一) 17:37
> To:user-zh 
> Subject:blink planner的org.apache.flink.table.api.ValidationException报错
>
> tEnv.connect(new Kafka()
> .version("universal")
> .topic("xxx")
> .startFromLatest()
> .property("bootstrap.servers",
> "")
> .property("group.id", ""))
> .withFormat(new Json().failOnMissingField(false).deriveSchema())
> .withSchema(new Schema()
> //.field("logger_name", Types.STRING)
> //.field("host", Types.STRING)
> //.field("@timestamp", Types.SQL_TIMESTAMP)
> //.field("_rowtime", Types.SQL_TIMESTAMP)
> //.rowtime(
> //new
>
> Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
> .field("doc", Types.POJO(Doc.class))
> )
> .inAppendMode()
> .registerTableSource("xxx");
>
> Table result = tEnv.sqlQuery(
> "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");
>
> //result.printSchema();
> tEnv.toAppendStream(result,
> new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
> STRING, STRING, STRING,
> STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
> STRING, STRING, STRING,
> STRING, LONG, STRING, INT, STRING, INT)).print();
>
>
>
> 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:
>
>
> 、、、
>
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Type
> LEGACY(PojoType) of
> table field 'doc' does not match with type
> PojoType of the field
> 'doc' of the TableSource return type.
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
>  at
> org.apache.flink.table.planner.pla

blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 Thread Kevin Liao
tEnv.connect(new Kafka()
.version("universal")
.topic("xxx")
.startFromLatest()
.property("bootstrap.servers",
"")
.property("group.id", ""))
.withFormat(new Json().failOnMissingField(false).deriveSchema())
.withSchema(new Schema()
//.field("logger_name", Types.STRING)
//.field("host", Types.STRING)
//.field("@timestamp", Types.SQL_TIMESTAMP)
//.field("_rowtime", Types.SQL_TIMESTAMP)
//.rowtime(
//new
Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
.field("doc", Types.POJO(Doc.class))
)
.inAppendMode()
.registerTableSource("xxx");

Table result = tEnv.sqlQuery(
"SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");

//result.printSchema();
tEnv.toAppendStream(result,
new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
STRING, STRING, STRING,
    STRING, LONG, STRING, INT, STRING, INT)).print();



以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:


、、、

Exception in thread "main"
org.apache.flink.table.api.ValidationException: Type
LEGACY(PojoType) of
table field 'doc' does not match with type
PojoType of the field
'doc' of the TableSource return type.
at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at 
org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)