[ 
https://issues.apache.org/jira/browse/FLINK-39953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Borges updated FLINK-39953:
---------------------------------
    Description: 
When a {{FlinkDeployment}} upgrade is triggered, the operator calls 
{{deleteClusterDeployment()}} before deploying the new cluster.

Internally, {{deleteClusterInternal()}} calls {{deleteDeploymentBlocking()}} 
then calling {{deleteBlocking() }}which issues the Kubernetes DELETE and 
finally {{waitUntilCondition(Objects::isNull, timeout, MILLISECONDS)}} on the 
fabric8 watch.

If the old cluster does not disappear within the configured 
{{kubernetes.operator.cluster.shutdown-timeout}} the fabric8 client throws 
{{{}KubernetesClientTimeoutException{}}}. Because any 
{{KubernetesClientException}} is swallowed in a generic catch block, only a 
WARN is emitted and the exception is *silently discarded:* 
{{updateStatusAfterClusterDeletion()}} runs, marking the cluster as deleted in 
the status, and the operator immediately proceeds to call 
{{{}submitApplicationCluster(){}}}, deploying the new version on top of a 
still-running cluster.
h2. Code Path


{code:java}
ApplicationReconciler.deploy()
└─ deleteClusterDeployment() ← calls deleteClusterInternal() + 
updateStatusAfterClusterDeletion()
└─ deleteClusterInternal()
└─ deleteDeploymentBlocking()
└─ deleteBlocking()
└─ waitUntilCondition(Objects::isNull, timeout)
→ throws KubernetesClientTimeoutException
→ caught by KubernetesClientException handler ← BUG: exception swallowed
└─ submitApplicationCluster() ← called unconditionally, runs against 
still-alive cluster {code}
h2. Reproducing the Issue

Deploy any flinkDeployment: {{{}flink-test-job{}}}, preferably with a low value 
for {{kubernetes.operator.resource.cleanup.timeout}}

Then, add a blocking finalizer to the underlying {{Deployment}} like so:
{code:java}
kubectl patch deployment flink-test-job --type=json 
-p='[{"op":"add","path":"/metadata/finalizers","value":["test/block-deletion"]}]'
 {code}
 

 

Finally, increment the restartNonce counter:
{code:java}
kubectl patch flinkdeployment flink-test-job --type=merge -p 
'{"spec":{"job":{"restartNonce": 123}}}'{code}

What the upgrade gets stuck with the following apache operator logs saying that 
the flink job already exists, which is misleading.


{code:java}
>>> Event[Job]       | Warning | ERROR           | Could not create Kubernetes 
>>> cluster "flink-marwan-onboarding". -> Failure executing: POST at: 
>>> https://172.17.0.1:443/apis/apps/v1/namespaces/flink-data-eng-infra/deployments.
>>>  Message: object is being deleted: deployments.apps "flink-test-job" 
>>> already exists {code}
h2. Proposed Fix

Add an explicit {{catch (KubernetesClientTimeoutException)}} block 
*{*}before{*}* the generic {{KubernetesClientException }}handler in 
{{AbstractFlinkService.deleteBlocking()}} and re-throw it (see PR for the fix).



 

If {{deleteBlocking()}} times out waiting for the old cluster to disappear, the 
exception propagates out of {{{}deploy(){}}}, the reconciler catches it as a 
{{ReconciliationException}} and JOSDK schedules an exponential-backoff retry. 
On retry the operator attempts deletion again. The new cluster is not submitted 
until the old one is confirmed gone.

  was:
When a `FlinkDeployment` upgrade is triggered, the operator calls 
`deleteClusterDeployment()` before deploying the new cluster. Internally, 
`deleteClusterInternal()` calls `deleteDeploymentBlocking()` → 
`deleteBlocking()`, which issues the Kubernetes DELETE and then calls 
`waitUntilCondition(Objects::isNull, timeout, MILLISECONDS)` on the fabric8 
watch.

If the old cluster does not disappear within the configured 
`kubernetes.operator.cluster.shutdown-timeout`, the fabric8 client throws 
`KubernetesClientTimeoutException`. Because `KubernetesClientTimeoutException 
extends KubernetesClientException` and overrides `getCode()` to return `0` (no 
HTTP status code), it falls into the existing generic catch block:

```
} catch (KubernetesClientException kce) {
    // We completely ignore not found errors and simply log others
    if (kce.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
        LOG.warn("Error while " + operation, kce);
    }
}
```

The condition `0 != 404` is true, so only a WARN is emitted and the exception 
is **silently discarded**. `updateStatusAfterClusterDeletion()` runs, marking 
the cluster as deleted in the status, and the operator immediately proceeds to 
call `submitApplicationCluster()`, deploying the new version on top of a 
still-running cluster.

### Code Path

```
ApplicationReconciler.deploy()
  └─ deleteClusterDeployment()          ← calls deleteClusterInternal() + 
updateStatusAfterClusterDeletion()
       └─ deleteClusterInternal()
            └─ deleteDeploymentBlocking()
                 └─ deleteBlocking()
                      └─ waitUntilCondition(Objects::isNull, timeout)
                           → throws KubernetesClientTimeoutException
                           → caught by KubernetesClientException handler   ← 
BUG: exception swallowed
  └─ submitApplicationCluster()         ← called unconditionally, runs against 
still-alive cluster
```

### Operator Log Evidence

```
WARN  AbstractFlinkService - Error while deleting JobManager Deployment
      io.fabric8.kubernetes.client.KubernetesClientTimeoutException: Timed out 
waiting for ...
INFO  NativeFlinkService - Deploying application cluster    ← should NOT appear 
after timeout
```

### Proposed Fix

Add an explicit `catch (KubernetesClientTimeoutException)` block **before** the 
generic `KubernetesClientException`
handler in `AbstractFlinkService.deleteBlocking()` and re-throw it:

```java
} catch (KubernetesClientTimeoutException e) {
    throw e;
} catch (KubernetesClientException kce) {
    // We completely ignore not found errors and simply log others
    if (kce.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
        LOG.warn("Error while " + operation, kce);
    }
}
```

If `deleteBlocking()` times out waiting for the old cluster to disappear, the 
exception propagates out of `deploy()`, the reconciler catches it as a 
`ReconciliationException`, sets the status to `UPGRADING` (already persisted 
before `deploy()` is entered), and JOSDK schedules an exponential-backoff 
retry. On retry the operator attempts deletion again. The new cluster is not 
submitted until the old one is confirmed gone.

The one callsite where a timeout should be absorbed rather than propagated is 
`shutdownJobManagersBlocking()` in `NativeFlinkService`, which performs an 
optional scale-to-zero step before the real deletion. That site catches 
`KubernetesClientTimeoutException` locally, logs a warning, and proceeds.


> AbstractFlinkService.deleteBlocking() swallows 
> KubernetesClientTimeoutException allowing cluster upgrade to proceed on top 
> of still-running cluster
> ---------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39953
>                 URL: https://issues.apache.org/jira/browse/FLINK-39953
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>    Affects Versions: kubernetes-operator-1.15.0
>         Environment: Operator 1.12.1
> Flink 1.20.3
>            Reporter: Lucas Borges
>            Priority: Major
>              Labels: pull-request-available
>
> When a {{FlinkDeployment}} upgrade is triggered, the operator calls 
> {{deleteClusterDeployment()}} before deploying the new cluster.
> Internally, {{deleteClusterInternal()}} calls {{deleteDeploymentBlocking()}} 
> then calling {{deleteBlocking() }}which issues the Kubernetes DELETE and 
> finally {{waitUntilCondition(Objects::isNull, timeout, MILLISECONDS)}} on the 
> fabric8 watch.
> If the old cluster does not disappear within the configured 
> {{kubernetes.operator.cluster.shutdown-timeout}} the fabric8 client throws 
> {{{}KubernetesClientTimeoutException{}}}. Because any 
> {{KubernetesClientException}} is swallowed in a generic catch block, only a 
> WARN is emitted and the exception is *silently discarded:* 
> {{updateStatusAfterClusterDeletion()}} runs, marking the cluster as deleted 
> in the status, and the operator immediately proceeds to call 
> {{{}submitApplicationCluster(){}}}, deploying the new version on top of a 
> still-running cluster.
> h2. Code Path
> {code:java}
> ApplicationReconciler.deploy()
> └─ deleteClusterDeployment() ← calls deleteClusterInternal() + 
> updateStatusAfterClusterDeletion()
> └─ deleteClusterInternal()
> └─ deleteDeploymentBlocking()
> └─ deleteBlocking()
> └─ waitUntilCondition(Objects::isNull, timeout)
> → throws KubernetesClientTimeoutException
> → caught by KubernetesClientException handler ← BUG: exception swallowed
> └─ submitApplicationCluster() ← called unconditionally, runs against 
> still-alive cluster {code}
> h2. Reproducing the Issue
> Deploy any flinkDeployment: {{{}flink-test-job{}}}, preferably with a low 
> value for {{kubernetes.operator.resource.cleanup.timeout}}
> Then, add a blocking finalizer to the underlying {{Deployment}} like so:
> {code:java}
> kubectl patch deployment flink-test-job --type=json 
> -p='[{"op":"add","path":"/metadata/finalizers","value":["test/block-deletion"]}]'
>  {code}
>  
>  
> Finally, increment the restartNonce counter:
> {code:java}
> kubectl patch flinkdeployment flink-test-job --type=merge -p 
> '{"spec":{"job":{"restartNonce": 123}}}'{code}
> What the upgrade gets stuck with the following apache operator logs saying 
> that the flink job already exists, which is misleading.
> {code:java}
> >>> Event[Job]       | Warning | ERROR           | Could not create 
> >>> Kubernetes cluster "flink-marwan-onboarding". -> Failure executing: POST 
> >>> at: 
> >>> https://172.17.0.1:443/apis/apps/v1/namespaces/flink-data-eng-infra/deployments.
> >>>  Message: object is being deleted: deployments.apps "flink-test-job" 
> >>> already exists {code}
> h2. Proposed Fix
> Add an explicit {{catch (KubernetesClientTimeoutException)}} block 
> *{*}before{*}* the generic {{KubernetesClientException }}handler in 
> {{AbstractFlinkService.deleteBlocking()}} and re-throw it (see PR for the 
> fix).
>  
> If {{deleteBlocking()}} times out waiting for the old cluster to disappear, 
> the exception propagates out of {{{}deploy(){}}}, the reconciler catches it 
> as a {{ReconciliationException}} and JOSDK schedules an exponential-backoff 
> retry. On retry the operator attempts deletion again. The new cluster is not 
> submitted until the old one is confirmed gone.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to