抱歉,报错信息理解错误,问题已经解决,感谢大佬。
在 2021-02-03 10:23:32,"肖越" <18242988...@163.com> 写道: >pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积, >结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。 >结果print报错: >Traceback (most recent call last): > File "C:*****/udtf_test.py", line 42, in <module> > env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) > FROM query_result') > File > "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", > line 543, in execute_sql > return TableResult(self._j_tenv.executeSql(stmt)) > File > "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "C:\****\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", > line 154, in deco > raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) >pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 >bytes is less than the least required Python worker Memory 79 mb. The Task >Off-Heap Memory can be configured using the configuration key >'taskmanager.memory.task.off-heap.size'." > > >【代码如下】: >s_env = StreamExecutionEnvironment.get_execution_environment() >s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) ># s_env.set_parallelism(8) >env = StreamTableEnvironment.create(s_env, > > environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()) >env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", > '0m') ># 注册源表 >env.execute_sql(get_table_ddl('TP_GL_DAY')) >env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE')) > > ># 注册输出表 >out_ddl = ''' > CREATE TABLE print_result ( > yldrate1 DOUBLE > ) WITH ( > 'connector' = 'print' > ) >''' >env.execute_sql(out_ddl) ># 定义及执行SQL >log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN >TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' " >view_table = env.sql_query(log_query) >env.register_table('query_result', view_table) > > ># 定义计算逻辑函数 >@udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), >udf_type="pandas") >def multi_production(yldrate): > yldrate_1 = yldrate + 1 > return np.prod(yldrate_1) - 1 > > ># 注册函数 >env.register_function('multi_production', multi_production) >env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) >FROM query_result') >query_result.print_schema() >env.execute('my_udf_job') >