??????????????????from_pandas???????????????????????????????????????? ????????????????????????8??????400??????????????????env.set_parallelism(8)??400????????????????12????????????????????????
1.????????????output??????????????8????????????????????1????????????????????????????????????????????????????????????????????????????????????????????????????1?????????? 2.arrow.batch.size????????????????????????????????????????arrow.batch.size???????????????????????? 3.??????????????????????????????????????arrow.batch.size???????????????????????????????????????????? 4.??????????12??24??????CPU????????????????????????????????????????12???? ?????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? ------------------ ???????? ------------------ ??????: "user-zh" <hxbks...@gmail.com>; ????????: 2020??10??26??(??????) ????8:47 ??????: "user-zh"<user-zh@flink.apache.org>; ????: Re: pyflink ???????????????????????????????? Hi?? 1. from_pandas????????????????????????????????????????????flink??csv??connector?????????????????? 2. arrow.batch.size????????????????????????????????pandas.series??????????????udf?????????????? Best, Xingbo ???????? <1264386...@qq.com> ??2020??10??26?????? ????4:32?????? > ?????????????????? > ?????????????????? > ?????????? > ??????????????????????????????????????????????13782492????????????????????????????????????4000??????????????????????id????0??????id?? > ???????????? > ??????????????????????????????????????????????????????????????id??????????????????????100m??????????????????????????100m????????-1?? > ?????????????????? > import&nbsp;pandas as&nbsp;pd > import&nbsp;numpy as&nbsp;np > from&nbsp;pyflink.datastream import&nbsp;StreamExecutionEnvironment > from&nbsp;pyflink.table import&nbsp;StreamTableEnvironment, DataTypes > from&nbsp;pyflink.table.descriptors import&nbsp;Schema, OldCsv, FileSystem > from&nbsp;pyflink.table.udf import&nbsp;udf > import&nbsp;os > import&nbsp;time > # ??????????????????????????????1??batchsize??10?????????????????????????? > > env =&nbsp;StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > t_env =&nbsp;StreamTableEnvironment.create(env) > t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", > '80m') > t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size", > '100000') > # ?????????? > if&nbsp;os.path.exists('output'): > &nbsp;&nbsp;&nbsp; os.remove('output') > > t_env.connect(FileSystem().path('output')) \ > &nbsp;&nbsp;&nbsp; .with_format(OldCsv() > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .field('id', DataTypes.BIGINT())) \ > &nbsp;&nbsp;&nbsp; .with_schema(Schema() > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .field('id', DataTypes.BIGINT())) \ > &nbsp;&nbsp;&nbsp; .create_temporary_table('mySink') > # ???????????????????? > data =&nbsp;pd.read_csv(r'D:\??????\????????\data\trip\graph_data.csv') > coor_o =&nbsp;pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'], > data['O_Y'])))).T > coor_d =&nbsp;pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'], > data['D_Y'])))).T > coor =&nbsp;coor_o.append(coor_d).drop_duplicates() > coor.columns =&nbsp;['lng', 'lat'] > coor =&nbsp;coor.sort_index() > coor =&nbsp;coor.to_numpy() > # udf?????????? > > > > @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(), > &nbsp;&nbsp;&nbsp;&nbsp; DataTypes.ARRAY(DataTypes.FLOAT()), > DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT()) > def&nbsp;distance_meters(lng1, lat1, lng2=coor[:, 0], lat2=coor[:, 1]): > &nbsp;&nbsp;&nbsp; temp =&nbsp;(np.sin((lng2-lng1)/2*np.pi/180)**2+&nbsp; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > +np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2) > &nbsp;&nbsp;&nbsp; distance =&nbsp;2*np.arctan2(np.sqrt(temp), > np.sqrt(1-temp)) > &nbsp;&nbsp;&nbsp; distance =&nbsp;distance*3958.8*1609.344 > > &nbsp;&nbsp;&nbsp; buffer=100 > &nbsp;&nbsp;&nbsp; if&nbsp;(distance <=&nbsp;buffer).sum() &gt;&nbsp;0: > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return&nbsp;distance.argmin() > &nbsp;&nbsp;&nbsp; else: > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return&nbsp;-1 > # ???????????????? > > df =&nbsp;pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv') > use_data =&nbsp;df[['pickup_longitude', 'pickup_latitude']] > # ???????? > t_env.from_pandas(use_data) \ > &nbsp;&nbsp;&nbsp;&nbsp; .select("distance_meters(pickup_longitude, > pickup_latitude)") \ > &nbsp;&nbsp;&nbsp;&nbsp; .insert_into('mySink') > # ?????????? > > start_time =&nbsp;time.time() > t_env.execute("tutorial_job") > print(time.time() -&nbsp;start_time) > ????????CPU??12??24???? > > > > > > > > > > > ??????????????????????????????????2607????43????????????????????????????????????????????????????????????????1??????????10????????????????????????.. > ????????????????????????8????????????????????8?????????????????????????????????????????????????????????????????????? > ??????????????????????????????????????????????batch.size?? > ????????????????????