Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 Thread Leonard Xu
Hello,
 
很有意思的话题,我理解这需要保证多个CDC数据源 的全局一致性, 多个业务表的 bin-log 通过 cdc接入flink后,得保证 
每个数据源的写入目标库的时候有一个全局一致性的保证,这个底层的APi应该也支持不了的。
一种可能的思路是 抽取cdc 记录 的metadata里的 committed ts (原始数据库中每次变更的时间, debezuim 
的source.ts_ms字段, canal的es 字段),通过这个时间来协调 多个 CDC 数据源的处理速度,这只是我的一个想法。

不过可以确定的是,目前的API应该拿不到这个信息,现在的 Flink 框架没法处理这个数据, 可以看下 一些CDC框架是否能做这个事情。

Best,
Leonard Xu


> 在 2020年7月8日,13:18,jindy_liu <286729...@qq.com> 写道:
> 
> 对的,只不过生产中,有些设计的时候外键没有显示声明,都是用流程保证更新表的顺序。
> 所以消费数据变化的时候,也是要按顺序消费。不然使用镜像数据的人,可能会出问题。
> 
> 求教:除flink sql 的cdc功能外,flink的其它特性能否较好的支持这种场景呢?  需要写再底层点的api吗?
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: 如何在Flink SQL中使用周期性水印?

2020-07-07 Thread Jark Wu
嗯, 可以在 JIRA 中开个 issue 描述下你的需求~

On Wed, 8 Jul 2020 at 12:01, 1193216154 <1193216...@qq.com> wrote:

>  Jark,flink有没有必要去支持这个特性?我感觉还是有一些应用场景
>
>
>
>
> --原始邮件--
> 发件人:"Jark Wu" 发送时间:2020年7月8日(星期三) 中午11:48
> 收件人:"user-zh"
> 主题:Re: 如何在Flink SQL中使用周期性水印?
>
>
>
> 如果所有 partition 都没有数据,还希望 watermark 往前走,那 idle source 确实解决不了这个问题。
> 目前确实没有太好的解决办法。
>
> Best,
> Jark
>
> On Wed, 8 Jul 2020 at 11:08, 1193216154 <1193216...@qq.com wrote:
>
>  hi Jark Wu.
> 
> 
> 我的理解是table.exec.source.idle-timeout只能解决watermark对齐的时候去忽略某个没有watermark的并行度。但是在每个并行度都没有watermark的时候,还是无法更新watermark。
> 
> 我觉得题主的意思应该是,在kafka的所有分区都没有数据的时候,最后一个窗口无法触发(因为没有watermark大于最后那个窗口结束时间了)。
>  有没有可以设置在eventTime情况下,周期性生成当前时间的一个waterMark(和数据无关),因为可能没有新数据到来了。
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Jark Wu"  发送时间:nbsp;2020年7月7日(星期二) 晚上6:09
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 如何在Flink SQL中使用周期性水印?
> 
> 
> 
>  Hi,
> 
>  这个问题我理解其实和周期性水印没有关系,是属于 idle source
>  的问题,你可以尝试下加上配置 table.exec.source.idle-timeout = 10s 能不能解决你的问题。[1]
> 
>  Best,
>  Jark
> 
>  [1]:
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout
> 
> 
> ;
>  On Tue, 7 Jul 2020 at 17:35, noake  
>  gt; Dear All:
>  gt;
>  gt;
>  gt; 大佬们, 请教下如何在Flink SQL中使用周期性的水印。
>  gt; 我们在消费kafka时, 想设置在没有数据时水印时间也能继续向前走, 用的是Flink SQL。


Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 Thread jindy_liu
对的,只不过生产中,有些设计的时候外键没有显示声明,都是用流程保证更新表的顺序。
所以消费数据变化的时候,也是要按顺序消费。不然使用镜像数据的人,可能会出问题。

求教:除flink sql 的cdc功能外,flink的其它特性能否较好的支持这种场景呢?  需要写再底层点的api吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 Thread jindy_liu
https://github.com/apache/flink/tree/release-1.11.0我看github上tag下已经发布了release-1.11.0,我就编了下tag下的release-1.11.0。最近在做实时计算的一些调研,我们第一步就是要做数据的实时搬运(异构存储),看flink
1.11有cdc功能,我关注了下。看发布了就立即试用了下,看看能不能用你们这个做变化数据的实时同步。1、体验了下,若mysql的binlog按单表有序到kafka,单topic,单分区,flink
cdc的同步确实很方便,几条sql语句就搞定了。2、若mysql binlog按db实例,多表有序到kafka
单topic,单分区,感觉不知道要怎么样定义这个ddl,
同时怎么保证按序同步。(比如表与表之前的数据存在逻辑上的外键约束等等,具体来说test表的status字端就是个外键,如果关联记录都有更新,那更新顺序就比较重要了,要严格按binlog顺序来)。今天看了下,源码里canal-json的解析,好像只解析到了json里的feild
和 operate 类型。感觉这个多表有序的场景应该也是比较多的需求的。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11 on kubernetes 构建失败

2020-07-07 Thread SmileSmile
hi

按照文档[1]的方法部署session cluster on kubernetes,集群构建的时候出现了如下报错


Starting Task Manager
sed: couldn't open temporary file /opt/flink/conf/sedVdyy6Q: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sedcj5VKQ: Read-only file 
system
/docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml: Permission denied
sed: couldn't open temporary file /opt/flink/conf/sedB5eynR: Read-only file 
system
/docker-entrypoint.sh: 120: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
[ERROR] The execution result is empty.
[ERROR] Could not get JVM parameters and dynamic configurations properly.


是否有遇到同样的问题,支个招



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-07 Thread Benchao Li
Congratulations!  Thanks Zhijiang & Piotr for the great work as release
managers.

Rui Li  于2020年7月8日周三 上午11:38写道:

> Congratulations! Thanks Zhijiang & Piotr for the hard work.
>
> On Tue, Jul 7, 2020 at 10:06 PM Zhijiang 
> wrote:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.11.0, which is the latest major release.
>>
>> Apache Flink® is an open-source stream processing framework for distributed,
>> high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this new major release:
>> https://flink.apache.org/news/2020/07/06/release-1.11.0.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346364
>>
>> We would like to thank all contributors of the Apache Flink community who 
>> made
>> this release possible!
>>
>> Cheers,
>> Piotr & Zhijiang
>>
>
>
> --
> Best regards!
> Rui Li
>


-- 

Best,
Benchao Li


回复:作业升级到flink1.11,idea运行失败

2020-07-07 Thread SmileSmile
添加依赖后正常了。应该是这个导致的

https://ci.apache.org/projects/flink/flink-docs-master/release-notes/flink-1.11.html#reversed-dependency-from-flink-streaming-java-to-flink-client-flink-15090

thanks




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月08日 11:30,Yangze Guo 写道:
尝试加一下这个依赖
groupId: org.apache.flink
artifactId: flink-clients_${scala.binary.version}

Best,
Yangze Guo

On Wed, Jul 8, 2020 at 11:27 AM SmileSmile  wrote:
>
>
> hi
>
> 作业的依赖从1.10.1升级到1.11.0,在idea运行的时候报错
>
> Exception in thread "main" java.lang.IllegalStateException: No 
> ExecutorFactory found to execute the application.
>  at 
> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1803)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713)
>  at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment()
>
> 是哪里出问题了呢
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制


Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 Thread Leonard Xu
Hello,
我理解下你场景:d1的 test 表 和 status 表两者之间有关联,比如外键,比如 test 更新一条数据后 status也需要级联地更新一条数据。
希望通过 Flink 的CDC功能同步这两张表到db2后,任意时刻,这两张表的状态是原子的(两张表对应 d1中两张表的一个快照版本), 是这种场景吗?

如果是这种场景,现在是还没有支持的。

Best,
Leonard Xu


> 在 2020年7月8日,11:59,Jark Wu  写道:
> 
> Hi,
> 
> 我想先问一下你使用的是刚发布的 1.11.0 版本吗? 还是自己 build 的 release-1.11 分支呢?
> 
> 另外,我理解下你的需求是  db1.test 同步到 db2.test,  db1.status 同步到 db2.status?
> 多表的*有序*同步是指?
> 我理解你只需要像定义 db1.test -> db2.test 一样,定义好 db1.status binlog table 然后 insert
> into 到 db2.status mysql table就行了。
> 
> 感谢反馈使用体验。
> 
> Best,
> Jark
> 
> 
> On Wed, 8 Jul 2020 at 10:30, jindy_liu <286729...@qq.com> wrote:
> 
>> 场景:canal解析binlog后,将db1实例内的多张表(表数据有关联)的变化发送到kafka的单topic,单分区中,从而保证有序;
>> 若我想做数据同步至另一个mysql实例db2中,怎么用flink sql操作多张表,同时保证表与表之间有序呢?
>> 例如mysql实例db1中有表test, statusCREATE TABLE `test` (  `id` int(11) NOT NULL,
>> `name` varchar(255) NOT NULL,  `time` datetime NOT NULL,  `status` int(11)
>> NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8CREATE
>> TABLE `status` (  `status` int(11) NOT NULL,  `name` varchar(255) NOT
>> NULL,
>> PRIMARY KEY (`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8比如,我用flink
>> 
>> sql,可以设置对应的一张test表,然后sink到mysql镜像实例db2的镜像表test,和表status做同步,但status表要怎么操作呢?如何保证有序?我目前能实现单表,确实方便,求助,多表的怎么做有序同步?CREATE
>> TABLE test (`id` INT,`name` VARCHAR(255),`time` TIMESTAMP(3),`status`
>> INT,PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='kafka',
>> 'topic'='test', 'properties.group.id'='c_mysql_binlog_postgres',
>> 'properties.bootstrap.servers'='localhost:9092',
>> 'scan.startup.mode'='earliest-offset', 'format'='canal-json',
>> 'canal-json.ignore-parse-errors'='true');CREATE TABLE status (`status`
>> INT,`name` VARCHAR(255),PRIMARY KEY(name) NOT ENFORCED ) WITH (
>> 'connector'='kafka', 'topic'='test',
>> 'properties.group.id'='c_mysql_binlog_postgres',
>> 'properties.bootstrap.servers'='localhost:9092',
>> 'scan.startup.mode'='earliest-offset', 'format'='canal-json',
>> 'canal-json.ignore-parse-errors'='true');
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: Re: Re: Re: Re: flink 1.11 作业执行异常

2020-07-07 Thread Jark Wu
估计是这个导致的:
https://ci.apache.org/projects/flink/flink-docs-master/release-notes/flink-1.11.html#reversed-dependency-from-flink-streaming-java-to-flink-client-flink-15090

On Wed, 8 Jul 2020 at 09:21, sunfulin  wrote:

> hi, noake
> 感谢分享。我加了这个依赖后也OK了。周知下大家。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-07 22:15:05,"noake"  写道:
> >我在1.11.0中遇到了同样的问题, pom中加了下面的依赖就没解决了
> >dependency
> > groupIdorg.apache.flink/groupId
> > artifactIdflink-clients_${scala.binary.version}/artifactId
> > version${flink.version}/version
> >/dependency
> >
> >
> >原始邮件
> >发件人:Congxian qiuqcx978132...@gmail.com
> >收件人:user-zhuser...@flink.apache.org
> >抄送:Jark wuimj...@gmail.com; Jun zhangzhangjunemail...@gmail.com
> >发送时间:2020年7月7日(周二) 19:35
> >主题:Re: Re: Re: Re: flink 1.11 作业执行异常
> >
> >
> >Hi 从这个报错看上去是尝试通过 serviceLoader 加载一些 factory 的时候出错了(找不到),可以看看对应的 module 的
> resources 文件下是否有对应的 resource 文件 Best, Congxian sunfulin
> sunfulin0...@163.com 于2020年7月7日周二 下午6:29写道: hi,
> 我的pom文件本地执行时,scope的provided都是去掉的。  dependency
> groupIdorg.apache.flink/groupId
>  artifactIdflink-table-planner-blink_${scala.binary.version}/artifactId
> version${flink.version}/version  /dependency
> 确实比较诡异。org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
> 这个异常在啥情况下会触发到。在 2020-07-07 18:10:58,"Jark Wu"
> imj...@gmail.com 写道:  如果是在 IDEA 中运行的话,你看看 blink planner 这个依赖的 scope 是不是被
> provided 掉了? 去掉  provided  再试试看?Best,  JarkOn Tue, 7 Jul 2020 at
> 18:01, sunfulin sunfulin0...@163.com wrote: hi,   @Jun Zhang
> 我一直使用的就是blink planner,这个jar包一直都有的。 @Jark Wu
> 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?   在
> 2020-07-07 15:40:17,"Jark Wu" imj...@gmail.com 写道:   Hi,
> 你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?  Best,   Jark  On Tue, 7 Jul 2020 at
> 15:31, Jun Zhang zhangjunemail...@gmail.com   wrote:   hi.sunfulin
> 你有没有导入blink的planner呢,加入这个试试   dependency
> groupIdorg.apache.flink/groupId
>  artifactIdflink-table-planner-blink_${scala.binary.version}/artifactId
> version${flink.version}/version/dependency  sunfulin
> sunfulin0...@163.com 于2020年7月7日周二 下午3:21写道:hi, jark
> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
> configuration里的DeployOptions.TARGET
> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
> //构建StreamExecutionEnvironmentpublic static final
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>  //构建EnvironmentSettings 并指定Blink Plannerprivate static final
> EnvironmentSettings bsSettings =
>  
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>  //构建StreamTableEnvironmentpublic static final
> StreamTableEnvironment tEnv =StreamTableEnvironment.create(env,
> bsSettings);   tEnv.executeSql(“ddl sql”);
> //source注册成表   tEnv.createTemporaryView("test", ds, $("f0").as("id"),
>   $("f1").as("first"), $("p").proctime());//join语句
>  Table table = tEnv.sqlQuery("select b.* from test a left  joinmy_dim
> FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
> //输出   tEnv.toAppendStream(table,  Row.class).print("LookUpJoinJob");
>   env.execute("LookUpJoinJob");在
> 2020-07-06 14:59:17,"Jark Wu" imj...@gmail.com 写道:能分享下复现的作业代码不?
>   Best,JarkOn Mon, 6 Jul 2020 at 11:00, sunfulin
> sunfulin0...@163.com  wrote: Hi, 我使用目前最新的Flink 1.11
> rc4来测试我的作业。报了如下异常: org.apache.flink.table.api.TableExecution: Failed to
> execute sql caused by : java.lang.IlleagalStateException: No
> ExecutorFactory   foundto execute the application. at
>  
> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
>


?????? ??????Flink SQL?????????????????

2020-07-07 Thread 1193216154
 Jark??flink??




----
??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout

 On Tue, 7 Jul 2020 at 17:35, noake 

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 Thread Jark Wu
Hi,

我想先问一下你使用的是刚发布的 1.11.0 版本吗? 还是自己 build 的 release-1.11 分支呢?

另外,我理解下你的需求是  db1.test 同步到 db2.test,  db1.status 同步到 db2.status?
多表的*有序*同步是指?
我理解你只需要像定义 db1.test -> db2.test 一样,定义好 db1.status binlog table 然后 insert
into 到 db2.status mysql table就行了。

感谢反馈使用体验。

Best,
Jark


On Wed, 8 Jul 2020 at 10:30, jindy_liu <286729...@qq.com> wrote:

> 场景:canal解析binlog后,将db1实例内的多张表(表数据有关联)的变化发送到kafka的单topic,单分区中,从而保证有序;
> 若我想做数据同步至另一个mysql实例db2中,怎么用flink sql操作多张表,同时保证表与表之间有序呢?
> 例如mysql实例db1中有表test, statusCREATE TABLE `test` (  `id` int(11) NOT NULL,
> `name` varchar(255) NOT NULL,  `time` datetime NOT NULL,  `status` int(11)
> NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8CREATE
> TABLE `status` (  `status` int(11) NOT NULL,  `name` varchar(255) NOT
> NULL,
> PRIMARY KEY (`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8比如,我用flink
>
> sql,可以设置对应的一张test表,然后sink到mysql镜像实例db2的镜像表test,和表status做同步,但status表要怎么操作呢?如何保证有序?我目前能实现单表,确实方便,求助,多表的怎么做有序同步?CREATE
> TABLE test (`id` INT,`name` VARCHAR(255),`time` TIMESTAMP(3),`status`
> INT,PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='kafka',
> 'topic'='test', 'properties.group.id'='c_mysql_binlog_postgres',
> 'properties.bootstrap.servers'='localhost:9092',
> 'scan.startup.mode'='earliest-offset', 'format'='canal-json',
> 'canal-json.ignore-parse-errors'='true');CREATE TABLE status (`status`
> INT,`name` VARCHAR(255),PRIMARY KEY(name) NOT ENFORCED ) WITH (
> 'connector'='kafka', 'topic'='test',
> 'properties.group.id'='c_mysql_binlog_postgres',
> 'properties.bootstrap.servers'='localhost:9092',
> 'scan.startup.mode'='earliest-offset', 'format'='canal-json',
> 'canal-json.ignore-parse-errors'='true');
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 如何在Flink SQL中使用周期性水印?

2020-07-07 Thread Jark Wu
如果所有 partition 都没有数据,还希望 watermark 往前走,那 idle source 确实解决不了这个问题。
目前确实没有太好的解决办法。

Best,
Jark

On Wed, 8 Jul 2020 at 11:08, 1193216154 <1193216...@qq.com> wrote:

> hi Jark Wu.
>
> 我的理解是table.exec.source.idle-timeout只能解决watermark对齐的时候去忽略某个没有watermark的并行度。但是在每个并行度都没有watermark的时候,还是无法更新watermark。
> 我觉得题主的意思应该是,在kafka的所有分区都没有数据的时候,最后一个窗口无法触发(因为没有watermark大于最后那个窗口结束时间了)。
> 有没有可以设置在eventTime情况下,周期性生成当前时间的一个waterMark(和数据无关),因为可能没有新数据到来了。
>
>
>
>
> --原始邮件--
> 发件人:"Jark Wu" 发送时间:2020年7月7日(星期二) 晚上6:09
> 收件人:"user-zh"
> 主题:Re: 如何在Flink SQL中使用周期性水印?
>
>
>
> Hi,
>
> 这个问题我理解其实和周期性水印没有关系,是属于 idle source
> 的问题,你可以尝试下加上配置 table.exec.source.idle-timeout = 10s 能不能解决你的问题。[1]
>
> Best,
> Jark
>
> [1]:
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout
>
> On Tue, 7 Jul 2020 at 17:35, noake 
>  Dear All:
> 
> 
>  大佬们, 请教下如何在Flink SQL中使用周期性的水印。
>  我们在消费kafka时, 想设置在没有数据时水印时间也能继续向前走, 用的是Flink SQL。


Re: FlinkKafkaProducer没有写入多个topic的功能

2020-07-07 Thread Leonard Xu
Hi,

夏帅的方案是ok的,因为Kafka 默认支持写入topic不存在时自动创建[1], 
这个配置是默认开启的,所以只用实现下自定义KafkaSerializationSchema就可以满足你的需求。

祝好,
Leonard Xu

[1] 
https://docs.confluent.io/current/installation/configuration/broker-configs.html#auto.create.topics.enable
 


> 在 2020年7月8日,11:08,flink小猪 <18579099...@163.com> 写道:
> 
> 兄弟,感谢
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-08 11:04:40,"夏帅"  写道:
> 
> 你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景
> class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] {
> override def serialize(element: DemoBean, timestamp: lang.Long): 
> ProducerRecord[Array[Byte], Array[Byte]] = {
> new ProducerRecord[Array[Byte], Array[Byte]](element.getTopic, 
> element.getValue)
>  }
> }
> --
> 发件人:18579099...@163.com <18579099...@163.com>
> 发送时间:2020年7月8日(星期三) 10:59
> 收件人:user-zh 
> 主 题:FlinkKafkaProducer没有写入多个topic的功能
> 
> 
> 我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
> 但是FlinkKafkaProducer好像只能写入一个主题里面?
> 
> 
> 
> 18579099...@163.com
> 
> 



Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-07 Thread Rui Li
Congratulations! Thanks Zhijiang & Piotr for the hard work.

On Tue, Jul 7, 2020 at 10:06 PM Zhijiang  wrote:

> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.11.0, which is the latest major release.
>
> Apache Flink® is an open-source stream processing framework for distributed,
> high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements for
> this new major release:
> https://flink.apache.org/news/2020/07/06/release-1.11.0.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346364
>
> We would like to thank all contributors of the Apache Flink community who made
> this release possible!
>
> Cheers,
> Piotr & Zhijiang
>


-- 
Best regards!
Rui Li


Re: flink 1.10.1 入 hive 格式为parquet

2020-07-07 Thread Rui Li
只要你的hive目标表创建为Parquet格式就行了哈,INSERT语句上跟其他类型的表没有区别的

On Tue, Jul 7, 2020 at 10:05 AM lydata  wrote:

>  Hi,
>
> 可以提供一份flink1.10 入hive格式为parquet的例子吗?
>
> Best,
> lydata



-- 
Best regards!
Rui Li


Re: 作业升级到flink1.11,idea运行失败

2020-07-07 Thread Yangze Guo
尝试加一下这个依赖
groupId: org.apache.flink
artifactId: flink-clients_${scala.binary.version}

Best,
Yangze Guo

On Wed, Jul 8, 2020 at 11:27 AM SmileSmile  wrote:
>
>
> hi
>
> 作业的依赖从1.10.1升级到1.11.0,在idea运行的时候报错
>
> Exception in thread "main" java.lang.IllegalStateException: No 
> ExecutorFactory found to execute the application.
>  at 
> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1803)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713)
>  at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment()
>
> 是哪里出问题了呢
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制


Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-07 Thread Wesley

