Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题

2020-06-27 Thread 于汝国



flink本身不提供cancel job后清理zookeeper上残留znode的功能或机制,包括hdfs上的部分数据,如果想清除的话,可手动操作或者自实现。














在 2020-06-28 09:12:41,"林恬"  写道:
>各位好:
>  目前我使用的是Flink 1.9.2, HA使用ZK, 使用过程中发现ZK上的/leader/${job_id} 
>节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢?
>
>
>


Re: Native K8S IAM Role?

2020-06-27 Thread Yang Wang
Hi kevin,

If you mean to add annotations for Flink native K8s session pods, you could
use "kubernetes.jobmanager.annotations"
and "kubernetes.taskmanager.annotations"[1]. However, they are only
supported from release-1.11. Maybe you could
wait for a little bit more time, 1.11 will be released soon. And we add
more features for native K8s integration in 1.11
(e.g. application mode, label, annotation, toleration, etc.).


[1].
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#kubernetes

Best,
Yang

Bohinski, Kevin  于2020年6月26日周五 上午3:09写道:

> Hi,
>
>
>
> How do we attach an IAM role to the native K8S sessions?
>
>
>
> Typically for our other pods we use the following in our yamls:
>
> spec:
>
>   template:
>
> metadata:
>
>   annotations:
>
> iam.amazonaws.com/role: ROLE_ARN
>
>
>
> Best
>
> kevin
>


回复: flink1.9 on yarn

2020-06-27 Thread 17610775...@163.com
问题1

./bin/flink run -m 
yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254

当yarn application -kill application_1567067657620_0254后,

在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?

问题2

./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?

 

Re: flink REST API是否支持-C参数

2020-06-27 Thread Yang Wang
Flink CLI是把-C的参数apply到了client端生成的JobGraph里,然后提交JobGraph来运行的
使用Rest方式提交,目前确实不支持针对单个Job设置classpath,我觉得这是一个合理的需求,可以提个JIRA

目前work around的办法只能是配置到cluster的configuration里面,在启动session的时候使用-C/--classpath
或者-D pipeline.classpaths=xxx,yyy,这样所有的job都会把它们增加到classpath里了


Best,
Yang

chenxuying  于2020年6月24日周三 下午6:13写道:

> 目前使用的是flink 1.10.0
> 背景:
> REST API有一个提交job的接口
> 接口 /jars/:jarid/run
>
> 参数entryClass,programArgs,parallelism,jobId,allowNonRestoredState,savepointPath
>
>
> 如果使用命令行方式提交job
> flink run -C file:///usr/local/soft/flink/my-function-0.1.jar -c
> cn.xuying.flink.table.sql.ParserSqlJob
> /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
> 可以看到命令行方式支持-C提供另外的jar包,flink会加载到classpath
> 问题:
> 发现目前的restapi并没有提供想命令行一样的-C参数的功能 , 所以想知道这个功能将来是否会增加


flink table??????????????????????????????

2020-06-27 Thread wujunxi
flink??1.8.0
??flink table
??
tEnv.registerDataStream("t_data",dataStream,"f1-1");
??
org.apache.flink.table.api.TableException: 
Field reference expression expected.

Re: 【Flink Sql 支持表在指定列后面加字段么】

2020-06-27 Thread Jark Wu
Hi,

能具体说明下你的场景和需求么? 数据源是什么,源数据中是否原先就有c4这一列呢,还是新增加了 c4 这一列呢?


其次,这个问题要看你的 connector 是什么。 有的 connector 是根据列名来映射的(如
JSON,各种数据库),有的是根据列名顺序来映射的(如 CSV)。
如果是按列名来映射的,那么在 Flink SQL DDL 中,新增 c4 一列就能读取到 c4 的值,不管c4 在哪个字段之后。

Best,
Jark


On Sat, 27 Jun 2020 at 17:26, 忝忝向仧 <153488...@qq.com> wrote:

> Hi,all:
>
>
> Flink sql
> 支持在表的指定列后面加字段么,比如表A有c1,c2,c3字段,现在我想在c1后面添加字段c4变为c1,c4,c2,c3而不是在末尾追加.
>
>
> 谢谢.


Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

2020-06-27 Thread Tianwang Li
我补充一下,checkpoint的UI截图如下:


https://imgchr.com/i/NgCUgS

https://imgchr.com/i/NgChDJ

