[ https://issues.apache.org/jira/browse/FLINK-23890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17550280#comment-17550280 ]
Dian Fu commented on FLINK-23890: --------------------------------- [~mayuehappy] Generally I think this is a valid optimization and so +1 from my side. Regarding to the optimization described in this ticket, it seems that it contains two parts: * Updating the timer registered in `processElement` from `watermark + 1` to `event timestamp` * Updating the timer registered in `onEventTime` from `watermark + 1` to `event timestamp + window time` Regarding to optimization 1), it means that timers will be created for each of the input elements. Regarding to the performance gain, I guess that it may mainly come from optimization 2). Could you verify that? > CepOperator may create a large number of timers and cause performance problems > ------------------------------------------------------------------------------ > > Key: FLINK-23890 > URL: https://issues.apache.org/jira/browse/FLINK-23890 > Project: Flink > Issue Type: Improvement > Components: Library / CEP > Affects Versions: 1.12.1 > Reporter: Yue Ma > Assignee: Nicholas Jiang > Priority: Major > Labels: pull-request-available > Attachments: image-2021-08-20-13-59-05-977.png > > > There are two situations in the CepOperator that may register the time when > dealing with EventTime. > when the processElement will buffer the data first, and then register a timer > with a timestamp of watermark+1. > {code:java} > if (timestamp > timerService.currentWatermark()) { > // we have an event with a valid timestamp, so > // we buffer it until we receive the proper watermark. > saveRegisterWatermarkTimer(); > bufferEvent(value, timestamp); > }{code} > The other is when the EventTimer is triggered, if sortedTimestamps or > partialMatches are not empty, a timer will also be registered. > {code:java} > if (!sortedTimestamps.isEmpty() || !partialMatches.isEmpty()) { > saveRegisterWatermarkTimer(); > }{code} > > The problem is, if the partialMatches corresponding to each of my keys are > not empty. Then every time the watermark advances, the timers of all keys > will be triggered, and then a new EventTimer is re-registered under each key. > When the number of task keys is very large, this operation greatly affects > performance. > !https://code.byted.org/inf/flink/uploads/91aee639553df07fa376cf2865e91fd2/image.png! > I think it is unnecessary to register EventTimer frequently like this and can > we make the following changes? > When an event comes, the timestamp of the EventTimer we registered is equal > to the EventTime of this event instead of watermark + 1. > When a new ComputionState with window is created (like *withIn* pattern ), > we use the timeout of this window to create EventTimer (EventTime + > WindowTime). > After making such an attempt in our test environment, the number of > registered timers has been greatly reduced, and the performance has been > greatly improved. > !https://code.byted.org/inf/flink/uploads/24b85492c6a34a35c4445a4fd46c8363/image.png! > > -- This message was sent by Atlassian Jira (v8.20.7#820007)