Hi Dian,

Thanks for the confirmation, I have created a ticket
https://issues.apache.org/jira/browse/FLINK-22605

Best,
Yik San

On Sat, May 8, 2021 at 2:32 PM Dian Fu <dian0511...@gmail.com> wrote:

> There is still no such optimization at framework level. However, I think
> this maybe a good point that we could optimize. Would you like to create a
> ticket for this?
>
> Regards,
> Dian
>
> 2021年5月8日 下午2:27,Yik San Chan <evan.chanyik...@gmail.com> 写道:
>
> Hi Dian,
>
> Thanks for pointing that out, it is a work around that I have also
> considered.
>
> I wonder if there is a framework level optimization on this, so that a UDF
> is only initiated once, no matter how many times it is called?
>
> Thank you!
>
> Best,
> Yik San
>
> On Sat, May 8, 2021 at 1:32 PM Dian Fu <dian0511...@gmail.com> wrote:
>
>> Hi Yik San,
>>
>> Is it acceptable to rewrite the UDF a bit to accept multiple parameters
>> and then rewrite the program as following:
>>
>> ```
>> SELECT
>> LABEL_ENCODE(a, b, c)
>> ...
>> ```
>>
>> Regards,
>> Dian
>>
>> 2021年5月8日 上午11:56,Yik San Chan <evan.chanyik...@gmail.com> 写道:
>>
>> Hi community,
>>
>> I am using PyFlink and Pandas UDF in my job.
>>
>> The job executes a SQL like this:
>>
>> ```
>> SELECT
>> LABEL_ENCODE(a),
>> LABEL_ENCODE(b),
>> LABEL_ENCODE(c)
>> ...
>> ```
>>
>> And my LABEL_ENCODE UDF is defined below:
>>
>> ```
>> class LabelEncode(ScalarFunction):
>>   def open(self, function_context):
>>     logging.info("LabelEncode.open")
>>     self.encoder = load_encoder()
>>   def eval(self, x):
>>     ...
>>
>> labelEncode = udf(LabelEncode(), ...)
>> ```
>>
>> When I run the job, according to taskmanger log, "LabelEncode.open" is
>> printed 3 times, which is exactly the times LABEL_ENCODE udf is called.
>>
>> Since every LabelEncode.open causes an I/O (load_encoder() does so), I
>> wonder if I can only initiate the UDF once, and use it 3 times?
>>
>> Thank you!
>>
>> Best,
>> Yik San
>>
>>
>>
>

Reply via email to