Dennis-Mircea commented on code in PR #1065:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/1065#discussion_r3181634044
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java:
##########
@@ -178,20 +181,59 @@ public DeleteControl
cleanupInternal(FlinkResourceContext<FlinkSessionJob> ctx)
}
public static boolean sessionClusterReady(Optional<FlinkDeployment>
flinkDeploymentOpt) {
- if (flinkDeploymentOpt.isPresent()) {
- var flinkdep = flinkDeploymentOpt.get();
- var jobmanagerDeploymentStatus =
flinkdep.getStatus().getJobManagerDeploymentStatus();
- if (jobmanagerDeploymentStatus !=
JobManagerDeploymentStatus.READY) {
- LOG.info(
- "Session cluster deployment is in {} status, not ready
for serve",
- jobmanagerDeploymentStatus);
- return false;
- } else {
- return true;
- }
- } else {
+ if (flinkDeploymentOpt.isEmpty()) {
LOG.warn("Session cluster deployment is not found");
return false;
}
+ var flinkdep = flinkDeploymentOpt.get();
+ var jobmanagerDeploymentStatus =
flinkdep.getStatus().getJobManagerDeploymentStatus();
+ if (jobmanagerDeploymentStatus != JobManagerDeploymentStatus.READY) {
+ LOG.info(
+ "Session cluster deployment is in {} status, not ready for
serve",
+ jobmanagerDeploymentStatus);
+ return false;
+ }
+
+ // Block while FlinkDeployment is in a transitional reconciliation
state
+ var reconciliationState =
flinkdep.getStatus().getReconciliationStatus().getState();
+ if (reconciliationState != ReconciliationState.DEPLOYED
+ && reconciliationState != ReconciliationState.ROLLED_BACK) {
+ LOG.info(
+ "Session cluster deployment reconciliation state is {},
not ready for serve",
+ reconciliationState);
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Checks that the FlinkDeployment has no pending spec changes
+ *
+ * <p>When they differ, the deployment spec was changed but the operator
hasn't acted yet. The
+ * cluster may still appear healthy (JM READY, state DEPLOYED), but it is
about to be updated
+ * (e.g. deleted and recreated). Allowing a session job to start upgrading
in this window would
+ * risk the savepoint being destroyed during the cluster rebuild.
+ *
+ * <p>This check is only used in {@link #readyToReconcile}, not in {@link
#sessionClusterReady},
+ * so it does not block cleanup or FlinkService creation.
+ */
Review Comment:
I would suggest here to mention that this check is intended to cover a very
early & incipient upgrade stage of the session cluster:
```suggestion
/**
* Checks that the FlinkDeployment has no pending spec changes, i.e.
that {@code
* metadata.generation} equals {@code status.observedGeneration}.
*
* <p>This guard targets the earliest, most incipient stage of a
session-cluster upgrade: the
* brief window between the user submitting a new FlinkDeployment spec
and the
* FlinkDeploymentController observing it and starting to act on it.
During that window the
* cluster still appears healthy from every other angle (JM READY,
reconciliation state
* DEPLOYED), so {@link #sessionClusterReady} alone would let a
session-job upgrade through.
* Allowing a savepoint upgrade to start here would risk the savepoint
being destroyed during
* the cluster rebuild.
*
* <p>This check is only used in {@link #readyToReconcile}, not in
{@link #sessionClusterReady},
* so it does not block cleanup of {@link
* org.apache.flink.kubernetes.operator.service.FlinkService} creation.
*/
```
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java:
##########
@@ -64,9 +66,38 @@ public SessionReconciler(
@Override
protected boolean readyToReconcile(FlinkResourceContext<FlinkDeployment>
ctx) {
+ var deployment = ctx.getResource();
+ // Only check when there is a pending spec change on the deployment
itself
+ if (!Objects.equals(
+ deployment.getMetadata().getGeneration(),
+ deployment.getStatus().getObservedGeneration())) {
+ var deploymentName = deployment.getMetadata().getName();
+ boolean hasTransitioningSessionJob =
+
ctx.getJosdkContext().getSecondaryResources(FlinkSessionJob.class).stream()
+ .filter(job ->
deploymentName.equals(job.getSpec().getDeploymentName()))
+
.anyMatch(SessionReconciler::isSessionJobTransitioning);
+ if (hasTransitioningSessionJob) {
+ LOG.info(
+ "Deferring session cluster upgrade: associated session
jobs are being upgraded");
+ return false;
+ }
+ }
return true;
}
+ private static boolean isSessionJobTransitioning(FlinkSessionJob job) {
+ var reconStatus = job.getStatus().getReconciliationStatus();
+ // New session jobs state is UPGRADING by default before first
deployment. Treating them as
+ // transitioning would deadlock: the deployment can't
+ // start because it waits for the job, and the job can't start because
it waits for the
+ // cluster.
+ if (reconStatus.isBeforeFirstDeployment()) {
+ return false;
+ }
+ var state = reconStatus.getState();
+ return state == ReconciliationState.UPGRADING || state ==
ReconciliationState.ROLLING_BACK;
Review Comment:
I tested this end-to-end against with the change scoped to
`SessionJobReconciler` only, with both `upgradeMode: stateless` and
`upgradeMode: savepoint` (HA + persistent checkpoint storage), and it behaves
correctly, with `no Connection refused` / `JobNotFoundException` received. The
savepoint falls back cleanly to last-state when needed. So keeping the support
as part of `SessionJobReconciler` should be all good.
The asymmetry is mostly fine in practice: stateful jobs running with HA +
checkpointing recover transparently via the savepoint -> last-state fallback.
One case worth flagging though:
- **Stateful jobs without HA**: In the rare ordering where a session-job
upgrade is already in flight when the FlinkDeployment starts upgrading,
`last-state` has nothing to fallback to and the job can end up parked in
waiting for upgradeable state. Probably best treated as a documented
prerequisite (HA + checkpointing for stateful session jobs) rather than
something to gate the deployment side on. This case I can also address &
document as part of
[FLINK-39571](https://issues.apache.org/jira/browse/FLINK-39571).
One small follow-up suggestion: it would help future readers if
`SessionReconciler#readyToReconcile` will carry a short JavaDoc explaining why
the `FlinkDeployment` side intentionally does not gate on `FlinkSessionJob`
state as this asymmetry is deliberate (deployment updates must never be blocked
by job upgrades, and the reverse coupling is the only one needed to close the
race).
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java:
##########
@@ -178,20 +181,59 @@ public DeleteControl
cleanupInternal(FlinkResourceContext<FlinkSessionJob> ctx)
}
public static boolean sessionClusterReady(Optional<FlinkDeployment>
flinkDeploymentOpt) {
- if (flinkDeploymentOpt.isPresent()) {
- var flinkdep = flinkDeploymentOpt.get();
- var jobmanagerDeploymentStatus =
flinkdep.getStatus().getJobManagerDeploymentStatus();
- if (jobmanagerDeploymentStatus !=
JobManagerDeploymentStatus.READY) {
- LOG.info(
- "Session cluster deployment is in {} status, not ready
for serve",
- jobmanagerDeploymentStatus);
- return false;
- } else {
- return true;
- }
- } else {
+ if (flinkDeploymentOpt.isEmpty()) {
LOG.warn("Session cluster deployment is not found");
return false;
}
+ var flinkdep = flinkDeploymentOpt.get();
+ var jobmanagerDeploymentStatus =
flinkdep.getStatus().getJobManagerDeploymentStatus();
+ if (jobmanagerDeploymentStatus != JobManagerDeploymentStatus.READY) {
+ LOG.info(
+ "Session cluster deployment is in {} status, not ready for
serve",
+ jobmanagerDeploymentStatus);
+ return false;
+ }
+
+ // Block while FlinkDeployment is in a transitional reconciliation
state
+ var reconciliationState =
flinkdep.getStatus().getReconciliationStatus().getState();
+ if (reconciliationState != ReconciliationState.DEPLOYED
+ && reconciliationState != ReconciliationState.ROLLED_BACK) {
+ LOG.info(
+ "Session cluster deployment reconciliation state is {},
not ready for serve",
+ reconciliationState);
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Checks that the FlinkDeployment has no pending spec changes
+ *
+ * <p>When they differ, the deployment spec was changed but the operator
hasn't acted yet. The
+ * cluster may still appear healthy (JM READY, state DEPLOYED), but it is
about to be updated
+ * (e.g. deleted and recreated). Allowing a session job to start upgrading
in this window would
+ * risk the savepoint being destroyed during the cluster rebuild.
+ *
+ * <p>This check is only used in {@link #readyToReconcile}, not in {@link
#sessionClusterReady},
+ * so it does not block cleanup or FlinkService creation.
+ */
+ private static boolean noDeploymentChangesPending(
+ Optional<FlinkDeployment> flinkDeploymentOpt) {
Review Comment:
Minor nit: replace `Optional<FlinkDeployment>` with a nullable
`FlinkDeployment` parameter. `Optional` is meant for return values, not
parameters (per Effective Java Item 55, and IntelliJ flags it).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]