blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 Thread Kevin Liao
tEnv.connect(new Kafka()
.version("universal")
.topic("xxx")
.startFromLatest()
.property("bootstrap.servers",
"")
.property("group.id", ""))
.withFormat(new Json().failOnMissingField(false).deriveSchema())
.withSchema(new Schema()
//.field("logger_name", Types.STRING)
//.field("host", Types.STRING)
//.field("@timestamp", Types.SQL_TIMESTAMP)
//.field("_rowtime", Types.SQL_TIMESTAMP)
//.rowtime(
//new
Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
.field("doc", Types.POJO(Doc.class))
)
.inAppendMode()
.registerTableSource("xxx");

Table result = tEnv.sqlQuery(
"SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");

//result.printSchema();
tEnv.toAppendStream(result,
new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
STRING, STRING, STRING,
STRING, LONG, STRING, INT, STRING, INT)).print();



以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:


、、、

Exception in thread "main"
org.apache.flink.table.api.ValidationException: Type
LEGACY(PojoType) of
table field 'doc' does not match with type
PojoType of the field
'doc' of the TableSource return type.
at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at 
org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlan

Re: Fail to deploy flink on k8s in minikube

2020-01-13 Thread Yang Wang
Hi, Jiangang

Glad to hear that you are looking to run Flink on Kubernetes.

It just because you are using the new Kubernetes version.The
extensions/v1beta1
has been removed since v1.16. Please use apps/v1 instead. The apps/v1 is
introduced
from v1.9.0. I will create a ticket fix the documentation.

Before release-1.10, you could use standalone per-job[1] or standalone
session[2] cluster on
K8s. There are some existing K8s operators to manage the application
lifecycle(e.g. google flink-on-k8s-operator[3],
lyft flink-k8s-operator[4]).

Running Flink native on K8s is supported from 1.10. You could find it here
[5]. It aims at to make
Flink users more convenient to deploy Flink workloads on K8s cluster.
However, we only support
session cluster now. The per-job mode is in development.

[1]
https://github.com/apache/flink/blob/release-1.9/flink-container/kubernetes/README.md#deploy-flink-job-cluster
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#flink-session-cluster-on-kubernetes
http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/
[3] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
[4] https://github.com/lyft/flinkk8soperator
[5]
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html

Best,
Yang

刘建刚  于2020年1月13日周一 下午12:14写道:

>   I fail to deploy flink on k8s referring to
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html
>   When I run the command 'kubectl create -f jobmanager-deployment.yaml',
> following error is reported:
> [image: image.png]
>   I am new to k8s. Our team want to deploy flink on k8s. Can anyone
> help me resolve this issue? Can anyone give me some tutorial about k8s and
> flink in product? Thank you very much.
>


flink checkpoint不生成

2020-01-13 Thread 起子
各位大佬好:
在执行flink job中发现设置了checkpoint,但是没有checkpoint生成,希望大佬们帮忙指点下,不胜感激,具体描述如下:
job 图如下:
 算子如下:
 
 checkpoint配置如下:
 
但是结果如下:
 

没有checkpoint生成



 部门 / 数据平台
 花名 / 起子
 Mobile :159 8810 1848
 WeChat :159 8810 1848
 Email :q...@dian.so
 Address :浙江省杭州市余杭区文一西路998号5#705



回复:flink checkpoint不生成

2020-01-13 Thread 起子
已经解决,因为里面有个算子完成了,状态为finished,所以不会生成checkpoint了


 部门 / 数据平台
 花名 / 起子
 Mobile :159 8810 1848
 WeChat :159 8810 1848
 Email :q...@dian.so
 Address :浙江省杭州市余杭区文一西路998号5#705


--
发件人:起子 
发送时间:2020年1月13日(星期一) 17:53
收件人:user-zh ; user 
主 题:flink checkpoint不生成

各位大佬好:
在执行flink job中发现设置了checkpoint,但是没有checkpoint生成,希望大佬们帮忙指点下,不胜感激,具体描述如下:
job 图如下:
 算子如下:
 
 checkpoint配置如下:
 
但是结果如下:
 

没有checkpoint生成



 部门 / 数据平台
 花名 / 起子
 Mobile :159 8810 1848
 WeChat :159 8810 1848
 Email :q...@dian.so
 Address :浙江省杭州市余杭区文一西路998号5#705



Re: 回复:flink checkpoint不生成

2020-01-13 Thread Yun Tang
Hi , 你好

