[
https://issues.apache.org/jira/browse/FLINK-39953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lucas Borges updated FLINK-39953:
---------------------------------
Description:
When a {{FlinkDeployment}} upgrade is triggered, the operator calls
{{deleteClusterDeployment()}} before deploying the new cluster.
Internally, {{deleteClusterInternal()}} calls {{deleteDeploymentBlocking()}}
then calling {{deleteBlocking() }}which issues the Kubernetes DELETE and
finally {{waitUntilCondition(Objects::isNull, timeout, MILLISECONDS)}} on the
fabric8 watch.
If the old cluster does not disappear within the configured
{{kubernetes.operator.cluster.shutdown-timeout}} the fabric8 client throws
{{{}KubernetesClientTimeoutException{}}}. Because any
{{KubernetesClientException}} is swallowed in a generic catch block, only a
WARN is emitted and the exception is *silently discarded:*
{{updateStatusAfterClusterDeletion()}} runs, marking the cluster as deleted in
the status, and the operator immediately proceeds to call
{{{}submitApplicationCluster(){}}}, deploying the new version on top of a
still-running cluster.
h2. Code Path
{code:java}
ApplicationReconciler.deploy()
└─ deleteClusterDeployment() ← calls deleteClusterInternal() +
updateStatusAfterClusterDeletion()
└─ deleteClusterInternal()
└─ deleteDeploymentBlocking()
└─ deleteBlocking()
└─ waitUntilCondition(Objects::isNull, timeout)
→ throws KubernetesClientTimeoutException
→ caught by KubernetesClientException handler ← BUG: exception swallowed
└─ submitApplicationCluster() ← called unconditionally, runs against
still-alive cluster {code}
h2. Reproducing the Issue
Deploy any flinkDeployment: {{{}flink-test-job{}}}, preferably with a low value
for {{kubernetes.operator.resource.cleanup.timeout}}
Then, add a blocking finalizer to the underlying {{Deployment}} like so:
{code:java}
kubectl patch deployment flink-test-job --type=json
-p='[{"op":"add","path":"/metadata/finalizers","value":["test/block-deletion"]}]'
{code}
Finally, increment the restartNonce counter:
{code:java}
kubectl patch flinkdeployment flink-test-job --type=merge -p
'{"spec":{"job":{"restartNonce": 123}}}'{code}
What the upgrade gets stuck with the following apache operator logs saying that
the flink job already exists, which is misleading.
{code:java}
>>> Event[Job] | Warning | ERROR | Could not create Kubernetes
>>> cluster "flink-marwan-onboarding". -> Failure executing: POST at:
>>> https://172.17.0.1:443/apis/apps/v1/namespaces/flink-data-eng-infra/deployments.
>>> Message: object is being deleted: deployments.apps "flink-test-job"
>>> already exists {code}
h2. Proposed Fix
Add an explicit {{catch (KubernetesClientTimeoutException)}} block
*{*}before{*}* the generic {{KubernetesClientException }}handler in
{{AbstractFlinkService.deleteBlocking()}} and re-throw it (see PR for the fix).
If {{deleteBlocking()}} times out waiting for the old cluster to disappear, the
exception propagates out of {{{}deploy(){}}}, the reconciler catches it as a
{{ReconciliationException}} and JOSDK schedules an exponential-backoff retry.
On retry the operator attempts deletion again. The new cluster is not submitted
until the old one is confirmed gone.
was:
When a `FlinkDeployment` upgrade is triggered, the operator calls
`deleteClusterDeployment()` before deploying the new cluster. Internally,
`deleteClusterInternal()` calls `deleteDeploymentBlocking()` →
`deleteBlocking()`, which issues the Kubernetes DELETE and then calls
`waitUntilCondition(Objects::isNull, timeout, MILLISECONDS)` on the fabric8
watch.
If the old cluster does not disappear within the configured
`kubernetes.operator.cluster.shutdown-timeout`, the fabric8 client throws
`KubernetesClientTimeoutException`. Because `KubernetesClientTimeoutException
extends KubernetesClientException` and overrides `getCode()` to return `0` (no
HTTP status code), it falls into the existing generic catch block:
```
} catch (KubernetesClientException kce) {
// We completely ignore not found errors and simply log others
if (kce.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
LOG.warn("Error while " + operation, kce);
}
}
```
The condition `0 != 404` is true, so only a WARN is emitted and the exception
is **silently discarded**. `updateStatusAfterClusterDeletion()` runs, marking
the cluster as deleted in the status, and the operator immediately proceeds to
call `submitApplicationCluster()`, deploying the new version on top of a
still-running cluster.
### Code Path
```
ApplicationReconciler.deploy()
└─ deleteClusterDeployment() ← calls deleteClusterInternal() +
updateStatusAfterClusterDeletion()
└─ deleteClusterInternal()
└─ deleteDeploymentBlocking()
└─ deleteBlocking()
└─ waitUntilCondition(Objects::isNull, timeout)
→ throws KubernetesClientTimeoutException
→ caught by KubernetesClientException handler ←
BUG: exception swallowed
└─ submitApplicationCluster() ← called unconditionally, runs against
still-alive cluster
```
### Operator Log Evidence
```
WARN AbstractFlinkService - Error while deleting JobManager Deployment
io.fabric8.kubernetes.client.KubernetesClientTimeoutException: Timed out
waiting for ...
INFO NativeFlinkService - Deploying application cluster ← should NOT appear
after timeout
```
### Proposed Fix
Add an explicit `catch (KubernetesClientTimeoutException)` block **before** the
generic `KubernetesClientException`
handler in `AbstractFlinkService.deleteBlocking()` and re-throw it:
```java
} catch (KubernetesClientTimeoutException e) {
throw e;
} catch (KubernetesClientException kce) {
// We completely ignore not found errors and simply log others
if (kce.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
LOG.warn("Error while " + operation, kce);
}
}
```
If `deleteBlocking()` times out waiting for the old cluster to disappear, the
exception propagates out of `deploy()`, the reconciler catches it as a
`ReconciliationException`, sets the status to `UPGRADING` (already persisted
before `deploy()` is entered), and JOSDK schedules an exponential-backoff
retry. On retry the operator attempts deletion again. The new cluster is not
submitted until the old one is confirmed gone.
The one callsite where a timeout should be absorbed rather than propagated is
`shutdownJobManagersBlocking()` in `NativeFlinkService`, which performs an
optional scale-to-zero step before the real deletion. That site catches
`KubernetesClientTimeoutException` locally, logs a warning, and proceeds.
> AbstractFlinkService.deleteBlocking() swallows
> KubernetesClientTimeoutException allowing cluster upgrade to proceed on top
> of still-running cluster
> ---------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39953
> URL: https://issues.apache.org/jira/browse/FLINK-39953
> Project: Flink
> Issue Type: Bug
> Components: Kubernetes Operator
> Affects Versions: kubernetes-operator-1.15.0
> Environment: Operator 1.12.1
> Flink 1.20.3
> Reporter: Lucas Borges
> Priority: Major
> Labels: pull-request-available
>
> When a {{FlinkDeployment}} upgrade is triggered, the operator calls
> {{deleteClusterDeployment()}} before deploying the new cluster.
> Internally, {{deleteClusterInternal()}} calls {{deleteDeploymentBlocking()}}
> then calling {{deleteBlocking() }}which issues the Kubernetes DELETE and
> finally {{waitUntilCondition(Objects::isNull, timeout, MILLISECONDS)}} on the
> fabric8 watch.
> If the old cluster does not disappear within the configured
> {{kubernetes.operator.cluster.shutdown-timeout}} the fabric8 client throws
> {{{}KubernetesClientTimeoutException{}}}. Because any
> {{KubernetesClientException}} is swallowed in a generic catch block, only a
> WARN is emitted and the exception is *silently discarded:*
> {{updateStatusAfterClusterDeletion()}} runs, marking the cluster as deleted
> in the status, and the operator immediately proceeds to call
> {{{}submitApplicationCluster(){}}}, deploying the new version on top of a
> still-running cluster.
> h2. Code Path
> {code:java}
> ApplicationReconciler.deploy()
> └─ deleteClusterDeployment() ← calls deleteClusterInternal() +
> updateStatusAfterClusterDeletion()
> └─ deleteClusterInternal()
> └─ deleteDeploymentBlocking()
> └─ deleteBlocking()
> └─ waitUntilCondition(Objects::isNull, timeout)
> → throws KubernetesClientTimeoutException
> → caught by KubernetesClientException handler ← BUG: exception swallowed
> └─ submitApplicationCluster() ← called unconditionally, runs against
> still-alive cluster {code}
> h2. Reproducing the Issue
> Deploy any flinkDeployment: {{{}flink-test-job{}}}, preferably with a low
> value for {{kubernetes.operator.resource.cleanup.timeout}}
> Then, add a blocking finalizer to the underlying {{Deployment}} like so:
> {code:java}
> kubectl patch deployment flink-test-job --type=json
> -p='[{"op":"add","path":"/metadata/finalizers","value":["test/block-deletion"]}]'
> {code}
>
>
> Finally, increment the restartNonce counter:
> {code:java}
> kubectl patch flinkdeployment flink-test-job --type=merge -p
> '{"spec":{"job":{"restartNonce": 123}}}'{code}
> What the upgrade gets stuck with the following apache operator logs saying
> that the flink job already exists, which is misleading.
> {code:java}
> >>> Event[Job] | Warning | ERROR | Could not create
> >>> Kubernetes cluster "flink-marwan-onboarding". -> Failure executing: POST
> >>> at:
> >>> https://172.17.0.1:443/apis/apps/v1/namespaces/flink-data-eng-infra/deployments.
> >>> Message: object is being deleted: deployments.apps "flink-test-job"
> >>> already exists {code}
> h2. Proposed Fix
> Add an explicit {{catch (KubernetesClientTimeoutException)}} block
> *{*}before{*}* the generic {{KubernetesClientException }}handler in
> {{AbstractFlinkService.deleteBlocking()}} and re-throw it (see PR for the
> fix).
>
> If {{deleteBlocking()}} times out waiting for the old cluster to disappear,
> the exception propagates out of {{{}deploy(){}}}, the reconciler catches it
> as a {{ReconciliationException}} and JOSDK schedules an exponential-backoff
> retry. On retry the operator attempts deletion again. The new cluster is not
> submitted until the old one is confirmed gone.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)