?????? pyflink 1.14.0 udf ??????????????????????????
??udfudfjar?? -- -- ??: "user-zh"
?????? pyflink 1.14.0 udf ??????????????????????????
??udfudfjar?? -- -- ??: "user-zh"
Re: flink 以阿里云 oss 作为 checkpoint cpu 过高
确实是跟 OSS 有关,我换成 HDFS 作为 checkpoint 后端就没有这种现象了,但我也不明白为什么会这样。 程序中设置了增量 checkpoit,但 flink web UI 中显示的 checkpoint data size 一直不断变高,三天就到了 1G On Mon, Oct 18, 2021 at 10:44 AM Michael Ran wrote: > 应该和OSS没关系吧,毕竟只是个存储。 > 我们CPU 你先看看消耗在哪个线程或者方法类呗 > > > > 在 2021-10-08 16:34:47,"Lei Wang" 写道: > > > > flink 程序以 RocksDB 作为 stateBackend, aliyun OSS 作为 checkpoint 数据最终的物理位置。 > 我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。 > > > > > > > 这个可能的原因是什么?会跟 OSS 有关吗? > > > 谢谢, > 王磊
Re:flink作业的停止
Hi, lei-tian. 基于你的描述,我推测(flink-1.10+)会存在这几种可能。 1. 使用了 flink的yarn-session模式,然后进行了相应的作业提交。这种情况表现为,作业最终完成后,yarn中对应flink集群的taskmanager container能够被释放掉,但是只是保留了Jobmanager组件的容器没有释放。在 flink的yarn-session模式 的部署方式中,这是正常的。 2. 不论是在flink yarn 的per-job部署模式或者yarn-session部署模式中,如果负责hbase source/sink的 算子与其他任意一种流模式的算子进行connect或者union等多流的计算,那么将会导致hbase IO结束后,剩余的流式算子还是处于正常运行状态,这种情况下的大概表现为 yarn中 flink taskmanager container和jobmanager container 都未释放。 3.其他。 如果作业所有的souce都是读取"批模式"的数据源,比如 mysql/hbase 而非包含kafka/pulsar等,那么你可以尝试flink on yarn 的per-job的部署方式运行任务。 祝好。 Roc 在 2021-10-18 21:31:21,"lei-tian" 写道: >您好: > > 我用flink读取hbase或者文件的数据,读完数据之后页面显示状态为finished,但是这个任务在yarn上的资源还没有被释放,仍然在yarn的running列表里面,需要通过命令行kill掉这个任务,有什么解决的方法么。我没有设置batch或者streaming的模式,用的是默认的。 > > >| | >lei-tian >| >| >totorobabyf...@163.com >| >签名由网易邮箱大师定制
Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码
我试了一下是可以运行的,可以发一下报错吗? On Mon, Oct 18, 2021 at 6:44 PM xuzh wrote: > from pyflink.table import ScalarFunction, EnvironmentSettings, > TableEnvironment, DataTypes > from pyflink.table.udf import udf > from pyflink.table.expressions import call, row > > > class HashCode(ScalarFunction): > def __init__(self): > self.factor = 12 > > def eval(self, s): > return hash(s) * self.factor > > > env_settings = EnvironmentSettings.in_batch_mode() > btenv = TableEnvironment.create(env_settings) > > hash_code = udf(HashCode(), result_type=DataTypes.BIGINT()) > # 在 SQL API 中使用 Python 自定义函数 > btenv.create_temporary_function("hash_code", udf(HashCode(), > result_type=DataTypes.BIGINT())) > tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)], > DataTypes.ROW([DataTypes.FIELD("a", > DataTypes.INT()), > DataTypes.FIELD("b", > DataTypes.STRING()), > DataTypes.FIELD("c", > DataTypes.FLOAT())])) > btenv.create_temporary_view("tb2", tb2) > tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2") > print(tb2.to_pandas()) > > # 3. 创建 sink 表 > btenv.execute_sql(""" >CREATE TABLE rs ( >a int, >b string, >m bigint >) WITH ( >'connector' = 'print' >) >""") > > tb2.execute_insert("rs").wait() > print(tb2.to_pandas()) > # > > > > >
flink作业的停止
您好: 我用flink读取hbase或者文件的数据,读完数据之后页面显示状态为finished,但是这个任务在yarn上的资源还没有被释放,仍然在yarn的running列表里面,需要通过命令行kill掉这个任务,有什么解决的方法么。我没有设置batch或者streaming的模式,用的是默认的。 | | lei-tian | | totorobabyf...@163.com | 签名由网易邮箱大师定制
pyflink 1.14.0 udf ??????????????????????????
from pyflink.table import ScalarFunction, EnvironmentSettings, TableEnvironment, DataTypes from pyflink.table.udf import udf from pyflink.table.expressions import call, row class HashCode(ScalarFunction): def __init__(self): self.factor = 12 def eval(self, s): return hash(s) * self.factor env_settings = EnvironmentSettings.in_batch_mode() btenv = TableEnvironment.create(env_settings) hash_code = udf(HashCode(), result_type=DataTypes.BIGINT()) # ?? SQL API ?? Python ?? btenv.create_temporary_function("hash_code", udf(HashCode(), result_type=DataTypes.BIGINT())) tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)], DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()), DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c", DataTypes.FLOAT())])) btenv.create_temporary_view("tb2", tb2) tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2") print(tb2.to_pandas()) # 3. sink ?? btenv.execute_sql(""" CREATE TABLE rs ( a int, b string, m bigint ) WITH ( 'connector' = 'print' ) """) tb2.execute_insert("rs").wait() print(tb2.to_pandas()) #