恭喜解决问题,不过关于社区邮件列表的使用有几点小建议:

  1.  如果是全中文的邮件,就不要抄送英文社区邮件列表 
(u...@flink.apache.org)了,毕竟社区有很多看不懂中文的开发者。中文邮件列表(user-zh)还是很活跃的,相信大家可以一起帮助解决问题。
  2.  因为开源社区邮件列表对附件图片支持不友好,建议使用超链接的方式,有助于更快收到回复和解答。

祝好
唐云

From: 起子 
Sent: Monday, January 13, 2020 18:01
To: user-zh ; user 
Subject: 回复:flink checkpoint不生成

已经解决,因为里面有个算子完成了,状态为finished,所以不会生成checkpoint了

[cid:__aliyun157890968224248201]
 部门 / 数据平台
 花名 / 起子
 Mobile :159 8810 1848
 WeChat :159 8810 1848
 Email :q...@dian.so
 Address :浙江省杭州市余杭区文一西路998号5#705

--
发件人:起子 
发送时间:2020年1月13日(星期一) 17:53
收件人:user-zh ; user 
主 题:flink checkpoint不生成

各位大佬好:
在执行flink job中发现设置了checkpoint,但是没有checkpoint生成,希望大佬们帮忙指点下,不胜感激,具体描述如下:
job 图如下:[X]
 算子如下:
 [X]
 checkpoint配置如下:
 [X]
但是结果如下:
   [X]
[X]
没有checkpoint生成


[X]
 部门 / 数据平台
 花名 / 起子
 Mobile :159 8810 1848
 WeChat :159 8810 1848
 Email :q...@dian.so
 Address :浙江省杭州市余杭区文一西路998号5#705



Flink向量化读取parquet

2020-01-13 Thread faaron zheng
flink使用的是hadoop中的parquetfilereader,这个貌似不支持向量化读取,hive和spark目前都支持向量化读取,请加一下flink有什么计划吗?


Re: Flink向量化读取parquet

2020-01-13 Thread Kurt Young
据我所知,已经有这部分的计划了,不出意外的话应该会在 1.11 版本发布:
https://issues.apache.org/jira/browse/FLINK-11899

Best,
Kurt


On Mon, Jan 13, 2020 at 7:50 PM faaron zheng  wrote:

>
> flink使用的是hadoop中的parquetfilereader,这个貌似不支持向量化读取,hive和spark目前都支持向量化读取,请加一下flink有什么计划吗?
>


Flink Weekly | 每周社区动态更新 - 2020/01/14

2020-01-13 Thread Forward Xu
大家好

2020年已经到来了,本周是2020年1月的第二周,虽然临近春节但 Flink 的活力丝毫没有受到影响。

本周社区主要新闻是 Flink 1.10版本的发布测试,SQL catalog 读取关系数据库 schema 的相关建议以及 Flink
Forward 旧金山的演讲邀请。

Flink开发进展
==
* [**Release**] 社区仍在测试和修复*Flink 1.10*的错误。您可以在发布燃尽板[1]上进行操作。 估计第一个 RC
版本很快就来了 [1]。

* [**SQL**] Bowen 建议在Table API中添加* JDBC 和 Postgres Catalog* API。 这样,Flink
可以自动创建关系数据库中对应的表。 目前,用户需要手动在 Flink 上创建相应的表(包括 schema)[2] & [3]。

* [**configuration**] Xintong 建议更改 Flink 内存配置的一些默认值(FLIP-49),并正在寻求反馈 [4]。

* [**datastream api**] Congxian 建议统一从 statebackends 向* AppendingState
*添加“空(null)” 值的处理。 建议的原因是使所有 statebackends 拒绝“空(null)”值 [5]。

[1]
https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=349&projectKey=FLINK
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-92-JDBC-catalog-and-Postgres-catalog-tp36505.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-Tuning-FLIP-49-configuration-default-values-td36528.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Make-AppendingState-add-refuse-to-add-null-element-tp36493.html


需要注意的一些缺陷
==
由于在发布测试,因此有很多活动,但是对于已经发布的版本,没有发现任何新的显著错误。

活动,博客文章,其他
===

** Flink Forward 旧金山的演讲邀请*即将结束,但是您仍然有机会将演讲提交给该演讲者(可能只有)北美的 Apache Flink
社区会议。 如有疑问或如果您不确定是否要提交参与,请随时与 Konstantin 联系 [6]。

* [**即将举行的聚会**]

