Hi, You can use api to set configuration: table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
The flink-conf.yaml way will only take effect when submitted through flink run, and the minicluster way(python xxx.py) will not take effect. Best, Xingbo Sharipov, Rinat <r.shari...@cleverdata.ru> 于2020年10月13日周二 上午1:56写道: > Hi mates ! > > I'm very new at pyflink and trying to register a custom UDF function using > python API. > Currently I faced an issue in both server env and my local IDE > environment. > > When I'm trying to execute the example below I got an error message: *The > configured Task Off-Heap Memory 0 bytes is less than the least required > Python worker Memory 79 mb. The Task Off-Heap Memory can be configured > using the configuration key 'taskmanager.memory.task.off-heap.size* > > Of course I've added required property into *flink-conf.yaml *and checked > that *pyflink-shell.sh *initializes env using specified configuration but > it doesn't make any sense and I still have an error. > > I've also attached my flink-conf.yaml file > > Thx for your help ! > > *Here is an example:* > > from pyflink.dataset import ExecutionEnvironment > from pyflink.table import BatchTableEnvironment, DataTypes > from pyflink.table.udf import udf > > > @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) > def test_udf(i): > return i > > > if __name__ == "__main__": > env = ExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > > bt_env = BatchTableEnvironment.create(env) > bt_env.register_function("test_udf", test_udf) > > my_table = bt_env.from_elements( > [ > ("user-1", "http://url/1"), > ("user-2", "http://url/2"), > ("user-1", "http://url/3"), > ("user-3", "http://url/4"), > ("user-1", "http://url/3") > ], > [ > "uid", "url" > ] > ) > > my_table_grouped_by_uid = my_table.group_by("uid").select("uid, > collect(url) as urls") > bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid) > > bt_env.execute_sql("select test_udf(uid) as uid, urls from > my_temp_table").print() > > > >