[ 
https://issues.apache.org/jira/browse/FLINK-23890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17550280#comment-17550280
 ] 

Dian Fu commented on FLINK-23890:
---------------------------------

[~mayuehappy] Generally I think this is a valid optimization and so +1 from my 
side. 

Regarding to the optimization described in this ticket, it seems that it 
contains two parts: 
* Updating the timer registered in `processElement` from `watermark + 1` to 
`event timestamp`
* Updating the timer registered in `onEventTime` from `watermark + 1` to `event 
timestamp + window time`

Regarding to optimization 1), it means that timers will be created for each of 
the input elements. Regarding to the performance gain, I guess that it may 
mainly come from optimization 2). Could you verify that?

> CepOperator may create a large number of timers and cause performance problems
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-23890
>                 URL: https://issues.apache.org/jira/browse/FLINK-23890
>             Project: Flink
>          Issue Type: Improvement
>          Components: Library / CEP
>    Affects Versions: 1.12.1
>            Reporter: Yue Ma
>            Assignee: Nicholas Jiang
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2021-08-20-13-59-05-977.png
>
>
>  There are two situations in the CepOperator that may register the time when 
> dealing with EventTime. 
> when the processElement will buffer the data first, and then register a timer 
> with a timestamp of watermark+1.
> {code:java}
> if (timestamp > timerService.currentWatermark()) {
>  // we have an event with a valid timestamp, so
>  // we buffer it until we receive the proper watermark.
>  saveRegisterWatermarkTimer();
>  bufferEvent(value, timestamp);
> }{code}
> The other is when the EventTimer is triggered, if sortedTimestamps or 
> partialMatches are not empty, a timer will also be registered.
> {code:java}
> if (!sortedTimestamps.isEmpty() || !partialMatches.isEmpty()) {
>  saveRegisterWatermarkTimer();
> }{code}
>  
> The problem is, if the partialMatches corresponding to each of my keys are 
> not empty. Then every time the watermark advances, the timers of all keys 
> will be triggered, and then a new EventTimer is re-registered under each key. 
> When the number of task keys is very large, this operation greatly affects 
> performance.
> !https://code.byted.org/inf/flink/uploads/91aee639553df07fa376cf2865e91fd2/image.png!
> I think it is unnecessary to register EventTimer frequently like this and can 
> we make the following changes?
> When an event comes, the timestamp of the EventTimer we registered is equal 
> to the EventTime of this event instead of watermark + 1.
> When a new ComputionState with window is created (like *withIn* pattern ),  
> we use the timeout of this window to create EventTimer (EventTime + 
> WindowTime). 
> After making such an attempt in our test environment, the number of 
> registered timers has been greatly reduced, and the performance has been 
> greatly improved.
> !https://code.byted.org/inf/flink/uploads/24b85492c6a34a35c4445a4fd46c8363/image.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to