Hi,

不好意思回复这么晚。关于pandas
udaf,我有专门测试过框架层的开销(函数用普通的均值计算)。和java相比,差距也就3,4倍左右,具体可以参考代码[1]。关于你这个代码,我怀疑是因为你函数实现的问题。你这个函数构造df是会有额外的开销。你为啥不直接使用j来进行计算。当然了,你也可以根据调整一些参数来提高性能,比如python.fn-execution.bundle.size和python.fn-execution.bundle.time,具体可以参考文档[2]。



[1]
https://github.com/HuangXingBo/pyflink-performance-demo/blob/master/python/flink/flink-pandas-udaf-test.py
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-fn-execution-bundle-size

Best,
Xingbo

xiao...@ysstech.com <xiao...@ysstech.com> 于2021年3月2日周二 下午1:38写道:

> Hi,
>     是的,就是在batch模式下,我是只在本机local下执行的,不是集群模式,把全部代码贴一下吧。
>     python版:
>     # 建立环境(udaf仅支持批环境)
>     env_settings =
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
>     env = BatchTableEnvironment.create(environment_settings=env_settings)
>     # 表1 1千万行
>      source_ddl1 = """CREATE TABLE TP_GL_DAY (DAY_ID
> VARCHAR(8),IS_EXCH_DAY DECIMAL
>                         ) WITH (
>                         'connector' = 'jdbc',
>                         'url' = 'jdbc:mysql://ip:port/db?useSSL=False',
>                         'driver' = 'com.mysql.cj.jdbc.Driver',
>                         'username' = 'root',
>                         'password' = 'xxx',
>                         'table-name' = 'TP_GL_DAY')
>                 """
>     #表2 700多行
>     source_ddl2 = """CREATE TABLE TS_PF_SEC_YLDRATE (PF_ID VARCHAR(10),\
>                     SYMBOL_ID VARCHAR(20),BIZ_DATE VARCHAR(8),\
>                     CCY_TYPE VARCHAR(10),YLDRATE DECIMAL(18,12)
>                     ) WITH (
>                     'connector' = 'jdbc',
>                     'url' = 'jdbc:mysql://ip:port/db?useSSL=False',
>                     'driver' = 'com.mysql.cj.jdbc.Driver',
>                     'username' = 'root',
>                     'password' = 'xxx',
>                     'table-name' = 'TS_PF_SEC_YLDRATE')
>             """
>    # sink
>      print_sink_ddl = """
>                       CREATE TABLE print(
>                         pf_id VARCHAR(10),
>                         out_put FLOAT
>                     ) WITH (
>                       'connector' = 'print'
>                     )
>                 """
>     # 源表
>     env.execute_sql(source_ddl1)
>     env.execute_sql(source_ddl2)
>    # sink
>      env.execute_sql(print_sink_ddl)
>
>     sql = "SELECT YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN
> TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '123' AND SYMBOL_ID =
> '456' AND BIZ_DATE BETWEEN '20160701' AND '20170307'"
>
>     # 获取Query结果
>     query_table = env.sql_query(sql)
>     # 执行udaf
>     # udaf 聚合函数计算
>     @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
>     def logReturn(i, j):
>         df = pd.DataFrame({'pf_id': i, 'yldrate': j})
>         df['yldrate1'] = df['yldrate'] + 1
>         return np.prod(df['yldrate1']) - 1
>     # 执行并打印
>     result =
> query_table.group_by(query_table.PF_ID).select(query_table.PF_ID,
>                                                     logReturn(
>                                                         query_table.PF_ID,
>
> query_table.YLDRATE)).execute_insert('print').wait()
>
>     Java版本:
>     Java选用的环境是流环境:
>         StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings streamSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, streamSettings);
>         streamEnv.execute("");
>     计算部分:
>         java这边的queryData也是通过定义connector DDL注册源表后,执行sql获取的。
>         tableEnv.registerFunction("add", new addFunction());
>         tableEnv.registerFunction("prod", new ProductUdaf());
>         Table addedTable = tableEnv.sqlQuery("SELECT pf_id,add(yldrate) as
> yldrate FROM queryData");
>         tableEnv.createTemporaryView("addedTable", addedTable);
>         Table resultTable = tableEnv.sqlQuery("SELECT
> pf_id,prod(yldrate)-1 as yldrate FROM addedTable group by pf_id");
>
> 因为java版本代码,是同事写的,但逻辑按照python这边的逻辑,执行时间上python看本机的cpu占用情况(每次执行时不超过8%)会跑400或500s不等,基本维持在400s左右;我的电脑是win10
> 64位,RAM16GB,主频2.3GHz, 内核4,逻辑处理器8.
>
>
> xiao...@ysstech.com
>
> 发件人: Xingbo Huang
> 发送时间: 2021-03-02 11:59
> 收件人: user-zh
> 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
> Hi,
>
> 首先,我假定你是在batch模式上跑的Pandas UDAF(unbounded stream上不支持pandas udaf)。
>
> 然后,我再确认另一件事,你使用java写了一个java版本的udaf(logReturn),同你写的这个python版本的udaf进行对比,时间上java版本是3s?python版本的要8分钟?
>
> Best,
> Xingbo
>
> xiao...@ysstech.com <xiao...@ysstech.com> 于2021年3月2日周二 上午9:57写道:
>
> > Hi,
> >     我是用的flink1.12的pandas类型的udaf, 代码如下:
> >     @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
> >     def logReturn(i, j):
> >         df = pd.DataFrame({'id': i, 'rate': j})
> >         df['rate1'] = df['rate'] + 1
> >         return numpy.prod(df['rate1']) - 1
> >     调用方式为:
> >      result =
> > query_table.group_by(query_table.PF_ID).select(query_table.ID,
> >                                                      logReturn(
> >                                                          query_table.ID,
> >
> >  query_table.RATE)).execute_insert('print').wait()
> > 这个代码的sink用的print,,其实之前的to_pandas也是用在最后一步来获取计算结果,用于返回的;
> > java的写法类似,也是定义了udaf之后执行,返回Table类型的结果,再对这个结构处理,取出计算结果;
> > 但两者的执行时间差很多,python用了很多方式,实现同样的简单计算逻辑,都差不多要8分钟左右。
> > 总感觉时间用在query上,之前使用过flink1.11中connector.read.query方式直接获取数据,计算的速度就很快~
> > 小白一个,不太了解flink内部的设计,希望能在这里找到具体的原因~谢谢您啦~
> >
> >
> >
> > xiao...@ysstech.com
> >
> > 发件人: Xingbo Huang
> > 发送时间: 2021-03-02 09:42
> > 收件人: user-zh
> > 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
> > Hi,
> >
> > 你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas
> >
> >
> udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。
> >
> > Best,
> > Xingbo
> >
> > xiaoyue <18242988...@163.com> 于2021年3月1日周一 上午10:34写道:
> >
> > > Hi, Xingbo
> > >     非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
> > >     项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
> > >     也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。
> > >
> > >     所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or
> > numpy中的矩阵计算,非常感谢~!
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > 在 2021-03-01 09:54:49,"Xingbo Huang" <hxbks...@gmail.com> 写道:
> > > >Hi,
> > > >
> > >
> > >
> >
> >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
> > > >
> > > >Best
> > > >Xingbo
> > > >
> > > >xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道:
> > > >
> > > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
> > > >>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON
> > > source1.ID
> > > >> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
> > > >> '20170307'"
> > > >> # 获取Query结果
> > > >>     query_table = env.sql_query(sql)
> > > >>     query_table.to_pandas()
> > > >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> > > >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> > > >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
> > > >>
> > > >>
> > >
> >
>

回复