Nice news. Congrats!


Leonard Xu wrote:

Congratulations!

Thanks Zhijiang and Piotr for the great work, and thanks everyone involved!

Best,
Leonard Xu



作业升级到flink1.11,idea运行失败

2020-07-07 Thread SmileSmile

hi

作业的依赖从1.10.1升级到1.11.0,在idea运行的时候报错

Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory 
found to execute the application.
 at 
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1803)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713)
 at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment()

是哪里出问题了呢
| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

Re: Check pointing for simple pipeline

2020-07-07 Thread Yun Tang
Hi Prasanna

Using incremental checkpoint is always better than not as this is faster and 
less memory consumed.
However, incremental checkpoint is only supported by RocksDB state-backend.


Best
Yun Tang

From: Prasanna kumar 
Sent: Tuesday, July 7, 2020 20:43
To: d...@flink.apache.org ; user 
Subject: Check pointing for simple pipeline

Hi ,

I have pipeline. Source-> Map(JSON transform)-> Sink..

Both source and sink are Kafka.

What is the best checkpoint ing mechanism?

 Is setting checkpoints incremental a good option? What should be careful of?

I am running it on aws emr.

Will checkpoint slow the speed?

Thanks,
Prasanna.


Re: Re:[ANNOUNCE] Apache Flink 1.11.0 released

2020-07-07 Thread Yun Tang
 Congratulations to every who involved and thanks for Zhijiang and Piotr's work 
as release manager.

From: chaojianok 
Sent: Wednesday, July 8, 2020 10:51
To: Zhijiang 
Cc: dev ; user@flink.apache.org ; 
announce 
Subject: Re:[ANNOUNCE] Apache Flink 1.11.0 released


Congratulations!

Very happy to make some contributions to Flink!





At 2020-07-07 22:06:05, "Zhijiang"  wrote:

The Apache Flink community is very happy to announce the release of Apache 
Flink 1.11.0, which is the latest major release.

Apache Flink® is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this new major release:
https://flink.apache.org/news/2020/07/06/release-1.11.0.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346364

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Cheers,
Piotr & Zhijiang






Re:回复:FlinkKafkaProducer没有写入多个topic的功能

2020-07-07 Thread flink小猪
兄弟,感谢
















在 2020-07-08 11:04:40,"夏帅"  写道:

你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景
class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] {
override def serialize(element: DemoBean, timestamp: lang.Long): 
ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord[Array[Byte], Array[Byte]](element.getTopic, element.getValue)
  }
}
--
发件人:18579099...@163.com <18579099...@163.com>
发送时间:2020年7月8日(星期三) 10:59
收件人:user-zh 
主 题:FlinkKafkaProducer没有写入多个topic的功能


我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
但是FlinkKafkaProducer好像只能写入一个主题里面?



18579099...@163.com




Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-07 Thread Leonard Xu
Congratulations!

Thanks Zhijiang and Piotr for the great work, and thanks everyone involved!

Best,
Leonard Xu



回复: 求助:FLINKSQL1.10实时统计累计UV

2020-07-07 Thread x
您说的这种方式,V1.10.1 不支持吧,我看参数只有一个String类型的
void sqlUpdate(String stmt);




--原始邮件--
发件人:"seeksst"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
 gt; gt; x 35907418@qq.comamp;gt; 于2020年7月6日周一 上午11:15写道: gt; gt; amp;gt; 
版本是1.10.1,最后sink的时候确实是一个window里面做count gt; amp;gt; 
distinct操作。请问是只要计算过程中含有一个window里面做count gt; amp;gt; gt; 
distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupamp;amp;nbsp;DATE_FORMAT(rowtm,
 gt; amp;gt; '-MM-dd') 这个sql对应的状态很大。代码如下: gt; amp;gt; val rt_totaluv_view : 
