flink内存参数

2020-01-16 Thread
大家好,


我正在做flink算子的性能测试,发现了以下问题。
flink加大数据量后运行时报OOM错误,但metrics中flink_taksmanager_Status_JVM_Memory_Direct_MemoryUsed,flink_taksmanager_Status_JVM_Memory_NonHeap_Used,flink_taksmanager_Status_JVM_Memory_Heap_Used一直没有波动(没有使用checkpoint,也没有网络方面的内存需求)。
想请问大家flink中这三个参数对应的内存具体是用来做什么的,OOM有可能是什么原因?


谢谢大家


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

回复:jobgraph 生成

2020-01-07 Thread
Very sorry for the wrong operation. I copied the wrong email address by the 
phone.


Thank you for your reply.


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

在2020年01月08日 11:08,tison 写道:
Hi Zhang,


I just notice that it is sent to user list. Please send to user-zh list(in cc) 
next time if you want to discuss in Chinese.


Best,
tison.




tison  于2020年1月8日周三 上午11:06写道:

如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以


JsonPlanGenerator.generatePlan(jobGraph)


拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。


Best,
tison.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos




张江  于2020年1月8日周三 上午11:01写道:

大家好,


通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink 
visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?


flink里似乎没有直接的API可以调用,但是我在flink web ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?




谢谢


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

回复:jobgraph 生成

2020-01-07 Thread
Very sorry for the wrong operation. I copied the wrong email address by the 
phone.


Thank you for your reply.


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

在2020年01月08日 11:08,tison 写道:
Hi Zhang,


I just notice that it is sent to user list. Please send to user-zh list(in cc) 
next time if you want to discuss in Chinese.


Best,
tison.




tison  于2020年1月8日周三 上午11:06写道:

如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以


JsonPlanGenerator.generatePlan(jobGraph)


拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。


Best,
tison.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos




张江  于2020年1月8日周三 上午11:01写道:

大家好,


通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink 
visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?


flink里似乎没有直接的API可以调用,但是我在flink web ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?




谢谢


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

jobgraph 生成

2020-01-07 Thread
大家好,


通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink 
visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?


flink里似乎没有直接的API可以调用,但是我在flink web ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?




谢谢


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

Re:回复: 回复:如何获取算子处理一条数据记录的时间

2020-01-06 Thread
好的,谢谢

在 2020-01-06 20:04:09,"戴嘉诚"  写道:
>你可以在算子中计算,然后上传到自定义的Flink Metrics中,这样就能看到平均一个算子的时间了.
>
>发件人: 张江
>发送时间: 2020年1月2日 19:18
>收件人: user-zh
>抄送: user-zh
>主题: 回复:如何获取算子处理一条数据记录的时间
>
>我其实是想知道算子的数据处理能力,得到一个算子每秒钟最多能处理多少条数据。比如说map算子,我需要知道它一秒钟最多能转换多少数据,之后根据source端的数据量来设置算子的并行度
>
>
>
>
>| |
>张江
>|
>|
>邮箱:zjkingdom2...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2020年01月02日 10:28,猫猫 写道:
>只有如下算子支持测流输出。
>
>ProcessFunction
>
>CoProcessFunction
>
>ProcessWindowFunction
>
>ProcessAllWindowFunction
>
>如果要计时的话,需要将你的逻辑全部放到相关函数中。在逐条处理的时候,记录数据ID和时间,写成一个新的数据流并行输出出去。
>但实际上我们很少这样做,因为很难将所有逻辑全部放到一个算子中。
>
>
>比较常见的方式是,在整体上数据有流入和流出,所以增加流入和流出算子,在数据经过的时候,记录数据ID,并形成新的数据流,然后写入到数据库进行合并就行了。
>这种可以考虑采样的方式,例如根据ID取模获取数据,但缺点是监控嵌入了执行逻辑,并且必须有并行度为1的统计算子,可能会影响性能。
>
>更为合理的看法是,当数据量不堆积的时候,时间也不是问题。
>我们只要看kafka-offset的消费速度就行了。一定时间消费多少条,平均下来就是速率的。适用于压满性能的时候用。
>
>可能还有更好的处理方式,我还没有了解,flink好像自身也有一定的监控能力。
>但你的需求到底是什么?你为什么要知道一条数据的处理时间?而不是一批数据的处理时间?
>你关心每个算子的时间,还是关心数据整体的处理时间?还是关心某个业务的执行时间?
>
>提供一下场景为佳。
>
>
>
>
>
>
>--原始邮件--
>发件人:"张江"发送时间:2020年1月2日(星期四) 上午9:27
>收件人:"user-zh"抄送:"user-zh"主题:回复:如何获取算子处理一条数据记录的时间
>
>
>
>可以讲的详细点么
>非常感谢
>
>
>| |
>张江
>|
>|
>邮箱:zjkingdom2...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2020年01月02日 08:09,猫猫 写道:
>算子内记时,测流输出。
>
>
>--nbsp;原始邮件nbsp;--
>发件人:nbsp;"张江"发送时间:nbsp;2020年1月1日(星期三) 晚上8:44
>收件人:nbsp;"user-zh"
>主题:nbsp;回复:如何获取算子处理一条数据记录的时间
>
>
>
>有人知道么
>
>
>
>
>| |
>张江
>|
>|
>邮箱:zjkingdom2...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2019年12月31日 17:14,张江 写道:
>Hi,
>
>
>我是刚学习flink的菜鸟,现在想对一个算子,获取其处理一条数据记录的时间。请问这个应该怎么做?
>
>
>谢谢
>
>
>| |
>张江
>|
>|
>邮箱:zjkingdom2...@163.com
>|
>
>签名由 网易邮箱大师 定制
>