https://imgchr.com/i/NgCT4x



>

-- 
**
 tivanli
**


Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

2020-06-27 Thread LakeShen
Hi Tianwang Li,

偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。

Best,
LakeShen

zhisheng  于2020年6月28日周日 上午10:27写道:

> hi, Tianwang Li
>
> 看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外:
>
> > 任务经常会出现反压(特别是在窗口输出的时候)
>
> 这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。
>
>
> > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)
>
> 这种也可以看看是不是 HDFS 有时候压力比较大导致的出现毛刺现象
>
> 另外建议补一下 UI 上 chekcpoint 相关的截图和日志信息,这样才能更好的定位问题。
>
>
> Best !
> zhisheng
>
>
> Tianwang Li  于2020年6月28日周日 上午10:17写道:
>
> > 关于Flink checkpoint偶尔会比较长时间的问题。
> >
> > *环境与背景:*
> > 版本:flink1.10.0
> > 数据量:每秒约10万左右的记录,数据源是kafka
> > 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
> > 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。
> >
> > *问题:*
> > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
> > source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting
> checkpoint消耗时间比较长。
> >
> > checkpoint情况大致如下:
> >
> > [image: image.png]
> > [image: image.png]
> > [image: image.png]
> >
> > 2020-06-24 21:09:53,369 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor- Trigger
> > checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693.
> >
> > 2020-06-24 21:09:58,327 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> > heartbeat request from e88ea2f790430c9c160e540ef0546d60.
> >
> > 2020-06-24 21:09:59,266 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> > heartbeat request from b93d7167db364dfdcbda886944f1482f.
> >
> > 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP:
> > 111/114/424 MB (used/committed/max)]
> >
> > 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > Used Memory: 583911424
> >
> > 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> >
> > 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> >
> > 2020-06-24 21:10:08,346 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> > heartbeat request from e88ea2f790430c9c160e540ef0546d60.
> >
> > 2020-06-24 21:10:09,286 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> > heartbeat request from b93d7167db364dfdcbda886944f1482f.
> >
> > 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP:
> > 111/114/424 MB (used/committed/max)]
> >
> > 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > Used Memory: 583911424
> >
> > 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> >
> > 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> >
> >
> > 省略
> >
> >
> > 2020-06-24 21:55:39,875 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > Used Memory: 583911424
> >
> > 2020-06-24 21:55:39,875 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> >
> > 2020-06-24 21:55:39,876 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC
> > COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> >
> > 2020-06-24 21:55:41,721 DEBUG
> > org.apache.flink.streaming.runtime.tasks.StreamTask   - Starting
> > checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map ->
> Filter
> > -> Timestamps/Watermarks (4/10)
> >
> > 2020-06-24 21:55:41,721 DEBUG
> > org.apache.flink.runtime.state.AbstractSnapshotStrategy   -
> > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd
> ,
> > checkpointDirectory=hdfs://chk-316,
> > 

Re: flink1.9 on yarn

2020-06-27 Thread zhisheng
hi,guanyq

你这种提交方式属于 Flink On YARN 的 per job 模式,机制是这样的,当新提一个作业的时候,AppID 是会变化的。

Best!
zhisheng

Yangze Guo  于2020年6月28日周日 上午9:59写道:

