[ https://issues.apache.org/jira/browse/FLINK-26370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17498941#comment-17498941 ]
Sandor Kelemen edited comment on FLINK-26370 at 2/28/22, 9:04 PM: ------------------------------------------------------------------ I'm quite new to this project, and wondering what problems these sync calls cause exactly? The only thing I see is they block the operator sdk threads that are triggering and running the Reconciler. If the operator sdk uses a fix-sized thread pool (like [here|https://github.com/java-operator-sdk/java-operator-sdk/blob/a7eba4303f2962f79a137c8c35fc77d8075b1e5f/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java#L96]), some slow flink cluster calls could slow down the reconciliation for other (independent) resources, as all the threads in the fixed pool would be blocked on the sync calls. Some potential solutions I see now: 1. Use non-blocking async APIs An elegant solution would be to completely avoid thread-blocking by using non-blocking async APIs (i.e. passing and triggering call-backs). If I saw properly, none of the libraries we use (java-operator-sdk, fabric8 kubernetes client, flink client) support this API style, so we can not avoid thread-blocking completely unless we switch to other libraries. 2. Configure java-operator-sdk to use an unbounded thread pool I'm not sure the operator sdk supports it, but it might be a quick win if it does. 3. Execute reconciliation on a new thread pool within the flink operator This could involve: - a new unbounded thread pool in the operator - introduce some flag in the FlinkDeploymentStatus to indicate that the reconciliation process is in progress for that resource (isReconsiliationInProgress) - when the operator sdk calls reconcile, submit a task to the thread pool, that will actually perform the reconciliation and immediately return a UpdateControl with isReconsiliationInProgress=true - make sure a maximum of 1 such uncompleted task is active in the thread pool's queue per resource and also cleanup is implemented properly - when the actual asynchronous reconciliation process finishes and the UpdateControl is computed, we could save the UpdateControl object in the memory of the operator and use an EventSource to trigger a reconciliation for that resource. The triggered reconciliation could recognise the already computed UpdateControl and return it immediately with isReconsiliationInProgress=false - If the reconciliation triggered via the EventSource is not guaranteed to run on the same operator instance, we need to find a suitable place to persist [EDIT:] -it- {_}the UpdateControl{_}. - there might be a risk with this solution, that the semantics of the reconcile method in the Reconciler changes somewhat: previously a returned empty UpdateControl indicated that the reconciliation is finished and no changes are necessary, but with the above solution in place, the reconciliation might be still in progress despite the empty UpdateControl. I'm not sure this causes any problems. 3/B. A variation of option 3. could be to use the kubernetes api directly to perform the updates instead of using an EventSource. This might be attractive if we can not store the UpdateControl object in memory, so instead of persisting it to an intermediate place, we can persist it straight to the Kubernetes API. In this case, there might be interference with the operator sdk, as both the sdk and the operator might want to update the resource on the Kubernetes API. It should be okay, as the operator sdk uses optimistic version control. As mentioned, I'm quite new here, so happy to hear what other problems you see with these blocking calls and also any comments/suggestions on the above solutions. was (Author: JIRAUSER285809): I'm quite new to this project, and wondering what problems these sync calls cause exactly? The only thing I see is they block the operator sdk threads that are triggering and running the Reconciler. If the operator sdk uses a fix-sized thread pool (like [here|https://github.com/java-operator-sdk/java-operator-sdk/blob/a7eba4303f2962f79a137c8c35fc77d8075b1e5f/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java#L96]), some slow flink cluster calls could slow down the reconciliation for other (independent) resources, as all the threads in the fixed pool would be blocked on the sync calls. Some potential solutions I see now: 1. Use non-blocking async APIs An elegant solution would be to completely avoid thread-blocking by using non-blocking async APIs (i.e. passing and triggering call-backs). If I saw properly, none of the libraries we use (java-operator-sdk, fabric8 kubernetes client, flink client) support this API style, so we can not avoid thread-blocking completely unless we switch to other libraries. 2. Configure java-operator-sdk to use an unbounded thread pool I'm not sure the operator sdk supports it, but it might be a quick win if it does. 3. Execute reconciliation on a new thread pool within the flink operator This could involve: - a new unbounded thread pool in the operator - introduce some flag in the FlinkDeploymentStatus to indicate that the reconciliation process is in progress for that resource (isReconsiliationInProgress) - when the operator sdk calls reconcile, submit a task to the thread pool, that will actually perform the reconciliation and immediately return a UpdateControl with isReconsiliationInProgress=true - make sure a maximum of 1 such uncompleted task is active in the thread pool's queue per resource and also cleanup is implemented properly - when the actual asynchronous reconciliation process finishes and the UpdateControl is computed, we could save the UpdateControl object in the memory of the operator and use an EventSource to trigger a reconciliation for that resource. The triggered reconciliation could recognise the already computed UpdateControl and return it immediately with isReconsiliationInProgress=false - If the reconciliation triggered via the EventSource is not guaranteed to run on the same operator instance, we need to find a suitable place to persist it - there might be a risk with this solution, that the semantics of the reconcile method in the Reconciler changes somewhat: previously a returned empty UpdateControl indicated that the reconciliation is finished and no changes are necessary, but with the above solution in place, the reconciliation might be still in progress despite the empty UpdateControl. I'm not sure this causes any problems. 3/B. A variation of option 3. could be to use the kubernetes api directly to perform the updates instead of using an EventSource. This might be attractive if we can not store the UpdateControl object in memory, so instead of persisting it to an intermediate place, we can persist it straight to the Kubernetes API. In this case, there might be interference with the operator sdk, as both the sdk and the operator might want to update the resource on the Kubernetes API. It should be okay, as the operator sdk uses optimistic version control. As mentioned, I'm quite new here, so happy to hear what other problems you see with these blocking calls and also any comments/suggestions on the above solutions. > Make Flink cluster communication asynchronous > --------------------------------------------- > > Key: FLINK-26370 > URL: https://issues.apache.org/jira/browse/FLINK-26370 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator > Reporter: Gyula Fora > Assignee: Sandor Kelemen > Priority: Major > > In the current architecture calls to the flink clusters (through the rest > client) are made synchronously from the reconcile loop. > These calls often take a long time due to various (compeltely normal) reasons: > - Cluster is not ready -> long call + timeoutexception > - Operation takes a long time -> cancel/savepoint operations are often > expected to take seconds/minutes > Both the observer and reconciler components make these calls. > We should come up with a way to avoid making these sync calls from the main > loop while still preserving the logic of the operator. -- This message was sent by Atlassian Jira (v8.20.1#820001)