* 1月18日* Preetdeep Kumar *将分享一些基本的 Flink DataStream processing
API,然后进行动手演示。 这将是在线活动。 在会议链接中可以查看更多详细信息 [7]。

* 1月22日 Konstantin 的同事 *Alexander Fedulov * 将在马德里的 Apache Flink 聚会上使 Flink
进行欺诈检测[8]。

[6] https://www.flink-forward.org/sf-2020
[7]
https://www.meetup.com/Hyderabad-Apache-Flink-Meetup-Group/events/267610014/
[8]
https://www.meetup.com/Meetup-de-Apache-Flink-en-Madrid/events/267744681/

中文邮件问题答疑汇总
===
* Flink 的 savepoint 为什么要设置成手动的?的问题解答:[9]

* Flink 消费 Kafka 没有数据问题的问题解答:[10]

* 关于 Flink 集群中调用 dubbo 服务的咨询:[11]

* 关于 Flink Plan Visualizer 什么时候会更新成1.9的样式的问题,tison 已经抄送给 Flink WebUI 重构的
Manager:[12]

* Flink 的每条数据既然都做了 checkpoint,做成全局分布式一致性快照,那还需要本地 state干啥呢?的问题解答:[13]

* 关于 Flink 遇到 valueState 自身的 NPE 的问题解答:[14]

* 关于流处理任务失败该如何追回之前的数据的问题解答:[15]

* 关于 Flink 是否可以通过代码设置 hadoop 的配置文件目录的问题解答:[16]

* 关于 Flink 算子状态查看的问题解答:[17]

* 关于疑似 ParquetTableSource Filter Pushdown bug 的问题解答:[18]

* 关于 Flink 1.10版本连接hive报错的问题解答:[19]

* 关于 Flink 不同 StateBackend ProcessWindowFunction 的差别的问题解答:[20]

* 关于 Jobgraph 生成的问题解答:[21]

* 关于注册 table 时 catalog 无法变更的问题解答:[22]

* 关于 Flink sql confluent schema avro topic 注册成表的问题解答:[23]

* 使用 Flink SQL时,碰到的【Window can only be defined over a time attribute
column】的问题解答:[24]

* 关于如何获取算子处理一条数据记录的时间的问题解答:[25]

[9]
http://apache-flink.147419.n8.nabble.com/flink-savepoint-checkpoint-td1229.html
[10] http://apache-flink.147419.n8.nabble.com/flink-Kafka-td1461.html
[11] http://apache-flink.147419.n8.nabble.com/flink-dubbo-td1467.html
[12]
http://apache-flink.147419.n8.nabble.com/Flink-Plan-Visualizer-1-9-td1404.html#a1429
[13] http://apache-flink.147419.n8.nabble.com/checkpoint-state-td1122.html
[14]
http://apache-flink.147419.n8.nabble.com/flink-valueState-NPE-td1447.html#a1459
[15] http://apache-flink.147419.n8.nabble.com/-td1016.html
[16] http://apache-flink.147419.n8.nabble.com/flink-hadoop-td1445.html
[17] http://apache-flink.147419.n8.nabble.com/flink-td1441.html
[18]
http://apache-flink.147419.n8.nabble.com/Re-ParquetTableSource-Filter-Pushdown-bug-tt1439.html
[19] http://apache-flink.147419.n8.nabble.com/flink1-10-hive-tt336.html
[20]
http://apache-flink.147419.n8.nabble.com/FLINK-StateBackend-ProcessWindowFunction-tt1418.html#a1419
[21] http://apache-flink.147419.n8.nabble.com/Re-jobgraph-tt1426.html
[22]
http://apache-flink.147419.n8.nabble.com/table-catalog-tt1417.html#a1425
[23]
http://apache-flink.147419.n8.nabble.com/flink-sql-confluent-schema-avro-topic-tt1264.html
[24]
http://apache-flink.147419.n8.nabble.com/Flink-SQL-Window-can-only-be-defined-over-a-time-attribute-column-tt1407.html
[25] http://apache-flink.147419.n8.nabble.com/-tt1357.html#a1412

祝好
徐前进


咨询一下 RocksDB 状态后端的调优经验

2020-01-13 Thread DONG, Weike
大家好,

我们在 YARN 容器内运行以 RocksDB 作为 State Backend 的 Flink 作业,状态数据比较大(50G
以上,难以放到内存中)。但是由于 YARN 本身的 pmem-check 限制,经常会因为内存用量的不受控而导致整个 Container 被强制
KILL.