> 我理解你需要使用session模式,即./bin/yarn-session.sh [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#flink-yarn-session
>
> Best,
> Yangze Guo
>
> On Sun, Jun 28, 2020 at 9:10 AM guanyq  wrote:
> >
> > 问题1
> >
> > ./bin/flink run -m
> yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
> >
> > 当yarn application -kill application_1567067657620_0254后,
> >
> > 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
> >
> > 问题2
> >
> > ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
> >
> >
>


Re:flink1.9 on yarn

2020-06-27 Thread Roc Marshal
Hi, guanyq.

关于问题1:在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
这个appid的自增策略并不是根据Flink负责生成,如果有必要,你可以对hadoop-yarn进行调研,并做出你的结论。



关于问题2 ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
我是否可以理解为,flink 
yarn-session模式的集群更适合你的作业需求呢?因为在问题中提到的提交方式为per-job,job关闭后,Flink即关闭集群。
可参考: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#start-flink-session
Best,
Roc Marshal

在 2020-06-28 09:09:43,"guanyq"  写道:
>问题1
>
>./bin/flink run -m 
>yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
>
>当yarn application -kill application_1567067657620_0254后,
>
>在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
>
>问题2
>
>./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
>
> 


Re: CEP use case !

2020-06-27 Thread Benchao Li
Hi Aissa,

Flink CEP is an api that processes multi-event matching with a pattern,
like (START MIDDLE+ END).
If you can calculate the "sensor_status" by one record, I think Flink
DataStream API / Table & SQL API
could satisfy your requirement already.

Aissa Elaffani  于2020年6月25日周四 下午11:35写道:

> Hello Guys,
> I am asking if the CEP Api can resolve my use case. Actually, I have a lot
> of sensors generating some data, and I want to apply a rules engine on
> those sensor's data,in order to define a "sensor_status" if it is Normal or
> Alert or warning.for each record I want to apply some conditions (if
> temperature>15, humidity>...) and then defne the status of the sensor, if
> it is in Nomarl status, or Alerte status ...
> And I am wondering if CEP Api can help me achieve that.
> Thank you guys for your time !
> Best,
> AISSA
>


-- 

Best,
Benchao Li


Re: Re: Flink可以用Canal对接Oracle么?

2020-06-27 Thread Jark Wu
欢迎使用后来反馈一下~

On Thu, 25 Jun 2020 at 14:55, gaolanf...@hotmail.com 
wrote:

> Hello
>
> 非常感谢!
>
> 查了您给的资料
> Canal支持MySQL,
> Debezuim支持 MySQL, PostgreSQL, Oracle等,
>
>不过Debezuim没用过,用的人也相对少,自己去试试看~
>
>
>
>
> gaolanf...@hotmail.com
>
> 发件人: Leonard Xu
> 发送时间: 2020-06-25 13:27
> 收件人: user-zh
> 主题: Re: Flink可以用Canal对接Oracle么?
>
> Hello
>
> Flink 1.11 会支持了读取CDC的功能,支持接入canal和debezuim的format的,1.11快要发布了,这是文档[1],可以参考。
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/canal.html
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/canal.html
> >
>


Re: Re: 为什么 flink checkpoint Checkpoint Duration (Async) 阶段耗时很慢

2020-06-27 Thread zhisheng
hi,立志:

从你的描述(能跑 10 几天且使用的是 FsStateBackend),可以提供一下 JobManager 和 TaskManager 的 GC
时间和次数的监控信息吗?怀疑是不是因为 Full GC 导致的问题。

Best!
zhisheng

张立志  于2020年6月28日周日 上午10:13写道:

> 从监控后台看back presure 是正常的,flatMap 这个任务是存在的,但只是连了一下mysql,没有别的任何操作,而且另一个job
> 没有flatmap ,单纯的map reduce
> 统计,能跑10几天,到1个多G的时侯就明显变慢,然后超时10分钟就报错了,从后台的错误日志里,没有明显的异常信息,都是checkpoint
> 超时后的信息.
> 在 2020-06-28 09:58:00,"LakeShen"  写道:
> >Hi 张立志,
> >
> >一般 Checkpoint 超时,可以先看看你的任务中,是否存在反压,比如 Sink 阶段,又或者是某个地方有 flatMap操作导致。
> >
> >然后看下自己任务中,是否存在热点问题等。如果一切都是正常的话,可以尝试使用 RocksDB 的增量 Checkpoint ,具体参考[1]。
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#rocksdb-state-backend-details
> >
> >Best,
> >LakeShen
> >
> >张立志  于2020年6月28日周日 上午9:52写道:
> >
> >> flink 版本1.8
> >> 部署集群yarn
> >>
> >>
> >> 配置代码:
> >> StreamExecutionEnvironment.stateBackend(new
> >>
> FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build();
> >> 业务代码相对比较简单,内存占用较大
> >> 超过10分钟后开始报错,state 大概在1.5G时,开始耗时开始变长
> >>
> >>
> >>
> >>
> >>
> >>
>


Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

2020-06-27 Thread zhisheng
hi, Tianwang Li

看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外:

> 任务经常会出现反压(特别是在窗口输出的时候)

这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。


> 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)

这种也可以看看是不是 HDFS 有时候压力比较大导致的出现毛刺现象

另外建议补一下 UI 上 chekcpoint 相关的截图和日志信息,这样才能更好的定位问题。


Best !
zhisheng


Tianwang Li  于2020年6月28日周日 上午10:17写道:

> 关于Flink checkpoint偶尔会比较长时间的问题。
>
> *环境与背景:*
> 版本:flink1.10.0
> 数据量:每秒约10万左右的记录,数据源是kafka
> 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
> 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。
>
> *问题:*
> 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
> source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting checkpoint消耗时间比较长。
>
> checkpoint情况大致如下:
>
> [image: image.png]
> [image: image.png]
> [image: image.png]
>
> 2020-06-24 21:09:53,369 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Trigger
> checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693.
>
> 2020-06-24 21:09:58,327 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from e88ea2f790430c9c160e540ef0546d60.
>
> 2020-06-24 21:09:59,266 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from b93d7167db364dfdcbda886944f1482f.
>
> 2020-06-24 21:09:59,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP:
> 111/114/424 MB (used/committed/max)]
>
> 2020-06-24 21:09:59,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> Used Memory: 583911424
>
> 2020-06-24 21:09:59,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:09:59,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
> 2020-06-24 21:10:08,346 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from e88ea2f790430c9c160e540ef0546d60.
>
> 2020-06-24 21:10:09,286 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from b93d7167db364dfdcbda886944f1482f.
>
> 2020-06-24 21:10:09,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP:
> 111/114/424 MB (used/committed/max)]
>
> 2020-06-24 21:10:09,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> Used Memory: 583911424
>
> 2020-06-24 21:10:09,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:10:09,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
>
> 省略
>
>
> 2020-06-24 21:55:39,875 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> Used Memory: 583911424
>
> 2020-06-24 21:55:39,875 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:55:39,876 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC
> COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Starting
> checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map -> Filter
> -> Timestamps/Watermarks (4/10)
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy   -
> DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
> checkpointDirectory=hdfs://chk-316,
> sharedStateDirectory=hdfs://shared,
> taskOwnedStateDirectory=hdfs://taskowned,
> metadataFilePath=hdfs://chk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> thread Thread[Source: Custom Source -> Map -> Filter ->
> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
>

