Hi Xingbo, thx a lot, it works !

But I'm still sure that it's not obvious from a user point of view,
that *pyflink-shell.sh
*doesn't use provided flink-conf.yaml, don't you think that it looks like
an issue ?

Thx !

вт, 13 окт. 2020 г. в 05:35, Xingbo Huang <hxbks...@gmail.com>:

> Hi,
>
> You can use api to set configuration:
> table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '80m')
>
> The flink-conf.yaml way will only take effect when submitted through flink
> run, and the minicluster way(python xxx.py) will not take effect.
>
> Best,
> Xingbo
>
> Sharipov, Rinat <r.shari...@cleverdata.ru> 于2020年10月13日周二 上午1:56写道:
>
>> Hi mates !
>>
>> I'm very new at pyflink and trying to register a custom UDF function
>> using python API.
>> Currently I faced an issue in both server env and my local IDE
>> environment.
>>
>> When I'm trying to execute the example below I got an error message: *The
>> configured Task Off-Heap Memory 0 bytes is less than the least required
>> Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
>> using the configuration key 'taskmanager.memory.task.off-heap.size*
>>
>> Of course I've added required property into *flink-conf.yaml *and
>> checked that *pyflink-shell.sh *initializes env using specified
>> configuration but it doesn't make any sense and I still have an error.
>>
>> I've also attached my flink-conf.yaml file
>>
>> Thx for your help !
>>
>> *Here is an example:*
>>
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.table import BatchTableEnvironment, DataTypes
>> from pyflink.table.udf import udf
>>
>>
>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>> def test_udf(i):
>>     return i
>>
>>
>> if __name__ == "__main__":
>>     env = ExecutionEnvironment.get_execution_environment()
>>     env.set_parallelism(1)
>>
>>     bt_env = BatchTableEnvironment.create(env)
>>     bt_env.register_function("test_udf", test_udf)
>>
>>     my_table = bt_env.from_elements(
>>         [
>>             ("user-1", "http://url/1";),
>>             ("user-2", "http://url/2";),
>>             ("user-1", "http://url/3";),
>>             ("user-3", "http://url/4";),
>>             ("user-1", "http://url/3";)
>>         ],
>>         [
>>             "uid", "url"
>>         ]
>>     )
>>
>>     my_table_grouped_by_uid = my_table.group_by("uid").select("uid, 
>> collect(url) as urls")
>>     bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)
>>
>>     bt_env.execute_sql("select test_udf(uid) as uid, urls from 
>> my_temp_table").print()
>>
>>
>>
>>

Reply via email to