目前调研了 https://issues.apache.org/jira/browse/FLINK-7289 这个提议,但是目前还未完全实现。
也按照 RocksDB 官方的调优指南
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide 设置了
state.backend.rocksdb.writebuffer.size
state.backend.rocksdb.writebuffer.count
state.backend.rocksdb.block.cache-size
state.backend.rocksdb.files.open
等等参数,但是目前观察到效果并不太明显,内存用量还是会不受控地越来越多。

请问各位是否有 RocksDB 作为状态后端的调优经验,例如在内存受限的情况下,尽量确保 RocksDB 的内存用量可控在一个封顶范围呢?

另外还有一个场景,假设内存够用的情况下,有哪些增加读写性能方面的建议呢?目前尝试使用 SSD 来存放 sst 文件,但是性能提升也不明显。

感谢 :)


Re: 咨询一下 RocksDB 状态后端的调优经验

2020-01-13 Thread Yun Tang
Hi Dong

RocksDB无论如何都是要使用native内存的,您的YARN pmem-check相比JVM heap的buffer空间是多少,是否合适呢?

FLINK-7289的基本所需task都已经完成在release-1.10 分支中了,您可以直接使用release-1.10 
分支打包,最近也要发布1.10的rc版本,欢迎试用该功能。

如果你的所有checkpoint size是50GB,其实不是很大,但是如果单个state 
backend有50GB的话,对于Flink这种低延迟流式场景是稍大的,建议降低单并发state数据量。

至于目前的问题,也就是您加了相关参数,但是内存用量仍然在涨,可以用以下思路排查一下:

  1.  首先,确保使用release-1.10 分支
  2.  开启 size-all-mem-tables  [1] 和 block-cache-usage [2]的metrics监控
  3.  在默认没有enable "state.backend.rocksdb.memory.managed" [3] 的情况下,对column 
family进行如下配置,核心思路就是将主要的内存使用都放在cache中,方便观察内存使用:

rocksDBStateBackend.setOptions(new OptionsFactory() {
 @Override
  public DBOptions createDBOptions(DBOptions currentOptions) {
  return currentOptions;
  }

  @Override
   public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions) {
 BlockBasedTableConfig blockBasedTableConfig = new 
BlockBasedTableConfig();
 blockBasedTableConfig.setCacheIndexAndFilterBlocks(true);
 blockBasedTableConfig.pinL0FilterAndIndexBlocksInCache();
 currentOptions.setTableFormatConfig(blockBasedTableConfig);
 return currentOptions;
  }
});

4. 由于没有enable cache共享,所以需要将每个column 
family的size-all-mem-tables和block-cache-usage进行相加,观察相关指数变化,看是否超过了你的pmem-check 限制。

相应地,您也可以启用"state.backend.rocksdb.memory.managed" [3] 该功能 或者 自行配置 
"state.backend.rocksdb.memory.fixed-per-slot" [4] 设置期望的rocksDB per slot memory 
size,此时只需要观察block-cache-usage的指标,由于这里使用共享cache的逻辑,所以并不需要相加,只要观察per 
slot的情况即可(同一个TM内,相同subtask index的rocksDB state其实是用的同一块cache),观察内存限制功能是否生效。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#state-backend-rocksdb-metrics-size-all-mem-tables
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#state-backend-rocksdb-metrics-block-cache-usage
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#state-backend-rocksdb-memory-managed
[4] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#state-backend-rocksdb-memory-fixed-per-slot


祝好
唐云


From: DONG, Weike 
Sent: Tuesday, January 14, 2020 10:02
To: user-zh@flink.apache.org 
Subject: 咨询一下 RocksDB 状态后端的调优经验

大家好,

我们在 YARN 容器内运行以 RocksDB 作为 State Backend 的 Flink 作业,状态数据比较大(50G
以上,难以放到内存中)。但是由于 YARN 本身的 pmem-check 限制,经常会因为内存用量的不受控而导致整个 Container 被强制
KILL.

目前调研了 https://issues.apache.org/jira/browse/FLINK-7289 这个提议,但是目前还未完全实现。
也按照 RocksDB 官方的调优指南
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide 设置了
state.backend.rocksdb.writebuffer.size
state.backend.rocksdb.writebuffer.count
state.backend.rocksdb.block.cache-size
state.backend.rocksdb.files.open
等等参数,但是目前观察到效果并不太明显,内存用量还是会不受控地越来越多。

请问各位是否有 RocksDB 作为状态后端的调优经验,例如在内存受限的情况下,尽量确保 RocksDB 的内存用量可控在一个封顶范围呢?