Re: Optimal Flink configuration for Standalone cluster.

2020-06-27 Thread Xintong Song
Hi Dimitris,

Regarding your questions.
a) For standalone clusters, the recommended way is to use `.flink.size`
rather than `.process.size`. `.process.size` includes JVM metaspace and
overhead in addition to `.flink.size`, which usually do not really matter
for standalone clusters.
b) In case of direct OOMs, you should increase
`taskmanager.memory.task.off-heap.size`. There's no fraction for that.
c) Your understanding is correct. And you can also specify the absolute
network memory size by setting the min and max to the same value.

Here are my suggestions according to what you described.

   1. Since both off-heap and network memory seems insufficient, I would
   suggest to increase `taskmanager.memory.flink.size` to give your task
   managers more memory in total.
   2. If 1) does not work, I would suggest not to set the total memory
   (means configure neither `.flink.size` nor `process.size`), but go for the
   fine grained configuration where explicitly specify the individual memory
   components. Flink will automatically add them up to derive the total memory.
  1. In addition to `.task.off-heap.size` and `.network.[min|max]`, you
  will also need to set `.task.heap.size` and `managed.size`.
  2. If you don't know how many heap/managed memory to configure, you
  can look for the configuration options in the beginning of the TM logs
  (`-Dkey=value`). Those are the values derived from your current
  configuration.


Thank you~

Xintong Song



On Sat, Jun 27, 2020 at 10:56 PM Dimitris Vogiatzidakis <
dimitrisvogiatzida...@gmail.com> wrote:

> Hello,
>
> I'm having a bit of trouble understanding the memory configuration on
> flink.
> I'm using flink10.0.0 to read some datasets of edges and extract features.
> I run this on a cluster consisting of 4 nodes , with 32cores and 252GB Ram
> each, and hopefully I could expand this as long as I can add extra nodes to
> the cluster.
>
> So regarding the configuration file (flink-conf.yaml).
> a) I can't understand when should I use process.size and when .flink.size.
>
> b) From the detailed memory model I understand that Direct memory is
> included in both of flink and process size, however if I don't specify
> off-heap.task.size I get
> " OutOfMemoryError: Direct buffer memory " .  Also should I change
> off-heap.fraction as well?
>
> c)When I fix this, I get network buffers error, which if I understand
> correctly,  flink.size * network fraction , should be between min and max.
>
> I can't find the 'perfect' configuration regarding my setup. What is the
> optimal way to use the system I have currently?
>
> Thank you for your time.
>
>
>


