Hi,

You can use api to set configuration:
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')

The flink-conf.yaml way will only take effect when submitted through flink
run, and the minicluster way(python xxx.py) will not take effect.

Best,
Xingbo

Sharipov, Rinat <r.shari...@cleverdata.ru> 于2020年10月13日周二 上午1:56写道:

> Hi mates !
>
> I'm very new at pyflink and trying to register a custom UDF function using
> python API.
> Currently I faced an issue in both server env and my local IDE
> environment.
>
> When I'm trying to execute the example below I got an error message: *The
> configured Task Off-Heap Memory 0 bytes is less than the least required
> Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
> using the configuration key 'taskmanager.memory.task.off-heap.size*
>
> Of course I've added required property into *flink-conf.yaml *and checked
> that *pyflink-shell.sh *initializes env using specified configuration but
> it doesn't make any sense and I still have an error.
>
> I've also attached my flink-conf.yaml file
>
> Thx for your help !
>
> *Here is an example:*
>
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.table import BatchTableEnvironment, DataTypes
> from pyflink.table.udf import udf
>
>
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> def test_udf(i):
>     return i
>
>
> if __name__ == "__main__":
>     env = ExecutionEnvironment.get_execution_environment()
>     env.set_parallelism(1)
>
>     bt_env = BatchTableEnvironment.create(env)
>     bt_env.register_function("test_udf", test_udf)
>
>     my_table = bt_env.from_elements(
>         [
>             ("user-1", "http://url/1";),
>             ("user-2", "http://url/2";),
>             ("user-1", "http://url/3";),
>             ("user-3", "http://url/4";),
>             ("user-1", "http://url/3";)
>         ],
>         [
>             "uid", "url"
>         ]
>     )
>
>     my_table_grouped_by_uid = my_table.group_by("uid").select("uid, 
> collect(url) as urls")
>     bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)
>
>     bt_env.execute_sql("select test_udf(uid) as uid, urls from 
> my_temp_table").print()
>
>
>
>

Reply via email to