另外还有一个场景,假设内存够用的情况下,有哪些增加读写性能方面的建议呢?目前尝试使用 SSD 来存放 sst 文件,但是性能提升也不明显。

感谢 :)


Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 Thread JingsongLee
Hi Kevin,

这是什么版本?
Doc类能完整提供下吗?方便我们复现。

Best,
Jingsong Lee


--
From:Kevin Liao 
Send Time:2020年1月13日(星期一) 17:37
To:user-zh 
Subject:blink planner的org.apache.flink.table.api.ValidationException报错

tEnv.connect(new Kafka()
.version("universal")
.topic("xxx")
.startFromLatest()
.property("bootstrap.servers",
"")
.property("group.id", ""))
.withFormat(new Json().failOnMissingField(false).deriveSchema())
.withSchema(new Schema()
//.field("logger_name", Types.STRING)
//.field("host", Types.STRING)
//.field("@timestamp", Types.SQL_TIMESTAMP)
//.field("_rowtime", Types.SQL_TIMESTAMP)
//.rowtime(
//new
Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
.field("doc", Types.POJO(Doc.class))
)
.inAppendMode()
.registerTableSource("xxx");

Table result = tEnv.sqlQuery(
"SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");

//result.printSchema();
tEnv.toAppendStream(result,
new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
STRING, STRING, STRING,
STRING, LONG, STRING, INT, STRING, INT)).print();



以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:


、、、

Exception in thread "main"
org.apache.flink.table.api.ValidationException: Type
LEGACY(PojoType) of
table field 'doc' does not match with type
PojoType of the field
'doc' of the TableSource return type.
 at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
 at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
 at 
org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.flink.table.planner.delegation.StreamPlan

Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 Thread Kevin Liao
flink 版本是 1.9.1 release

Doc
完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约
30 多个字段,我理解这跟字段数关系不大

```

import org.apache.commons.lang3.builder.ToStringBuilder;
import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;

/**
 * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
 */
@JsonIgnoreProperties(ignoreUnknown = true)
public class Doc {

  private String suv;
  private Float factor = 1F;
  private String st;
  private String agentId;
  private Long timestamp;

  ... // omit some, omit getters and setters

```

希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)

JingsongLee  于2020年1月14日周二 上午11:25写道:

> Hi Kevin,
>
> 这是什么版本?
> Doc类能完整提供下吗?方便我们复现。
>
> Best,
> Jingsong Lee
>
>
> --
> From:Kevin Liao 
> Send Time:2020年1月13日(星期一) 17:37
> To:user-zh 
> Subject:blink planner的org.apache.flink.table.api.ValidationException报错
>
> tEnv.connect(new Kafka()
> .version("universal")
> .topic("xxx")
> .startFromLatest()
> .property("bootstrap.servers",
> "")
> .property("group.id", ""))
> .withFormat(new Json().failOnMissingField(false).deriveSchema())
> .withSchema(new Schema()
> //.field("logger_name", Types.STRING)
> //.field("host", Types.STRING)
> //.field("@timestamp", Types.SQL_TIMESTAMP)
> //.field("_rowtime", Types.SQL_TIMESTAMP)
> //.rowtime(
> //new
>
> Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
> .field("doc", Types.POJO(Doc.class))
> )
> .inAppendMode()
> .registerTableSource("xxx");
>
> Table result = tEnv.sqlQuery(
> "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");
>
> //result.printSchema();
> tEnv.toAppendStream(result,
> new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
> STRING, STRING, STRING,
> STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
> STRING, STRING, STRING,
> STRING, LONG, STRING, INT, STRING, INT)).print();
>
>
>
> 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:
>
>
> 、、、
>
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Type
> LEGACY(PojoType) of
> table field 'doc' does not match with type
> PojoType of the field
> 'doc' of the TableSource return type.
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
>  at
> org.apache.flink.table.planner.delegation

Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 Thread JingsongLee
谢谢,
你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。

Best,
Jingsong Lee


--
From:Kevin Liao 
Send Time:2020年1月14日(星期二) 11:38
To:user-zh ; JingsongLee 
Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错

flink 版本是 1.9.1 release

Doc 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约 
30 多个字段,我理解这跟字段数关系不大

```
import org.apache.commons.lang3.builder.ToStringBuilder;
import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;

/**
 * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
 */
@JsonIgnoreProperties(ignoreUnknown = true)
public class Doc {

  private String suv;
  private Float factor = 1F;
  private String st;
  private String agentId;
  private Long timestamp;
  ... // omit some, omit getters and setters
```