Re:Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题

2020-06-27 Thread Roc Marshal
是的。


Best,
Roc Marshal.

















在 2020-06-28 10:10:20,"林恬"  写道:
>您的意思是,这些因为Cancel Job的遗留的空的leader/${job_id} ZNode是需要使用者自己定期清理么?
>
>
>
>
>
>
>
>--Original--
>From: "Roc Marshal"Date: Sun, Jun 28, 2020 10:07 AM
>To: "FLINK中国"
>Subject: Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题
>
>
>
>Hi, 林恬.
>首先,感谢你的反馈。
>关于zk对应路径下的信息清理问题,你可以简单理解为,Flink对zk组件的依赖,仅在依赖其功能的范围内。并不会提供整个集群或者某个路径下和Flink 
>job信息一致性的维护,即不会对其进行无效的信息清理,因为在HA的场景下,对无效路径的判定条件要复杂很多。
>
>
>
>
>Best,
>Roc Marshal.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-28 09:12:41,"林恬" 各位好:
>nbsp; nbsp; 目前我使用的是Flink 1.9.2, HA使用ZK, 
>使用过程中发现ZK上的/leader/${job_id} 
>节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢?
>
>
>nbsp;


Flink-1.10.0 source的checkpoint偶尔时间比较长

2020-06-27 Thread Tianwang Li
关于Flink checkpoint偶尔会比较长时间的问题。

*环境与背景:*
版本:flink1.10.0
数据量:每秒约10万左右的记录,数据源是kafka
计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。

*问题:*
大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting checkpoint消耗时间比较长。

checkpoint情况大致如下:

[image: image.png]
[image: image.png]
[image: image.png]

2020-06-24 21:09:53,369 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Trigger
checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693.

2020-06-24 21:09:58,327 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
heartbeat request from e88ea2f790430c9c160e540ef0546d60.

2020-06-24 21:09:59,266 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
heartbeat request from b93d7167db364dfdcbda886944f1482f.

2020-06-24 21:09:59,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP: 111/114/424
MB (used/committed/max)]

2020-06-24 21:09:59,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used
Memory: 583911424

2020-06-24 21:09:59,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - Off-heap pool stats: [Code Cache: 35/35/240 MB
(used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
[Compressed Class Space: 8/9/88 MB (used/committed/max)]

2020-06-24 21:09:59,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]

2020-06-24 21:10:08,346 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
heartbeat request from e88ea2f790430c9c160e540ef0546d60.

2020-06-24 21:10:09,286 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
heartbeat request from b93d7167db364dfdcbda886944f1482f.

2020-06-24 21:10:09,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP: 111/114/424
MB (used/committed/max)]

2020-06-24 21:10:09,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used
Memory: 583911424

