flink内存参数
大家好, 我正在做flink算子的性能测试,发现了以下问题。 flink加大数据量后运行时报OOM错误,但metrics中flink_taksmanager_Status_JVM_Memory_Direct_MemoryUsed,flink_taksmanager_Status_JVM_Memory_NonHeap_Used,flink_taksmanager_Status_JVM_Memory_Heap_Used一直没有波动(没有使用checkpoint,也没有网络方面的内存需求)。 想请问大家flink中这三个参数对应的内存具体是用来做什么的,OOM有可能是什么原因? 谢谢大家 | | 张江 | | 邮箱:zjkingdom2...@163.com | 签名由 网易邮箱大师 定制
回复:jobgraph 生成
Very sorry for the wrong operation. I copied the wrong email address by the phone. Thank you for your reply. | | 张江 | | 邮箱:zjkingdom2...@163.com | 签名由 网易邮箱大师 定制 在2020年01月08日 11:08,tison 写道: Hi Zhang, I just notice that it is sent to user list. Please send to user-zh list(in cc) next time if you want to discuss in Chinese. Best, tison. tison 于2020年1月8日周三 上午11:06写道: 如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以 JsonPlanGenerator.generatePlan(jobGraph) 拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。 Best, tison. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos 张江 于2020年1月8日周三 上午11:01写道: 大家好, 通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做? flink里似乎没有直接的API可以调用,但是我在flink web ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的? 谢谢 | | 张江 | | 邮箱:zjkingdom2...@163.com | 签名由 网易邮箱大师 定制
回复:jobgraph 生成
Very sorry for the wrong operation. I copied the wrong email address by the phone. Thank you for your reply. | | 张江 | | 邮箱:zjkingdom2...@163.com | 签名由 网易邮箱大师 定制 在2020年01月08日 11:08,tison 写道: Hi Zhang, I just notice that it is sent to user list. Please send to user-zh list(in cc) next time if you want to discuss in Chinese. Best, tison. tison 于2020年1月8日周三 上午11:06写道: 如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以 JsonPlanGenerator.generatePlan(jobGraph) 拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。 Best, tison. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos 张江 于2020年1月8日周三 上午11:01写道: 大家好, 通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做? flink里似乎没有直接的API可以调用,但是我在flink web ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的? 谢谢 | | 张江 | | 邮箱:zjkingdom2...@163.com | 签名由 网易邮箱大师 定制
jobgraph 生成
大家好, 通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做? flink里似乎没有直接的API可以调用,但是我在flink web ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的? 谢谢 | | 张江 | | 邮箱:zjkingdom2...@163.com | 签名由 网易邮箱大师 定制
Re:回复: 回复:如何获取算子处理一条数据记录的时间
好的,谢谢 在 2020-01-06 20:04:09,"戴嘉诚" 写道: >你可以在算子中计算,然后上传到自定义的Flink Metrics中,这样就能看到平均一个算子的时间了. > >发件人: 张江 >发送时间: 2020年1月2日 19:18 >收件人: user-zh >抄送: user-zh >主题: 回复:如何获取算子处理一条数据记录的时间 > >我其实是想知道算子的数据处理能力,得到一个算子每秒钟最多能处理多少条数据。比如说map算子,我需要知道它一秒钟最多能转换多少数据,之后根据source端的数据量来设置算子的并行度 > > > > >| | >张江 >| >| >邮箱:zjkingdom2...@163.com >| > >签名由 网易邮箱大师 定制 > >在2020年01月02日 10:28,猫猫 写道: >只有如下算子支持测流输出。 > >ProcessFunction > >CoProcessFunction > >ProcessWindowFunction > >ProcessAllWindowFunction > >如果要计时的话,需要将你的逻辑全部放到相关函数中。在逐条处理的时候,记录数据ID和时间,写成一个新的数据流并行输出出去。 >但实际上我们很少这样做,因为很难将所有逻辑全部放到一个算子中。 > > >比较常见的方式是,在整体上数据有流入和流出,所以增加流入和流出算子,在数据经过的时候,记录数据ID,并形成新的数据流,然后写入到数据库进行合并就行了。 >这种可以考虑采样的方式,例如根据ID取模获取数据,但缺点是监控嵌入了执行逻辑,并且必须有并行度为1的统计算子,可能会影响性能。 > >更为合理的看法是,当数据量不堆积的时候,时间也不是问题。 >我们只要看kafka-offset的消费速度就行了。一定时间消费多少条,平均下来就是速率的。适用于压满性能的时候用。 > >可能还有更好的处理方式,我还没有了解,flink好像自身也有一定的监控能力。 >但你的需求到底是什么?你为什么要知道一条数据的处理时间?而不是一批数据的处理时间? >你关心每个算子的时间,还是关心数据整体的处理时间?还是关心某个业务的执行时间? > >提供一下场景为佳。 > > > > > > >--原始邮件-- >发件人:"张江"发送时间:2020年1月2日(星期四) 上午9:27 >收件人:"user-zh"抄送:"user-zh"主题:回复:如何获取算子处理一条数据记录的时间 > > > >可以讲的详细点么 >非常感谢 > > >| | >张江 >| >| >邮箱:zjkingdom2...@163.com >| > >签名由 网易邮箱大师 定制 > >在2020年01月02日 08:09,猫猫 写道: >算子内记时,测流输出。 > > >--nbsp;原始邮件nbsp;-- >发件人:nbsp;"张江"发送时间:nbsp;2020年1月1日(星期三) 晚上8:44 >收件人:nbsp;"user-zh" >主题:nbsp;回复:如何获取算子处理一条数据记录的时间 > > > >有人知道么 > > > > >| | >张江 >| >| >邮箱:zjkingdom2...@163.com >| > >签名由 网易邮箱大师 定制 > >在2019年12月31日 17:14,张江 写道: >Hi, > > >我是刚学习flink的菜鸟,现在想对一个算子,获取其处理一条数据记录的时间。请问这个应该怎么做? > > >谢谢 > > >| | >张江 >| >| >邮箱:zjkingdom2...@163.com >| > >签名由 网易邮箱大师 定制 >
Re:Re: 回复:使用influxdb作为flink metrics reporter
好的,多谢 在 2020-01-06 01:38:22,"Yun Tang" 写道: >Hi 张江 > >这个invalid boolean >一般是tag和field中间穿插空格有关,导致influxDB识别匹配的时候出了问题,你的原始报错信息是什么,不要隐去你的operator >name和task name,另外task_id= 后面的那个空格是你粘贴时候的错误还是原先就是这样。 > >最后,这些只会是warning,不会导致你的其他metrics数据无法插入,不影响整体使用。 > >祝好 >唐云 > > >From: 张江 >Sent: Saturday, January 4, 2020 19:14 >To: user-zh ; myas...@live.com >Subject: 回复:使用influxdb作为flink metrics reporter > > >你好, > > >我看我这里报错的问题是invalid boolean,并不是NaN/infinity value造成的,不知道是什么原因? > > >而且我用的flink是1.9.1版本,influxdb是1.7.9版本。 > > >祝好, > ><https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=%E5%BC%A0%E6%B1%9F=zjkingdom2010%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22%E9%82%AE%E7%AE%B1%EF%BC%9Azjkingdom2010%40163.com%22%5D> >[https://mail-online.nosdn.127.net/qiyelogo/defaultAvatar.png] >张江 >邮箱:zjkingdom2...@163.com > >签名由 网易邮箱大师<https://mail.163.com/dashi/dlpro.html?from=mail88> 定制 > >在2020年01月04日 00:56,Yun Tang<mailto:myas...@live.com> 写道: >Hi 张江 > > > * Retention policy 需要现在InfluxDB端创建,InfluxDBReporter不会自行创建不存在的 retention > policy. > * kafka的一些metrics在使用influxDB reporter的时候,会出现一些cast exception,可以参考 > [1],在Flink-1.9 版本下可以忽略这些异常。 > >[1] https://issues.apache.org/jira/browse/FLINK-12147 > >祝好 >唐云 > >From: 张江 >Sent: Friday, January 3, 2020 21:22 >To: user-zh@flink.apache.org >Subject: 使用influxdb作为flink metrics reporter > >大家好, > > >我按照官网所介绍的flink metrics reporter设置,选用了influxdb,进行了如下设置: >metrics.reporter.influxdb.class:org.apache.flink.metrics.influxdb.InfluxdbReportermetrics.reporter.influxdb.host:localhostmetrics.reporter.influxdb.port:8086metrics.reporter.influxdb.db:flinkmetrics.reporter.influxdb.username:flink-metrics >metrics.reporter.influxdb.password:qwerty >metrics.reporter.influxdb.retentionPolicy:one_hour >但是,启动flink作业(on yarn per job模式)和flinxdb后,发现一直报错: >error [500] - "retention policy not found: one_hour" {"log_id": >"OK6nejJI000", "service": "httpd"} [httpd] 10.90.*.* - flinkuser >[03/Jan/2020:19:35:58 +0800] "POST /write? >db=flink=one_hour=n=one HTTP/1.1" 500 49 "-" >"okhttp/3.11.0" 3637af63-2e1d-11ea-802a-000c2947e206 165 > > >我使用的是 flink 1.9.1,influxdb版本是1.79. > > >而且,当我不设置retentionPolicy时,还是会报错,提示: >org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException: > partial write: unable to parse >"taskmanager_job_task_operator_sync-time-avg,host=master,job_id=03136f4c1a78e9930262b455ef0657e2,job_name=Flink-app,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=XXX,task_attempt_num=0,task_id= > >cbc357ccb763df2852fee8c4fc7d55f2,task_name=XX,tm_id=container_1577507646998_0054_01_02 > value=? 157805124760500": invalid boolean > > >求问各位大佬,这些问题怎么解决? >谢谢 > > >祝好, > > >
回复:使用influxdb作为flink metrics reporter
你好, 我看我这里报错的问题是invalid boolean,并不是NaN/infinity value造成的,不知道是什么原因? 而且我用的flink是1.9.1版本,influxdb是1.7.9版本。 祝好, | | 张江 | | 邮箱:zjkingdom2...@163.com | 签名由 网易邮箱大师 定制 在2020年01月04日 00:56,Yun Tang 写道: Hi 张江 * Retention policy 需要现在InfluxDB端创建,InfluxDBReporter不会自行创建不存在的 retention policy. * kafka的一些metrics在使用influxDB reporter的时候,会出现一些cast exception,可以参考 [1],在Flink-1.9 版本下可以忽略这些异常。 [1] https://issues.apache.org/jira/browse/FLINK-12147 祝好 唐云 From: 张江 Sent: Friday, January 3, 2020 21:22 To: user-zh@flink.apache.org Subject: 使用influxdb作为flink metrics reporter 大家好, 我按照官网所介绍的flink metrics reporter设置,选用了influxdb,进行了如下设置: metrics.reporter.influxdb.class:org.apache.flink.metrics.influxdb.InfluxdbReportermetrics.reporter.influxdb.host:localhostmetrics.reporter.influxdb.port:8086metrics.reporter.influxdb.db:flinkmetrics.reporter.influxdb.username:flink-metrics metrics.reporter.influxdb.password:qwerty metrics.reporter.influxdb.retentionPolicy:one_hour 但是,启动flink作业(on yarn per job模式)和flinxdb后,发现一直报错: error [500] - "retention policy not found: one_hour" {"log_id": "OK6nejJI000", "service": "httpd"} [httpd] 10.90.*.* - flinkuser [03/Jan/2020:19:35:58 +0800] "POST /write? db=flink=one_hour=n=one HTTP/1.1" 500 49 "-" "okhttp/3.11.0" 3637af63-2e1d-11ea-802a-000c2947e206 165 我使用的是 flink 1.9.1,influxdb版本是1.79. 而且,当我不设置retentionPolicy时,还是会报错,提示: org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to parse "taskmanager_job_task_operator_sync-time-avg,host=master,job_id=03136f4c1a78e9930262b455ef0657e2,job_name=Flink-app,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=XXX,task_attempt_num=0,task_id= cbc357ccb763df2852fee8c4fc7d55f2,task_name=XX,tm_id=container_1577507646998_0054_01_02 value=? 157805124760500": invalid boolean 求问各位大佬,这些问题怎么解决? 谢谢 祝好,
单个算子处理一条数据的真实处理时间
大家好, 之前问过类似的问题,可能表达不太清楚。向各位大佬再请教一下。 有人知道怎么获取以下的flink metrics么: 这个是flink forward asia 2019会议上马庆祥老师讲的flink动态资源调整里的内容。但我自己在flink官网上没有看到metrics有这个指标信息。 谢谢。 祝好,
使用influxdb作为flink metrics reporter
大家好, 我按照官网所介绍的flink metrics reporter设置,选用了influxdb,进行了如下设置: metrics.reporter.influxdb.class:org.apache.flink.metrics.influxdb.InfluxdbReportermetrics.reporter.influxdb.host:localhostmetrics.reporter.influxdb.port:8086metrics.reporter.influxdb.db:flinkmetrics.reporter.influxdb.username:flink-metrics metrics.reporter.influxdb.password:qwerty metrics.reporter.influxdb.retentionPolicy:one_hour 但是,启动flink作业(on yarn per job模式)和flinxdb后,发现一直报错: error [500] - "retention policy not found: one_hour" {"log_id": "OK6nejJI000", "service": "httpd"} [httpd] 10.90.*.* - flinkuser [03/Jan/2020:19:35:58 +0800] "POST /write? db=flink=one_hour=n=one HTTP/1.1" 500 49 "-" "okhttp/3.11.0" 3637af63-2e1d-11ea-802a-000c2947e206 165 我使用的是 flink 1.9.1,influxdb版本是1.79. 而且,当我不设置retentionPolicy时,还是会报错,提示: org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to parse "taskmanager_job_task_operator_sync-time-avg,host=master,job_id=03136f4c1a78e9930262b455ef0657e2,job_name=Flink-app,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=XXX,task_attempt_num=0,task_id= cbc357ccb763df2852fee8c4fc7d55f2,task_name=XX,tm_id=container_1577507646998_0054_01_02 value=? 157805124760500": invalid boolean 求问各位大佬,这些问题怎么解决? 谢谢 祝好,
回复:如何获取算子处理一条数据记录的时间
有人知道么 | | 张江 | | 邮箱:zjkingdom2...@163.com | 签名由 网易邮箱大师 定制 在2019年12月31日 17:14,张江 写道: Hi, 我是刚学习flink的菜鸟,现在想对一个算子,获取其处理一条数据记录的时间。请问这个应该怎么做? 谢谢 | | 张江 | | 邮箱:zjkingdom2...@163.com | 签名由 网易邮箱大师 定制
如何获取算子处理一条数据记录的时间
Hi, 我是刚学习flink的菜鸟,现在想对一个算子,获取其处理一条数据记录的时间。请问这个应该怎么做? 谢谢 | | 张江 | | 邮箱:zjkingdom2...@163.com | 签名由 网易邮箱大师 定制