Hi, >>> 1.但是结果是在output的文件夹内生成8个文件,但是只有文件1有数据,这样是正常的吗?检查了一下,好像顺序没有改变,与原顺序一致,怎样设置可以将其按照原顺序保存为1个文件呢? flink的table作业目前没法单独为每一个算子设置并发度,所以你设置并发度为8,就会输出8个文件。我觉得你这数据量不大,本质还是from_pandas的问题,你先把它换了,先用一个并发度玩就行。
>>> 2.arrow.batch.size的意思经过您的细心解答我理解了,那么增大arrow.batch.size也是可以加快处理速度吗? 其实跑pandas udf的模型是有一个java进程和对应一个Python进程,你的udf在Python进程跑着,数据从Java进程批量发送过去,一次发送多少数据是由python.fn-execution.bundle.size这个配置控制的,对于pandas udf来说,因为需要把这个数据组织成pandas.series,所以还会有这个配置python.fn-execution.arrow.batch.size。举个例子就是说比如python.fn-execution.bundle.size=6,python.fn-execution.arrow.batch.size=2,那么就是我就会把6条数据,组织称3个pandas.series一次性发送到Python进程,一个pandas.series会调用一次的pandas udf。所以这里就是调用3次。很明显了,你提高arrow.batch.size的好处是,一个是你组成的pandas.series的数量会更少,很显然每个pandas.series都是有meta信息放在数据头部,越少的pandas.series,那么你传送的数据少一点,通信开销会少一点,另一个好处是你调用udf的次数会减少。当然了你的python.fn-execution.arrow.batch.size是没法超过python.fn-execution.bundle.size,至于说不断增大python.fn-execution.bundle.size是不是就一定是好的也不一定,太大显然你是要buffer数据的,会增大延迟的,而且这时候python进程是空闲的,没有充分调度起来。 关于这两个参数的配置你可以参考文档[1] >>> 3.我应该如何确定该使用多大的并行数和多大arrow.batch.size呢?还是说这是一个经验的做法?需要多次尝试? 调节参数大小,你要根据你具体作业来调节。一般来说我们提供的默认值都是较优的,不需要调节。 >>> 4.我的电脑是12核24线程的CPU,如果我不设置并行数,那么默认就是并行数12吗? 你这应该是24个。你可以通过env.get_parallelism()拿到 我说的可能有点多,希望对你有所帮助。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_config.html#python-options Best, Xingbo 洗你的头 <1264386...@qq.com> 于2020年10月26日周一 下午11:04写道: > 感谢您的解答,原来from_pandas的性能会差点哦,我明天会改一下读取的方式 > > > 然后我尝试了设置并行数为8,使用400万数据测试了一下,env.set_parallelism(8),400万的数据耗时耗时12分钟,感觉是比之前快了点 > > > > 1.但是结果是在output的文件夹内生成8个文件,但是只有文件1有数据,这样是正常的吗?检查了一下,好像顺序没有改变,与原顺序一致,怎样设置可以将其按照原顺序保存为1个文件呢? > > > 2.arrow.batch.size的意思经过您的细心解答我理解了,那么增大arrow.batch.size也是可以加快处理速度吗? > > > 3.我应该如何确定该使用多大的并行数和多大arrow.batch.size呢?还是说这是一个经验的做法?需要多次尝试? > > > 4.我的电脑是12核24线程的CPU,如果我不设置并行数,那么默认就是并行数12吗? > > 最后,再次感谢您的细心解答,祝您工作顺利,身体健康!我的问题可能比较多,并且比较初级,真的十分感谢您能细心回答,对我的帮助太大了。 > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > < > hxbks...@gmail.com>; > 发送时间: 2020年10月26日(星期一) 晚上8:47 > 收件人: "user-zh"<user-zh@flink.apache.org>; > > 主题: Re: pyflink 如何正确设置高速度?(如何提速) > > > > Hi, > > 1. from_pandas性能不太好的,不是用在生产上的。你可以直接用flink的csv的connector来读取你的数据呀。 > 2. arrow.batch.size,表示的是会把多少条数据变成一个pandas.series,然后作为你的udf的一个列传给你 > > Best, > Xingbo > > 洗你的头 <1264386...@qq.com> 于2020年10月26日周一 下午4:32写道: > > > 尊敬的开发者您好, > > 我的需求是这样的, > > 拥有数据: > > > 现拥有两个表,一个表为出租车起点的经纬度坐标(13782492行),另一个表为交叉口的经纬度坐标(4000多行,每个坐标具备一个id,从0开始的id) > > 需要做什么? > > > 将将一千多万的起点坐标匹配到距离最近的交叉口上去,返回该匹配的id,设置了一个距离阈值为100m,如果据最近的交叉口仍超过100m,则返回-1。 > > 我现在的代码如下: > > import&nbsp;pandas as&nbsp;pd > > import&nbsp;numpy as&nbsp;np > > from&nbsp;pyflink.datastream > import&nbsp;StreamExecutionEnvironment > > from&nbsp;pyflink.table import&nbsp;StreamTableEnvironment, > DataTypes > > from&nbsp;pyflink.table.descriptors import&nbsp;Schema, > OldCsv, FileSystem > > from&nbsp;pyflink.table.udf import&nbsp;udf > > import&nbsp;os > > import&nbsp;time > > # 环境等设置,目前使用的并行数为1,batchsize为10万(我不知道这个有没有用) > > > > env =&nbsp;StreamExecutionEnvironment.get_execution_environment() > > env.set_parallelism(1) > > t_env =&nbsp;StreamTableEnvironment.create(env) > > > t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", > > '80m') > > > t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size", > > '100000') > > # 输出表创建 > > if&nbsp;os.path.exists('output'): > > &nbsp;&nbsp;&nbsp; os.remove('output') > > > > t_env.connect(FileSystem().path('output')) \ > > &nbsp;&nbsp;&nbsp; .with_format(OldCsv() > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > .field('id', DataTypes.BIGINT())) \ > > &nbsp;&nbsp;&nbsp; .with_schema(Schema() > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > .field('id', DataTypes.BIGINT())) \ > > &nbsp;&nbsp;&nbsp; .create_temporary_table('mySink') > > # 交叉口经纬度数据读取 > > data =&nbsp;pd.read_csv(r'D:\大论文\项目代码\data\trip\graph_data.csv') > > coor_o =&nbsp;pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'], > > data['O_Y'])))).T > > coor_d =&nbsp;pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'], > > data['D_Y'])))).T > > coor =&nbsp;coor_o.append(coor_d).drop_duplicates() > > coor.columns =&nbsp;['lng', 'lat'] > > coor =&nbsp;coor.sort_index() > > coor =&nbsp;coor.to_numpy() > > # udf编写与注册 > > > > > > > > @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(), > > &nbsp;&nbsp;&nbsp;&nbsp; > DataTypes.ARRAY(DataTypes.FLOAT()), > > DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT()) > > def&nbsp;distance_meters(lng1, lat1, lng2=coor[:, 0], > lat2=coor[:, 1]): > > &nbsp;&nbsp;&nbsp; temp > =&nbsp;(np.sin((lng2-lng1)/2*np.pi/180)**2+&nbsp; > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > > +np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2) > > &nbsp;&nbsp;&nbsp; distance > =&nbsp;2*np.arctan2(np.sqrt(temp), > > np.sqrt(1-temp)) > > &nbsp;&nbsp;&nbsp; distance > =&nbsp;distance*3958.8*1609.344 > > > > &nbsp;&nbsp;&nbsp; buffer=100 > > &nbsp;&nbsp;&nbsp; if&nbsp;(distance > <=&nbsp;buffer).sum() &gt;&nbsp;0: > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > return&nbsp;distance.argmin() > > &nbsp;&nbsp;&nbsp; else: > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > return&nbsp;-1 > > # 出行起点数据读取 > > > > df =&nbsp;pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv') > > use_data =&nbsp;df[['pickup_longitude', 'pickup_latitude']] > > # 处理流程 > > t_env.from_pandas(use_data) \ > > &nbsp;&nbsp;&nbsp;&nbsp; > .select("distance_meters(pickup_longitude, > > pickup_latitude)") \ > > &nbsp;&nbsp;&nbsp;&nbsp; .insert_into('mySink') > > # 执行与计时 > > > > start_time =&nbsp;time.time() > > t_env.execute("tutorial_job") > > print(time.time() -&nbsp;start_time) > > 我电脑的CPU为12核24线程 > > > > > > > > > > > > > > > > > > > > > > > 目前处理一千多万数据所耗费的时间为2607秒(43分钟),我不知道为什么要花这么长的时间,按理来说即使设置并行数为1,批大小为10万,应该要比这个快很多吧.. > > 我尝试了一下设置并行数为8,但是返现结果会变为8个文件,我就打断了,没有运行完(我需要保持原表的输入顺序,该怎么做呢) > > 请问,我这种情况应该如何去提速呢?可否解释一下batch.size? > > 期待您的回答,感谢!