2020-06-24 21:10:09,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - Off-heap pool stats: [Code Cache: 35/35/240 MB
(used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
[Compressed Class Space: 8/9/88 MB (used/committed/max)]

2020-06-24 21:10:09,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]


省略


2020-06-24 21:55:39,875 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used
Memory: 583911424

2020-06-24 21:55:39,875 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - Off-heap pool stats: [Code Cache: 35/35/240 MB
(used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
[Compressed Class Space: 8/9/88 MB (used/committed/max)]

2020-06-24 21:55:39,876 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC
COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask   - Starting
checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map -> Filter
-> Timestamps/Watermarks (4/10)

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy   -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
checkpointDirectory=hdfs://chk-316,
sharedStateDirectory=hdfs://shared,
taskOwnedStateDirectory=hdfs://taskowned,
metadataFilePath=hdfs://chk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Source: Custom Source -> Map -> Filter ->
Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy   -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
checkpointDirectory=hdfs://chk-316,
sharedStateDirectory=hdfs://shared,
taskOwnedStateDirectory=hdfs://taskowned,
metadataFilePath=hdfs://chk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Source: Custom Source -> Map -> Filter ->
Timestamps/Watermarks (4/10),5,Flink 

Re: 如何快速定位拖慢速度的 operator

2020-06-27 Thread 徐骁
好的 感谢两位我试试

Sun.Zhu <17626017...@163.com> 于2020年6月25日周四 下午11:19写道:

> 虽然chain在一起,但是可以通过metrics中看出来各个算子的各项指标的
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月25日 00:51,徐骁 写道:
> 两个方法确实可以, 但是要追踪起来很废时间, 对小白太不友好啊
>


Re: flink sql row类型group by

2020-06-27 Thread Benchao Li
Hi,
我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么?
能附上异常栈就更好啦。

sunfulin  于2020年6月25日周四 下午4:35写道:

> Hi,
> 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
> 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?



-- 

Best,
Benchao Li


Re:Re: 为什么 flink checkpoint Checkpoint Duration (Async) 阶段耗时很慢

2020-06-27 Thread 张立志
从监控后台看back presure 是正常的,flatMap 这个任务是存在的,但只是连了一下mysql,没有别的任何操作,而且另一个job 
没有flatmap ,单纯的map reduce  
统计,能跑10几天,到1个多G的时侯就明显变慢,然后超时10分钟就报错了,从后台的错误日志里,没有明显的异常信息,都是checkpoint 超时后的信息.
在 2020-06-28 09:58:00,"LakeShen"  写道:
>Hi 张立志,
>
>一般 Checkpoint 超时,可以先看看你的任务中,是否存在反压,比如 Sink 阶段,又或者是某个地方有 flatMap操作导致。
>
>然后看下自己任务中,是否存在热点问题等。如果一切都是正常的话,可以尝试使用 RocksDB 的增量 Checkpoint ,具体参考[1]。
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#rocksdb-state-backend-details
>
>Best,
>LakeShen
>
>张立志  于2020年6月28日周日 上午9:52写道:
>
>> flink 版本1.8
>> 部署集群yarn
>>
>>
>> 配置代码:
>> StreamExecutionEnvironment.stateBackend(new
>> FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build();
>> 业务代码相对比较简单,内存占用较大
>> 超过10分钟后开始报错,state 大概在1.5G时,开始耗时开始变长
>>
>>
>>
>>
>>
>>


Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题

2020-06-27 Thread 林恬
您的意思是,这些因为Cancel Job的遗留的空的leader/${job_id} ZNode是需要使用者自己定期清理么?







--Original--
From: "Roc Marshal"

Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题

2020-06-27 Thread Roc Marshal
Hi, 林恬.
首先,感谢你的反馈。
关于zk对应路径下的信息清理问题,你可以简单理解为,Flink对zk组件的依赖,仅在依赖其功能的范围内。并不会提供整个集群或者某个路径下和Flink 
job信息一致性的维护,即不会对其进行无效的信息清理,因为在HA的场景下,对无效路径的判定条件要复杂很多。




Best,
Roc Marshal.

















在 2020-06-28 09:12:41,"林恬"  写道:
>各位好:
>  目前我使用的是Flink 1.9.2, HA使用ZK, 使用过程中发现ZK上的/leader/${job_id} 
>节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢?
>
>
>


Re: Heartbeat of TaskManager timed out.

2020-06-27 Thread Xintong Song
Hi Ori,

Here are some suggestions from my side.

   - Probably the most straightforward way is to try increasing the timeout
   to see if that helps. You can leverage the configuration option
   `heartbeat.timeout`[1]. The default is 50s.
   - It might be helpful to share your configuration setups (e.g., the TM
   resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
   share the beginning part of your JM/TM logs, including the JVM parameters
   and all the loaded configurations.
   - You may want to look into the GC logs in addition to the metrics. In
   case of a CMS GC stop-the-world, you may not be able to see the most recent
   metrics due to the process not responding to the metric querying services.
   - You may also look into the status of the JM process. If JM is under
   significant GC pressure, it could also happen that the heartbeat message
   from TM is not timely handled before the timeout check.
   - Is there any metrics monitoring the network condition between the JM
   and timeouted TM? Possibly any jitters?


Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout

On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski  wrote:

> Hello,
>
> I'm running Flink 1.10 on EMR and reading from Kafka with 189 partitions
> and I have parallelism of 189.
>
> Currently running with RocksDB, with checkpointing disabled. My state size
> is appx. 500gb.
>
> I'm getting sporadic "Heartbeat of TaskManager timed out" errors with no
> apparent reason.
>
> I check the container that gets the timeout for GC pauses, heap memory,
> direct memory, mapped memory, offheap memory, CPU load, network load, total
> out-records, total in-records, backpressure, and everything I can think of.
> But all those metrics show that there's nothing unusual, and it has around
> average values for all those metrics. There are a lot of other containers
> which score higher.
>
> All the metrics are very low because every TaskManager runs on a
> r5.2xlarge machine alone.
>
> I'm trying to debug this for days and I cannot find any explanation for it.
>
> Can someone explain why it's happening?
>
> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
> container_1593074931633_0011_01_000127 timed out.
> at org.apache.flink.runtime.jobmaster.
> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster
> .java:1147)
> at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
> HeartbeatMonitorImpl.java:109)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
> 511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
> AkkaRpcActor.java:397)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
> AkkaRpcActor.java:190)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
> .handleRpcMessage(FencedAkkaRpcActor.java:74)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
> AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
> .java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:
> 1979)
> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> Thanks
>


