[ 
https://issues.apache.org/jira/browse/FLINK-26370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17498941#comment-17498941
 ] 

Sandor Kelemen edited comment on FLINK-26370 at 2/28/22, 9:04 PM:
------------------------------------------------------------------

I'm quite new to this project, and wondering what problems these sync calls 
cause exactly?

The only thing I see is they block the operator sdk threads that are triggering 
and running the Reconciler. If the operator sdk uses a fix-sized thread pool 
(like 
[here|https://github.com/java-operator-sdk/java-operator-sdk/blob/a7eba4303f2962f79a137c8c35fc77d8075b1e5f/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java#L96]),
 some slow flink cluster calls could slow down the reconciliation for other 
(independent) resources, as all the threads in the fixed pool would be blocked 
on the sync calls.

 

Some potential solutions I see now:

 

1. Use non-blocking async APIs

An elegant solution would be to completely avoid thread-blocking by using 
non-blocking async APIs (i.e. passing and triggering call-backs). If I saw 
properly, none of the libraries we use (java-operator-sdk, fabric8 kubernetes 
client, flink client) support this API style, so we can not avoid 
thread-blocking completely unless we switch to other libraries.

 

2. Configure java-operator-sdk to use an unbounded thread pool

I'm not sure the operator sdk supports it, but it might be a quick win if it 
does.

 

3. Execute reconciliation on a new thread pool within the flink operator

This could involve:
 - a new unbounded thread pool in the operator
 - introduce some flag in the FlinkDeploymentStatus to indicate that the 
reconciliation process is in progress for that resource 
(isReconsiliationInProgress)
 - when the operator sdk calls reconcile, submit a task to the thread pool, 
that will actually perform the reconciliation and immediately return a 
UpdateControl with isReconsiliationInProgress=true
 - make sure a maximum of 1 such uncompleted task is active in the thread 
pool's queue per resource and also cleanup is implemented properly
 - when the actual asynchronous reconciliation process finishes and the 
UpdateControl is computed, we could save the UpdateControl object in the memory 
of the operator and use an EventSource to trigger a reconciliation for that 
resource. The triggered reconciliation could recognise the already computed 
UpdateControl and return it immediately with isReconsiliationInProgress=false
 - If the reconciliation triggered via the EventSource is not guaranteed to run 
on the same operator instance, we need to find a suitable place to persist 
[EDIT:] -it- {_}the UpdateControl{_}.
 - there might be a risk with this solution, that the semantics of the 
reconcile method in the Reconciler changes somewhat: previously a returned 
empty UpdateControl indicated that the reconciliation is finished and no 
changes are necessary, but with the above solution in place, the reconciliation 
might be still in progress despite the empty UpdateControl. I'm not sure this 
causes any problems.

3/B. A variation of option 3. could be to use the kubernetes api directly to 
perform the updates instead of using an EventSource. This might be attractive 
if we can not store the UpdateControl object in memory, so instead of 
persisting it to an intermediate place, we can persist it straight to the 
Kubernetes API. In this case, there might be interference with the operator 
sdk, as both the sdk and the operator might want to update the resource on the 
Kubernetes API. It should be okay, as the operator sdk uses optimistic version 
control.

As mentioned, I'm quite new here, so happy to hear what other problems you see 
with these blocking calls and also any comments/suggestions on the above 
solutions.


was (Author: JIRAUSER285809):
I'm quite new to this project, and wondering what problems these sync calls 
cause exactly?


The only thing I see is they block the operator sdk threads that are triggering 
and running the Reconciler. If the operator sdk uses a fix-sized thread pool 
(like 
[here|https://github.com/java-operator-sdk/java-operator-sdk/blob/a7eba4303f2962f79a137c8c35fc77d8075b1e5f/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java#L96]),
 some slow flink cluster calls could slow down the reconciliation for other 
(independent) resources, as all the threads in the fixed pool would be blocked 
on the sync calls.

 

Some potential solutions I see now:

 

1. Use non-blocking async APIs

An elegant solution would be to completely avoid thread-blocking by using 
non-blocking async APIs (i.e. passing and triggering call-backs). If I saw 
properly, none of the libraries we use (java-operator-sdk, fabric8 kubernetes 
client, flink client) support this API style, so we can not avoid 
thread-blocking completely unless we switch to other libraries.

 

2. Configure java-operator-sdk to use an unbounded thread pool

I'm not sure the operator sdk supports it, but it might be a quick win if it 
does.

 

3. Execute reconciliation on a new thread pool within the flink operator

This could involve:
- a new unbounded thread pool in the operator
- introduce some flag in the FlinkDeploymentStatus to indicate that the 
reconciliation process is in progress for that resource 
(isReconsiliationInProgress)
- when the operator sdk calls reconcile, submit a task to the thread pool, that 
will actually perform the reconciliation and immediately return a UpdateControl 
with isReconsiliationInProgress=true
- make sure a maximum of 1 such uncompleted task is active in the thread pool's 
queue per resource and also cleanup is implemented properly
- when the actual asynchronous reconciliation process finishes and the 
UpdateControl is computed, we could save the UpdateControl object in the memory 
of the operator and use an EventSource to trigger a reconciliation for that 
resource. The triggered reconciliation could recognise the already computed 
UpdateControl and return it immediately with isReconsiliationInProgress=false
- If the reconciliation triggered via the EventSource is not guaranteed to run 
on the same operator instance, we need to find a suitable place to persist it
- there might be a risk with this solution, that the semantics of the reconcile 
method in the Reconciler changes somewhat: previously a returned empty 
UpdateControl indicated that the reconciliation is finished and no changes are 
necessary, but with the above solution in place, the reconciliation might be 
still in progress despite the empty UpdateControl. I'm not sure this causes any 
problems.


3/B. A variation of option 3. could be to use the kubernetes api directly to 
perform the updates instead of using an EventSource. This might be attractive 
if we can not store the UpdateControl object in memory, so instead of 
persisting it to an intermediate place, we can persist it straight to the 
Kubernetes API. In this case, there might be interference with the operator 
sdk, as both the sdk and the operator might want to update the resource on the 
Kubernetes API. It should be okay, as the operator sdk uses optimistic version 
control.


As mentioned, I'm quite new here, so happy to hear what other problems you see 
with these blocking calls and also any comments/suggestions on the above 
solutions.

> Make Flink cluster communication asynchronous
> ---------------------------------------------
>
>                 Key: FLINK-26370
>                 URL: https://issues.apache.org/jira/browse/FLINK-26370
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Kubernetes Operator
>            Reporter: Gyula Fora
>            Assignee: Sandor Kelemen
>            Priority: Major
>
> In the current architecture calls to the flink clusters (through the rest 
> client) are made synchronously from the reconcile loop. 
> These calls often take a long time due to various (compeltely normal) reasons:
>  - Cluster is not ready -> long call + timeoutexception
>  - Operation takes a long time -> cancel/savepoint operations are often 
> expected to take seconds/minutes
> Both the observer and reconciler components make these calls.
> We should come up with a way to avoid making these sync calls from the main 
> loop while still preserving the logic of the operator.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to