抱歉,报错信息理解错误,问题已经解决,感谢大佬。

在 2021-02-03 10:23:32,"肖越" <18242988...@163.com> 写道:
>pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
>结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
>结果print报错:
>Traceback (most recent call last):
>  File "C:*****/udtf_test.py", line 42, in <module>
>    env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) 
> FROM query_result')
>  File 
> "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
>  line 543, in execute_sql
>    return TableResult(self._j_tenv.executeSql(stmt))
>  File 
> "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
>  line 1286, in __call__
>    answer, self.gateway_client, self.target_id, self.name)
>  File 
> "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
>  line 154, in deco
>    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
>pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 
>bytes is less than the least required Python worker Memory 79 mb. The Task 
>Off-Heap Memory can be configured using the configuration key 
>'taskmanager.memory.task.off-heap.size'."
>
>
>【代码如下】:
>s_env = StreamExecutionEnvironment.get_execution_environment()
>s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
># s_env.set_parallelism(8)
>env = StreamTableEnvironment.create(s_env,
>    
> environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
>env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '0m')
># 注册源表
>env.execute_sql(get_table_ddl('TP_GL_DAY'))
>env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE'))
>
>
># 注册输出表
>out_ddl = '''
>        CREATE TABLE print_result (
>         yldrate1 DOUBLE
>        ) WITH (
>         'connector' = 'print'
>        )
>'''
>env.execute_sql(out_ddl)
># 定义及执行SQL
>log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN 
>TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' "
>view_table = env.sql_query(log_query)
>env.register_table('query_result', view_table)
>
>
># 定义计算逻辑函数
>@udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), 
>udf_type="pandas")
>def multi_production(yldrate):
>    yldrate_1 = yldrate + 1
>    return np.prod(yldrate_1) - 1
>
>
># 注册函数
>env.register_function('multi_production', multi_production)
>env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) 
>FROM query_result')
>query_result.print_schema()
>env.execute('my_udf_job')
>

回复