??????????????????from_pandas????????????????????????????????????????

????????????????????????8??????400??????????????????env.set_parallelism(8)??400????????????????12????????????????????????


1.????????????output??????????????8????????????????????1????????????????????????????????????????????????????????????????????????????????????????????????????1??????????


2.arrow.batch.size????????????????????????????????????????arrow.batch.size????????????????????????


3.??????????????????????????????????????arrow.batch.size????????????????????????????????????????????


4.??????????12??24??????CPU????????????????????????????????????????12????

??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????


------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<hxbks...@gmail.com&gt;;
????????:&nbsp;2020??10??26??(??????) ????8:47
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;Re: pyflink ????????????????????????????????



Hi??

1. 
from_pandas????????????????????????????????????????????flink??csv??connector??????????????????
2. 
arrow.batch.size????????????????????????????????pandas.series??????????????udf??????????????

Best,
Xingbo

???????? <1264386...@qq.com&gt; ??2020??10??26?????? ????4:32??????

&gt; ??????????????????
&gt; ??????????????????
&gt; ??????????
&gt; 
??????????????????????????????????????????????13782492????????????????????????????????????4000??????????????????????id????0??????id??
&gt; ????????????
&gt; 
??????????????????????????????????????????????????????????????id??????????????????????100m??????????????????????????100m????????-1??
&gt; ??????????????????
&gt; import&amp;nbsp;pandas as&amp;nbsp;pd
&gt; import&amp;nbsp;numpy as&amp;nbsp;np
&gt; from&amp;nbsp;pyflink.datastream import&amp;nbsp;StreamExecutionEnvironment
&gt; from&amp;nbsp;pyflink.table import&amp;nbsp;StreamTableEnvironment, 
DataTypes
&gt; from&amp;nbsp;pyflink.table.descriptors import&amp;nbsp;Schema, OldCsv, 
FileSystem
&gt; from&amp;nbsp;pyflink.table.udf import&amp;nbsp;udf
&gt; import&amp;nbsp;os
&gt; import&amp;nbsp;time
&gt; # ??????????????????????????????1??batchsize??10??????????????????????????
&gt;
&gt; env =&amp;nbsp;StreamExecutionEnvironment.get_execution_environment()
&gt; env.set_parallelism(1)
&gt; t_env =&amp;nbsp;StreamTableEnvironment.create(env)
&gt; 
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
&gt; '80m')
&gt; 
t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size",
&gt; '100000')
&gt; # ??????????
&gt; if&amp;nbsp;os.path.exists('output'):
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; os.remove('output')
&gt;
&gt; t_env.connect(FileSystem().path('output')) \
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; .with_format(OldCsv()
&gt; 
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .field('id', DataTypes.BIGINT())) \
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; .with_schema(Schema()
&gt; 
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .field('id', DataTypes.BIGINT())) \
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; .create_temporary_table('mySink')
&gt; # ????????????????????
&gt; data =&amp;nbsp;pd.read_csv(r'D:\??????\????????\data\trip\graph_data.csv')
&gt; coor_o =&amp;nbsp;pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'],
&gt; data['O_Y'])))).T
&gt; coor_d =&amp;nbsp;pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'],
&gt; data['D_Y'])))).T
&gt; coor =&amp;nbsp;coor_o.append(coor_d).drop_duplicates()
&gt; coor.columns =&amp;nbsp;['lng', 'lat']
&gt; coor =&amp;nbsp;coor.sort_index()
&gt; coor =&amp;nbsp;coor.to_numpy()
&gt; # udf??????????
&gt;
&gt;
&gt;
&gt; @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(),
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
DataTypes.ARRAY(DataTypes.FLOAT()),
&gt; DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT())
&gt; def&amp;nbsp;distance_meters(lng1, lat1, lng2=coor[:, 0], lat2=coor[:, 1]):
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; temp 
=&amp;nbsp;(np.sin((lng2-lng1)/2*np.pi/180)**2+&amp;nbsp;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; 
+np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2)
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; distance 
=&amp;nbsp;2*np.arctan2(np.sqrt(temp),
&gt; np.sqrt(1-temp))
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; distance =&amp;nbsp;distance*3958.8*1609.344
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; buffer=100
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; if&amp;nbsp;(distance 
<=&amp;nbsp;buffer).sum() &amp;gt;&amp;nbsp;0:
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
return&amp;nbsp;distance.argmin()
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; else:
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
return&amp;nbsp;-1
&gt; # ????????????????
&gt;
&gt; df =&amp;nbsp;pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv')
&gt; use_data =&amp;nbsp;df[['pickup_longitude', 'pickup_latitude']]
&gt; # ????????
&gt; t_env.from_pandas(use_data) \
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
.select("distance_meters(pickup_longitude,
&gt; pickup_latitude)") \
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; .insert_into('mySink')
&gt; # ??????????
&gt;
&gt; start_time =&amp;nbsp;time.time()
&gt; t_env.execute("tutorial_job")
&gt; print(time.time() -&amp;nbsp;start_time)
&gt; ????????CPU??12??24????
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; 
??????????????????????????????????2607????43????????????????????????????????????????????????????????????????1??????????10????????????????????????..
&gt; 
????????????????????????8????????????????????8??????????????????????????????????????????????????????????????????????
&gt; ??????????????????????????????????????????????batch.size??
&gt; ????????????????????

回复