??????????udf????????????MySQL????????????????????
import pymysql
import pandas as pd
def pole_record_perid1(poleId: int, current_time: int, days: int):
    days_delta_ts = current_time - days * 24 * 60 * 60 * 1000


    days_mh_delta = '''select rt,ts from test where  poleId={} 
and
                      
  ts between {} and {};'''.format( poleId, days_delta_ts,
                      
                      
                      
     current_time)
        # mysql????????????                                                     
                                                                   
    mysql = pymysql.connect('192.1.1.1', 'test_database', port=3306, 
charset='utf8')
    delta = mysql.excute(days_mh_delta)
        # mysql????????????
        
    info_df = pd.DataFrame(delta, columns=['rt', 'ts'])
    if info_df.empty:
        return ["0.0", timeconvert(current_time), "-1", 
"-1"]
    else:
        max_rt_info = info_df.query('rt==rt.max()')
        min_rt_info = info_df.query('rt==rt.min()')
        max_rt_info = 
max_rt_info['ts'].apply(timeconvert).tolist()
        min_rt_info = 
min_rt_info['ts'].apply(timeconvert).tolist()
        
        max_rt_ts = ",".join(max_rt_info)
        min_rt_ts = ",".join(min_rt_info)
        return [str(info_df['rt'].max() - 
info_df['rt'].min()), timeconvert(current_time), \
                max_rt_ts, min_rt_ts]


@udf(result_type=DataTypes.ARRAY(DataTypes.STRING()), func_type="pandas")
def pole_record_perid(poleId, current_time, days):
    df = pd.DataFrame({'poleId': poleId, 'current_time': 
current_time, 'days': days})
    df['res'] = df.apply(lambda x: pole_record_perid1(x.poleId, 
x.current_time, x.days), axis=1)
    return df['res']

回复