Hi, 报错信息说了最少需要79m,我看你代码配成0m,当然还是继续报错呀 Best, Xingbo
肖越 <18242988...@163.com> 于2021年2月3日周三 上午10:24写道: > pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积, > 结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。 > 结果print报错: > Traceback (most recent call last): > File "C:*****/udtf_test.py", line 42, in <module> > env.execute_sql('INSERT INTO print_result SELECT > multi_production(YLDRATE) FROM query_result') > File > "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", > line 543, in execute_sql > return TableResult(self._j_tenv.executeSql(stmt)) > File > "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", > line 154, in deco > raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) > pyflink.util.exceptions.TableException: "The configured Task Off-Heap > Memory 0 bytes is less than the least required Python worker Memory 79 mb. > The Task Off-Heap Memory can be configured using the configuration key > 'taskmanager.memory.task.off-heap.size'." > > > 【代码如下】: > s_env = StreamExecutionEnvironment.get_execution_environment() > s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > # s_env.set_parallelism(8) > env = StreamTableEnvironment.create(s_env, > > environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()) > env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", > '0m') > # 注册源表 > env.execute_sql(get_table_ddl('TP_GL_DAY')) > env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE')) > > > # 注册输出表 > out_ddl = ''' > CREATE TABLE print_result ( > yldrate1 DOUBLE > ) WITH ( > 'connector' = 'print' > ) > ''' > env.execute_sql(out_ddl) > # 定义及执行SQL > log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY > JOIN TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' " > view_table = env.sql_query(log_query) > env.register_table('query_result', view_table) > > > # 定义计算逻辑函数 > @udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), > udf_type="pandas") > def multi_production(yldrate): > yldrate_1 = yldrate + 1 > return np.prod(yldrate_1) - 1 > > > # 注册函数 > env.register_function('multi_production', multi_production) > env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) > FROM query_result') > query_result.print_schema() > env.execute('my_udf_job') > >