?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-18 文章 xuzh
??udfudfjar??














--  --
??: 
   "user-zh"



?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-18 文章 xuzh
??udfudfjar??













--  --
??: 
   "user-zh"



Re: flink 以阿里云 oss 作为 checkpoint cpu 过高

2021-10-18 文章 Lei Wang
确实是跟 OSS 有关,我换成 HDFS 作为 checkpoint 后端就没有这种现象了,但我也不明白为什么会这样。
程序中设置了增量 checkpoit,但 flink web UI 中显示的 checkpoint data size 一直不断变高,三天就到了 1G


On Mon, Oct 18, 2021 at 10:44 AM Michael Ran  wrote:

> 应该和OSS没关系吧,毕竟只是个存储。
> 我们CPU 你先看看消耗在哪个线程或者方法类呗
>
>
>
> 在 2021-10-08 16:34:47,"Lei Wang"  写道:
>
>
>
> flink 程序以 RocksDB 作为 stateBackend,  aliyun OSS 作为 checkpoint 数据最终的物理位置。
> 我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。
>
>
>
>
>
>
> 这个可能的原因是什么?会跟 OSS 有关吗?
>
>
> 谢谢,
> 王磊


Re:flink作业的停止

2021-10-18 文章 Yuepeng Pan
Hi, 
lei-tian.
基于你的描述,我推测(flink-1.10+)会存在这几种可能。
1. 使用了 
flink的yarn-session模式,然后进行了相应的作业提交。这种情况表现为,作业最终完成后,yarn中对应flink集群的taskmanager 
container能够被释放掉,但是只是保留了Jobmanager组件的容器没有释放。在 flink的yarn-session模式 的部署方式中,这是正常的。
2. 不论是在flink yarn 的per-job部署模式或者yarn-session部署模式中,如果负责hbase source/sink的 
算子与其他任意一种流模式的算子进行connect或者union等多流的计算,那么将会导致hbase 
IO结束后,剩余的流式算子还是处于正常运行状态,这种情况下的大概表现为 yarn中 flink taskmanager 
container和jobmanager container 都未释放。
3.其他。
如果作业所有的souce都是读取"批模式"的数据源,比如 mysql/hbase 而非包含kafka/pulsar等,那么你可以尝试flink on yarn 
的per-job的部署方式运行任务。




祝好。
Roc











在 2021-10-18 21:31:21,"lei-tian"  写道:
>您好:
>  
> 我用flink读取hbase或者文件的数据,读完数据之后页面显示状态为finished,但是这个任务在yarn上的资源还没有被释放,仍然在yarn的running列表里面,需要通过命令行kill掉这个任务,有什么解决的方法么。我没有设置batch或者streaming的模式,用的是默认的。
>
>
>| |
>lei-tian
>|
>|
>totorobabyf...@163.com
>|
>签名由网易邮箱大师定制


Re: pyflink 1.14.0 udf 执行报错,根据官网写的代码

2021-10-18 文章 Dian Fu
我试了一下是可以运行的,可以发一下报错吗?

On Mon, Oct 18, 2021 at 6:44 PM xuzh  wrote:

> from pyflink.table import ScalarFunction, EnvironmentSettings, 
> TableEnvironment, DataTypes
> from pyflink.table.udf import udf
> from pyflink.table.expressions import call, row
>
>
> class HashCode(ScalarFunction):
> def __init__(self):
> self.factor = 12
>
> def eval(self, s):
> return hash(s) * self.factor
>
>
> env_settings = EnvironmentSettings.in_batch_mode()
> btenv = TableEnvironment.create(env_settings)
>
> hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
> # 在 SQL API 中使用 Python 自定义函数
> btenv.create_temporary_function("hash_code", udf(HashCode(), 
> result_type=DataTypes.BIGINT()))
> tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
>   DataTypes.ROW([DataTypes.FIELD("a", 
> DataTypes.INT()),
>  DataTypes.FIELD("b", 
> DataTypes.STRING()),
>  DataTypes.FIELD("c", 
> DataTypes.FLOAT())]))
> btenv.create_temporary_view("tb2", tb2)
> tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
> print(tb2.to_pandas())
>
> # 3. 创建 sink 表
> btenv.execute_sql("""
>CREATE TABLE rs (
>a int,
>b string,
>m bigint
>) WITH (
>'connector' = 'print'
>)
>""")
>
> tb2.execute_insert("rs").wait()
> print(tb2.to_pandas())
> #
>
>
>
>
>


flink作业的停止

2021-10-18 文章 lei-tian
您好:
  
我用flink读取hbase或者文件的数据,读完数据之后页面显示状态为finished,但是这个任务在yarn上的资源还没有被释放,仍然在yarn的running列表里面,需要通过命令行kill掉这个任务,有什么解决的方法么。我没有设置batch或者streaming的模式,用的是默认的。


| |
lei-tian
|
|
totorobabyf...@163.com
|
签名由网易邮箱大师定制

pyflink 1.14.0 udf ??????????????????????????

2021-10-18 文章 xuzh
from pyflink.table import ScalarFunction, EnvironmentSettings, 
TableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table.expressions import call, row


class HashCode(ScalarFunction):
def __init__(self):
self.factor = 12

def eval(self, s):
return hash(s) * self.factor


env_settings = EnvironmentSettings.in_batch_mode()
btenv = TableEnvironment.create(env_settings)

hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
# ?? SQL API ?? Python ??
btenv.create_temporary_function("hash_code", udf(HashCode(), 
result_type=DataTypes.BIGINT()))
tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
  DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
 DataTypes.FIELD("b", 
DataTypes.STRING()),
 DataTypes.FIELD("c", 
DataTypes.FLOAT())]))
btenv.create_temporary_view("tb2", tb2)
tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
print(tb2.to_pandas())

# 3.  sink ??
btenv.execute_sql("""
   CREATE TABLE rs (
   a int,
   b string,
   m bigint
   ) WITH (
   'connector' = 'print'
   )
   """)

tb2.execute_insert("rs").wait()
print(tb2.to_pandas())
#