希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)
JingsongLee  于2020年1月14日周二 上午11:25写道:
Hi Kevin,

 这是什么版本?
 Doc类能完整提供下吗?方便我们复现。

 Best,
 Jingsong Lee


 --
 From:Kevin Liao 
 Send Time:2020年1月13日(星期一) 17:37
 To:user-zh 
 Subject:blink planner的org.apache.flink.table.api.ValidationException报错

 tEnv.connect(new Kafka()
 .version("universal")
 .topic("xxx")
 .startFromLatest()
 .property("bootstrap.servers",
 "")
 .property("group.id", ""))
 .withFormat(new Json().failOnMissingField(false).deriveSchema())
 .withSchema(new Schema()
 //.field("logger_name", Types.STRING)
 //.field("host", Types.STRING)
 //.field("@timestamp", Types.SQL_TIMESTAMP)
 //.field("_rowtime", Types.SQL_TIMESTAMP)
 //.rowtime(
 //new
 Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
 .field("doc", Types.POJO(Doc.class))
 )
 .inAppendMode()
 .registerTableSource("xxx");

 Table result = tEnv.sqlQuery(
 "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");

 //result.printSchema();
 tEnv.toAppendStream(result,
 new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
 STRING, STRING, STRING,
 STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
 STRING, STRING, STRING,
 STRING, LONG, STRING, INT, STRING, INT)).print();



 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:


 、、、

 Exception in thread "main"
 org.apache.flink.table.api.ValidationException: Type
 LEGACY(PojoType) of
 table field 'doc' does not match with type
 PojoType of the field
 'doc' of the TableSource return type.
  at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
  at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at 
org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
  at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
  at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
  at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(Ex

Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 Thread Kevin Liao
我用的是
https://www.apache.org/dyn/closer.lua/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz
官网下载的

您说的 master 最新的版本我稍后试一下,谢谢

JingsongLee  于2020年1月14日周二 上午11:51写道:

> 谢谢,
> 你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。
>
> Best,
> Jingsong Lee
>
> --
> From:Kevin Liao 
> Send Time:2020年1月14日(星期二) 11:38
> To:user-zh ; JingsongLee <
> lzljs3620...@aliyun.com>
> Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错
>
> flink 版本是 1.9.1 release
>
> Doc
> 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约
> 30 多个字段,我理解这跟字段数关系不大
>
> ```
>
> import org.apache.commons.lang3.builder.ToStringBuilder;
> import 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
>
> /**
>  * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
>  */
> @JsonIgnoreProperties(ignoreUnknown = true)
> public class Doc {
>
>   private String suv;
>   private Float factor = 1F;
>   private String st;
>   private String agentId;
>   private Long timestamp;
>
>   ... // omit some, omit getters and setters
>
> ```
>
> 希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)
>
> JingsongLee  于2020年1月14日周二 上午11:25写道:
> Hi Kevin,
>
> 这是什么版本?
> Doc类能完整提供下吗?方便我们复现。
>
> Best,
> Jingsong Lee
>
>
> --
> From:Kevin Liao 
> Send Time:2020年1月13日(星期一) 17:37
> To:user-zh 
> Subject:blink planner的org.apache.flink.table.api.ValidationException报错
>
> tEnv.connect(new Kafka()
> .version("universal")
> .topic("xxx")
> .startFromLatest()
> .property("bootstrap.servers",
> "")
> .property("group.id", ""))
> .withFormat(new Json().failOnMissingField(false).deriveSchema())
> .withSchema(new Schema()
> //.field("logger_name", Types.STRING)
> //.field("host", Types.STRING)
> //.field("@timestamp", Types.SQL_TIMESTAMP)
> //.field("_rowtime", Types.SQL_TIMESTAMP)
> //.rowtime(
> //new
>
> Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
> .field("doc", Types.POJO(Doc.class))
> )
> .inAppendMode()
> .registerTableSource("xxx");
>
> Table result = tEnv.sqlQuery(
> "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");
>
> //result.printSchema();
> tEnv.toAppendStream(result,
> new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
> STRING, STRING, STRING,
> STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
> STRING, STRING, STRING,
> STRING, LONG, STRING, INT, STRING, INT)).print();
>
>
>
> 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:
>
>
> 、、、
>
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Type
> LEGACY(PojoType) of
> table field 'doc' does not match with type
> PojoType of the field
> 'doc' of the TableSource return type.
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(Stre