Thanks, Guowei, I see your point.
But I'm afraid there is no direct connection between delivery semantics and 
TimeService. 
Yes, obviously, Java Timer is the first thing that comes to mind, but it 
requires an extra thread to perform background work, this approach inflicts 
some drawbacks such as when a strict amount of resources is available for TM 
(when a job runs in YARN or Kubernetes clusters). I just wanted to piggyback on 
already existing TimeService.

The only possible reason for not sharing a TimeService with AyncFunction is 
that Flink guarantees that onTimer(...) and processElement(...) calls are 
synchronized. But these guarantees could've worked for AsyncFucntion as well.
So, my question still remains open: Why TimeService is unavailable for 
RichAsyncFunctions?

Thank you all for sharing your thoughts!

Kind Regards,
Mike Pryakhin

> On 28 Apr 2019, at 04:30, Guowei Ma <guowei....@gmail.com> wrote:
> 
> Hi,
> 
> AFAIK, TimeService in Flink could guarantee the semantics of "at least 
> once/exactly once" after failure.
> If you only want to reload config periodically I think you could use Java 
> native Timer yourself.
> 
> Best,
> Guowei
> 
> 
> Guowei Ma <guowei....@gmail.com <mailto:guowei....@gmail.com>> 于2019年4月28日周日 
> 上午9:25写道:
> Hi
> AFAIK, TimeService is Flink could guarantee the semastics of 
> Best,
> Guowei
> 
> 
> Mikhail Pryakhin <m.prya...@gmail.com <mailto:m.prya...@gmail.com>> 
> 于2019年4月26日周五 下午7:57写道:
> Hi David, 
> Thank you!
> 
> Yes, fair enough, but take for instance a BucketingSink class[1], it is a 
> RichFunction which employs Timeservice to execute time-based logic, which is 
> not directly associated with an event flow, like for example closing files 
> every n minutes, etc. In an AsyncFunction I intended to use Timeservice the 
> same way, like periodically reloading configuration from outside. 
> Does it make sense?
> 
> [1] 
> https://github.com/apache/flink/blob/24c2e17c8d52ae2f0f897a5806a3a44fdf62b0a5/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L405
>  
> <https://github.com/apache/flink/blob/24c2e17c8d52ae2f0f897a5806a3a44fdf62b0a5/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L405>
> Kind Regards,
> Mike Pryakhin
> 
>> On 25 Apr 2019, at 10:59, Dawid Wysakowicz <dwysakow...@apache.org 
>> <mailto:dwysakow...@apache.org>> wrote:
>> 
>> Hi Mike,
>> 
>> I think the reason why there is no access to TimerService in async function 
>> is that as it is an async function, there are no guarantees when/and 
>> where(at which stage of the pipeline) the function will actually be 
>> executed. This characteristic doesn't align with TimerService and timely 
>> callbacks.
>> 
>> Best,
>> 
>> Dawid
>> 
>> On 19/04/2019 17:41, Mikhail Pryakhin wrote:
>>> Hello, Flink community!
>>> 
>>> It happens that I need to access a timer service in a RichAsyncFunction 
>>> implementation. I know it's normally accomplished via 
>>> StreamingRuntimeContext instance available in a RichFunction, but 
>>> unfortunately, RichAsyncFunction extending RichFunction overrides 
>>> “setRuntimeContext” method [1] wrapping a RuntimeContext instance passed as 
>>> the method argument into a RichAsyncFunctionRuntimeContext instance [2]. 
>>> This RichAsyncFunction specific RuntimeContext implementation is private 
>>> [2] which makes it infeasible to gain access to a wrapped original 
>>> RuntimeContext thus making it impossible to leverage timer service in 
>>> RichAsyncFunction implementations. Just curious is there any reason for 
>>> that? Can we make this implementation public or somehow share a wrapped 
>>> instance?
>>> 
>>> Many thanks in advance!
>>> 
>>> [1] 
>>> https://github.com/apache/flink/blob/c96a4d7afe379a291cc538ca36af896df8dc2127/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java#L76
>>>  
>>> <https://github.com/apache/flink/blob/c96a4d7afe379a291cc538ca36af896df8dc2127/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java#L76>
>>> [2] 
>>> https://github.com/apache/flink/blob/c96a4d7afe379a291cc538ca36af896df8dc2127/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java#L100
>>>  
>>> <https://github.com/apache/flink/blob/c96a4d7afe379a291cc538ca36af896df8dc2127/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java#L100>
>>> 
>>> 
>>> 
>>> Kind Regards,
>>> Mike Pryakhin
>>> 
> 

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to