Table = tabEnv.sqlQuery( gt; amp;gt;amp;nbsp;amp;nbsp; """ gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT MAX(DATE_FORMAT(rowtm, 
'-MM-dd gt; HH:mm:00')) gt; amp;gt; time_str,COUNT(DISTINCT userkey) uv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM source gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY DATE_FORMAT(rowtm, 
'-MM-dd') gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) gt; amp;gt; gt; 
amp;gt; val totaluvTmp = gt; 
tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) gt; 
amp;gt;amp;nbsp;amp;nbsp; .filter( line =amp;amp;gt; line._1 == true ).map( 
line gt; =amp;amp;gt; line._2 ) gt; amp;gt; gt; amp;gt; val totaluvTabTmp = 
tabEnv.fromDataStream( totaluvTmp ) gt; amp;gt; gt; amp;gt; tabEnv.sqlUpdate( 
gt; amp;gt;amp;nbsp;amp;nbsp; s""" gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; INSERT INTO mysql_totaluv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT _1,MAX(_2) gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM $totaluvTabTmp gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY _1 gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
--amp;amp;nbsp;原始邮件amp;amp;nbsp;-- gt; amp;gt; 
发件人:amp;amp;nbsp;"Benchao Li"libenchao@apache.orgamp;amp;gt;; gt; amp;gt; 
发送时间:amp;amp;nbsp;2020年7月3日(星期五) 晚上9:47 gt; amp;gt; 
收件人:amp;amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;amp;gt;; gt; amp;gt; gt; 
amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; gt; amp;gt; gt; 
amp;gt; gt; amp;gt; 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, 
gt; amp;gt; 这个已经在1.11中修复了。 gt; amp;gt; gt; amp;gt; [1] 
https://issues.apache.org/jira/browse/FLINK-17942 gt; amp;gt; gt; amp;gt; x 
35907418@qq.comamp;amp;gt; 于2020年7月3日周五 下午4:34写道: gt; amp;gt; gt; amp;gt; 
amp;amp;gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, gt; amp;gt; amp;amp;gt; 
gt; amp;gt; amp;amp;gt; gt; amp;gt; gt; 
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
 gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
--amp;amp;amp;nbsp;原始邮件amp;amp;amp;nbsp;-- gt; 
amp;gt; amp;amp;gt; 发件人:amp;amp;amp;nbsp;"Jark 
Wu"imjark@gmail.comamp;amp;amp;gt;; gt; amp;gt; amp;amp;gt; 
发送时间:amp;amp;amp;nbsp;2020年6月18日(星期四) 中午12:16 gt; amp;gt; amp;amp;gt; 
收件人:amp;amp;amp;nbsp;"user-zh"user-zh@flink.apache.org gt; amp;amp;amp;gt;; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 主题:amp;amp;amp;nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 是的,我觉得这样子是能绕过的。 gt; amp;gt; 
amp;amp;gt; gt; amp;gt; amp;amp;gt; On Thu, 18 Jun 2020 at 10:34, x 
35907418@qq.comamp;amp;amp;gt; gt; wrote: gt; amp;gt; amp;amp;gt; gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; val resTmpTab: Table = tabEnv.sqlQuery( gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp; """ gt; amp;gt; 
amp;amp;gt; gt; 
amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;
 SELECT gt; amp;gt; MAX(DATE_FORMAT(ts, '-MM-dd gt; amp;gt; amp;amp;gt; 
HH:mm:00')) gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; time_str,COUNT(DISTINCT 
userkey) uv gt; amp;gt; amp;amp;gt; gt; 
amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;
 FROM gt; amp;gt; 
user_behavioramp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp; GROUP BY gt; 
amp;gt; amp;amp;gt; DATE_FORMAT(ts, gt; 
'-MM-dd')amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp; """) gt; 
amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; val 
gt; amp;gt; resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) gt; 
amp;gt; amp;amp;gt; amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp; gt; 
amp;gt; amp;amp;gt; gt; 
.filter(line=amp;amp;amp;amp;gt;line._1==true).map(line=amp;amp;amp;amp;gt;line._2)
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; 
amp;amp;amp;gt; val res= tabEnv.fromDataStream(resTmpStream) gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; tabEnv.sqlUpdate( gt; amp;gt; amp;amp;gt; 
amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp; s""" gt; amp;gt; amp;amp;gt; 
gt; 

?????? ??????Flink SQL?????????????????

2020-07-07 Thread 1193216154
hi Jark Wu.
??table.exec.source.idle-timeoutwatermarkwatermarkwatermarkwatermark??
??kafka??watermark??
eventTimewaterMark(??),??




----
??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout

On Tue, 7 Jul 2020 at 17:35, noake 

回复:FlinkKafkaProducer没有写入多个topic的功能

2020-07-07 Thread 夏帅
你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景
class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] {
  override def serialize(element: DemoBean, timestamp: lang.Long): 
ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord[Array[Byte], Array[Byte]](element.getTopic, 
element.getValue)
  }
}
--
发件人:18579099...@163.com <18579099...@163.com>
发送时间:2020年7月8日(星期三) 10:59
收件人:user-zh 
主 题:FlinkKafkaProducer没有写入多个topic的功能

我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
但是FlinkKafkaProducer好像只能写入一个主题里面?



18579099...@163.com



Flink Hadoop????

2020-07-07 Thread Z-Z
Hi?? ??Flink 
1.10.0??jobmanager??libflink-shaded-hadoop-2-uber-2.7.5-10.0.jar??webuicli??Could
 not find a file system implementation for scheme 'hdfs'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded.??

FlinkKafkaProducer没有写入多个topic的功能

2020-07-07 Thread 18579099...@163.com
我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
但是FlinkKafkaProducer好像只能写入一个主题里面?



18579099...@163.com


Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-07 Thread Jingsong Li
Congratulations!

Thanks Zhijiang and Piotr as release managers, and thanks everyone.

Best,
Jingsong

On Wed, Jul 8, 2020 at 10:51 AM chaojianok  wrote:

> Congratulations!
>
> Very happy to make some contributions to Flink!
>
>
>
>
>
> At 2020-07-07 22:06:05, "Zhijiang"  wrote:
>
> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.11.0, which is the latest major release.
>
> Apache Flink® is an open-source stream processing framework for distributed,
> high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements for
> this new major release:
> https://flink.apache.org/news/2020/07/06/release-1.11.0.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346364
>
> We would like to thank all contributors of the Apache Flink community who made
> this release possible!
>
> Cheers,
> Piotr & Zhijiang
>
>
>
>
>


-- 
Best, Jingsong Lee


Re:[ANNOUNCE] Apache Flink 1.11.0 released

2020-07-07 Thread chaojianok
Congratulations! 

Very happy to make some contributions to Flink!













At 2020-07-07 22:06:05, "Zhijiang"  wrote:

The Apache Flink community is very happy to announce the release of Apache 
Flink 1.11.0, which is the latest major release.


Apache Flink® is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.


The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this new major release:
https://flink.apache.org/news/2020/07/06/release-1.11.0.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346364

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Cheers,
Piotr & Zhijiang

Re: How to ensure that job is restored from savepoint when using Flink SQL

2020-07-07 Thread shadowell


Hi Fabian,


Thanks for your information!
Actually, I am not clear about the mechanism of auto-generated IDs in Flink SQL 
and the mechanism of how does the operator state mapping back from savepoint.
I hope to get some detail information by giving an example bellow.


I have two sql as samples:
old sql : select id, name, sum(salary) from user_info where id == '001' group 
by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;
new sql:   select id, name, sum(salary) from user_info where id == '001' and 
age >= '28' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name; 
I just add some age limitation in new SQL. Now, I want to switch the job from 
old one to the new one by trigger a savepoint. Flink will generate operator IDs 
for operators in new SQL.
In this case, just from a technical point of view,  the operator IDs in the 
savepoint of the old SQL job can match the operator IDs in the new SQL job?
My understanding is that Flink will reorder the operators and generate new IDs 
for operators. The new IDs may not match the old IDs. 
This will cause some states failed to be mapped back from the old job 
savepoint, which naturally leads to inaccurate calculation results.
I wonder if my understanding is correct.


Thanks~ 
Jie


| |
shadowell
|
|
shadow...@126.com
|
签名由网易邮箱大师定制
On 7/7/2020 17:23,Fabian Hueske wrote:
Hi Jie Feng,


As you said, Flink translates SQL queries into streaming programs with 
auto-generated operator IDs.
In order to start a SQL query from a savepoint, the operator IDs in the 
savepoint must match the IDs in the newly translated program.
Right now this can only be guaranteed if you translate the same query with the 
same Flink version (optimizer changes might change the structure of the 
resulting plan even if the query is the same).
This is of course a significant limitation, that the community is aware of and 
planning to improve in the future.


I'd also like to add that it can be very difficult to assess whether it is 
meaningful to start a query from a savepoint that was generated with a 
different query.
A savepoint holds intermediate data that is needed to compute the result of a 
query.
If you update a query it is very well possible that the result computed by 
Flink won't be equal to the actual result of the new query.



Best, Fabian



Am Mo., 6. Juli 2020 um 10:50 Uhr schrieb shadowell :



Hello, everyone,
I have some unclear points when using Flink SQL. I hope to get an 
answer or tell me where I can find the answer.
When using the DataStream API, in order to ensure that the job can 
recover the state from savepoint after adjustment, it is necessary to specify 
the uid for the operator. However, when using Flink SQL, the uid of the 
operator is automatically generated. If the SQL logic changes (operator order 
changes), when the task is restored from savepoint, will it cause some of the 
operator states to be unable to be mapped back, resulting in state loss?


Thanks~
Jie Feng 
| |
shadowell
|
|
shadow...@126.com
|
签名由网易邮箱大师定制

Re: 嵌套 json 中string 数组的解析异常

2020-07-07 Thread Jun Zou
Hi,
感谢您的指导!

祝好!

Leonard Xu  于2020年7月7日周二 下午9:49写道:

> Hi,
>
> 看了下代码,这确实是Flink 1.9里面的一个bug[1], 原因没有 source 没有正确处理legacy type 和新的
> type,这个issue没有在1.9的分支上修复,可以升级到1.10.1试下。
>
> 祝好,
> Leonard Xu
> [1]https://issues.apache.org/jira/browse/FLINK-16622 <
> https://issues.apache.org/jira/browse/FLINK-16622?focusedCommentId=17061790=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17061790
> >
>
>


State??????guava Cache

2020-07-07 Thread op
 ValueState[Cache]??value 



map??cacheputupdatestate??cache??1

flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 Thread jindy_liu
场景:canal解析binlog后,将db1实例内的多张表(表数据有关联)的变化发送到kafka的单topic,单分区中,从而保证有序;   
若我想做数据同步至另一个mysql实例db2中,怎么用flink sql操作多张表,同时保证表与表之间有序呢?  
例如mysql实例db1中有表test, statusCREATE TABLE `test` (  `id` int(11) NOT NULL, 
`name` varchar(255) NOT NULL,  `time` datetime NOT NULL,  `status` int(11)
NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8CREATE
TABLE `status` (  `status` int(11) NOT NULL,  `name` varchar(255) NOT NULL, 
PRIMARY KEY (`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8比如,我用flink
sql,可以设置对应的一张test表,然后sink到mysql镜像实例db2的镜像表test,和表status做同步,但status表要怎么操作呢?如何保证有序?我目前能实现单表,确实方便,求助,多表的怎么做有序同步?CREATE
TABLE test (`id` INT,`name` VARCHAR(255),`time` TIMESTAMP(3),`status`
INT,PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='kafka',
'topic'='test', 'properties.group.id'='c_mysql_binlog_postgres',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset', 'format'='canal-json',
'canal-json.ignore-parse-errors'='true');CREATE TABLE status (`status`
INT,`name` VARCHAR(255),PRIMARY KEY(name) NOT ENFORCED ) WITH (
'connector'='kafka', 'topic'='test',
'properties.group.id'='c_mysql_binlog_postgres',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset', 'format'='canal-json',
'canal-json.ignore-parse-errors'='true');



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-07 Thread Yangze Guo
Thanks, Zhijiang and Piotr. Congrats to everyone involved!

Best,
Yangze Guo

On Wed, Jul 8, 2020 at 10:19 AM Jark Wu  wrote:
>
> Congratulations!
> Thanks Zhijiang and Piotr for the great work as release manager, and thanks
> everyone who makes the release possible!
>
> Best,
> Jark
>
> On Wed, 8 Jul 2020 at 10:12, Paul Lam  wrote:
>
> > Finally! Thanks for Piotr and Zhijiang being the release managers, and
> > everyone that contributed to the release!
> >
> > Best,
> > Paul Lam
> >
> > 2020年7月7日 22:06,Zhijiang  写道:
> >
> > The Apache Flink community is very happy to announce the release of
> > Apache Flink 1.11.0, which is the latest major release.
> >
> > Apache Flink® is an open-source stream processing framework for distributed,
> > high-performing, always-available, and accurate data streaming
> > applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Please check out the release blog post for an overview of the improvements 
> > for
> > this new major release:
> > https://flink.apache.org/news/2020/07/06/release-1.11.0.html
> >
> > The full release notes are available in Jira:
> >
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346364
> >
> > We would like to thank all contributors of the Apache Flink community who 
> > made
> > this release possible!
> >
> > Cheers,
> > Piotr & Zhijiang
> >
> >
> >


Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-07 Thread Jark Wu
Congratulations!
Thanks Zhijiang and Piotr for the great work as release manager, and thanks
everyone who makes the release possible!

Best,
Jark

On Wed, 8 Jul 2020 at 10:12, Paul Lam  wrote:

> Finally! Thanks for Piotr and Zhijiang being the release managers, and
> everyone that contributed to the release!
>
> Best,
> Paul Lam
>
> 2020年7月7日 22:06,Zhijiang  写道:
>
> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.11.0, which is the latest major release.
>
> Apache Flink® is an open-source stream processing framework for distributed,
> high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements for
> this new major release:
> https://flink.apache.org/news/2020/07/06/release-1.11.0.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346364
>
> We would like to thank all contributors of the Apache Flink community who made
> this release possible!
>
> Cheers,
> Piotr & Zhijiang
>
>
>


Re: [Third-party Tool] Flink memory calculator

2020-07-07 Thread Yangze Guo
Hi, there,

As Flink 1.11.0 released, we provide a new calculator[1] for this
version. Feel free to try it and any feedback or suggestion is
welcomed!

[1] 
https://github.com/KarmaGYZ/flink-memory-calculator/blob/master/calculator-1.11.sh

Best,
Yangze Guo

On Wed, Apr 1, 2020 at 9:45 PM Yangze Guo  wrote:
>
> @Marta
> Thanks for the tip! I'll do that.
>
> Best,
> Yangze Guo
>
> On Wed, Apr 1, 2020 at 8:05 PM Marta Paes Moreira  wrote:
> >
> > Hey, Yangze.
> >
> > I'd like to suggest that you submit this tool to Flink Community Pages [1]. 
> > That way it can get more exposure and it'll be easier for users to find it.
> >
> > Thanks for your contribution!
> >
> > [1] https://flink-packages.org/
> >
> > On Tue, Mar 31, 2020 at 9:09 AM Yangze Guo  wrote:
> >>
> >> Hi, there.
> >>
> >> In the latest version, the calculator supports dynamic options. You
> >> could append all your dynamic options to the end of "bin/calculator.sh
> >> [-h]".
> >> Since "-tm" will be deprecated eventually, please replace it with
> >> "-Dtaskmanager.memory.process.size=".
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Mon, Mar 30, 2020 at 12:57 PM Xintong Song  
> >> wrote:
> >> >
> >> > Hi Jeff,
> >> >
> >> > I think the purpose of this tool it to allow users play with the memory 
> >> > configurations without needing to actually deploy the Flink cluster or 
> >> > even have a job. For sanity checks, we currently have them in the 
> >> > start-up scripts (for standalone clusters) and resource managers (on 
> >> > K8s/Yarn/Mesos).
> >> >
> >> > I think it makes sense do the checks earlier, i.e. on the client side. 
> >> > But I'm not sure if JobListener is the right place. IIUC, JobListener is 
> >> > invoked before submitting a specific job, while the mentioned checks 
> >> > validate Flink's cluster level configurations. It might be okay for a 
> >> > job cluster, but does not cover the scenarios of session clusters.
> >> >
> >> > Thank you~
> >> >
> >> > Xintong Song
> >> >
> >> >
> >> >
> >> > On Mon, Mar 30, 2020 at 12:03 PM Yangze Guo  wrote:
> >> >>
> >> >> Thanks for your feedbacks, @Xintong and @Jeff.
> >> >>
> >> >> @Jeff
> >> >> I think it would always be good to leverage exist logic in Flink, such
> >> >> as JobListener. However, this calculator does not only target to check
> >> >> the conflict, it also targets to provide the calculating result to
> >> >> user before the job is actually deployed in case there is any
> >> >> unexpected configuration. It's a good point that we need to parse the
> >> >> dynamic configs. I prefer to parse the dynamic configs and cli
> >> >> commands in bash instead of adding hook in JobListener.
> >> >>
> >> >> Best,
> >> >> Yangze Guo
> >> >>
> >> >> On Mon, Mar 30, 2020 at 10:32 AM Jeff Zhang  wrote:
> >> >> >
> >> >> > Hi Yangze,
> >> >> >
> >> >> > Does this tool just parse the configuration in flink-conf.yaml ?  
> >> >> > Maybe it could be done in JobListener [1] (we should enhance it via 
> >> >> > adding hook before job submission), so that it could all the cases 
> >> >> > (e.g. parameters coming from command line)
> >> >> >
> >> >> > [1] 
> >> >> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L35
> >> >> >
> >> >> >
> >> >> > Yangze Guo  于2020年3月30日周一 上午9:40写道:
> >> >> >>
> >> >> >> Hi, Yun,
> >> >> >>
> >> >> >> I'm sorry that it currently could not handle it. But I think it is a
> >> >> >> really good idea and that feature would be added to the next version.
> >> >> >>
> >> >> >> Best,
> >> >> >> Yangze Guo
> >> >> >>
> >> >> >> On Mon, Mar 30, 2020 at 12:21 AM Yun Tang  wrote:
> >> >> >> >
> >> >> >> > Very interesting and convenient tool, just a quick question: could 
> >> >> >> > this tool also handle deployment cluster commands like "-tm" mixed 
> >> >> >> > with configuration in `flink-conf.yaml` ?
> >> >> >> >
> >> >> >> > Best
> >> >> >> > Yun Tang
> >> >> >> > 
> >> >> >> > From: Yangze Guo 
> >> >> >> > Sent: Friday, March 27, 2020 18:00
> >> >> >> > To: user ; user...@flink.apache.org 
> >> >> >> > 
> >> >> >> > Subject: [Third-party Tool] Flink memory calculator
> >> >> >> >
> >> >> >> > Hi, there.
> >> >> >> >
> >> >> >> > In release-1.10, the memory setup of task managers has changed a 
> >> >> >> > lot.
> >> >> >> > I would like to provide here a third-party tool to simulate and get
> >> >> >> > the calculation result of Flink's memory configuration.
> >> >> >> >
> >> >> >> >  Although there is already a detailed setup guide[1] and migration
> >> >> >> > guide[2] officially, the calculator could further allow users to:
> >> >> >> > - Verify if there is any conflict in their configuration. The
> >> >> >> > calculator is more lightweight than starting a Flink cluster,
> >> >> >> > especially when running Flink on Yarn/Kubernetes. User could make 
> >> >> >> > sure
> >> >> >> > their configuration is correct locally before deploying it to 
> >> >> >> > external
> 

Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-07 Thread Dian Fu
Thanks Piotr and Zhijiang for the great work and everyone who contributed to 
this release!

Regards,
Dian

> 在 2020年7月8日,上午10:12,Paul Lam  写道:
> 
> Finally! Thanks for Piotr and Zhijiang being the release managers, and 
> everyone that contributed to the release!
> 
> Best,
> Paul Lam
> 
>> 2020年7月7日 22:06,Zhijiang > > 写道:
>> 
>> The Apache Flink community is very happy to announce the release of Apache 
>> Flink 1.11.0, which is the latest major release.
>> 
>> Apache Flink® is an open-source stream processing framework for distributed, 
>> high-performing, always-available, and accurate data streaming applications.
>> 
>> The release is available for download at:
>> https://flink.apache.org/downloads.html 
>> 
>> 
>> Please check out the release blog post for an overview of the improvements 
>> for this new major release:
>> https://flink.apache.org/news/2020/07/06/release-1.11.0.html 
>> 
>> 
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346364
>>  
>> 
>> 
>> We would like to thank all contributors of the Apache Flink community who 
>> made this release possible!
>> 
>> Cheers,
>> Piotr & Zhijiang
> 



Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-07 Thread Paul Lam
Finally! Thanks for Piotr and Zhijiang being the release managers, and everyone 
that contributed to the release!

Best,
Paul Lam

> 2020年7月7日 22:06,Zhijiang  写道:
> 
> The Apache Flink community is very happy to announce the release of Apache 
> Flink 1.11.0, which is the latest major release.
> 
> Apache Flink® is an open-source stream processing framework for distributed, 
> high-performing, always-available, and accurate data streaming applications.
> 
> The release is available for download at:
> https://flink.apache.org/downloads.html 
> 
> 
> Please check out the release blog post for an overview of the improvements 
> for this new major release:
> https://flink.apache.org/news/2020/07/06/release-1.11.0.html 
> 
> 
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346364
>  
> 
> 
> We would like to thank all contributors of the Apache Flink community who 
> made this release possible!
> 
> Cheers,
> Piotr & Zhijiang



Re: Manual allocation of slot usage

2020-07-07 Thread Xintong Song
Hi Mu,
Regarding your questions.

   - The feature `spread out tasks evenly across task managers` is
   introduced in Flink 1.10.0, and backported to Flink 1.9.2, per the JIRA
   ticket [1]. That means if you configure this option in Flink 1.9.0, it
   should not take any effect.
   - Please be aware that this feature ATM only works for standalone
   deployment (including standalone Kubernetes deployment). For the native
   Kubernetes, Yarn and Mesos deployment, it is a known issue that this
   feature does not work as expected.
   - Regarding the scheduling behavior changes, we would need more
   information to explain this. To provide the information needed, the easiest
   way is probably to provide the jobmanager log files, if you're okay with
   sharing them. If you cannot share the logs, then it would be better to
   answer the following questions
  - What Flink deployment are you using? (Standalone/K8s/Yarn/Mesos)
  - How many times have you tried with and without
  `cluster.evenly-spread-out-slots`? In other words, the described
behaviors
  before and after setting `cluster.evenly-spread-out-slots`, can they be
  stably reproduced?
  - How many TMs do you have? And how many slots does each TM has?


Thank you~

Xintong Song


[1] https://issues.apache.org/jira/browse/FLINK-12122

On Tue, Jul 7, 2020 at 8:33 PM Mu Kong  wrote:

> Hi, Guo,
>
> Thanks for helping out.
>
> My application has a kafka source with 60 subtasks(parallelism), and we
> have 15 task managers with 15 slots on each.
>
> *Before I applied the cluster.evenly-spread-out-slots,* meaning it is set
> to default false, the operator 'kafka source" has 11 subtasks allocated in
> one single task manager,
> while the remaining 49 subtasks of "kafka source" distributed to the
> remaining 14 task managers.
>
> *After I set cluster.evenly-spread-out-slots to true*, the 60 subtasks of
> "kafka source" were allocated to only 4 task managers, and they took 15
> slots on each of these 4 TMs.
>
> What I thought is that this config will make the subtasks of one operator
> more evenly spread among the task managers, but it seems it made them
> allocated in the same task manager as much as possible.
>
> The version I'm deploying is 1.9.0.
>
> Best regards,
> Mu
>
> On Tue, Jul 7, 2020 at 7:10 PM Yangze Guo  wrote:
>
>> Hi, Mu,
>>
>> IIUC, cluster.evenly-spread-out-slots would fulfill your demand. Why
>> do you think it does the opposite of what you want. Do you run your
>> job in active mode? If so, cluster.evenly-spread-out-slots might not
>> work very well because there could be insufficient task managers when
>> request slot from ResourceManager. This has been discussed in
>> https://issues.apache.org/jira/browse/FLINK-12122 .
>>
>>
>> Best,
>> Yangze Guo
>>
>> On Tue, Jul 7, 2020 at 5:44 PM Mu Kong  wrote:
>> >
>> > Hi community,
>> >
>> > I'm running an application to consume data from kafka, and process it
>> then put data to the druid.
>> > I wonder if there is a way where I can allocate the data source
>> consuming process evenly across the task manager to maximize the usage of
>> the network of task managers.
>> >
>> > So, for example, I have 15 task managers and I set parallelism for the
>> kafka source as 60, since I have 60 partitions in kafka topic.
>> > What I want is flink cluster will put 4 kafka source subtasks on each
>> task manager.
>> >
>> > Is that possible? I have gone through the document, the only thing we
>> found is
>> >
>> > cluster.evenly-spread-out-slots
>> >
>> > which does exact the opposite of what I want. It will put the subtasks
>> of the same operator onto one task manager as much as possible.
>> >
>> > So, is some kind of manual resource allocation available?
>> > Thanks in advance!
>> >
>> >
>> > Best regards,
>> > Mu
>>
>


Re: Manual allocation of slot usage

2020-07-07 Thread Yangze Guo
Hi, Mu,

AFAIK, this feature is added to 1.9.2. If you use 1.9.0, would you
like to upgrade your Flink distribution?

Best,
Yangze Guo

On Tue, Jul 7, 2020 at 8:33 PM Mu Kong  wrote:
>
> Hi, Guo,
>
> Thanks for helping out.
>
> My application has a kafka source with 60 subtasks(parallelism), and we have 
> 15 task managers with 15 slots on each.
>
> Before I applied the cluster.evenly-spread-out-slots, meaning it is set to 
> default false, the operator 'kafka source" has 11 subtasks allocated in one 
> single task manager,
> while the remaining 49 subtasks of "kafka source" distributed to the 
> remaining 14 task managers.
>
> After I set cluster.evenly-spread-out-slots to true, the 60 subtasks of 
> "kafka source" were allocated to only 4 task managers, and they took 15 slots 
> on each of these 4 TMs.
>
> What I thought is that this config will make the subtasks of one operator 
> more evenly spread among the task managers, but it seems it made them 
> allocated in the same task manager as much as possible.
>
> The version I'm deploying is 1.9.0.
>
> Best regards,
> Mu
>
> On Tue, Jul 7, 2020 at 7:10 PM Yangze Guo  wrote:
>>
>> Hi, Mu,
>>
>> IIUC, cluster.evenly-spread-out-slots would fulfill your demand. Why
>> do you think it does the opposite of what you want. Do you run your
>> job in active mode? If so, cluster.evenly-spread-out-slots might not
>> work very well because there could be insufficient task managers when
>> request slot from ResourceManager. This has been discussed in
>> https://issues.apache.org/jira/browse/FLINK-12122 .
>>
>>
>> Best,
>> Yangze Guo
>>
>> On Tue, Jul 7, 2020 at 5:44 PM Mu Kong  wrote:
>> >
>> > Hi community,
>> >
>> > I'm running an application to consume data from kafka, and process it then 
>> > put data to the druid.
>> > I wonder if there is a way where I can allocate the data source consuming 
>> > process evenly across the task manager to maximize the usage of the 
>> > network of task managers.
>> >
>> > So, for example, I have 15 task managers and I set parallelism for the 
>> > kafka source as 60, since I have 60 partitions in kafka topic.
>> > What I want is flink cluster will put 4 kafka source subtasks on each task 
>> > manager.
>> >
>> > Is that possible? I have gone through the document, the only thing we 
>> > found is
>> >
>> > cluster.evenly-spread-out-slots
>> >
>> > which does exact the opposite of what I want. It will put the subtasks of 
>> > the same operator onto one task manager as much as possible.
>> >
>> > So, is some kind of manual resource allocation available?
>> > Thanks in advance!
>> >
>> >
>> > Best regards,
>> > Mu


Re: flink 1.11 connector jdbc 依赖解析失败

2020-07-07 Thread Leonard Xu
Hello,

我看下了maven仓库里有的[1], 官网文档里也有下载链接[2],是不是pom里的依赖没有写对?1.11 jdbc connector 的module名从 
flink-jdbc 规范到了 flink-connector-jdbc。

祝好,
Leonard Xu

[1] 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.11.0/
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
 



> 在 2020年7月8日,08:15,Zhou Zach  写道:
> 
> hi all,
> flink升级到1.11,flink-connector-jdbc 
> idea解析失败,去maven仓库查也没查到,请问是不是要手动编译1.11的源码的方式安装依赖的
> 



回复:【Flink的shuffle mode】

2020-07-07 Thread 夏帅
补充: 1.11的shuffle-mode配置的默认值为ALL_EDGES_BLOCKING
共有
ALL_EDGES_BLOCKING(等同于batch)
FORWARD_EDGES_PIPELINEDPOINTWISE_EDGES_PIPELINED
ALL_EDGES_PIPELINED(等同于pipelined)对于pipelined多出了两种选择


--
发件人:忝忝向仧 <153488...@qq.com>
发送时间:2020年7月7日(星期二) 23:37
收件人:user-zh 
主 题:回复: 【Flink的shuffle mode】

如果是批的模式,怎么在应用程序里面指定shuffle_mode呢?
另外,下面提到如果是流的计算,一定是pipeline模式.
那为什么我使用datastream做keyby流操作后,跟踪源码它的mode是UNDEFINED呢?
谢谢.




--原始邮件--
发件人:"Jingsong Li"

回复:【Flink的shuffle mode】

2020-07-07 Thread 夏帅
你好:
问题1,指定shuffle_mode
tEnv.getConfig.getConfiguration.setString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
 "pipeline")
问题2,mode是UNDEFINED的概念
使用UNDEFINED并不是说模式没有定义,而是由框架自己决定
The shuffle mode is undefined. It leaves it up to the framework to decide the 
shuffle mode.



--
发件人:忝忝向仧 <153488...@qq.com>
发送时间:2020年7月7日(星期二) 23:37
收件人:user-zh 
主 题:回复: 【Flink的shuffle mode】

如果是批的模式,怎么在应用程序里面指定shuffle_mode呢?
另外,下面提到如果是流的计算,一定是pipeline模式.
那为什么我使用datastream做keyby流操作后,跟踪源码它的mode是UNDEFINED呢?
谢谢.




--原始邮件--
发件人:"Jingsong Li"

Re:Re: Re: Re: Re: flink 1.11 作业执行异常

2020-07-07 Thread sunfulin
hi, noake
感谢分享。我加了这个依赖后也OK了。周知下大家。

















在 2020-07-07 22:15:05,"noake"  写道:
>我在1.11.0中遇到了同样的问题, pom中加了下面的依赖就没解决了
>dependency
> groupIdorg.apache.flink/groupId
> artifactIdflink-clients_${scala.binary.version}/artifactId
> version${flink.version}/version
>/dependency
>
>
>原始邮件
>发件人:Congxian qiuqcx978132...@gmail.com
>收件人:user-zhuser...@flink.apache.org
>抄送:Jark wuimj...@gmail.com; Jun zhangzhangjunemail...@gmail.com
>发送时间:2020年7月7日(周二) 19:35
>主题:Re: Re: Re: Re: flink 1.11 作业执行异常
>
>
>Hi 从这个报错看上去是尝试通过 serviceLoader 加载一些 factory 的时候出错了(找不到),可以看看对应的 module 的 
>resources 文件下是否有对应的 resource 文件 Best, Congxian sunfulin sunfulin0...@163.com 
>于2020年7月7日周二 下午6:29写道: hi,  我的pom文件本地执行时,scope的provided都是去掉的。  dependency  
>groupIdorg.apache.flink/groupId   
>artifactIdflink-table-planner-blink_${scala.binary.version}/artifactId  
>version${flink.version}/version  /dependency
>确实比较诡异。org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>  这个异常在啥情况下会触发到。在 2020-07-07 18:10:58,"Jark Wu" 
>imj...@gmail.com 写道:  如果是在 IDEA 中运行的话,你看看 blink planner 这个依赖的 scope 是不是被 
>provided 掉了? 去掉  provided  再试试看?Best,  JarkOn Tue, 7 Jul 2020 at 
>18:01, sunfulin sunfulin0...@163.com wrote: hi,   @Jun Zhang 我一直使用的就是blink 
>planner,这个jar包一直都有的。 @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?   
>在 2020-07-07 15:40:17,"Jark Wu" imj...@gmail.com 
>写道:   Hi,  你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?  Best,   Jark  On Tue, 7 
>Jul 2020 at 15:31, Jun Zhang zhangjunemail...@gmail.com   wrote:   
>hi.sunfulin你有没有导入blink的planner呢,加入这个试试   dependency
>groupIdorg.apache.flink/groupId   
>artifactIdflink-table-planner-blink_${scala.binary.version}/artifactId
>version${flink.version}/version/dependency  sunfulin 
>sunfulin0...@163.com 于2020年7月7日周二 下午3:21写道:hi, jark
>我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
>configuration里的DeployOptions.TARGET
>(execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。  
>//构建StreamExecutionEnvironmentpublic static final 
>StreamExecutionEnvironment env =
>StreamExecutionEnvironment.getExecutionEnvironment();   
>//构建EnvironmentSettings 并指定Blink Plannerprivate static final 
>EnvironmentSettings bsSettings =   
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); 
>  //构建StreamTableEnvironmentpublic static final StreamTableEnvironment 
>tEnv =StreamTableEnvironment.create(env, bsSettings);   
>tEnv.executeSql(“ddl sql”);//source注册成表   
>tEnv.createTemporaryView("test", ds, $("f0").as("id"),$("f1").as("first"), 
>$("p").proctime());//join语句   Table table = 
>tEnv.sqlQuery("select b.* from test a left  joinmy_dim FOR SYSTEM_TIME AS 
>OF a.p AS b on a.first = b.userId");//输出   
>tEnv.toAppendStream(table,  Row.class).print("LookUpJoinJob");
>env.execute("LookUpJoinJob");在 2020-07-06 
>14:59:17,"Jark Wu" imj...@gmail.com 写道:能分享下复现的作业代码不?Best,Jark  
>  On Mon, 6 Jul 2020 at 11:00, sunfulin sunfulin0...@163.com  wrote:   
>  Hi, 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常: 
>org.apache.flink.table.api.TableExecution: Failed to execute sql 
>caused by : java.lang.IlleagalStateException: No ExecutorFactory   foundto 
>execute the application. at   
>org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。


Re: Heterogeneous or Dynamic Stream Processing

2020-07-07 Thread Rob Shepherd
Thank you for the excellent clarifications.
I couldn't quite figure out how to map the above to my domain.

Nevertheless i have a working demo that performs the following pseudo code:

Let's say that each "channel" has slightly different stream requirements
and we can look up the list of operations needed to be performed using a
channel key.
(an operation is our term for some element of processing, a FaaS call or
local routine maybe)

1. extract channel key from incoming message
2. lookup channel info and enrich the stream object with channel info and a
list of operations
3. i...n using the iterative stream API, loop around each operation in the
list from (2).
4. sink

https://gist.github.com/robshep/bf38b7753062e9d49d365e505e86385e#file-dynamicstramjob-java-L52

I've some work to do to understand storing and retrieving state, as my demo
just stores the loop-state in my main stream object - I don't know whether
this is bad or bad-practice.

I'd be really grateful if anyone can cast their eye on this little demo and
see if there are any gotchas or pitfalls I'm likely to succumb to with this
approach.

With thanks for your help getting started



Rob Shepherd BEng PhD



On Tue, 7 Jul 2020 at 19:26, Arvid Heise  wrote:

> Hi Rob,
>
> 1. When you start a flink application, you actually just execute a Java
> main called the driver. This driver submits a job graph to the job manager,
> which executes the job. Since the driver is an ordinary Java program that
> uses the Flink API, you can compose the job graph in any way you want. Have
> a look at one example to see what I mean [1]. It's not hard to imagine that
> you can compose a query such as
>
> List extractorQueries = new ArrayList<>();
> Table table = tableEnvironment.from("testCatalog.`default`.testTable");
> Table errors = tableEnvironment.fromValues();
> for (int index = 0; index < extractorQueries.size(); index++) {
>String extractorQuery = extractorQueries.get(index);
>table = table.addColumns($(extractorQuery).as("extractedValue" + index, 
> "error"));
>errors = errors.unionAll(table.filter($("error").isNotNull()));
>table = table.filter($("error").isNull()).dropColumns($("error"));
> }
> // write table and errors
>
> This query first loads the data from a testTable and then successively
> applies sql expressions that calculate one value + one error column each.
> The value is stored in extractedValue0...99 (assuming 100 extractor
> queries). All values that cause errors, will have a value in the error
> column set. These are collected in the table "errors" for side output (very
> useful for debugging and improving the extractor queries). All good records
> (error IS NULL) are retained for further processing and the error column
> gets dropped.
>
> Btw there is also a Python entry point available, which offers you more or
> less the same. I just haven't tried it yet. [2]
>
> Lastly, currently all extractors are executed in succession. Of course, it
> is also possible to run them independently if you have different source
> streams. You can then later join / union them.
>
> 2. The downside of this driver approach is that changes in the
> configuration are not directly reflected. However, upon restart Flink will
> adapt the changes and recover from the last checkpoint [3] (= snapshot of
> the current processing state, which can be done every second in your case
> as the state is rather small). So now you just need to find a way to force
> a restart.
>
> One approach is to kill it manually and start again, but that's not
> scaling well. However, Flink's fault tolerance feature can be somewhat
> exploited: You can have one part of your program fail on config change,
> which will restart the whole application automatically if configured
> correctly and thus using the latest configuration.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java#L77-L100
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/python_udfs.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/stateful-stream-processing.html#checkpointing
>
> On Tue, Jul 7, 2020 at 6:12 PM Rob Shepherd  wrote:
>
>> Very helpful thank you Arvid.
>>
>> I've been reading up but I'm not sure I grasp all of that just yet.
>> Please may I ask for clarification?
>>
>> 1. Could I summarise correctly that I may build a list of functions from
>> an SQL call which can then be looped over?
>> This looping sounds appealing and you are right that "1 or 100" is a big
>> bonus.
>>
>> 2. "during the start of the application and restart to reflect changes"
>> "during the start" do you mean when the job first boots, or immediately
>> upon ingress of the data event from the queue?
>> "restart" is this an API call to maybe abort an execution of a piece of
>> data but with more up-to-date context.
>>
>>
>> Trying to be a fast learner, 

flink 1.11 connector jdbc 依赖解析失败

2020-07-07 Thread Zhou Zach
hi all,
flink升级到1.11,flink-connector-jdbc 
idea解析失败,去maven仓库查也没查到,请问是不是要手动编译1.11的源码的方式安装依赖的



TaskManager docker image for Beam WordCount failing with ClassNotFound Exception

2020-07-07 Thread Avijit Saha
Hi,
I am trying the run the Beam WordCount example on Flink runner using docker
container-s for 'Jobcluster' and 'TaskManager'.

When I put the Beam Wordcount custom jar in the /opt/flink/usrlib/ dir -
the 'taskmanager' docker image  fails at runtime with ClassNotFound
Exception for the following:
Caused by: java.lang.ClassNotFoundException:
org.apache.beam.runners.core.metrics.MetricUpdates$MetricUpdate:
taskmanager_1  | Caused by: java.lang.ClassNotFoundException:
org.apache.beam.runners.core.metrics.MetricUpdates$MetricUpdate
taskmanager_1  |at
java.net.URLClassLoader.findClass(URLClassLoader.java:382)
taskmanager_1  |at
java.lang.ClassLoader.loadClass(ClassLoader.java:424)
taskmanager_1  |at
org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
taskmanager_1  |at
java.lang.ClassLoader.loadClass(ClassLoader.java:357)
taskmanager_1  |... 68 more

If I  instead put the Beam WordCount jar in the "/opt/flink-1.10.1/lib" dir
as follows,
$ ls
flink-dist_2.12-1.10.1.jar
flink-table-blink_2.12-1.10.1.jar
flink-table_2.12-1.10.1.jar
log4j-1.2.17.jar
 slf4j-log4j12-1.7.15.jar
 word-count-beam-bundled-0.1.jar

It runs without any Exception!

Is this the expected behavior? Do we need to always bundle the job-jar in
the same lib location as other flink jars?


Re: Heterogeneous or Dynamic Stream Processing

2020-07-07 Thread Arvid Heise
Hi Rob,

1. When you start a flink application, you actually just execute a Java
main called the driver. This driver submits a job graph to the job manager,
which executes the job. Since the driver is an ordinary Java program that
uses the Flink API, you can compose the job graph in any way you want. Have
a look at one example to see what I mean [1]. It's not hard to imagine that
you can compose a query such as

List extractorQueries = new ArrayList<>();
Table table = tableEnvironment.from("testCatalog.`default`.testTable");
Table errors = tableEnvironment.fromValues();
for (int index = 0; index < extractorQueries.size(); index++) {
   String extractorQuery = extractorQueries.get(index);
   table = table.addColumns($(extractorQuery).as("extractedValue" +
index, "error"));
   errors = errors.unionAll(table.filter($("error").isNotNull()));
   table = table.filter($("error").isNull()).dropColumns($("error"));
}
// write table and errors

This query first loads the data from a testTable and then successively
applies sql expressions that calculate one value + one error column each.
The value is stored in extractedValue0...99 (assuming 100 extractor
queries). All values that cause errors, will have a value in the error
column set. These are collected in the table "errors" for side output (very
useful for debugging and improving the extractor queries). All good records
(error IS NULL) are retained for further processing and the error column
gets dropped.

Btw there is also a Python entry point available, which offers you more or
less the same. I just haven't tried it yet. [2]

Lastly, currently all extractors are executed in succession. Of course, it
is also possible to run them independently if you have different source
streams. You can then later join / union them.

2. The downside of this driver approach is that changes in the
configuration are not directly reflected. However, upon restart Flink will
adapt the changes and recover from the last checkpoint [3] (= snapshot of
the current processing state, which can be done every second in your case
as the state is rather small). So now you just need to find a way to force
a restart.

One approach is to kill it manually and start again, but that's not scaling
well. However, Flink's fault tolerance feature can be somewhat exploited:
You can have one part of your program fail on config change, which will
restart the whole application automatically if configured correctly and
thus using the latest configuration.

[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java#L77-L100
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/python_udfs.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/stateful-stream-processing.html#checkpointing

On Tue, Jul 7, 2020 at 6:12 PM Rob Shepherd  wrote:

> Very helpful thank you Arvid.
>
> I've been reading up but I'm not sure I grasp all of that just yet.
> Please may I ask for clarification?
>
> 1. Could I summarise correctly that I may build a list of functions from
> an SQL call which can then be looped over?
> This looping sounds appealing and you are right that "1 or 100" is a big
> bonus.
>
> 2. "during the start of the application and restart to reflect changes"
> "during the start" do you mean when the job first boots, or immediately
> upon ingress of the data event from the queue?
> "restart" is this an API call to maybe abort an execution of a piece of
> data but with more up-to-date context.
>
>
> Trying to be a fast learner, and very grateful for the pointers.
>
> With thanks and best regards
>
> Rob
>
>
>
>
> Rob Shepherd BEng PhD
>
>
>
> On Tue, 7 Jul 2020 at 15:33, Arvid Heise  wrote:
>
>> Hi Rob,
>>
>> In the past I used a mixture of configuration and template queries to
>> achieve a similar goal (I had only up to 150 of these jobs per
>> application). My approach was not completely dynamic as you have described
>> but rather to compose a big query from a configuration during the start of
>> the application and restart to reflect changes.
>>
>> For the simple extractor/mapper, I'd use Table API and plug in SQL
>> statements [1] that could be easily given by experienced
>> end-users/analysts. Abort logic should be added programmatically to each of
>> the extractor/mapper through Table API (for example, extractor can output
>> an error column that also gives an explanation and this column is then
>> checked for non-null). The big advantage of using Table API over a simple
>> SQL query is that you can add structural variance: your application may use
>> 1 extractor or 100; it's just a matter of a loop.
>>
>> Note that async IO is currently not available in Table API, but you can
>> easily switch back and forth between Table API and Datastream. I'd
>> definitely suggest to use async IO for your described use cases.
>>
>> So please consider to also use that less 

FlinkKinesisProducer blocking ?

2020-07-07 Thread Vijay Balakrishnan
Hi,
current setup.

Kinesis stream 1 -> Kinesis Analytics Flink -> Kinesis stream 2
|
> Firehose Delivery stream

Curl eror:
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
 - [2020-07-02 15:22:32.203053] [0x07f4][0x7ffbced15700] [error]
[AWS Log: ERROR](CurlHttpClient)Curl returned error code 28

But I am still seeing tons of the curl 28 error. I use parallelism of 80
for the Sink to Kinesis Data stream(KDS). Which seems to point to KDS being
pounded with too many requests - the 80(parallelism) * 10(ThreadPool size)
= 800 requests. Is my understanding correct ? So, maybe reduce the 80
parallelism ??
*I still don't understand why the logs are stuck with just
FlinkKInesisProducer for around 4s(blocking calls???) *with the rest of the
Flink Analytics application not producing any logs while this happens.
*I noticed that the FlinkKInesisProducer took about 3.785secs, 3.984s,
4.223s in between other application logs in Kibana when the Kinesis
GetIterator Age peaked*. It seemed like FlinkKinesisProducer was blocking
for that long when the Flink app was not able to generate any other logs.

Looked at this:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure

Could use this:
producerConfig.put("RequestTimeout", "1");//from 6000

But doesn't really solve the problem when trying to maintain a real time
processing system.

TIA


Re: Heterogeneous or Dynamic Stream Processing

2020-07-07 Thread Rob Shepherd
Very helpful thank you Arvid.

I've been reading up but I'm not sure I grasp all of that just yet.  Please
may I ask for clarification?

1. Could I summarise correctly that I may build a list of functions from an
SQL call which can then be looped over?
This looping sounds appealing and you are right that "1 or 100" is a big
bonus.

2. "during the start of the application and restart to reflect changes"
"during the start" do you mean when the job first boots, or immediately
upon ingress of the data event from the queue?
"restart" is this an API call to maybe abort an execution of a piece of
data but with more up-to-date context.


Trying to be a fast learner, and very grateful for the pointers.

With thanks and best regards

Rob




Rob Shepherd BEng PhD



On Tue, 7 Jul 2020 at 15:33, Arvid Heise  wrote:

> Hi Rob,
>
> In the past I used a mixture of configuration and template queries to
> achieve a similar goal (I had only up to 150 of these jobs per
> application). My approach was not completely dynamic as you have described
> but rather to compose a big query from a configuration during the start of
> the application and restart to reflect changes.
>
> For the simple extractor/mapper, I'd use Table API and plug in SQL
> statements [1] that could be easily given by experienced
> end-users/analysts. Abort logic should be added programmatically to each of
> the extractor/mapper through Table API (for example, extractor can output
> an error column that also gives an explanation and this column is then
> checked for non-null). The big advantage of using Table API over a simple
> SQL query is that you can add structural variance: your application may use
> 1 extractor or 100; it's just a matter of a loop.
>
> Note that async IO is currently not available in Table API, but you can
> easily switch back and forth between Table API and Datastream. I'd
> definitely suggest to use async IO for your described use cases.
>
> So please consider to also use that less dynamic approach; you'd get much
> for free: SQL support with proper query validation and meaningful error
> messages. And it's also much easier to test/debug.
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#sql
>
> On Tue, Jul 7, 2020 at 4:01 PM Rob Shepherd  wrote:
>
>> Hi All,
>>
>> It'd be great to consider stream processing as a platform for our
>> upcoming projects. Flink seems to be the closeted match.
>>
>> However we have numerous stream processing workloads and would want to be
>> able to scale up to 1000's different streams;  each quite similar in
>> structure/sequence but with the functional logic being very different in
>> each.
>>
>> For example, there is always a "validate" stage - but what that means is
>> dependant on the client/data/context etc and would typically map to a few
>> line of script to perform.
>>
>> In essence, our sequences can often be deconstructed down to 8-12 python
>> snippets and the serverless/functional paradigm seems to fit well.
>>
>> Whilst we can deploy our functions readily to a faas/k8s or something
>> (which seems to fit the bill with remote functions) I don't yet see how to
>> quickly draw these together in a dynamic stream.
>>
>> My initial thoughts would be to create a very general purpose stream job
>> that then works through the context of mapping functions to flink tasks
>> based on the client dataset.
>>
>> E.g. some pseudo code:
>>
>> ingress()
>> extract()
>> getDynamicStreamFunctionDefs()
>> getFunction1()
>> runFunction1()
>> abortOnError()
>> getFunction2()
>> runFunction2()
>> abortOnError()
>> ...
>> getFunction10()
>> runFunction10()
>> sinkData()
>>
>> Most functions are not however simple lexical operations, or
>> extractors/mappers - but on the whole require a few database/API calls to
>> retrieve things like previous data, configurations etc.
>>
>> They are not necessarily long running but certainly Async is a
>> consideration.
>>
>> I think every stage will be UDFs (and then Meta-UDFs at that)
>>
>> As a result I'm not sure if we can get this to fit without a brittle set
>> of workarounds, and ruin any benefit of running through flink etc...
>> but it would great to hear opinions of others who might have tackled this
>> kind of dynamic tasking.
>>
>>
>> I'm happy to explain this better if it isn't clear.
>>
>> With best regards
>>
>> Rob
>>
>>
>>
>>
>> Rob Shepherd BEng PhD
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


回复:Flink 1.11 SQL作业中调用UDTF 出现“No match found for function signature ”异常

2020-07-07 Thread 邹云鹤
好的




| |
邹云鹤
|
|
邮箱:kevinyu...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月07日 23:27,Benchao Li 写道:
我感觉这应该是新版本的udf的bug,我在本地也可以复现。
已经建了一个issue[1] 来跟进。

[1] https://issues.apache.org/jira/browse/FLINK-18520

邹云鹤  于2020年7月7日周二 下午9:43写道:

>
>
> hi all
> 本人基于Flink 1.11 SNAPSHOT 在 Flink sql 作业中使用 UDTF, UDTF 的定义如下:
>
>
> @FunctionHint(
> input = {@DataTypeHint("STRING"), @DataTypeHint("STRING")},
> output = @DataTypeHint("STRING")
> )
> public class Split extends TableFunction {
>   public Split(){}
>   public void eval(String str, String ch) {
> if (str == null || str.isEmpty()) {
>   return;
> } else {
>   String[] ss = str.split(ch);
>   for (String s : ss) {
> collect(s);
>   }
> }
>   }
> }
>
>
> 在flink sql中通过 create function splitByChar as '**.**.Split'
> 来创建这个function,在tableEnv 中调用executeSql() 来完成对这个 function的注册,在sql
> 后面的计算逻辑中 通过以下方式来调用这个UDTF
> create view view_source_1 as select `dateTime,`itime`,
> lng,lat,net,event_info, cast(split_index(T.s, '_', 0) as int) as time_page
> from view_source as a left join LATERAL TABLE (splitByChar('a,b,c',',')) as
> T(s) on true;
>
>
> 结果一直出现以下错误信息:
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 3, column 25 to line 3, column 47: No match found for function
> signature splitByChar(, )
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:629)
> 
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 3, column 25 to line 3, column 47: r(, )
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
> at
> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
> at
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
> at
> 

?????? ??Flink??shuffle mode??

2020-07-07 Thread ????????
??shuffle_mode???
pipeline.
??datastream??keyby??mode??UNDEFINED???
.




----
??:"Jingsong Li"

Re: Flink 1.11 SQL作业中调用UDTF 出现“No match found for function signature ”异常

2020-07-07 Thread Benchao Li
我感觉这应该是新版本的udf的bug,我在本地也可以复现。
已经建了一个issue[1] 来跟进。

[1] https://issues.apache.org/jira/browse/FLINK-18520

邹云鹤  于2020年7月7日周二 下午9:43写道:

>
>
> hi all
> 本人基于Flink 1.11 SNAPSHOT 在 Flink sql 作业中使用 UDTF, UDTF 的定义如下:
>
>
> @FunctionHint(
> input = {@DataTypeHint("STRING"), @DataTypeHint("STRING")},
> output = @DataTypeHint("STRING")
> )
> public class Split extends TableFunction {
>   public Split(){}
>   public void eval(String str, String ch) {
> if (str == null || str.isEmpty()) {
>   return;
> } else {
>   String[] ss = str.split(ch);
>   for (String s : ss) {
> collect(s);
>   }
> }
>   }
> }
>
>
> 在flink sql中通过 create function splitByChar as '**.**.Split'
> 来创建这个function,在tableEnv 中调用executeSql() 来完成对这个 function的注册,在sql
> 后面的计算逻辑中 通过以下方式来调用这个UDTF
> create view view_source_1 as select `dateTime,`itime`,
> lng,lat,net,event_info, cast(split_index(T.s, '_', 0) as int) as time_page
> from view_source as a left join LATERAL TABLE (splitByChar('a,b,c',',')) as
> T(s) on true;
>
>
> 结果一直出现以下错误信息:
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 3, column 25 to line 3, column 47: No match found for function
> signature splitByChar(, )
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:629)
> 
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 3, column 25 to line 3, column 47: r(, )
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
> at
> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
> at
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> 

Re: Decompressing Tar Files for Batch Processing

2020-07-07 Thread Austin Cawley-Edwards
On Tue, Jul 7, 2020 at 10:53 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey Xiaolong,
>
> Thanks for the suggestions. Just to make sure I understand, are you saying
> to run the download and decompression in the Job Manager before executing
> the job?
>
> I think another way to ensure the tar file is not downloaded more than
> once is a source w/ parallelism 1. The issue I can't get past is after
> decompressing the tarball, how would I pass those OutputStreams for each
> entry through Flink?
>
> Best,
> Austin
>
>
>
> On Tue, Jul 7, 2020 at 5:56 AM Xiaolong Wang 
> wrote:
>
>> It seems like to me that it can not be done by Flink, for code will be
>> run across all task managers. That way, there will be multiple downloads of
>> you tar file, which is unnecessary.
>>
>> However, you can do it  on your code before initializing a Flink runtime,
>> and the code will be run only on the client side.
>>
>> On Tue, Jul 7, 2020 at 7:31 AM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> I need to ingest a tar file containing ~1GB of data in around 10 CSVs.
>>> The data is fairly connected and needs some cleaning, which I'd like to do
>>> with the Batch Table API + SQL (but have never used before). I've got a
>>> small prototype loading the uncompressed CSVs and applying the necessary
>>> SQL, which works well.
>>>
>>> I'm wondering about the task of downloading the tar file and unzipping
>>> it into the CSVs. Does this sound like something I can/ should do in Flink,
>>> or should I set up another process to download, unzip, and store in a
>>> filesystem to then read with the Flink Batch job? My research is leading me
>>> towards doing it separately but I'd like to do it all in the same job if
>>> there's a creative way.
>>>
>>> Thanks!
>>> Austin
>>>
>>


Re: Decompressing Tar Files for Batch Processing

2020-07-07 Thread Austin Cawley-Edwards
Hey Chesnay,

Thanks for the advice, and easy enough to do it in a separate process.

Best,
Austin

On Tue, Jul 7, 2020 at 10:29 AM Chesnay Schepler  wrote:

> I would probably go with a separate process.
>
> Downloading the file could work with Flink if it is already present in
> some supported filesystem. Decompressing the file is supported for
> selected formats (deflate, gzip, bz2, xz), but this seems to be an
> undocumented feature, so I'm not sure how usable it is in reality.
>
> On 07/07/2020 01:30, Austin Cawley-Edwards wrote:
> > Hey all,
> >
> > I need to ingest a tar file containing ~1GB of data in around 10 CSVs.
> > The data is fairly connected and needs some cleaning, which I'd like
> > to do with the Batch Table API + SQL (but have never used before).
> > I've got a small prototype loading the uncompressed CSVs and applying
> > the necessary SQL, which works well.
> >
> > I'm wondering about the task of downloading the tar file and unzipping
> > it into the CSVs. Does this sound like something I can/ should do in
> > Flink, or should I set up another process to download, unzip, and
> > store in a filesystem to then read with the Flink Batch job? My
> > research is leading me towards doing it separately but I'd like to do
> > it all in the same job if there's a creative way.
> >
> > Thanks!
> > Austin
>
>
>


DataStream????uv????

2020-07-07 Thread ?g???U?[????



  
DataStream??apiUV??2
  
1Tumbling??1Time.days(1)??uv
 
trigger
  
2stateTTL


DataStream

Re: Heterogeneous or Dynamic Stream Processing

2020-07-07 Thread Arvid Heise
Hi Rob,

In the past I used a mixture of configuration and template queries to
achieve a similar goal (I had only up to 150 of these jobs per
application). My approach was not completely dynamic as you have described
but rather to compose a big query from a configuration during the start of
the application and restart to reflect changes.

For the simple extractor/mapper, I'd use Table API and plug in SQL
statements [1] that could be easily given by experienced
end-users/analysts. Abort logic should be added programmatically to each of
the extractor/mapper through Table API (for example, extractor can output
an error column that also gives an explanation and this column is then
checked for non-null). The big advantage of using Table API over a simple
SQL query is that you can add structural variance: your application may use
1 extractor or 100; it's just a matter of a loop.

Note that async IO is currently not available in Table API, but you can
easily switch back and forth between Table API and Datastream. I'd
definitely suggest to use async IO for your described use cases.

So please consider to also use that less dynamic approach; you'd get much
for free: SQL support with proper query validation and meaningful error
messages. And it's also much easier to test/debug.


https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#sql

On Tue, Jul 7, 2020 at 4:01 PM Rob Shepherd  wrote:

> Hi All,
>
> It'd be great to consider stream processing as a platform for our upcoming
> projects. Flink seems to be the closeted match.
>
> However we have numerous stream processing workloads and would want to be
> able to scale up to 1000's different streams;  each quite similar in
> structure/sequence but with the functional logic being very different in
> each.
>
> For example, there is always a "validate" stage - but what that means is
> dependant on the client/data/context etc and would typically map to a few
> line of script to perform.
>
> In essence, our sequences can often be deconstructed down to 8-12 python
> snippets and the serverless/functional paradigm seems to fit well.
>
> Whilst we can deploy our functions readily to a faas/k8s or something
> (which seems to fit the bill with remote functions) I don't yet see how to
> quickly draw these together in a dynamic stream.
>
> My initial thoughts would be to create a very general purpose stream job
> that then works through the context of mapping functions to flink tasks
> based on the client dataset.
>
> E.g. some pseudo code:
>
> ingress()
> extract()
> getDynamicStreamFunctionDefs()
> getFunction1()
> runFunction1()
> abortOnError()
> getFunction2()
> runFunction2()
> abortOnError()
> ...
> getFunction10()
> runFunction10()
> sinkData()
>
> Most functions are not however simple lexical operations, or
> extractors/mappers - but on the whole require a few database/API calls to
> retrieve things like previous data, configurations etc.
>
> They are not necessarily long running but certainly Async is a
> consideration.
>
> I think every stage will be UDFs (and then Meta-UDFs at that)
>
> As a result I'm not sure if we can get this to fit without a brittle set
> of workarounds, and ruin any benefit of running through flink etc...
> but it would great to hear opinions of others who might have tackled this
> kind of dynamic tasking.
>
>
> I'm happy to explain this better if it isn't clear.
>
> With best regards
>
> Rob
>
>
>
>
> Rob Shepherd BEng PhD
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Decompressing Tar Files for Batch Processing

2020-07-07 Thread Chesnay Schepler

I would probably go with a separate process.

Downloading the file could work with Flink if it is already present in 
some supported filesystem. Decompressing the file is supported for 
selected formats (deflate, gzip, bz2, xz), but this seems to be an 
undocumented feature, so I'm not sure how usable it is in reality.


On 07/07/2020 01:30, Austin Cawley-Edwards wrote:

Hey all,

I need to ingest a tar file containing ~1GB of data in around 10 CSVs. 
The data is fairly connected and needs some cleaning, which I'd like 
to do with the Batch Table API + SQL (but have never used before). 
I've got a small prototype loading the uncompressed CSVs and applying 
the necessary SQL, which works well.


I'm wondering about the task of downloading the tar file and unzipping 
it into the CSVs. Does this sound like something I can/ should do in 
Flink, or should I set up another process to download, unzip, and 
store in a filesystem to then read with the Flink Batch job? My 
research is leading me towards doing it separately but I'd like to do 
it all in the same job if there's a creative way.


Thanks!
Austin





Flink DataStream ????UV????

2020-07-07 Thread ?g???U?[????



  
DataStream??apiUV??2
1Tumbling??1Time.days(1)??uv
trigger
2stateTTL

 
??
Jiazhi

Re: Re: Re: Re: flink 1.11 作业执行异常

2020-07-07 Thread noake
我在1.11.0中遇到了同样的问题, pom中加了下面的依赖就没解决了
dependency
 groupIdorg.apache.flink/groupId
 artifactIdflink-clients_${scala.binary.version}/artifactId
 version${flink.version}/version
/dependency


原始邮件
发件人:Congxian qiuqcx978132...@gmail.com
收件人:user-zhuser...@flink.apache.org
抄送:Jark wuimj...@gmail.com; Jun zhangzhangjunemail...@gmail.com
发送时间:2020年7月7日(周二) 19:35
主题:Re: Re: Re: Re: flink 1.11 作业执行异常


Hi 从这个报错看上去是尝试通过 serviceLoader 加载一些 factory 的时候出错了(找不到),可以看看对应的 module 的 
resources 文件下是否有对应的 resource 文件 Best, Congxian sunfulin sunfulin0...@163.com 
于2020年7月7日周二 下午6:29写道: hi,  我的pom文件本地执行时,scope的provided都是去掉的。  dependency  
groupIdorg.apache.flink/groupId   
artifactIdflink-table-planner-blink_${scala.binary.version}/artifactId  
version${flink.version}/version  /dependency
确实比较诡异。org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
  这个异常在啥情况下会触发到。在 2020-07-07 18:10:58,"Jark Wu" 
imj...@gmail.com 写道:  如果是在 IDEA 中运行的话,你看看 blink planner 这个依赖的 scope 是不是被 
provided 掉了? 去掉  provided  再试试看?Best,  JarkOn Tue, 7 Jul 2020 at 18:01, 
sunfulin sunfulin0...@163.com wrote: hi,   @Jun Zhang 我一直使用的就是blink 
planner,这个jar包一直都有的。 @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?
   在 2020-07-07 15:40:17,"Jark Wu" imj...@gmail.com 写道: 
  Hi,  你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?  Best,   Jark  On Tue, 7 Jul 
2020 at 15:31, Jun Zhang zhangjunemail...@gmail.com   wrote:   hi.sunfulin  
  你有没有导入blink的planner呢,加入这个试试   dependency
groupIdorg.apache.flink/groupId   
artifactIdflink-table-planner-blink_${scala.binary.version}/artifactId
version${flink.version}/version/dependency  sunfulin 
sunfulin0...@163.com 于2020年7月7日周二 下午3:21写道:hi, jark
我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
configuration里的DeployOptions.TARGET
(execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。  
//构建StreamExecutionEnvironmentpublic static final 
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();   
//构建EnvironmentSettings 并指定Blink Plannerprivate static final 
EnvironmentSettings bsSettings =   
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();  
 //构建StreamTableEnvironmentpublic static final StreamTableEnvironment 
tEnv =StreamTableEnvironment.create(env, bsSettings);   
tEnv.executeSql(“ddl sql”);//source注册成表   
tEnv.createTemporaryView("test", ds, $("f0").as("id"),$("f1").as("first"), 
$("p").proctime());//join语句   Table table = 
tEnv.sqlQuery("select b.* from test a left  joinmy_dim FOR SYSTEM_TIME AS 
OF a.p AS b on a.first = b.userId");//输出   
tEnv.toAppendStream(table,  Row.class).print("LookUpJoinJob");
env.execute("LookUpJoinJob");在 2020-07-06 
14:59:17,"Jark Wu" imj...@gmail.com 写道:能分享下复现的作业代码不?Best,Jark   
 On Mon, 6 Jul 2020 at 11:00, sunfulin sunfulin0...@163.com  wrote: 
Hi, 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常: 
org.apache.flink.table.api.TableExecution: Failed to execute sql 
caused by : java.lang.IlleagalStateException: No ExecutorFactory   foundto  
   execute the application. at   
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。

[ANNOUNCE] Apache Flink 1.11.0 released

2020-07-07 Thread Zhijiang
The Apache Flink community is very happy to announce the release of Apache 
Flink 1.11.0, which is the latest major release.

Apache Flink(r) is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this new major release:
https://flink.apache.org/news/2020/07/06/release-1.11.0.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346364

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Cheers,
Piotr & Zhijiang

Re: Timeout when using RockDB to handle large state in a stream app

2020-07-07 Thread Felipe Gutierrez
I figured out that for my stream job the best was just to use the
default MemoryStateBackend. I load a table from a file of 725MB in a
UDF. I am also not using Flink ListState since I don't have to change
the values of this table. i only do a lookup.

The only thing that I need was more memory for the TM and a bit larger
timeout. Currently, my configurations are these. I am not sure if
there are a better configuration
heartbeat.timeout: 10
taskmanager.memory.flink.size: 12g
taskmanager.memory.jvm-overhead.max: 4g
taskmanager.memory.jvm-metaspace.size: 2048m # default: 1024m

Another thing that is not working is this parameter that when I set it
I get an JVM argument error and the TM does not start.

taskmanager.memory.task.heap.size: 2048m # default: 1024m # Flink error

Best,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Jul 7, 2020 at 2:17 PM Yun Tang  wrote:
>
> Hi Felipe
>
> flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed cannot tell you how 
> much memory is used by RocksDB as it mallocate memory from os directly 
> instead from JVM.
>
> Moreover, I cannot totally understand why you ask how to increase the memory 
> of the JM and TM when using the PredefinedOptions.SPINNING_DISK_OPTIMIZED for 
> RocksDB.
> Did you mean how to increase the total process memory? If so, as Flink uses 
> managed memory to control RocksDB [1] by default, you could increase total 
> memory by increasing managed memory [2][3]
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-backend-rocksdb-memory-managed
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-managed-fraction
> [3] 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-managed-size
>
> Best,
> Yun Tang
>
>
>
> 
> From: Felipe Gutierrez 
> Sent: Monday, July 6, 2020 19:17
> To: Yun Tang 
> Cc: Ori Popowski ; user 
> Subject: Re: Timeout when using RockDB to handle large state in a stream app
>
> Hi all,
>
> I tested the two TPC-H query 03 [1] and 10 [2] using Datastream API on
> the cluster with RocksDB state backend. One thing that I did that
> improved a lot was to replace the List POJO to a
> List>. Then I could load a table of 200MB in memory as my
> state. However, the original table is 725MB, and turned out that I
> need another configuration. I am not sure what I can do more to reduce
> the size of my state. If one of you have an idea I am thankful to
> hear.
>
> Now, speaking about the flink-conf.yaml file and the RocksDB
> configuration. When I use these configurations on the flink-conf.yaml
> the stream job still runs out of memory.
> jobmanager.heap.size: 4g # default: 2048m
> heartbeat.timeout: 10
> taskmanager.memory.process.size: 2g # default: 1728m
>
> Then I changed for this configuration which I can set
> programmatically. The stream job seems to behave better. It starts to
> process something, then the metrics disappear for some time and appear
> again. The available and used memory on the TM
> (flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed) is 167MB. And
> the available and used memory on the JM
> (flink_jobmanager_Status_JVM_Memory_Direct_MemoryUsed) is 610KB. I
> guess the PredefinedOptions.SPINNING_DISK_OPTIMIZED configuration is
> overwriting the configuration on the flink-conf.yaml file.
>
> RocksDBStateBackend stateBackend = new RocksDBStateBackend(stateDir, true);
> stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
> env.setStateBackend(stateBackend);
>
> How can I increase the memory of the JM and TM when I am still using
> the PredefinedOptions.SPINNING_DISK_OPTIMIZED for RocksDB?
>
> [1] 
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
> [2] 
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery10.java
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, Jul 3, 2020 at 9:01 AM Felipe Gutierrez
>  wrote:
> >
> > yes. I agree. because RocsDB will spill data to disk if there is not
> > enough space in memory.
> > Thanks
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Fri, Jul 3, 2020 at 8:27 AM Yun Tang  wrote:
> > >
> > > Hi Felipe,
> > >
> > > I noticed my previous mail has a typo: RocksDB is executed in task main 
> > > thread which does not take the role to respond to heart beat. Sorry for 
> > > previous typo, and the key point I want to clarify is that RocksDB should 
> > > not have business for heartbeat problem.
> > >
> > > Best
> > > Yun Tang
> > > 
> > > From: Felipe Gutierrez 
> > > Sent: Tuesday, June 30, 2020 17:46
> > > To: Yun Tang 
> > > Cc: 

Heterogeneous or Dynamic Stream Processing

2020-07-07 Thread Rob Shepherd
Hi All,

It'd be great to consider stream processing as a platform for our upcoming
projects. Flink seems to be the closeted match.

However we have numerous stream processing workloads and would want to be
able to scale up to 1000's different streams;  each quite similar in
structure/sequence but with the functional logic being very different in
each.

For example, there is always a "validate" stage - but what that means is
dependant on the client/data/context etc and would typically map to a few
line of script to perform.

In essence, our sequences can often be deconstructed down to 8-12 python
snippets and the serverless/functional paradigm seems to fit well.

Whilst we can deploy our functions readily to a faas/k8s or something
(which seems to fit the bill with remote functions) I don't yet see how to
quickly draw these together in a dynamic stream.

My initial thoughts would be to create a very general purpose stream job
that then works through the context of mapping functions to flink tasks
based on the client dataset.

E.g. some pseudo code:

ingress()
extract()
getDynamicStreamFunctionDefs()
getFunction1()
runFunction1()
abortOnError()
getFunction2()
runFunction2()
abortOnError()
...
getFunction10()
runFunction10()
sinkData()

Most functions are not however simple lexical operations, or
extractors/mappers - but on the whole require a few database/API calls to
retrieve things like previous data, configurations etc.

They are not necessarily long running but certainly Async is a
consideration.

I think every stage will be UDFs (and then Meta-UDFs at that)

As a result I'm not sure if we can get this to fit without a brittle set of
workarounds, and ruin any benefit of running through flink etc...
but it would great to hear opinions of others who might have tackled this
kind of dynamic tasking.


I'm happy to explain this better if it isn't clear.

With best regards

Rob




Rob Shepherd BEng PhD


Re: 嵌套 json 中string 数组的解析异常

2020-07-07 Thread Leonard Xu
Hi, 

看了下代码,这确实是Flink 1.9里面的一个bug[1], 原因没有 source 没有正确处理legacy type 和新的 
type,这个issue没有在1.9的分支上修复,可以升级到1.10.1试下。

祝好,
Leonard Xu
[1]https://issues.apache.org/jira/browse/FLINK-16622 




Flink 1.11 SQL作业中调用UDTF 出现“No match found for function signature ”异常

2020-07-07 Thread 邹云鹤


hi all
本人基于Flink 1.11 SNAPSHOT 在 Flink sql 作业中使用 UDTF, UDTF 的定义如下:


@FunctionHint(
input = {@DataTypeHint("STRING"), @DataTypeHint("STRING")},
output = @DataTypeHint("STRING")
)
public class Split extends TableFunction {
  public Split(){}
  public void eval(String str, String ch) {
if (str == null || str.isEmpty()) {
  return;
} else {
  String[] ss = str.split(ch);
  for (String s : ss) {
collect(s);
  }
}
  }
} 


在flink sql中通过 create function splitByChar as '**.**.Split' 
来创建这个function,在tableEnv 中调用executeSql() 来完成对这个 function的注册,在sql 后面的计算逻辑中 
通过以下方式来调用这个UDTF 
create view view_source_1 as select `dateTime,`itime`,  lng,lat,net,event_info, 
cast(split_index(T.s, '_', 0) as int) as time_page from view_source as a left 
join LATERAL TABLE (splitByChar('a,b,c',',')) as T(s) on true;


结果一直出现以下错误信息:
org.apache.flink.table.api.ValidationException: SQL validation failed. From 
line 3, column 25 to line 3, column 47: No match found for function signature 
splitByChar(, )
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:629)

Caused by: org.apache.calcite.runtime.CalciteContextException: From line 3, 
column 25 to line 3, column 47: r(, )
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at 
org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
at 
org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
... 8 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match 
found for function signature splitByChar(, )
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 

Check pointing for simple pipeline

2020-07-07 Thread Prasanna kumar
Hi ,

I have pipeline. Source-> Map(JSON transform)-> Sink..

Both source and sink are Kafka.

What is the best checkpoint ing mechanism?

 Is setting checkpoints incremental a good option? What should be careful
of?

I am running it on aws emr.

Will checkpoint slow the speed?

Thanks,
Prasanna.


Re: Manual allocation of slot usage

2020-07-07 Thread Mu Kong
Hi, Guo,

Thanks for helping out.

My application has a kafka source with 60 subtasks(parallelism), and we
have 15 task managers with 15 slots on each.

*Before I applied the cluster.evenly-spread-out-slots,* meaning it is set
to default false, the operator 'kafka source" has 11 subtasks allocated in
one single task manager,
while the remaining 49 subtasks of "kafka source" distributed to the
remaining 14 task managers.

*After I set cluster.evenly-spread-out-slots to true*, the 60 subtasks of
"kafka source" were allocated to only 4 task managers, and they took 15
slots on each of these 4 TMs.

What I thought is that this config will make the subtasks of one operator
more evenly spread among the task managers, but it seems it made them
allocated in the same task manager as much as possible.

The version I'm deploying is 1.9.0.

Best regards,
Mu

On Tue, Jul 7, 2020 at 7:10 PM Yangze Guo  wrote:

> Hi, Mu,
>
> IIUC, cluster.evenly-spread-out-slots would fulfill your demand. Why
> do you think it does the opposite of what you want. Do you run your
> job in active mode? If so, cluster.evenly-spread-out-slots might not
> work very well because there could be insufficient task managers when
> request slot from ResourceManager. This has been discussed in
> https://issues.apache.org/jira/browse/FLINK-12122 .
>
>
> Best,
> Yangze Guo
>
> On Tue, Jul 7, 2020 at 5:44 PM Mu Kong  wrote:
> >
> > Hi community,
> >
> > I'm running an application to consume data from kafka, and process it
> then put data to the druid.
> > I wonder if there is a way where I can allocate the data source
> consuming process evenly across the task manager to maximize the usage of
> the network of task managers.
> >
> > So, for example, I have 15 task managers and I set parallelism for the
> kafka source as 60, since I have 60 partitions in kafka topic.
> > What I want is flink cluster will put 4 kafka source subtasks on each
> task manager.
> >
> > Is that possible? I have gone through the document, the only thing we
> found is
> >
> > cluster.evenly-spread-out-slots
> >
> > which does exact the opposite of what I want. It will put the subtasks
> of the same operator onto one task manager as much as possible.
> >
> > So, is some kind of manual resource allocation available?
> > Thanks in advance!
> >
> >
> > Best regards,
> > Mu
>


Re: Timeout when using RockDB to handle large state in a stream app

2020-07-07 Thread Yun Tang
Hi Felipe

flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed cannot tell you how much 
memory is used by RocksDB as it mallocate memory from os directly instead from 
JVM.

Moreover, I cannot totally understand why you ask how to increase the memory of 
the JM and TM when using the PredefinedOptions.SPINNING_DISK_OPTIMIZED for 
RocksDB.
Did you mean how to increase the total process memory? If so, as Flink uses 
managed memory to control RocksDB [1] by default, you could increase total 
memory by increasing managed memory [2][3]

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-backend-rocksdb-memory-managed
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-managed-fraction
[3] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-managed-size

Best,
Yun Tang




From: Felipe Gutierrez 
Sent: Monday, July 6, 2020 19:17
To: Yun Tang 
Cc: Ori Popowski ; user 
Subject: Re: Timeout when using RockDB to handle large state in a stream app

Hi all,

I tested the two TPC-H query 03 [1] and 10 [2] using Datastream API on
the cluster with RocksDB state backend. One thing that I did that
improved a lot was to replace the List POJO to a
List>. Then I could load a table of 200MB in memory as my
state. However, the original table is 725MB, and turned out that I
need another configuration. I am not sure what I can do more to reduce
the size of my state. If one of you have an idea I am thankful to
hear.

Now, speaking about the flink-conf.yaml file and the RocksDB
configuration. When I use these configurations on the flink-conf.yaml
the stream job still runs out of memory.
jobmanager.heap.size: 4g # default: 2048m
heartbeat.timeout: 10
taskmanager.memory.process.size: 2g # default: 1728m

Then I changed for this configuration which I can set
programmatically. The stream job seems to behave better. It starts to
process something, then the metrics disappear for some time and appear
again. The available and used memory on the TM
(flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed) is 167MB. And
the available and used memory on the JM
(flink_jobmanager_Status_JVM_Memory_Direct_MemoryUsed) is 610KB. I
guess the PredefinedOptions.SPINNING_DISK_OPTIMIZED configuration is
overwriting the configuration on the flink-conf.yaml file.

RocksDBStateBackend stateBackend = new RocksDBStateBackend(stateDir, true);
stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
env.setStateBackend(stateBackend);

How can I increase the memory of the JM and TM when I am still using
the PredefinedOptions.SPINNING_DISK_OPTIMIZED for RocksDB?

[1] 
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
[2] 
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery10.java

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, Jul 3, 2020 at 9:01 AM Felipe Gutierrez
 wrote:
>
> yes. I agree. because RocsDB will spill data to disk if there is not
> enough space in memory.
> Thanks
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, Jul 3, 2020 at 8:27 AM Yun Tang  wrote:
> >
> > Hi Felipe,
> >
> > I noticed my previous mail has a typo: RocksDB is executed in task main 
> > thread which does not take the role to respond to heart beat. Sorry for 
> > previous typo, and the key point I want to clarify is that RocksDB should 
> > not have business for heartbeat problem.
> >
> > Best
> > Yun Tang
> > 
> > From: Felipe Gutierrez 
> > Sent: Tuesday, June 30, 2020 17:46
> > To: Yun Tang 
> > Cc: Ori Popowski ; user 
> > Subject: Re: Timeout when using RockDB to handle large state in a stream app
> >
> > Hi,
> >
> > I reduced the size of the tables that I am loading on a ListState and
> > the query worked. One of them was about 700MB [1] [2].
> >
> > Now I am gonna deploy it on the cluster and check if it works. I will
> > probably need to increase the heartbeat timeout.
> >
> > Thanks,
> > Felipe
> > [1] 
> > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
> > [2] 
> > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Tue, Jun 30, 2020 at 10:51 AM Yun Tang  wrote:
> > >
> > > Hi Felipe
> > >
> > > RocksDB is executed in task main thread which does take the role to 
> > > respond to heart beat and RocksDB mainly use native memory which is 
> > > decoupled from JVM heap to not bring any GC 

Re: Stateful Functions: Deploying to existing Cluster

2020-07-07 Thread Jan Brusch

Hi Igal,

just as a feedback for you and anyone else reading this: Worked like a 
charm. Thanks again for your quick help!



Best regards

Jan


On 06.07.20 14:02, Igal Shilman wrote:

Hi Jan,

Stateful functions would look at the java class path for the module.yaml,
So one way would be including the module.yaml in your 
src/main/resources/ directory.


Good luck,
Igal.


On Mon, Jul 6, 2020 at 12:39 PM Jan Brusch > wrote:


Hi,

quick question about Deploying a Flink Stateful Functions
Application to
an existing cluster: The Documentation says to integrate
"statefun-flink-distribution" as additional maven Dependency in
the fat
jar.

(https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/deployment-and-operations/packaging.html#flink-jar)

But how and where do I upload my module.yml for external function
definitions in that scenario...?


Best regards

Jan


--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501



Re: SSL for QueryableStateClient

2020-07-07 Thread Chesnay Schepler

Queryable state does not support SSL.

On 06/07/2020 22:42, mail2so...@yahoo.co.in wrote:

Hello,

I am running flink on Kubernetes, and from outside the Ingress to a 
proxy on Kubernetes is via SSL 443 PORT only.


Can you please provide guidance on how to setup the SSL for 
/*QueryableStateClient*/, the client to inquire the state.



Please let me know if any other details is needed.

Thanks & Regards
Souma Suvra Ghosh





Re: Re: Re: Re: flink 1.11 作业执行异常

2020-07-07 Thread Congxian Qiu
Hi

从这个报错看上去是尝试通过 serviceLoader 加载一些 factory 的时候出错了(找不到),可以看看对应的 module 的
resources 文件下是否有对应的 resource 文件

Best,
Congxian


sunfulin  于2020年7月7日周二 下午6:29写道:

>
>
>
> hi,
> 我的pom文件本地执行时,scope的provided都是去掉的。
> 
> org.apache.flink
>
>  flink-table-planner-blink_${scala.binary.version}
>${flink.version}
> 
>
>
> 确实比较诡异。org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
> 这个异常在啥情况下会触发到。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-07 18:10:58,"Jark Wu"  写道:
> >如果是在 IDEA 中运行的话,你看看 blink planner 这个依赖的 scope 是不是被 provided 掉了? 去掉
> provided
> >再试试看?
> >
> >Best,
> >Jark
> >
> >On Tue, 7 Jul 2020 at 18:01, sunfulin  wrote:
> >
> >> hi,
> >>  @Jun Zhang 我一直使用的就是blink planner,这个jar包一直都有的。
> >>
> >>  @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-07 15:40:17,"Jark Wu"  写道:
> >> >Hi,
> >> >
> >> >你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?
> >> >
> >> >Best,
> >> >Jark
> >> >
> >> >On Tue, 7 Jul 2020 at 15:31, Jun Zhang 
> >> wrote:
> >> >
> >> >> hi.sunfulin
> >> >> 你有没有导入blink的planner呢,加入这个试试
> >> >>
> >> >> 
> >> >> org.apache.flink
> >> >>
> >>
> flink-table-planner-blink_${scala.binary.version}
> >> >> ${flink.version}
> >> >> 
> >> >>
> >> >>
> >> >> sunfulin  于2020年7月7日周二 下午3:21写道:
> >> >>
> >> >>>
> >> >>>
> >> >>>
> >> >>> hi, jark
> >> >>> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
> >> >>> configuration里的DeployOptions.TARGET
> >> >>> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
> >> >>>
> >> >>>
> >> >>> //构建StreamExecutionEnvironment
> >> >>> public static final StreamExecutionEnvironment env =
> >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> >> >>>
> >> >>> //构建EnvironmentSettings 并指定Blink Planner
> >> >>> private static final EnvironmentSettings bsSettings =
> >> >>>
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >> >>>
> >> >>> //构建StreamTableEnvironment
> >> >>> public static final StreamTableEnvironment tEnv =
> >> >>> StreamTableEnvironment.create(env, bsSettings);
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>tEnv.executeSql(“ddl sql”);
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> //source注册成表
> >> >>>
> >> >>> tEnv.createTemporaryView("test", ds, $("f0").as("id"),
> >> >>> $("f1").as("first"), $("p").proctime());
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> //join语句
> >> >>>
> >> >>> Table table = tEnv.sqlQuery("select b.* from test a left
> join
> >> >>> my_dim FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> //输出
> >> >>>
> >> >>> tEnv.toAppendStream(table,
> Row.class).print("LookUpJoinJob");
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> env.execute("LookUpJoinJob");
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> 在 2020-07-06 14:59:17,"Jark Wu"  写道:
> >> >>> >能分享下复现的作业代码不?
> >> >>> >
> >> >>> >Best,
> >> >>> >Jark
> >> >>> >
> >> >>> >On Mon, 6 Jul 2020 at 11:00, sunfulin 
> wrote:
> >> >>> >
> >> >>> >> Hi,
> >> >>> >> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
> >> >>> >> org.apache.flink.table.api.TableExecution: Failed to execute sql
> >> >>> >>
> >> >>> >>
> >> >>> >> caused by : java.lang.IlleagalStateException: No ExecutorFactory
> >> found
> >> >>> to
> >> >>> >> execute the application.
> >> >>> >>   at
> >> >>> >>
> >> >>>
> >>
> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
> >> >>> >>
> >> >>> >>
> >> >>> >> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
> >> >>>
> >> >>
> >>
>


Re: 嵌套 json 中string 数组的解析异常

2020-07-07 Thread Jun Zou
Hi, Leonard Xu:

我使用的 sql 如下,

> SELECT TUMBLE_START(rowtime, INTERVAL '30' SECOND) AS ts, fruit,
> COUNT(`fruit`) AS `cnt`
> FROM mysource, UNNEST(mysource.parsedResponse) AS A(fruit)
> GROUP BY TUMBLE(rowtime, INTERVAL '30' SECOND), fruit


从调试日志来看,应该是一开始就挂掉了,我贴一下相关的日志

INFO - Initializing heap keyed state backend with stream factory.

INFO - Source: Custom Source -> Timestamps/Watermarks -> from:
> (parsedResponse, rowtime) -> correlate:
> table(explode($cor0.parsedResponse)), select: parsedResponse, rowtime, f0
> -> select: (rowtime, fruit) -> time attribute: (rowtime) (1/1)
> (d8c5f92b850811595dbdc130c04f9e58) switched from RUNNING to FAILED.
> java.lang.Exception:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.CommonConsumer.run(CommonConsumer.java:49)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
> Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be
> cast to [Ljava.lang.String;
> at
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
> ... 10 more
>

另外,如果我把string 数组的类型从  BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
改为 ObjectArrayTypeInfo.getInfoFor(Types.STRING), 即schema 从

> root
>  |-- parsedResponse: LEGACY(BasicArrayTypeInfo)
>  |-- rowtime: TIMESTAMP(3) *ROWTIME*
>
变为

> root
>  |-- parsedResponse: ARRAY
>  |-- rowtime: TIMESTAMP(3) *ROWTIME*
>

也仍然会发生相同的错误,但日志执行有些不同

> INFO - Source: Custom Source -> Timestamps/Watermarks -> from:
> (parsedResponse, rowtime) -> correlate:
> table(explode($cor0.parsedResponse)), select: parsedResponse, rowtime, f0
> -> select: (rowtime, fruit) -> time attribute: (rowtime) (1/1)
> (36b79032354b9e9ab70a30d98b1de903) switched from RUNNING to FAILED.
> java.lang.Exception:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> 

Re: 一个source多个sink的同步问题

2020-07-07 Thread lgs
是1个小时才到来。10:00- 11:00的数据,11:01分到来。

但是现在的问题是这个数据来了,我的第一个sink马上就保存到数据库了, 11:02进数据库。但是第二个sink,因为有tumble
window,所以10:00- 11:00的数据,需要到12:01,才会触发这个窗口。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: Flink从SavePoint启动任务,修改的代码不生效

2020-07-07 Thread milan183sansiro
好的,感谢。


在2020年7月7日 10:28,Paul Lam 写道:
估计你是用同一个 Kafka Source 消费 A B 两个 Topic? 如果是,看起来像是 Kafka Connector 早期的一个问题。

作业停止的时候,Topic B 的 partition offset 被存储到 Savepoint 中,然后在恢复的时候,尽管代码中 Topic B 
已经被移除,但它的 partition offset 还是被恢复了。

这个问题在后来的版本,估计是 1.8 或 1.9,被修复了。

Best,
Paul Lam

2020年7月6日 20:55,milan183sansiro  写道:

你好:
1.没有给算子手动设置id
2.设置savepoint恢复的路径是正确的


在2020年7月6日 20:32,wujunxi<462329...@qq.com> 写道:
你好,确认以下两个点
1.是否给每个算子设置了id
2.设置savepoint恢复的路径是否正确



--原始邮件--
发件人:"milan183sansiro"

Re:Re: Re: Re: flink 1.11 作业执行异常

2020-07-07 Thread sunfulin



hi,
我的pom文件本地执行时,scope的provided都是去掉的。

org.apache.flink
   flink-table-planner-blink_${scala.binary.version}
   ${flink.version}



确实比较诡异。org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
 这个异常在啥情况下会触发到。














在 2020-07-07 18:10:58,"Jark Wu"  写道:
>如果是在 IDEA 中运行的话,你看看 blink planner 这个依赖的 scope 是不是被 provided 掉了? 去掉 provided
>再试试看?
>
>Best,
>Jark
>
>On Tue, 7 Jul 2020 at 18:01, sunfulin  wrote:
>
>> hi,
>>  @Jun Zhang 我一直使用的就是blink planner,这个jar包一直都有的。
>>
>>  @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-07 15:40:17,"Jark Wu"  写道:
>> >Hi,
>> >
>> >你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?
>> >
>> >Best,
>> >Jark
>> >
>> >On Tue, 7 Jul 2020 at 15:31, Jun Zhang 
>> wrote:
>> >
>> >> hi.sunfulin
>> >> 你有没有导入blink的planner呢,加入这个试试
>> >>
>> >> 
>> >> org.apache.flink
>> >>
>>  flink-table-planner-blink_${scala.binary.version}
>> >> ${flink.version}
>> >> 
>> >>
>> >>
>> >> sunfulin  于2020年7月7日周二 下午3:21写道:
>> >>
>> >>>
>> >>>
>> >>>
>> >>> hi, jark
>> >>> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
>> >>> configuration里的DeployOptions.TARGET
>> >>> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
>> >>>
>> >>>
>> >>> //构建StreamExecutionEnvironment
>> >>> public static final StreamExecutionEnvironment env =
>> >>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >>>
>> >>> //构建EnvironmentSettings 并指定Blink Planner
>> >>> private static final EnvironmentSettings bsSettings =
>> >>>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> >>>
>> >>> //构建StreamTableEnvironment
>> >>> public static final StreamTableEnvironment tEnv =
>> >>> StreamTableEnvironment.create(env, bsSettings);
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>tEnv.executeSql(“ddl sql”);
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> //source注册成表
>> >>>
>> >>> tEnv.createTemporaryView("test", ds, $("f0").as("id"),
>> >>> $("f1").as("first"), $("p").proctime());
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> //join语句
>> >>>
>> >>> Table table = tEnv.sqlQuery("select b.* from test a left join
>> >>> my_dim FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> //输出
>> >>>
>> >>> tEnv.toAppendStream(table, Row.class).print("LookUpJoinJob");
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> env.execute("LookUpJoinJob");
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> 在 2020-07-06 14:59:17,"Jark Wu"  写道:
>> >>> >能分享下复现的作业代码不?
>> >>> >
>> >>> >Best,
>> >>> >Jark
>> >>> >
>> >>> >On Mon, 6 Jul 2020 at 11:00, sunfulin  wrote:
>> >>> >
>> >>> >> Hi,
>> >>> >> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
>> >>> >> org.apache.flink.table.api.TableExecution: Failed to execute sql
>> >>> >>
>> >>> >>
>> >>> >> caused by : java.lang.IlleagalStateException: No ExecutorFactory
>> found
>> >>> to
>> >>> >> execute the application.
>> >>> >>   at
>> >>> >>
>> >>>
>> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>> >>> >>
>> >>> >>
>> >>> >> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
>> >>>
>> >>
>>


回复:rocksdb的block cache usage应该如何使用

2020-07-07 Thread SmileSmile
hi yun tang!

下午通过配置yaml的方式修改env成功生成内存文件,目前在重新复现和获取文件ing! tanks!具体内存dump在获取ing



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月07日 17:47,Yun Tang 写道:
Hi

你的jemalloc有带debug的重新编译么? 例如用下面的命令重新编译jemalloc得到相关的so文件
./configure --enable-prof --enable-stats --enable-debug --enable-fill
make

其次最好指定dump文件的输出地址,例如在 MALLOC_CONF中加上前缀的配置  prof_prefix:/tmp/jeprof.out 
,以确保文件位置可写。

最后,由于你是在容器中跑,在容器退出前要保证相关文件能上传或者退出时候hang住一段时间,否则相关dump的文件无法看到了

祝好
唐云

From: SmileSmile 
Sent: Monday, July 6, 2020 14:15
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

hi yun tang!

我在容器内加入了libjemalloc.so.2并且在配置中加上了
containerized.master.env.LD_PRELOAD: "/opt/jemalloc/lib/libjemalloc.so.2"
containerized.master.env.MALLOC_CONF: 
"prof:true,lg_prof_interval:25,lg_prof_sample:17"

请问要如何可以得到内存文件?试着kill一个tm,找不到对应的heap文件。求助



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 19:13,Yun Tang 写道:
hi

有采集过内存使用情况么,推荐使用jemalloc的预先加载方式[1][2]来sample 
JVM的内存使用,观察是否有malloc的内存存在超用的场景。需要配置相关参数 
containerized.taskmanager.env.MALLOC_CONF 和 
containerized.taskmanager.env.LD_PRELOAD


[1] https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling
[2] https://www.evanjones.ca/java-native-leak-bug.html

祝好
唐云



From: SmileSmile 
Sent: Friday, July 3, 2020 15:22
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

Hi

作业只配置了重启策略,作业如果fail了,只会重启,没有恢复历史数据。

【作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。】
我目前遇到的情况是作业fail重启,pod就很容易被os kill,只能重构集群解决。

详情可见
http://apache-flink.147419.n8.nabble.com/Checkpoint-td4406.html


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 15:13,Yun Tang 写道:
Hi

如果是没有开启Checkpoint,作业是如何做到failover的?failover的时候一定需要从Checkpoint加载数据的。还是说你其实开启了Checkpoint,但是Checkpoint的interval设置的很大,所以每次failover相当于作业重新运行。
如果都没有从checkpoint加载数据,哪里来的历史数据呢?作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 15:07
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

hi yun tang!

因为网络或者不可抗力导致pod重生,作业会重启,目前作业没有开启checkpoint,恢复等价继续消费最新数据计算,运行一段时间很容易内存超用被os 
kill,然后重启,再运行一段时间,间隔变短,死的越来越频繁。

从现象上看很像是内存没有释放,这种场景下,上一次作业残留的未到水位线还没有被触发计算的数据是否在作业重启过程中被清除了?




 写道:
Hi

观察block cache usage的显示数值是否超过你的单个slot的managed memory,计算方法是 managed memory / 
slot数目,得到一个slot的managed memory,将该数值与block cache usage比较,看内存是否超用。重启之后容易被os 
kill,使用的是从savepoint恢复数据么?

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 14:20
To: Yun Tang 
Cc: Flink user-zh mailing list 
Subject: 回复:rocksdb的block cache usage应该如何使用

thanks yun tang!

那如果想通过block cache usage判断是否超过managed memory,该如何配置呢? 最近遇到作业只要重启后很容易被os 
kill的情况,想对比下




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 14:10,Yun Tang 写道:
Hi

默认Flink启用了rocksDB 的managed 
memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block 
cache均是一个,这样你可以根据taskmanager和subtask_index 
作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block 
cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。

祝好
唐云

From: SmileSmile 
Sent: Thursday, July 2, 2020 18:05
To: Flink user-zh mailing list 
Subject: rocksdb的block cache usage应该如何使用


通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 
rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。


我们的作业一个TM的内存设置如下:

taskmanager.memory.process.size: 23000m
taskmanager.memory.managed.fraction: 0.4

ui上显示的Flink Managed MEM是8.48G。


通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host)



如果维度是host,operator_name,每个operator_name维度是22G。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host,operator_name)


请问这个指标应该如何使用?

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制


Re: Re: Re: flink 1.11 作业执行异常

2020-07-07 Thread Jark Wu
如果是在 IDEA 中运行的话,你看看 blink planner 这个依赖的 scope 是不是被 provided 掉了? 去掉 provided
再试试看?

Best,
Jark

On Tue, 7 Jul 2020 at 18:01, sunfulin  wrote:

> hi,
>  @Jun Zhang 我一直使用的就是blink planner,这个jar包一直都有的。
>
>  @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-07 15:40:17,"Jark Wu"  写道:
> >Hi,
> >
> >你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?
> >
> >Best,
> >Jark
> >
> >On Tue, 7 Jul 2020 at 15:31, Jun Zhang 
> wrote:
> >
> >> hi.sunfulin
> >> 你有没有导入blink的planner呢,加入这个试试
> >>
> >> 
> >> org.apache.flink
> >>
>  flink-table-planner-blink_${scala.binary.version}
> >> ${flink.version}
> >> 
> >>
> >>
> >> sunfulin  于2020年7月7日周二 下午3:21写道:
> >>
> >>>
> >>>
> >>>
> >>> hi, jark
> >>> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
> >>> configuration里的DeployOptions.TARGET
> >>> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
> >>>
> >>>
> >>> //构建StreamExecutionEnvironment
> >>> public static final StreamExecutionEnvironment env =
> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> >>>
> >>> //构建EnvironmentSettings 并指定Blink Planner
> >>> private static final EnvironmentSettings bsSettings =
> >>>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >>>
> >>> //构建StreamTableEnvironment
> >>> public static final StreamTableEnvironment tEnv =
> >>> StreamTableEnvironment.create(env, bsSettings);
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>tEnv.executeSql(“ddl sql”);
> >>>
> >>>
> >>>
> >>>
> >>> //source注册成表
> >>>
> >>> tEnv.createTemporaryView("test", ds, $("f0").as("id"),
> >>> $("f1").as("first"), $("p").proctime());
> >>>
> >>>
> >>>
> >>>
> >>> //join语句
> >>>
> >>> Table table = tEnv.sqlQuery("select b.* from test a left join
> >>> my_dim FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
> >>>
> >>>
> >>>
> >>>
> >>> //输出
> >>>
> >>> tEnv.toAppendStream(table, Row.class).print("LookUpJoinJob");
> >>>
> >>>
> >>>
> >>>
> >>> env.execute("LookUpJoinJob");
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> 在 2020-07-06 14:59:17,"Jark Wu"  写道:
> >>> >能分享下复现的作业代码不?
> >>> >
> >>> >Best,
> >>> >Jark
> >>> >
> >>> >On Mon, 6 Jul 2020 at 11:00, sunfulin  wrote:
> >>> >
> >>> >> Hi,
> >>> >> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
> >>> >> org.apache.flink.table.api.TableExecution: Failed to execute sql
> >>> >>
> >>> >>
> >>> >> caused by : java.lang.IlleagalStateException: No ExecutorFactory
> found
> >>> to
> >>> >> execute the application.
> >>> >>   at
> >>> >>
> >>>
> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
> >>> >>
> >>> >>
> >>> >> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
> >>>
> >>
>


Re: Manual allocation of slot usage

2020-07-07 Thread Yangze Guo
Hi, Mu,

IIUC, cluster.evenly-spread-out-slots would fulfill your demand. Why
do you think it does the opposite of what you want. Do you run your
job in active mode? If so, cluster.evenly-spread-out-slots might not
work very well because there could be insufficient task managers when
request slot from ResourceManager. This has been discussed in
https://issues.apache.org/jira/browse/FLINK-12122 .


Best,
Yangze Guo

On Tue, Jul 7, 2020 at 5:44 PM Mu Kong  wrote:
>
> Hi community,
>
> I'm running an application to consume data from kafka, and process it then 
> put data to the druid.
> I wonder if there is a way where I can allocate the data source consuming 
> process evenly across the task manager to maximize the usage of the network 
> of task managers.
>
> So, for example, I have 15 task managers and I set parallelism for the kafka 
> source as 60, since I have 60 partitions in kafka topic.
> What I want is flink cluster will put 4 kafka source subtasks on each task 
> manager.
>
> Is that possible? I have gone through the document, the only thing we found is
>
> cluster.evenly-spread-out-slots
>
> which does exact the opposite of what I want. It will put the subtasks of the 
> same operator onto one task manager as much as possible.
>
> So, is some kind of manual resource allocation available?
> Thanks in advance!
>
>
> Best regards,
> Mu


Re: 如何在Flink SQL中使用周期性水印?

2020-07-07 Thread Jark Wu
Hi,

这个问题我理解其实和周期性水印没有关系,是属于 idle source
的问题,你可以尝试下加上配置 table.exec.source.idle-timeout = 10s 能不能解决你的问题。[1]

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout

On Tue, 7 Jul 2020 at 17:35, noake  wrote:

> Dear All:
>
>
> 大佬们, 请教下如何在Flink SQL中使用周期性的水印。
> 我们在消费kafka时, 想设置在没有数据时水印时间也能继续向前走, 用的是Flink SQL。


Re: 一个source多个sink的同步问题

2020-07-07 Thread Jark Wu
watermark 的计算是跟数据上的 event-time 相关的。你的数据是不是间隔一小时来一波的呢?
比如 10:00 的数据之后,就是 11:00 的数据,但是要1小时后才到来?


Best,
Jark

On Tue, 7 Jul 2020 at 17:20, lgs <9925...@qq.com> wrote:

> source是kafka,有一个rowtime定义:
>
> .field("rowtime", DataTypes.TIMESTAMP(0))
> .rowtime(Rowtime()
> .timestamps_from_field("actionTime")
> .watermarks_periodic_bounded(6)
> )
>
> 有两个sink,第一个sink是直接把kafa的数据保存到postgres。
> 第二个sink是定义一个1小时的tumble window,然后定义了一个udf,udf里面去查询第一个sink保存的数据。
> st_env.scan("source") \
>  .window(Tumble.over("1.hour").on("rowtime").alias("hourlywindow"))
> \
>  .group_by("hourlywindow") \
>  .select("udf(...)")
>  ...
>
>
> 现在的问题是:第二个sink的tumble window触发的时候,数据库里面的数据已经保存了下一个小时的数据了。
>
> 有什么办法让tumble window在一个小时结束后马上触发?现在观察的是需要下一个小时的数据来了,才能触发上一个小时的窗口。
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re:Re: Re: flink 1.11 作业执行异常

2020-07-07 Thread sunfulin
hi, 
 @Jun Zhang 我一直使用的就是blink planner,这个jar包一直都有的。

 @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?


















在 2020-07-07 15:40:17,"Jark Wu"  写道:
>Hi,
>
>你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?
>
>Best,
>Jark
>
>On Tue, 7 Jul 2020 at 15:31, Jun Zhang  wrote:
>
>> hi.sunfulin
>> 你有没有导入blink的planner呢,加入这个试试
>>
>> 
>> org.apache.flink
>> 
>> flink-table-planner-blink_${scala.binary.version}
>> ${flink.version}
>> 
>>
>>
>> sunfulin  于2020年7月7日周二 下午3:21写道:
>>
>>>
>>>
>>>
>>> hi, jark
>>> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
>>> configuration里的DeployOptions.TARGET
>>> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
>>>
>>>
>>> //构建StreamExecutionEnvironment
>>> public static final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> //构建EnvironmentSettings 并指定Blink Planner
>>> private static final EnvironmentSettings bsSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>
>>> //构建StreamTableEnvironment
>>> public static final StreamTableEnvironment tEnv =
>>> StreamTableEnvironment.create(env, bsSettings);
>>>
>>>
>>>
>>>
>>>
>>>tEnv.executeSql(“ddl sql”);
>>>
>>>
>>>
>>>
>>> //source注册成表
>>>
>>> tEnv.createTemporaryView("test", ds, $("f0").as("id"),
>>> $("f1").as("first"), $("p").proctime());
>>>
>>>
>>>
>>>
>>> //join语句
>>>
>>> Table table = tEnv.sqlQuery("select b.* from test a left join
>>> my_dim FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
>>>
>>>
>>>
>>>
>>> //输出
>>>
>>> tEnv.toAppendStream(table, Row.class).print("LookUpJoinJob");
>>>
>>>
>>>
>>>
>>> env.execute("LookUpJoinJob");
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-07-06 14:59:17,"Jark Wu"  写道:
>>> >能分享下复现的作业代码不?
>>> >
>>> >Best,
>>> >Jark
>>> >
>>> >On Mon, 6 Jul 2020 at 11:00, sunfulin  wrote:
>>> >
>>> >> Hi,
>>> >> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
>>> >> org.apache.flink.table.api.TableExecution: Failed to execute sql
>>> >>
>>> >>
>>> >> caused by : java.lang.IlleagalStateException: No ExecutorFactory found
>>> to
>>> >> execute the application.
>>> >>   at
>>> >>
>>> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>>> >>
>>> >>
>>> >> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
>>>
>>


Re: flink sql 读写写kafka表的时候可以指定消息的key吗

2020-07-07 Thread Jark Wu
Hi,

可以描述下你的业务场景么? 为什么一定要去获取 key 的信息呢,因为按照我的理解,一般来说 key 的信息一般在 value 中也有。

Best,
Jark

On Tue, 7 Jul 2020 at 17:17, op <520075...@qq.com> wrote:

> 感谢
>
>
>
>
> --原始邮件--
> 发件人:"Leonard Xu" 发送时间:2020年7月7日(星期二) 下午5:15
> 收件人:"user-zh"
> 主题:Re: flink sql 读写写kafka表的时候可以指定消息的key吗
>
>
>
> Hi,
>
> 目前还不支持的,社区有一个 FLIP-107[1] 在计划做这个事情。
>
> 祝好,
> Leonard Xu
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.
> ;
>
>  在 2020年7月7日,17:01,op <520075...@qq.com 写道:
> 
>  hi:
>  nbsp; flink sql 写kafka表的时候可以指定消息的key吗?
>  看官网的kafka connector没有找到消息key相关的说明
>  如果可以的话,如何指定?
>  nbsp;谢谢


Re: 回复:rocksdb的block cache usage应该如何使用

2020-07-07 Thread Yun Tang
Hi

你的jemalloc有带debug的重新编译么? 例如用下面的命令重新编译jemalloc得到相关的so文件
./configure --enable-prof --enable-stats --enable-debug --enable-fill
make

其次最好指定dump文件的输出地址,例如在 MALLOC_CONF中加上前缀的配置  prof_prefix:/tmp/jeprof.out 
,以确保文件位置可写。

最后,由于你是在容器中跑,在容器退出前要保证相关文件能上传或者退出时候hang住一段时间,否则相关dump的文件无法看到了

祝好
唐云

From: SmileSmile 
Sent: Monday, July 6, 2020 14:15
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

hi yun tang!

我在容器内加入了libjemalloc.so.2并且在配置中加上了
containerized.master.env.LD_PRELOAD: "/opt/jemalloc/lib/libjemalloc.so.2"
containerized.master.env.MALLOC_CONF: 
"prof:true,lg_prof_interval:25,lg_prof_sample:17"

请问要如何可以得到内存文件?试着kill一个tm,找不到对应的heap文件。求助



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 19:13,Yun Tang 写道:
hi

有采集过内存使用情况么,推荐使用jemalloc的预先加载方式[1][2]来sample 
JVM的内存使用,观察是否有malloc的内存存在超用的场景。需要配置相关参数 
containerized.taskmanager.env.MALLOC_CONF 和 
containerized.taskmanager.env.LD_PRELOAD


[1] https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling
[2] https://www.evanjones.ca/java-native-leak-bug.html

祝好
唐云



From: SmileSmile 
Sent: Friday, July 3, 2020 15:22
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

Hi

作业只配置了重启策略,作业如果fail了,只会重启,没有恢复历史数据。

【作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。】
我目前遇到的情况是作业fail重启,pod就很容易被os kill,只能重构集群解决。

详情可见
http://apache-flink.147419.n8.nabble.com/Checkpoint-td4406.html


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 15:13,Yun Tang 写道:
Hi

如果是没有开启Checkpoint,作业是如何做到failover的?failover的时候一定需要从Checkpoint加载数据的。还是说你其实开启了Checkpoint,但是Checkpoint的interval设置的很大,所以每次failover相当于作业重新运行。
如果都没有从checkpoint加载数据,哪里来的历史数据呢?作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 15:07
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

hi yun tang!

因为网络或者不可抗力导致pod重生,作业会重启,目前作业没有开启checkpoint,恢复等价继续消费最新数据计算,运行一段时间很容易内存超用被os 
kill,然后重启,再运行一段时间,间隔变短,死的越来越频繁。

从现象上看很像是内存没有释放,这种场景下,上一次作业残留的未到水位线还没有被触发计算的数据是否在作业重启过程中被清除了?




 写道:
Hi

观察block cache usage的显示数值是否超过你的单个slot的managed memory,计算方法是 managed memory / 
slot数目,得到一个slot的managed memory,将该数值与block cache usage比较,看内存是否超用。重启之后容易被os 
kill,使用的是从savepoint恢复数据么?

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 14:20
To: Yun Tang 
Cc: Flink user-zh mailing list 
Subject: 回复:rocksdb的block cache usage应该如何使用

thanks yun tang!

那如果想通过block cache usage判断是否超过managed memory,该如何配置呢? 最近遇到作业只要重启后很容易被os 
kill的情况,想对比下




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 14:10,Yun Tang 写道:
Hi

默认Flink启用了rocksDB 的managed 
memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block 
cache均是一个,这样你可以根据taskmanager和subtask_index 
作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block 
cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。

祝好
唐云

From: SmileSmile 
Sent: Thursday, July 2, 2020 18:05
To: Flink user-zh mailing list 
Subject: rocksdb的block cache usage应该如何使用


通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 
rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。


我们的作业一个TM的内存设置如下:

taskmanager.memory.process.size: 23000m
taskmanager.memory.managed.fraction: 0.4

ui上显示的Flink Managed MEM是8.48G。


通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host)



如果维度是host,operator_name,每个operator_name维度是22G。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host,operator_name)


请问这个指标应该如何使用?

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制


Manual allocation of slot usage

2020-07-07 Thread Mu Kong
Hi community,

I'm running an application to consume data from kafka, and process it then
put data to the druid.
I wonder if there is a way where I can allocate the data source consuming
process evenly across the task manager to maximize the usage of the network
of task managers.

So, for example, I have 15 task managers and I set parallelism for the
kafka source as 60, since I have 60 partitions in kafka topic.
What I want is flink cluster will put 4 kafka source subtasks on each task
manager.

Is that possible? I have gone through the document, the only thing we found
is
cluster.evenly-spread-out-slots
which does exact the opposite of what I want. It will put the subtasks of
the same operator onto one task manager as much as possible.

So, is some kind of manual resource allocation available?
Thanks in advance!


Best regards,
Mu


Re: flink kafka connector中获取kafka元数据

2020-07-07 Thread Dream-底限
好的

On Tue, Jul 7, 2020 at 5:30 PM Leonard Xu  wrote:

> 嗯,这个在FLIP-107里会支持,目前没法拿到这些meta数据,可以关注下FLIP-107的进展。
>
> Best,
> Leonard Xu
>
> > 在 2020年7月7日,17:26,Dream-底限  写道:
> >
> > hi
> > 是的,想以下面这种方式获取
> >
> > CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
> > ('connector.type' = 'kafka','connector.version' = '0.11' ,...)
> >
> >
> > On Tue, Jul 7, 2020 at 5:19 PM Leonard Xu  wrote:
> >
> >> Hi,
> >> kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
> >> 如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。
> >>
> >> 祝好,
> >> Leonard Xu
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records
> >>>
> >>
> >>> 在 2020年7月7日,17:12,Dream-底限  写道:
> >>>
> >>> kafka元数据
> >>
> >>
>
>


嵌套 json 中string 数组的解析异常

2020-07-07 Thread Jun Zou
Hi all:
我使用 flink 1.9 处理嵌套 json, 它嵌套了一个string数组,构造出的 table schema结构为:
Row(parsedResponse: BasicArrayTypeInfo, timestamp: Long)
执行作业后会发生报错如下,出现 object 类型和string 类型的转换错误
Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast
to [Ljava.lang.String;
at
org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)

大佬们知道该怎么修改么?

我的json 的结构如下:
{"parsedResponse":["apple", "banana", "orange"], "timestamp": "1522253345"}
 P.S:
如果把 string 数组改为 long 数组或者 double 数组执行对应的操作可以正确运行,目前来看只有 string 数组出现问题。


如何在Flink SQL中使用周期性水印?

2020-07-07 Thread noake
Dear All:


大佬们, 请教下如何在Flink SQL中使用周期性的水印。
我们在消费kafka时, 想设置在没有数据时水印时间也能继续向前走, 用的是Flink SQL。

Re: flink kafka connector中获取kafka元数据

2020-07-07 Thread Leonard Xu
嗯,这个在FLIP-107里会支持,目前没法拿到这些meta数据,可以关注下FLIP-107的进展。

Best,
Leonard Xu

> 在 2020年7月7日,17:26,Dream-底限  写道:
> 
> hi
> 是的,想以下面这种方式获取
> 
> CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
> ('connector.type' = 'kafka','connector.version' = '0.11' ,...)
> 
> 
> On Tue, Jul 7, 2020 at 5:19 PM Leonard Xu  wrote:
> 
>> Hi,
>> kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
>> 如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。
>> 
>> 祝好,
>> Leonard Xu
>> 
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records
>>> 
>> 
>>> 在 2020年7月7日,17:12,Dream-底限  写道:
>>> 
>>> kafka元数据
>> 
>> 



Re: flink kafka connector中获取kafka元数据

2020-07-07 Thread Dream-底限
hi
是的,想以下面这种方式获取

CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
('connector.type' = 'kafka','connector.version' = '0.11' ,...)


On Tue, Jul 7, 2020 at 5:19 PM Leonard Xu  wrote:

> Hi,
>  kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
> 如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。
>
> 祝好,
> Leonard Xu
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records
> >
>
> > 在 2020年7月7日,17:12,Dream-底限  写道:
> >
> > kafka元数据
>
>


Re: How to ensure that job is restored from savepoint when using Flink SQL

2020-07-07 Thread Fabian Hueske
Hi Jie Feng,

As you said, Flink translates SQL queries into streaming programs with
auto-generated operator IDs.
In order to start a SQL query from a savepoint, the operator IDs in the
savepoint must match the IDs in the newly translated program.
Right now this can only be guaranteed if you translate the same query with
the same Flink version (optimizer changes might change the structure of the
resulting plan even if the query is the same).
This is of course a significant limitation, that the community is aware of
and planning to improve in the future.

I'd also like to add that it can be very difficult to assess whether it is
meaningful to start a query from a savepoint that was generated with a
different query.
A savepoint holds intermediate data that is needed to compute the result of
a query.
If you update a query it is very well possible that the result computed by
Flink won't be equal to the actual result of the new query.

Best, Fabian

Am Mo., 6. Juli 2020 um 10:50 Uhr schrieb shadowell :

>
> Hello, everyone,
> I have some unclear points when using Flink SQL. I hope to get an
> answer or tell me where I can find the answer.
> When using the DataStream API, in order to ensure that the job can
> recover the state from savepoint after adjustment, it is necessary to
> specify the uid for the operator. However, when using Flink SQL, the uid of
> the operator is automatically generated. If the SQL logic changes (operator
> order changes), when the task is restored from savepoint, will it cause
> some of the operator states to be unable to be mapped back, resulting in
> state loss?
>
> Thanks~
> Jie Feng
> shadowell
> shadow...@126.com
>
> 
> 签名由 网易邮箱大师  定制
>


一个source多个sink的同步问题

2020-07-07 Thread lgs
source是kafka,有一个rowtime定义:

.field("rowtime", DataTypes.TIMESTAMP(0))
.rowtime(Rowtime()
.timestamps_from_field("actionTime")
.watermarks_periodic_bounded(6)
)

有两个sink,第一个sink是直接把kafa的数据保存到postgres。
第二个sink是定义一个1小时的tumble window,然后定义了一个udf,udf里面去查询第一个sink保存的数据。
st_env.scan("source") \
 .window(Tumble.over("1.hour").on("rowtime").alias("hourlywindow"))
\
 .group_by("hourlywindow") \
 .select("udf(...)")
 ...


现在的问题是:第二个sink的tumble window触发的时候,数据库里面的数据已经保存了下一个小时的数据了。

有什么办法让tumble window在一个小时结束后马上触发?现在观察的是需要下一个小时的数据来了,才能触发上一个小时的窗口。




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink kafka connector中获取kafka元数据

2020-07-07 Thread Leonard Xu
Hi,
 kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。

祝好,
Leonard Xu

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
 


> 在 2020年7月7日,17:12,Dream-底限  写道:
> 
> kafka元数据



?????? flink sql ??????kafka??????????????????????key??

2020-07-07 Thread op





----
??:"Leonard Xu"https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.
 


Re: flink sql 读写写kafka表的时候可以指定消息的key吗

2020-07-07 Thread Leonard Xu
Hi,

目前还不支持的,社区有一个 FLIP-107[1] 在计划做这个事情。

祝好,
Leonard Xu
[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.
 


> 在 2020年7月7日,17:01,op <520075...@qq.com> 写道:
> 
> hi:
>  flink sql 写kafka表的时候可以指定消息的key吗?
> 看官网的kafka connector没有找到消息key相关的说明
> 如果可以的话,如何指定?
> 谢谢



flink kafka connector中获取kafka元数据

2020-07-07 Thread Dream-底限
hi、
flink table/sql api中,有办法获取kafka元数据吗?

tableEnvironment.sqlUpdate(CREATE TABLE MyUserTable (...) WITH
('connector.type' = 'kafka','connector.version' = '0.11' ,...))


flink sql ??????kafka??????????????????????key??

2020-07-07 Thread op
hi??
 flink sql ??kafka??key
kafka connectorkey??



Any idea for data skew in hash join

2020-07-07 Thread faaron zheng
Hi, all, I use flink 1.10 to run a sql and I find that almost 60% of the data 
is concentrated on one parallelism. Is there any good idea for this scene?

Re: 作业从flink1.9.0迁移到1.10.1,LogicalTypeRoot变更后无法从CP恢复:No enum constant org.apache.flink.table.types.logical.LogicalTypeRoot.ANY

2020-07-07 Thread Jark Wu
Hi,

问一下,你是指用1.10去恢复 1.9 作业的 savepoint/checkpoint 吗?还是指迁移到 1.10 后,无法从 failover
中恢复?
如果是前者的话,Flink SQL 目前没有保证跨大版本的 state 兼容性。所以当你从 1.9 升级到 1.10 时,作业需要放弃状态重跑。

Best,
Jark

On Tue, 7 Jul 2020 at 15:54, 吴磊  wrote:

> 各位好:
> 当我把作业从flink1.9.0迁移到1.10.1,且作业中使用了'group by'形式的语法时,会导致无法从cp/sp恢复,
> 代码:
>
>
> 报错如下:
>
> switched from RUNNING to FAILED.switched from RUNNING to 
> FAILED.java.lang.Exception: Exception while creating 
> StreamOperatorStateContext. at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at 
> java.lang.Thread.run(Thread.java:748)Caused by: 
> org.apache.flink.util.FlinkException: Could not restore keyed state backend 
> for KeyedProcessOperator_d26553d858836b90d15e66f459fbcb50_(2/3) from any of 
> the 1 provided restore options. at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>  at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>  at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>  ... 9 moreCaused by: 
> org.apache.flink.runtime.state.BackendBuildingException: Failed when trying 
> to restore heap backend at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
>  at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
>  at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>  at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>  at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>  ... 11 moreCaused by: java.io.InvalidObjectException: enum constant ANY does 
> not exist in class org.apache.flink.table.types.logical.LogicalTypeRoot at 
> java.io.ObjectInputStream.readEnum(ObjectInputStream.java:2013) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1569) at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:555)
>  at 
> org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot.readSnapshot(BaseRowSerializer.java:306)
>  at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
>  at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>  at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
>  at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
>  at 
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
>  at 
> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:169)
>  at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:133)
>  at 
> 

??????flink1.9.0??????1.10.1??LogicalTypeRoot????????????CP??????No enum constant org.apache.flink.table.types.logical.LogicalTypeRoot.ANY

2020-07-07 Thread ????
  flink1.9.0??1.10.1'group 
by'??cp/sp??
??




??
switched from RUNNING to FAILED.switched from RUNNING to 
FAILED.java.lang.Exception: Exception while creating 
StreamOperatorStateContext. at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at 
java.lang.Thread.run(Thread.java:748)Caused by: 
org.apache.flink.util.FlinkException: Could not restore keyed state backend for 
KeyedProcessOperator_d26553d858836b90d15e66f459fbcb50_(2/3) from any of the 1 
provided restore options. at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
 ... 9 moreCaused by: org.apache.flink.runtime.state.BackendBuildingException: 
Failed when trying to restore heap backend at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
 at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
 ... 11 moreCaused by: java.io.InvalidObjectException: enum constant ANY does 
not exist in class org.apache.flink.table.types.logical.LogicalTypeRoot at 
java.io.ObjectInputStream.readEnum(ObjectInputStream.java:2013) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1569) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:555)
 at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot.readSnapshot(BaseRowSerializer.java:306)
 at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
 at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
 at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
 at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
 at 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
 at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:169)
 at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:133)
 at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
 ... 15 moreCaused by: java.lang.IllegalArgumentException: No enum constant 
org.apache.flink.table.types.logical.LogicalTypeRoot.ANY at 
java.lang.Enum.valueOf(Enum.java:238) 




 

Re: Re: flink 1.11 作业执行异常

2020-07-07 Thread Jark Wu
Hi,

你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?

Best,
Jark

On Tue, 7 Jul 2020 at 15:31, Jun Zhang  wrote:

> hi.sunfulin
> 你有没有导入blink的planner呢,加入这个试试
>
> 
> org.apache.flink
> flink-table-planner-blink_${scala.binary.version}
> ${flink.version}
> 
>
>
> sunfulin  于2020年7月7日周二 下午3:21写道:
>
>>
>>
>>
>> hi, jark
>> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
>> configuration里的DeployOptions.TARGET
>> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
>>
>>
>> //构建StreamExecutionEnvironment
>> public static final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> //构建EnvironmentSettings 并指定Blink Planner
>> private static final EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>
>> //构建StreamTableEnvironment
>> public static final StreamTableEnvironment tEnv =
>> StreamTableEnvironment.create(env, bsSettings);
>>
>>
>>
>>
>>
>>tEnv.executeSql(“ddl sql”);
>>
>>
>>
>>
>> //source注册成表
>>
>> tEnv.createTemporaryView("test", ds, $("f0").as("id"),
>> $("f1").as("first"), $("p").proctime());
>>
>>
>>
>>
>> //join语句
>>
>> Table table = tEnv.sqlQuery("select b.* from test a left join
>> my_dim FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
>>
>>
>>
>>
>> //输出
>>
>> tEnv.toAppendStream(table, Row.class).print("LookUpJoinJob");
>>
>>
>>
>>
>> env.execute("LookUpJoinJob");
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-06 14:59:17,"Jark Wu"  写道:
>> >能分享下复现的作业代码不?
>> >
>> >Best,
>> >Jark
>> >
>> >On Mon, 6 Jul 2020 at 11:00, sunfulin  wrote:
>> >
>> >> Hi,
>> >> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
>> >> org.apache.flink.table.api.TableExecution: Failed to execute sql
>> >>
>> >>
>> >> caused by : java.lang.IlleagalStateException: No ExecutorFactory found
>> to
>> >> execute the application.
>> >>   at
>> >>
>> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>> >>
>> >>
>> >> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
>>
>


  1   2   >