Re:Re: 回复:使用influxdb作为flink metrics reporter

2020-01-06 Thread
好的,多谢
在 2020-01-06 01:38:22,"Yun Tang"  写道:
>Hi 张江
>
>这个invalid boolean 
>一般是tag和field中间穿插空格有关,导致influxDB识别匹配的时候出了问题,你的原始报错信息是什么,不要隐去你的operator 
>name和task name,另外task_id= 后面的那个空格是你粘贴时候的错误还是原先就是这样。
>
>最后,这些只会是warning,不会导致你的其他metrics数据无法插入,不影响整体使用。
>
>祝好
>唐云
>
>
>From: 张江 
>Sent: Saturday, January 4, 2020 19:14
>To: user-zh ; myas...@live.com 
>Subject: 回复:使用influxdb作为flink metrics reporter
>
>
>你好,
>
>
>我看我这里报错的问题是invalid boolean,并不是NaN/infinity value造成的,不知道是什么原因?
>
>
>而且我用的flink是1.9.1版本,influxdb是1.7.9版本。
>
>
>祝好,
>
><https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=%E5%BC%A0%E6%B1%9F=zjkingdom2010%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22%E9%82%AE%E7%AE%B1%EF%BC%9Azjkingdom2010%40163.com%22%5D>
>[https://mail-online.nosdn.127.net/qiyelogo/defaultAvatar.png]
>张江
>邮箱:zjkingdom2...@163.com
>
>签名由 网易邮箱大师<https://mail.163.com/dashi/dlpro.html?from=mail88> 定制
>
>在2020年01月04日 00:56,Yun Tang<mailto:myas...@live.com> 写道:
>Hi 张江
>
>
> *   Retention policy 需要现在InfluxDB端创建,InfluxDBReporter不会自行创建不存在的 retention 
> policy.
> *   kafka的一些metrics在使用influxDB reporter的时候,会出现一些cast exception,可以参考 
> [1],在Flink-1.9 版本下可以忽略这些异常。
>
>[1] https://issues.apache.org/jira/browse/FLINK-12147
>
>祝好
>唐云
>
>From: 张江 
>Sent: Friday, January 3, 2020 21:22
>To: user-zh@flink.apache.org 
>Subject: 使用influxdb作为flink metrics reporter
>
>大家好,
>
>
>我按照官网所介绍的flink metrics reporter设置,选用了influxdb,进行了如下设置:
>metrics.reporter.influxdb.class:org.apache.flink.metrics.influxdb.InfluxdbReportermetrics.reporter.influxdb.host:localhostmetrics.reporter.influxdb.port:8086metrics.reporter.influxdb.db:flinkmetrics.reporter.influxdb.username:flink-metrics
>metrics.reporter.influxdb.password:qwerty
>metrics.reporter.influxdb.retentionPolicy:one_hour
>但是,启动flink作业(on yarn per job模式)和flinxdb后,发现一直报错:
>error  [500] - "retention policy not found: one_hour" {"log_id": 
>"OK6nejJI000", "service": "httpd"} [httpd] 10.90.*.* - flinkuser 
>[03/Jan/2020:19:35:58 +0800] "POST /write? 
>db=flink=one_hour=n=one HTTP/1.1" 500 49 "-" 
>"okhttp/3.11.0" 3637af63-2e1d-11ea-802a-000c2947e206 165
>
>
>我使用的是 flink 1.9.1,influxdb版本是1.79.
>
>
>而且,当我不设置retentionPolicy时,还是会报错,提示:
>org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException:
> partial write: unable to parse 
>"taskmanager_job_task_operator_sync-time-avg,host=master,job_id=03136f4c1a78e9930262b455ef0657e2,job_name=Flink-app,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=XXX,task_attempt_num=0,task_id=
> 
>cbc357ccb763df2852fee8c4fc7d55f2,task_name=XX,tm_id=container_1577507646998_0054_01_02
> value=? 157805124760500": invalid boolean
>
>
>求问各位大佬,这些问题怎么解决?
>谢谢
>
>
>祝好,
>
>
>


回复:使用influxdb作为flink metrics reporter

2020-01-04 Thread
你好,




我看我这里报错的问题是invalid boolean,并不是NaN/infinity value造成的,不知道是什么原因?




而且我用的flink是1.9.1版本,influxdb是1.7.9版本。




祝好,



| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

在2020年01月04日 00:56,Yun Tang 写道:
Hi 张江


 *   Retention policy 需要现在InfluxDB端创建,InfluxDBReporter不会自行创建不存在的 retention 
policy.
 *   kafka的一些metrics在使用influxDB reporter的时候,会出现一些cast exception,可以参考 
[1],在Flink-1.9 版本下可以忽略这些异常。

[1] https://issues.apache.org/jira/browse/FLINK-12147

祝好
唐云

From: 张江 
Sent: Friday, January 3, 2020 21:22
To: user-zh@flink.apache.org 
Subject: 使用influxdb作为flink metrics reporter

大家好,


我按照官网所介绍的flink metrics reporter设置,选用了influxdb,进行了如下设置:
metrics.reporter.influxdb.class:org.apache.flink.metrics.influxdb.InfluxdbReportermetrics.reporter.influxdb.host:localhostmetrics.reporter.influxdb.port:8086metrics.reporter.influxdb.db:flinkmetrics.reporter.influxdb.username:flink-metrics
metrics.reporter.influxdb.password:qwerty
metrics.reporter.influxdb.retentionPolicy:one_hour
但是,启动flink作业(on yarn per job模式)和flinxdb后,发现一直报错:
error  [500] - "retention policy not found: one_hour" {"log_id": "OK6nejJI000", 
"service": "httpd"} [httpd] 10.90.*.* - flinkuser [03/Jan/2020:19:35:58 +0800] 
"POST /write? db=flink=one_hour=n=one HTTP/1.1" 500 49 
"-" "okhttp/3.11.0" 3637af63-2e1d-11ea-802a-000c2947e206 165


我使用的是 flink 1.9.1,influxdb版本是1.79.


而且,当我不设置retentionPolicy时,还是会报错,提示:
org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException:
 partial write: unable to parse 
"taskmanager_job_task_operator_sync-time-avg,host=master,job_id=03136f4c1a78e9930262b455ef0657e2,job_name=Flink-app,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=XXX,task_attempt_num=0,task_id=
 
cbc357ccb763df2852fee8c4fc7d55f2,task_name=XX,tm_id=container_1577507646998_0054_01_02
 value=? 157805124760500": invalid boolean


求问各位大佬,这些问题怎么解决?
谢谢


祝好,





单个算子处理一条数据的真实处理时间

2020-01-03 Thread
大家好,



之前问过类似的问题,可能表达不太清楚。向各位大佬再请教一下。

有人知道怎么获取以下的flink metrics么:




这个是flink forward asia 
2019会议上马庆祥老师讲的flink动态资源调整里的内容。但我自己在flink官网上没有看到metrics有这个指标信息。

谢谢。




祝好,







使用influxdb作为flink metrics reporter

2020-01-03 Thread
大家好,


我按照官网所介绍的flink metrics reporter设置,选用了influxdb,进行了如下设置:
metrics.reporter.influxdb.class:org.apache.flink.metrics.influxdb.InfluxdbReportermetrics.reporter.influxdb.host:localhostmetrics.reporter.influxdb.port:8086metrics.reporter.influxdb.db:flinkmetrics.reporter.influxdb.username:flink-metrics
metrics.reporter.influxdb.password:qwerty 
metrics.reporter.influxdb.retentionPolicy:one_hour
但是,启动flink作业(on yarn per job模式)和flinxdb后,发现一直报错:
error  [500] - "retention policy not found: one_hour" {"log_id": "OK6nejJI000", 
"service": "httpd"} [httpd] 10.90.*.* - flinkuser [03/Jan/2020:19:35:58 +0800] 
"POST /write? db=flink=one_hour=n=one HTTP/1.1" 500 49 
"-" "okhttp/3.11.0" 3637af63-2e1d-11ea-802a-000c2947e206 165


我使用的是 flink 1.9.1,influxdb版本是1.79.


而且,当我不设置retentionPolicy时,还是会报错,提示:
org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException:
 partial write: unable to parse 
"taskmanager_job_task_operator_sync-time-avg,host=master,job_id=03136f4c1a78e9930262b455ef0657e2,job_name=Flink-app,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=XXX,task_attempt_num=0,task_id=
 
cbc357ccb763df2852fee8c4fc7d55f2,task_name=XX,tm_id=container_1577507646998_0054_01_02
 value=? 157805124760500": invalid boolean


求问各位大佬,这些问题怎么解决?
谢谢


祝好,





回复:如何获取算子处理一条数据记录的时间

2020-01-01 Thread
有人知道么




| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

在2019年12月31日 17:14,张江 写道:
Hi,


我是刚学习flink的菜鸟,现在想对一个算子,获取其处理一条数据记录的时间。请问这个应该怎么做?


谢谢


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

如何获取算子处理一条数据记录的时间

2019-12-31 Thread
Hi,


我是刚学习flink的菜鸟,现在想对一个算子,获取其处理一条数据记录的时间。请问这个应该怎么做?


谢谢


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制