Re: flink1.9 on yarn

2020-06-27 Thread Yangze Guo
我理解你需要使用session模式,即./bin/yarn-session.sh [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#flink-yarn-session

Best,
Yangze Guo

On Sun, Jun 28, 2020 at 9:10 AM guanyq  wrote:
>
> 问题1
>
> ./bin/flink run -m 
> yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
>
> 当yarn application -kill application_1567067657620_0254后,
>
> 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
>
> 问题2
>
> ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
>
>


Re:为什么 flink checkpoint Checkpoint Duration (Async) 阶段耗时很慢

2020-06-27 Thread Roc Marshal
Hi, 立志。
能不能提供一下更多的信息,比如异常信息等,方便对这个case背景做更进一步的了解呢?


谢谢。


Best,
Roc Marshal














在 2020-06-28 09:52:10,"张立志"  写道:
>flink 版本1.8
>部署集群yarn
>
>
>配置代码:
>StreamExecutionEnvironment.stateBackend(new 
>FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build();
>业务代码相对比较简单,内存占用较大
>超过10分钟后开始报错,state 大概在1.5G时,开始耗时开始变长
>
>
>
>
>


Re: 为什么 flink checkpoint Checkpoint Duration (Async) 阶段耗时很慢

2020-06-27 Thread LakeShen
Hi 张立志,

一般 Checkpoint 超时,可以先看看你的任务中,是否存在反压,比如 Sink 阶段,又或者是某个地方有 flatMap操作导致。

然后看下自己任务中,是否存在热点问题等。如果一切都是正常的话,可以尝试使用 RocksDB 的增量 Checkpoint ,具体参考[1]。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#rocksdb-state-backend-details

Best,
LakeShen

张立志  于2020年6月28日周日 上午9:52写道:

> flink 版本1.8
> 部署集群yarn
>
>
> 配置代码:
> StreamExecutionEnvironment.stateBackend(new
> FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build();
> 业务代码相对比较简单,内存占用较大
> 超过10分钟后开始报错,state 大概在1.5G时,开始耗时开始变长
>
>
>
>
>
>


Re: flink1.9 on yarn

2020-06-27 Thread LakeShen
Hi guanyq,

你为什么希望 app id 不变呢?

Best,
LakeShen

guanyq  于2020年6月28日周日 上午9:10写道:

> 问题1
>
> ./bin/flink run -m
> yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
>
> 当yarn application -kill application_1567067657620_0254后,
>
> 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
>
> 问题2
>
> ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
>
>


为什么 flink checkpoint Checkpoint Duration (Async) 阶段耗时很慢

2020-06-27 Thread 张立志
flink 版本1.8
部署集群yarn


配置代码:
StreamExecutionEnvironment.stateBackend(new 
FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build();
业务代码相对比较简单,内存占用较大
超过10分钟后开始报错,state 大概在1.5G时,开始耗时开始变长







Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题

2020-06-27 Thread 林恬
各位好:
  目前我使用的是Flink 1.9.2, HA使用ZK, 使用过程中发现ZK上的/leader/${job_id} 
节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢?




flink1.9 on yarn

2020-06-27 Thread guanyq
问题1

./bin/flink run -m 
yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254

当yarn application -kill application_1567067657620_0254后,

在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?

问题2

./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?

 

Re: Running Apache Flink on the GraalVM as a Native Image

2020-06-27 Thread Stephen Connolly
On Thu 25 Jun 2020 at 12:48, ivo.kn...@t-online.de 
wrote:

> Whats up guys,
>
>
>
> I'm trying to run an Apache Flink Application with the GraalVM Native
> Image but I get the following error: (check attached file)
>
>
>
> I suppose this happens, because Flink uses a lot of low-level-code and is
> highly optimized.
>

Actually I suspect the reason is that Flink uses dynamic classloading.

GraalVM requires all the code available in order to produce a native image.

You’d need to pre-bind the topology you want Flink to run into the native
image.

More fun, you’ll actually need two images, one for the job manager and one
for the task manager.

And you’ll need to convince GraalVM that the entry-point is your topology
needs reflection support enabled... plus whatever other classes use
reflection in Flink.

Sounds rather complex to me. If native images are what is important to you,
there seemed to be a strong contender in the Rust language community,
didn’t provide as strong management as Flink, and you’d probably have more
work managing things like checkpointing, but if native code is important
that’s where I’d be looking. Sadly I cannot remember the name and my
google-foo is weak tonight


>
> When I googled the combination of GraalVM Native Image and Apache Flink I
> get no results.
>
>
>
> Did anyone ever succeeded in making it work and how?
>
>
>
> Best regards,
>
>
>
> Ivo
> 
>
-- 
Sent from my phone


Re: Question about Watermarks within a KeyedProcessFunction

2020-06-27 Thread David Anderson
With an AscendingTimestampExtractor, watermarks are not created for every
event, and as your job starts up, some events will be processed before the
first watermark is generated.

The impossible value you see is an initial value that's in place until the
first real watermark is available. On the other hand, onTimer can not be
called until some timer is triggered by the arrival of a watermark, at
which point the watermark will have a reasonable value.

On Sat, Jun 27, 2020 at 2:37 AM Marco Villalobos 
wrote:

>
> My source is a Kafka topic.
> I am using Event Time.
> I assign the event time with an AscendingTimestampExtractor
>
> I noticed when debugging that in the KeyedProcessFunction that
> after my highest known event time of:  2020-06-23T00:46:30.000Z
>
> the processElement method had a watermark with an impossible date of:
> -292275055-05-16T16:47:04.192Z
>
> but in the onTimer method it had a more reasonable value that trails the
> highest known event time by 1 millisecond, which is this
> value:  2020-06-23T00:46:29.999Z
>
> I want to know, why does the processElement method have an impossible
> watermark value?
>
>
>


Optimal Flink configuration for Standalone cluster.

2020-06-27 Thread Dimitris Vogiatzidakis
Hello,

I'm having a bit of trouble understanding the memory configuration on
flink.
I'm using flink10.0.0 to read some datasets of edges and extract features.
I run this on a cluster consisting of 4 nodes , with 32cores and 252GB Ram
each, and hopefully I could expand this as long as I can add extra nodes to
the cluster.

So regarding the configuration file (flink-conf.yaml).
a) I can't understand when should I use process.size and when .flink.size.

b) From the detailed memory model I understand that Direct memory is
included in both of flink and process size, however if I don't specify
off-heap.task.size I get
" OutOfMemoryError: Direct buffer memory " .  Also should I change
off-heap.fraction as well?

c)When I fix this, I get network buffers error, which if I understand
correctly,  flink.size * network fraction , should be between min and max.

I can't find the 'perfect' configuration regarding my setup. What is the
optimal way to use the system I have currently?

Thank you for your time.


??Flink Sql ????????????????????????????

2020-06-27 Thread ????????
Hi,all:


Flink sql 
,??A??c1,c2,c3c1c4c1,c4,c2,c3.


.

Distributed Anomaly Detection using MIDAS

2020-06-27 Thread Shivin Srivastava
Hi All,

I have recently been exploring MIDAS: an algorithm for Streaming Anomaly
Detection. A production level parallel and distributed implementation of
MIDAS should be quite useful to the industry. I feel that Flink is very
well-suited for the same as MIDAS deals with streaming data. If anyone is
interested to contribute/collaborate, please let me know. Currently, there
exist C++, Python, Ruby, Rust, R, and Golang implementations.

MIDAS repository: https://github.com/bhatiasiddharth/MIDAS
MIDAS paper: https://www.comp.nus.edu.sg/~sbhatia/assets/pdf/midas.pdf

Thanks!
Shivin


Is Flink HIPAA certified

2020-06-27 Thread Prasanna kumar
Hi Community ,

Could anyone let me know if Flink is used in US healthcare tech space ?

Thanks,
Prasanna.