Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题
flink本身不提供cancel job后清理zookeeper上残留znode的功能或机制,包括hdfs上的部分数据,如果想清除的话,可手动操作或者自实现。 在 2020-06-28 09:12:41,"林恬" 写道: >各位好: > 目前我使用的是Flink 1.9.2, HA使用ZK, 使用过程中发现ZK上的/leader/${job_id} >节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢? > > >
Re: Native K8S IAM Role?
Hi kevin, If you mean to add annotations for Flink native K8s session pods, you could use "kubernetes.jobmanager.annotations" and "kubernetes.taskmanager.annotations"[1]. However, they are only supported from release-1.11. Maybe you could wait for a little bit more time, 1.11 will be released soon. And we add more features for native K8s integration in 1.11 (e.g. application mode, label, annotation, toleration, etc.). [1]. https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#kubernetes Best, Yang Bohinski, Kevin 于2020年6月26日周五 上午3:09写道: > Hi, > > > > How do we attach an IAM role to the native K8S sessions? > > > > Typically for our other pods we use the following in our yamls: > > spec: > > template: > > metadata: > > annotations: > > iam.amazonaws.com/role: ROLE_ARN > > > > Best > > kevin >
回复: flink1.9 on yarn
问题1 ./bin/flink run -m yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254 当yarn application -kill application_1567067657620_0254后, 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增? 问题2 ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
Re: flink REST API是否支持-C参数
Flink CLI是把-C的参数apply到了client端生成的JobGraph里,然后提交JobGraph来运行的 使用Rest方式提交,目前确实不支持针对单个Job设置classpath,我觉得这是一个合理的需求,可以提个JIRA 目前work around的办法只能是配置到cluster的configuration里面,在启动session的时候使用-C/--classpath 或者-D pipeline.classpaths=xxx,yyy,这样所有的job都会把它们增加到classpath里了 Best, Yang chenxuying 于2020年6月24日周三 下午6:13写道: > 目前使用的是flink 1.10.0 > 背景: > REST API有一个提交job的接口 > 接口 /jars/:jarid/run > > 参数entryClass,programArgs,parallelism,jobId,allowNonRestoredState,savepointPath > > > 如果使用命令行方式提交job > flink run -C file:///usr/local/soft/flink/my-function-0.1.jar -c > cn.xuying.flink.table.sql.ParserSqlJob > /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar > 可以看到命令行方式支持-C提供另外的jar包,flink会加载到classpath > 问题: > 发现目前的restapi并没有提供想命令行一样的-C参数的功能 , 所以想知道这个功能将来是否会增加
flink table??????????????????????????????
flink??1.8.0 ??flink table ?? tEnv.registerDataStream("t_data",dataStream,"f1-1"); ?? org.apache.flink.table.api.TableException: Field reference expression expected.
Re: 【Flink Sql 支持表在指定列后面加字段么】
Hi, 能具体说明下你的场景和需求么? 数据源是什么,源数据中是否原先就有c4这一列呢,还是新增加了 c4 这一列呢? 其次,这个问题要看你的 connector 是什么。 有的 connector 是根据列名来映射的(如 JSON,各种数据库),有的是根据列名顺序来映射的(如 CSV)。 如果是按列名来映射的,那么在 Flink SQL DDL 中,新增 c4 一列就能读取到 c4 的值,不管c4 在哪个字段之后。 Best, Jark On Sat, 27 Jun 2020 at 17:26, 忝忝向仧 <153488...@qq.com> wrote: > Hi,all: > > > Flink sql > 支持在表的指定列后面加字段么,比如表A有c1,c2,c3字段,现在我想在c1后面添加字段c4变为c1,c4,c2,c3而不是在末尾追加. > > > 谢谢.
Re: Flink-1.10.0 source的checkpoint偶尔时间比较长
我补充一下,checkpoint的UI截图如下: https://imgchr.com/i/NgCUgS https://imgchr.com/i/NgChDJ https://imgchr.com/i/NgCT4x > -- ** tivanli **
Re: Flink-1.10.0 source的checkpoint偶尔时间比较长
Hi Tianwang Li, 偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。 Best, LakeShen zhisheng 于2020年6月28日周日 上午10:27写道: > hi, Tianwang Li > > 看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外: > > > 任务经常会出现反压(特别是在窗口输出的时候) > > 这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。 > > > > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次) > > 这种也可以看看是不是 HDFS 有时候压力比较大导致的出现毛刺现象 > > 另外建议补一下 UI 上 chekcpoint 相关的截图和日志信息,这样才能更好的定位问题。 > > > Best ! > zhisheng > > > Tianwang Li 于2020年6月28日周日 上午10:17写道: > > > 关于Flink checkpoint偶尔会比较长时间的问题。 > > > > *环境与背景:* > > 版本:flink1.10.0 > > 数据量:每秒约10万左右的记录,数据源是kafka > > 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。 > > 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。 > > > > *问题:* > > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。 > > source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting > checkpoint消耗时间比较长。 > > > > checkpoint情况大致如下: > > > > [image: image.png] > > [image: image.png] > > [image: image.png] > > > > 2020-06-24 21:09:53,369 DEBUG > > org.apache.flink.runtime.taskexecutor.TaskExecutor- Trigger > > checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693. > > > > 2020-06-24 21:09:58,327 DEBUG > > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received > > heartbeat request from e88ea2f790430c9c160e540ef0546d60. > > > > 2020-06-24 21:09:59,266 DEBUG > > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received > > heartbeat request from b93d7167db364dfdcbda886944f1482f. > > > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP: > > 111/114/424 MB (used/committed/max)] > > > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Direct memory stats: Count: 17403, Total Capacity: 583911423, > > Used Memory: 583911424 > > > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Off-heap pool stats: [Code Cache: 35/35/240 MB > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC > > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > > 2020-06-24 21:10:08,346 DEBUG > > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received > > heartbeat request from e88ea2f790430c9c160e540ef0546d60. > > > > 2020-06-24 21:10:09,286 DEBUG > > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received > > heartbeat request from b93d7167db364dfdcbda886944f1482f. > > > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP: > > 111/114/424 MB (used/committed/max)] > > > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Direct memory stats: Count: 17403, Total Capacity: 583911423, > > Used Memory: 583911424 > > > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Off-heap pool stats: [Code Cache: 35/35/240 MB > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC > > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > > > > 省略 > > > > > > 2020-06-24 21:55:39,875 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Direct memory stats: Count: 17403, Total Capacity: 583911423, > > Used Memory: 583911424 > > > > 2020-06-24 21:55:39,875 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Off-heap pool stats: [Code Cache: 35/35/240 MB > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > > > 2020-06-24 21:55:39,876 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC > > COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > > 2020-06-24 21:55:41,721 DEBUG > > org.apache.flink.streaming.runtime.tasks.StreamTask - Starting > > checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map -> > Filter > > -> Timestamps/Watermarks (4/10) > > > > 2020-06-24 21:55:41,721 DEBUG > > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd > , > > checkpointDirectory=hdfs://chk-316, > >
Re: flink1.9 on yarn
hi,guanyq 你这种提交方式属于 Flink On YARN 的 per job 模式,机制是这样的,当新提一个作业的时候,AppID 是会变化的。 Best! zhisheng Yangze Guo 于2020年6月28日周日 上午9:59写道: > 我理解你需要使用session模式,即./bin/yarn-session.sh [1] > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#flink-yarn-session > > Best, > Yangze Guo > > On Sun, Jun 28, 2020 at 9:10 AM guanyq wrote: > > > > 问题1 > > > > ./bin/flink run -m > yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254 > > > > 当yarn application -kill application_1567067657620_0254后, > > > > 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增? > > > > 问题2 > > > > ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上? > > > > >
Re:flink1.9 on yarn
Hi, guanyq. 关于问题1:在提交./bin/flink run -m yarn-cluster如何不让这个appid递增? 这个appid的自增策略并不是根据Flink负责生成,如果有必要,你可以对hadoop-yarn进行调研,并做出你的结论。 关于问题2 ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上? 我是否可以理解为,flink yarn-session模式的集群更适合你的作业需求呢?因为在问题中提到的提交方式为per-job,job关闭后,Flink即关闭集群。 可参考: https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#start-flink-session Best, Roc Marshal 在 2020-06-28 09:09:43,"guanyq" 写道: >问题1 > >./bin/flink run -m >yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254 > >当yarn application -kill application_1567067657620_0254后, > >在提交./bin/flink run -m yarn-cluster如何不让这个appid递增? > >问题2 > >./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上? > >
Re: CEP use case !
Hi Aissa, Flink CEP is an api that processes multi-event matching with a pattern, like (START MIDDLE+ END). If you can calculate the "sensor_status" by one record, I think Flink DataStream API / Table & SQL API could satisfy your requirement already. Aissa Elaffani 于2020年6月25日周四 下午11:35写道: > Hello Guys, > I am asking if the CEP Api can resolve my use case. Actually, I have a lot > of sensors generating some data, and I want to apply a rules engine on > those sensor's data,in order to define a "sensor_status" if it is Normal or > Alert or warning.for each record I want to apply some conditions (if > temperature>15, humidity>...) and then defne the status of the sensor, if > it is in Nomarl status, or Alerte status ... > And I am wondering if CEP Api can help me achieve that. > Thank you guys for your time ! > Best, > AISSA > -- Best, Benchao Li
Re: Re: Flink可以用Canal对接Oracle么?
欢迎使用后来反馈一下~ On Thu, 25 Jun 2020 at 14:55, gaolanf...@hotmail.com wrote: > Hello > > 非常感谢! > > 查了您给的资料 > Canal支持MySQL, > Debezuim支持 MySQL, PostgreSQL, Oracle等, > >不过Debezuim没用过,用的人也相对少,自己去试试看~ > > > > > gaolanf...@hotmail.com > > 发件人: Leonard Xu > 发送时间: 2020-06-25 13:27 > 收件人: user-zh > 主题: Re: Flink可以用Canal对接Oracle么? > > Hello > > Flink 1.11 会支持了读取CDC的功能,支持接入canal和debezuim的format的,1.11快要发布了,这是文档[1],可以参考。 > > Best, > Leonard Xu > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/canal.html > < > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/canal.html > > >
Re: Re: 为什么 flink checkpoint Checkpoint Duration (Async) 阶段耗时很慢
hi,立志: 从你的描述(能跑 10 几天且使用的是 FsStateBackend),可以提供一下 JobManager 和 TaskManager 的 GC 时间和次数的监控信息吗?怀疑是不是因为 Full GC 导致的问题。 Best! zhisheng 张立志 于2020年6月28日周日 上午10:13写道: > 从监控后台看back presure 是正常的,flatMap 这个任务是存在的,但只是连了一下mysql,没有别的任何操作,而且另一个job > 没有flatmap ,单纯的map reduce > 统计,能跑10几天,到1个多G的时侯就明显变慢,然后超时10分钟就报错了,从后台的错误日志里,没有明显的异常信息,都是checkpoint > 超时后的信息. > 在 2020-06-28 09:58:00,"LakeShen" 写道: > >Hi 张立志, > > > >一般 Checkpoint 超时,可以先看看你的任务中,是否存在反压,比如 Sink 阶段,又或者是某个地方有 flatMap操作导致。 > > > >然后看下自己任务中,是否存在热点问题等。如果一切都是正常的话,可以尝试使用 RocksDB 的增量 Checkpoint ,具体参考[1]。 > > > >[1] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#rocksdb-state-backend-details > > > >Best, > >LakeShen > > > >张立志 于2020年6月28日周日 上午9:52写道: > > > >> flink 版本1.8 > >> 部署集群yarn > >> > >> > >> 配置代码: > >> StreamExecutionEnvironment.stateBackend(new > >> > FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build(); > >> 业务代码相对比较简单,内存占用较大 > >> 超过10分钟后开始报错,state 大概在1.5G时,开始耗时开始变长 > >> > >> > >> > >> > >> > >> >
Re: Flink-1.10.0 source的checkpoint偶尔时间比较长
hi, Tianwang Li 看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外: > 任务经常会出现反压(特别是在窗口输出的时候) 这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。 > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次) 这种也可以看看是不是 HDFS 有时候压力比较大导致的出现毛刺现象 另外建议补一下 UI 上 chekcpoint 相关的截图和日志信息,这样才能更好的定位问题。 Best ! zhisheng Tianwang Li 于2020年6月28日周日 上午10:17写道: > 关于Flink checkpoint偶尔会比较长时间的问题。 > > *环境与背景:* > 版本:flink1.10.0 > 数据量:每秒约10万左右的记录,数据源是kafka > 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。 > 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。 > > *问题:* > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。 > source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting checkpoint消耗时间比较长。 > > checkpoint情况大致如下: > > [image: image.png] > [image: image.png] > [image: image.png] > > 2020-06-24 21:09:53,369 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor- Trigger > checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693. > > 2020-06-24 21:09:58,327 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received > heartbeat request from e88ea2f790430c9c160e540ef0546d60. > > 2020-06-24 21:09:59,266 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received > heartbeat request from b93d7167db364dfdcbda886944f1482f. > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP: > 111/114/424 MB (used/committed/max)] > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Direct memory stats: Count: 17403, Total Capacity: 583911423, > Used Memory: 583911424 > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Off-heap pool stats: [Code Cache: 35/35/240 MB > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > 2020-06-24 21:09:59,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > 2020-06-24 21:10:08,346 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received > heartbeat request from e88ea2f790430c9c160e540ef0546d60. > > 2020-06-24 21:10:09,286 DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received > heartbeat request from b93d7167db364dfdcbda886944f1482f. > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP: > 111/114/424 MB (used/committed/max)] > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Direct memory stats: Count: 17403, Total Capacity: 583911423, > Used Memory: 583911424 > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Off-heap pool stats: [Code Cache: 35/35/240 MB > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > 2020-06-24 21:10:09,686 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > > 省略 > > > 2020-06-24 21:55:39,875 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Direct memory stats: Count: 17403, Total Capacity: 583911423, > Used Memory: 583911424 > > 2020-06-24 21:55:39,875 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Off-heap pool stats: [Code Cache: 35/35/240 MB > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], > [Compressed Class Space: 8/9/88 MB (used/committed/max)] > > 2020-06-24 21:55:39,876 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner > - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC > COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask - Starting > checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map -> Filter > -> Timestamps/Watermarks (4/10) > > 2020-06-24 21:55:41,721 DEBUG > org.apache.flink.runtime.state.AbstractSnapshotStrategy - > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, > checkpointDirectory=hdfs://chk-316, > sharedStateDirectory=hdfs://shared, > taskOwnedStateDirectory=hdfs://taskowned, > metadataFilePath=hdfs://chk-316/_metadata, reference=(default), > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in > thread Thread[Source: Custom Source -> Map -> Filter -> > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. >
Re: Optimal Flink configuration for Standalone cluster.
Hi Dimitris, Regarding your questions. a) For standalone clusters, the recommended way is to use `.flink.size` rather than `.process.size`. `.process.size` includes JVM metaspace and overhead in addition to `.flink.size`, which usually do not really matter for standalone clusters. b) In case of direct OOMs, you should increase `taskmanager.memory.task.off-heap.size`. There's no fraction for that. c) Your understanding is correct. And you can also specify the absolute network memory size by setting the min and max to the same value. Here are my suggestions according to what you described. 1. Since both off-heap and network memory seems insufficient, I would suggest to increase `taskmanager.memory.flink.size` to give your task managers more memory in total. 2. If 1) does not work, I would suggest not to set the total memory (means configure neither `.flink.size` nor `process.size`), but go for the fine grained configuration where explicitly specify the individual memory components. Flink will automatically add them up to derive the total memory. 1. In addition to `.task.off-heap.size` and `.network.[min|max]`, you will also need to set `.task.heap.size` and `managed.size`. 2. If you don't know how many heap/managed memory to configure, you can look for the configuration options in the beginning of the TM logs (`-Dkey=value`). Those are the values derived from your current configuration. Thank you~ Xintong Song On Sat, Jun 27, 2020 at 10:56 PM Dimitris Vogiatzidakis < dimitrisvogiatzida...@gmail.com> wrote: > Hello, > > I'm having a bit of trouble understanding the memory configuration on > flink. > I'm using flink10.0.0 to read some datasets of edges and extract features. > I run this on a cluster consisting of 4 nodes , with 32cores and 252GB Ram > each, and hopefully I could expand this as long as I can add extra nodes to > the cluster. > > So regarding the configuration file (flink-conf.yaml). > a) I can't understand when should I use process.size and when .flink.size. > > b) From the detailed memory model I understand that Direct memory is > included in both of flink and process size, however if I don't specify > off-heap.task.size I get > " OutOfMemoryError: Direct buffer memory " . Also should I change > off-heap.fraction as well? > > c)When I fix this, I get network buffers error, which if I understand > correctly, flink.size * network fraction , should be between min and max. > > I can't find the 'perfect' configuration regarding my setup. What is the > optimal way to use the system I have currently? > > Thank you for your time. > > >
Re:Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题
是的。 Best, Roc Marshal. 在 2020-06-28 10:10:20,"林恬" 写道: >您的意思是,这些因为Cancel Job的遗留的空的leader/${job_id} ZNode是需要使用者自己定期清理么? > > > > > > > >--Original-- >From: "Roc Marshal"Date: Sun, Jun 28, 2020 10:07 AM >To: "FLINK中国" >Subject: Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题 > > > >Hi, 林恬. >首先,感谢你的反馈。 >关于zk对应路径下的信息清理问题,你可以简单理解为,Flink对zk组件的依赖,仅在依赖其功能的范围内。并不会提供整个集群或者某个路径下和Flink >job信息一致性的维护,即不会对其进行无效的信息清理,因为在HA的场景下,对无效路径的判定条件要复杂很多。 > > > > >Best, >Roc Marshal. > > > > > > > > > > > > > > > > > >在 2020-06-28 09:12:41,"林恬" 各位好: >nbsp; nbsp; 目前我使用的是Flink 1.9.2, HA使用ZK, >使用过程中发现ZK上的/leader/${job_id} >节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢? > > >nbsp;
Flink-1.10.0 source的checkpoint偶尔时间比较长
关于Flink checkpoint偶尔会比较长时间的问题。 *环境与背景:* 版本:flink1.10.0 数据量:每秒约10万左右的记录,数据源是kafka 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。 *问题:* 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。 source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting checkpoint消耗时间比较长。 checkpoint情况大致如下: [image: image.png] [image: image.png] [image: image.png] 2020-06-24 21:09:53,369 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor- Trigger checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693. 2020-06-24 21:09:58,327 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor- Received heartbeat request from e88ea2f790430c9c160e540ef0546d60. 2020-06-24 21:09:59,266 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor- Received heartbeat request from b93d7167db364dfdcbda886944f1482f. 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP: 111/114/424 MB (used/committed/max)] 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used Memory: 583911424 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap pool stats: [Code Cache: 35/35/240 MB (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], [Compressed Class Space: 8/9/88 MB (used/committed/max)] 2020-06-24 21:09:59,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] 2020-06-24 21:10:08,346 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor- Received heartbeat request from e88ea2f790430c9c160e540ef0546d60. 2020-06-24 21:10:09,286 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor- Received heartbeat request from b93d7167db364dfdcbda886944f1482f. 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP: 111/114/424 MB (used/committed/max)] 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used Memory: 583911424 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap pool stats: [Code Cache: 35/35/240 MB (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], [Compressed Class Space: 8/9/88 MB (used/committed/max)] 2020-06-24 21:10:09,686 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] 省略 2020-06-24 21:55:39,875 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used Memory: 583911424 2020-06-24 21:55:39,875 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap pool stats: [Code Cache: 35/35/240 MB (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], [Compressed Class Space: 8/9/88 MB (used/committed/max)] 2020-06-24 21:55:39,876 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6] 2020-06-24 21:55:41,721 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) 2020-06-24 21:55:41,721 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://chk-316, sharedStateDirectory=hdfs://shared, taskOwnedStateDirectory=hdfs://taskowned, metadataFilePath=hdfs://chk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms. 2020-06-24 21:55:41,721 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://chk-316, sharedStateDirectory=hdfs://shared, taskOwnedStateDirectory=hdfs://taskowned, metadataFilePath=hdfs://chk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10),5,Flink
Re: 如何快速定位拖慢速度的 operator
好的 感谢两位我试试 Sun.Zhu <17626017...@163.com> 于2020年6月25日周四 下午11:19写道: > 虽然chain在一起,但是可以通过metrics中看出来各个算子的各项指标的 > | | > Sun.Zhu > | > | > 17626017...@163.com > | > 签名由网易邮箱大师定制 > > > 在2020年06月25日 00:51,徐骁 写道: > 两个方法确实可以, 但是要追踪起来很废时间, 对小白太不友好啊 >
Re: flink sql row类型group by
Hi, 我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么? 能附上异常栈就更好啦。 sunfulin 于2020年6月25日周四 下午4:35写道: > Hi, > 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。 > 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈? -- Best, Benchao Li
Re:Re: 为什么 flink checkpoint Checkpoint Duration (Async) 阶段耗时很慢
从监控后台看back presure 是正常的,flatMap 这个任务是存在的,但只是连了一下mysql,没有别的任何操作,而且另一个job 没有flatmap ,单纯的map reduce 统计,能跑10几天,到1个多G的时侯就明显变慢,然后超时10分钟就报错了,从后台的错误日志里,没有明显的异常信息,都是checkpoint 超时后的信息. 在 2020-06-28 09:58:00,"LakeShen" 写道: >Hi 张立志, > >一般 Checkpoint 超时,可以先看看你的任务中,是否存在反压,比如 Sink 阶段,又或者是某个地方有 flatMap操作导致。 > >然后看下自己任务中,是否存在热点问题等。如果一切都是正常的话,可以尝试使用 RocksDB 的增量 Checkpoint ,具体参考[1]。 > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#rocksdb-state-backend-details > >Best, >LakeShen > >张立志 于2020年6月28日周日 上午9:52写道: > >> flink 版本1.8 >> 部署集群yarn >> >> >> 配置代码: >> StreamExecutionEnvironment.stateBackend(new >> FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build(); >> 业务代码相对比较简单,内存占用较大 >> 超过10分钟后开始报错,state 大概在1.5G时,开始耗时开始变长 >> >> >> >> >> >>
Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题
您的意思是,这些因为Cancel Job的遗留的空的leader/${job_id} ZNode是需要使用者自己定期清理么? --Original-- From: "Roc Marshal"
Re:Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题
Hi, 林恬. 首先,感谢你的反馈。 关于zk对应路径下的信息清理问题,你可以简单理解为,Flink对zk组件的依赖,仅在依赖其功能的范围内。并不会提供整个集群或者某个路径下和Flink job信息一致性的维护,即不会对其进行无效的信息清理,因为在HA的场景下,对无效路径的判定条件要复杂很多。 Best, Roc Marshal. 在 2020-06-28 09:12:41,"林恬" 写道: >各位好: > 目前我使用的是Flink 1.9.2, HA使用ZK, 使用过程中发现ZK上的/leader/${job_id} >节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢? > > >
Re: Heartbeat of TaskManager timed out.
Hi Ori, Here are some suggestions from my side. - Probably the most straightforward way is to try increasing the timeout to see if that helps. You can leverage the configuration option `heartbeat.timeout`[1]. The default is 50s. - It might be helpful to share your configuration setups (e.g., the TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is to share the beginning part of your JM/TM logs, including the JVM parameters and all the loaded configurations. - You may want to look into the GC logs in addition to the metrics. In case of a CMS GC stop-the-world, you may not be able to see the most recent metrics due to the process not responding to the metric querying services. - You may also look into the status of the JM process. If JM is under significant GC pressure, it could also happen that the heartbeat message from TM is not timely handled before the timeout check. - Is there any metrics monitoring the network condition between the JM and timeouted TM? Possibly any jitters? Thank you~ Xintong Song [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski wrote: > Hello, > > I'm running Flink 1.10 on EMR and reading from Kafka with 189 partitions > and I have parallelism of 189. > > Currently running with RocksDB, with checkpointing disabled. My state size > is appx. 500gb. > > I'm getting sporadic "Heartbeat of TaskManager timed out" errors with no > apparent reason. > > I check the container that gets the timeout for GC pauses, heap memory, > direct memory, mapped memory, offheap memory, CPU load, network load, total > out-records, total in-records, backpressure, and everything I can think of. > But all those metrics show that there's nothing unusual, and it has around > average values for all those metrics. There are a lot of other containers > which score higher. > > All the metrics are very low because every TaskManager runs on a > r5.2xlarge machine alone. > > I'm trying to debug this for days and I cannot find any explanation for it. > > Can someone explain why it's happening? > > java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id > container_1593074931633_0011_01_000127 timed out. > at org.apache.flink.runtime.jobmaster. > JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster > .java:1147) > at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run( > HeartbeatMonitorImpl.java:109) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: > 511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync( > AkkaRpcActor.java:397) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage( > AkkaRpcActor.java:190) > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor > .handleRpcMessage(FencedAkkaRpcActor.java:74) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage( > AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool > .java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java: > 1979) > at akka.dispatch.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > > Thanks >
Re: flink1.9 on yarn
我理解你需要使用session模式,即./bin/yarn-session.sh [1] [1] https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#flink-yarn-session Best, Yangze Guo On Sun, Jun 28, 2020 at 9:10 AM guanyq wrote: > > 问题1 > > ./bin/flink run -m > yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254 > > 当yarn application -kill application_1567067657620_0254后, > > 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增? > > 问题2 > > ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上? > >
Re:为什么 flink checkpoint Checkpoint Duration (Async) 阶段耗时很慢
Hi, 立志。 能不能提供一下更多的信息,比如异常信息等,方便对这个case背景做更进一步的了解呢? 谢谢。 Best, Roc Marshal 在 2020-06-28 09:52:10,"张立志" 写道: >flink 版本1.8 >部署集群yarn > > >配置代码: >StreamExecutionEnvironment.stateBackend(new >FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build(); >业务代码相对比较简单,内存占用较大 >超过10分钟后开始报错,state 大概在1.5G时,开始耗时开始变长 > > > > >
Re: 为什么 flink checkpoint Checkpoint Duration (Async) 阶段耗时很慢
Hi 张立志, 一般 Checkpoint 超时,可以先看看你的任务中,是否存在反压,比如 Sink 阶段,又或者是某个地方有 flatMap操作导致。 然后看下自己任务中,是否存在热点问题等。如果一切都是正常的话,可以尝试使用 RocksDB 的增量 Checkpoint ,具体参考[1]。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#rocksdb-state-backend-details Best, LakeShen 张立志 于2020年6月28日周日 上午9:52写道: > flink 版本1.8 > 部署集群yarn > > > 配置代码: > StreamExecutionEnvironment.stateBackend(new > FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build(); > 业务代码相对比较简单,内存占用较大 > 超过10分钟后开始报错,state 大概在1.5G时,开始耗时开始变长 > > > > > >
Re: flink1.9 on yarn
Hi guanyq, 你为什么希望 app id 不变呢? Best, LakeShen guanyq 于2020年6月28日周日 上午9:10写道: > 问题1 > > ./bin/flink run -m > yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254 > > 当yarn application -kill application_1567067657620_0254后, > > 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增? > > 问题2 > > ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上? > >
为什么 flink checkpoint Checkpoint Duration (Async) 阶段耗时很慢
flink 版本1.8 部署集群yarn 配置代码: StreamExecutionEnvironment.stateBackend(new FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build(); 业务代码相对比较简单,内存占用较大 超过10分钟后开始报错,state 大概在1.5G时,开始耗时开始变长
Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题
各位好: 目前我使用的是Flink 1.9.2, HA使用ZK, 使用过程中发现ZK上的/leader/${job_id} 节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢?
flink1.9 on yarn
问题1 ./bin/flink run -m yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254 当yarn application -kill application_1567067657620_0254后, 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增? 问题2 ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
Re: Running Apache Flink on the GraalVM as a Native Image
On Thu 25 Jun 2020 at 12:48, ivo.kn...@t-online.de wrote: > Whats up guys, > > > > I'm trying to run an Apache Flink Application with the GraalVM Native > Image but I get the following error: (check attached file) > > > > I suppose this happens, because Flink uses a lot of low-level-code and is > highly optimized. > Actually I suspect the reason is that Flink uses dynamic classloading. GraalVM requires all the code available in order to produce a native image. You’d need to pre-bind the topology you want Flink to run into the native image. More fun, you’ll actually need two images, one for the job manager and one for the task manager. And you’ll need to convince GraalVM that the entry-point is your topology needs reflection support enabled... plus whatever other classes use reflection in Flink. Sounds rather complex to me. If native images are what is important to you, there seemed to be a strong contender in the Rust language community, didn’t provide as strong management as Flink, and you’d probably have more work managing things like checkpointing, but if native code is important that’s where I’d be looking. Sadly I cannot remember the name and my google-foo is weak tonight > > When I googled the combination of GraalVM Native Image and Apache Flink I > get no results. > > > > Did anyone ever succeeded in making it work and how? > > > > Best regards, > > > > Ivo > > -- Sent from my phone
Re: Question about Watermarks within a KeyedProcessFunction
With an AscendingTimestampExtractor, watermarks are not created for every event, and as your job starts up, some events will be processed before the first watermark is generated. The impossible value you see is an initial value that's in place until the first real watermark is available. On the other hand, onTimer can not be called until some timer is triggered by the arrival of a watermark, at which point the watermark will have a reasonable value. On Sat, Jun 27, 2020 at 2:37 AM Marco Villalobos wrote: > > My source is a Kafka topic. > I am using Event Time. > I assign the event time with an AscendingTimestampExtractor > > I noticed when debugging that in the KeyedProcessFunction that > after my highest known event time of: 2020-06-23T00:46:30.000Z > > the processElement method had a watermark with an impossible date of: > -292275055-05-16T16:47:04.192Z > > but in the onTimer method it had a more reasonable value that trails the > highest known event time by 1 millisecond, which is this > value: 2020-06-23T00:46:29.999Z > > I want to know, why does the processElement method have an impossible > watermark value? > > >
Optimal Flink configuration for Standalone cluster.
Hello, I'm having a bit of trouble understanding the memory configuration on flink. I'm using flink10.0.0 to read some datasets of edges and extract features. I run this on a cluster consisting of 4 nodes , with 32cores and 252GB Ram each, and hopefully I could expand this as long as I can add extra nodes to the cluster. So regarding the configuration file (flink-conf.yaml). a) I can't understand when should I use process.size and when .flink.size. b) From the detailed memory model I understand that Direct memory is included in both of flink and process size, however if I don't specify off-heap.task.size I get " OutOfMemoryError: Direct buffer memory " . Also should I change off-heap.fraction as well? c)When I fix this, I get network buffers error, which if I understand correctly, flink.size * network fraction , should be between min and max. I can't find the 'perfect' configuration regarding my setup. What is the optimal way to use the system I have currently? Thank you for your time.
??Flink Sql ????????????????????????????
Hi,all: Flink sql ,??A??c1,c2,c3c1c4c1,c4,c2,c3. .
Distributed Anomaly Detection using MIDAS
Hi All, I have recently been exploring MIDAS: an algorithm for Streaming Anomaly Detection. A production level parallel and distributed implementation of MIDAS should be quite useful to the industry. I feel that Flink is very well-suited for the same as MIDAS deals with streaming data. If anyone is interested to contribute/collaborate, please let me know. Currently, there exist C++, Python, Ruby, Rust, R, and Golang implementations. MIDAS repository: https://github.com/bhatiasiddharth/MIDAS MIDAS paper: https://www.comp.nus.edu.sg/~sbhatia/assets/pdf/midas.pdf Thanks! Shivin
Is Flink HIPAA certified
Hi Community , Could anyone let me know if Flink is used in US healthcare tech space ? Thanks, Prasanna.