??????????udf????????????MySQL???????????????????? import pymysql import pandas as pd def pole_record_perid1(poleId: int, current_time: int, days: int): days_delta_ts = current_time - days * 24 * 60 * 60 * 1000
days_mh_delta = '''select rt,ts from test where poleId={} and ts between {} and {};'''.format( poleId, days_delta_ts, current_time) # mysql???????????? mysql = pymysql.connect('192.1.1.1', 'test_database', port=3306, charset='utf8') delta = mysql.excute(days_mh_delta) # mysql???????????? info_df = pd.DataFrame(delta, columns=['rt', 'ts']) if info_df.empty: return ["0.0", timeconvert(current_time), "-1", "-1"] else: max_rt_info = info_df.query('rt==rt.max()') min_rt_info = info_df.query('rt==rt.min()') max_rt_info = max_rt_info['ts'].apply(timeconvert).tolist() min_rt_info = min_rt_info['ts'].apply(timeconvert).tolist() max_rt_ts = ",".join(max_rt_info) min_rt_ts = ",".join(min_rt_info) return [str(info_df['rt'].max() - info_df['rt'].min()), timeconvert(current_time), \ max_rt_ts, min_rt_ts] @udf(result_type=DataTypes.ARRAY(DataTypes.STRING()), func_type="pandas") def pole_record_perid(poleId, current_time, days): df = pd.DataFrame({'poleId': poleId, 'current_time': current_time, 'days': days}) df['res'] = df.apply(lambda x: pole_record_perid1(x.poleId, x.current_time, x.days), axis=1) return df['res']