Hi Xingbo, thx a lot, it works ! But I'm still sure that it's not obvious from a user point of view, that *pyflink-shell.sh *doesn't use provided flink-conf.yaml, don't you think that it looks like an issue ?
Thx ! вт, 13 окт. 2020 г. в 05:35, Xingbo Huang <hxbks...@gmail.com>: > Hi, > > You can use api to set configuration: > table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", > '80m') > > The flink-conf.yaml way will only take effect when submitted through flink > run, and the minicluster way(python xxx.py) will not take effect. > > Best, > Xingbo > > Sharipov, Rinat <r.shari...@cleverdata.ru> 于2020年10月13日周二 上午1:56写道: > >> Hi mates ! >> >> I'm very new at pyflink and trying to register a custom UDF function >> using python API. >> Currently I faced an issue in both server env and my local IDE >> environment. >> >> When I'm trying to execute the example below I got an error message: *The >> configured Task Off-Heap Memory 0 bytes is less than the least required >> Python worker Memory 79 mb. The Task Off-Heap Memory can be configured >> using the configuration key 'taskmanager.memory.task.off-heap.size* >> >> Of course I've added required property into *flink-conf.yaml *and >> checked that *pyflink-shell.sh *initializes env using specified >> configuration but it doesn't make any sense and I still have an error. >> >> I've also attached my flink-conf.yaml file >> >> Thx for your help ! >> >> *Here is an example:* >> >> from pyflink.dataset import ExecutionEnvironment >> from pyflink.table import BatchTableEnvironment, DataTypes >> from pyflink.table.udf import udf >> >> >> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) >> def test_udf(i): >> return i >> >> >> if __name__ == "__main__": >> env = ExecutionEnvironment.get_execution_environment() >> env.set_parallelism(1) >> >> bt_env = BatchTableEnvironment.create(env) >> bt_env.register_function("test_udf", test_udf) >> >> my_table = bt_env.from_elements( >> [ >> ("user-1", "http://url/1"), >> ("user-2", "http://url/2"), >> ("user-1", "http://url/3"), >> ("user-3", "http://url/4"), >> ("user-1", "http://url/3") >> ], >> [ >> "uid", "url" >> ] >> ) >> >> my_table_grouped_by_uid = my_table.group_by("uid").select("uid, >> collect(url) as urls") >> bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid) >> >> bt_env.execute_sql("select test_udf(uid) as uid, urls from >> my_temp_table").print() >> >> >> >>