This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 890547f [FLINK-26546] Extract Observer Interface 890547f is described below commit 890547f063e75f5929d2809e1cfe8895d0d2ac9f Author: Aitozi <yuli....@alibaba-inc.com> AuthorDate: Sun Mar 13 20:27:07 2022 +0800 [FLINK-26546] Extract Observer Interface --- .../flink/kubernetes/operator/FlinkOperator.java | 12 +- .../flink/kubernetes/operator/config/Mode.java | 31 ++++ .../controller/FlinkDeploymentController.java | 14 +- .../kubernetes/operator/observer/BaseObserver.java | 117 +++++++++++++ .../kubernetes/operator/observer/JobObserver.java | 126 ++++++++++++++ .../kubernetes/operator/observer/Observer.java | 191 ++------------------- .../ObserverFactory.java} | 42 ++--- .../operator/observer/SessionObserver.java | 40 +++++ .../operator/reconciler/JobReconciler.java | 2 +- .../operator/reconciler/ReconcilerFactory.java | 12 +- .../controller/FlinkDeploymentControllerTest.java | 7 +- .../{ObserverTest.java => JobObserverTest.java} | 48 +----- .../operator/observer/SessionObserverTest.java | 69 ++++++++ 13 files changed, 433 insertions(+), 278 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 7e46b0c..1125b65 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -22,7 +22,7 @@ import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig; import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController; import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils; -import org.apache.flink.kubernetes.operator.observer.Observer; +import org.apache.flink.kubernetes.operator.observer.ObserverFactory; import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; @@ -57,12 +57,10 @@ public class FlinkOperator { FlinkService flinkService = new FlinkService(client); FlinkOperatorConfiguration operatorConfiguration = FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig()); - - Observer observer = new Observer(flinkService, operatorConfiguration); - FlinkDeploymentValidator validator = new DefaultDeploymentValidator(); - ReconcilerFactory factory = + ReconcilerFactory reconcilerFactory = new ReconcilerFactory(client, flinkService, operatorConfiguration); + ObserverFactory observerFactory = new ObserverFactory(flinkService, operatorConfiguration); FlinkDeploymentController controller = new FlinkDeploymentController( @@ -71,8 +69,8 @@ public class FlinkOperator { client, namespace, validator, - observer, - factory); + reconcilerFactory, + observerFactory); FlinkControllerConfig controllerConfig = new FlinkControllerConfig(controller); controller.setControllerConfig(controllerConfig); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java new file mode 100644 index 0000000..e616432 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.config; + +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; + +/** The mode of {@link FlinkDeployment}. */ +public enum Mode { + APPLICATION, + SESSION; + + public static Mode getMode(FlinkDeployment flinkApp) { + return flinkApp.getSpec().getJob() != null ? APPLICATION : SESSION; + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 195defd..4ef0959 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -24,7 +24,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus; -import org.apache.flink.kubernetes.operator.observer.Observer; +import org.apache.flink.kubernetes.operator.observer.ObserverFactory; import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; @@ -65,8 +65,8 @@ public class FlinkDeploymentController private final String operatorNamespace; private final FlinkDeploymentValidator validator; - private final Observer observer; private final ReconcilerFactory reconcilerFactory; + private final ObserverFactory observerFactory; private final DefaultConfig defaultConfig; private final FlinkOperatorConfiguration operatorConfiguration; @@ -78,15 +78,15 @@ public class FlinkDeploymentController KubernetesClient kubernetesClient, String operatorNamespace, FlinkDeploymentValidator validator, - Observer observer, - ReconcilerFactory reconcilerFactory) { + ReconcilerFactory reconcilerFactory, + ObserverFactory observerFactory) { this.defaultConfig = defaultConfig; this.operatorConfiguration = operatorConfiguration; this.kubernetesClient = kubernetesClient; this.operatorNamespace = operatorNamespace; this.validator = validator; - this.observer = observer; this.reconcilerFactory = reconcilerFactory; + this.observerFactory = observerFactory; } @Override @@ -95,7 +95,7 @@ public class FlinkDeploymentController Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig()); try { - observer.observe(flinkApp, context, effectiveConfig); + observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, effectiveConfig); } catch (DeploymentFailedException dfe) { // ignore during cleanup } @@ -119,7 +119,7 @@ public class FlinkDeploymentController FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig()); try { - observer.observe(flinkApp, context, effectiveConfig); + observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, effectiveConfig); reconcilerFactory .getOrCreate(flinkApp) .reconcile(operatorNamespace, flinkApp, context, effectiveConfig); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java new file mode 100644 index 0000000..2dee79e --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.observer; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; +import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; +import org.apache.flink.kubernetes.operator.service.FlinkService; + +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentCondition; +import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; +import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; + +/** The base observer. */ +public abstract class BaseObserver implements Observer { + + protected final Logger logger = LoggerFactory.getLogger(this.getClass()); + + public static final String JOB_STATE_UNKNOWN = "UNKNOWN"; + protected final FlinkService flinkService; + protected final FlinkOperatorConfiguration operatorConfiguration; + + public BaseObserver( + FlinkService flinkService, FlinkOperatorConfiguration operatorConfiguration) { + this.flinkService = flinkService; + this.operatorConfiguration = operatorConfiguration; + } + + protected void observeJmDeployment( + FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) { + FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus(); + JobManagerDeploymentStatus previousJmStatus = + deploymentStatus.getJobManagerDeploymentStatus(); + + if (JobManagerDeploymentStatus.READY == previousJmStatus) { + return; + } + + if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) { + deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); + return; + } + + Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class); + if (deployment.isPresent()) { + DeploymentStatus status = deployment.get().getStatus(); + DeploymentSpec spec = deployment.get().getSpec(); + if (status != null + && status.getAvailableReplicas() != null + && spec.getReplicas().intValue() == status.getReplicas() + && spec.getReplicas().intValue() == status.getAvailableReplicas() + && flinkService.isJobManagerPortReady(effectiveConfig)) { + + // typically it takes a few seconds for the REST server to be ready + logger.info( + "JobManager deployment {} in namespace {} port ready, waiting for the REST API...", + flinkApp.getMetadata().getName(), + flinkApp.getMetadata().getNamespace()); + deploymentStatus.setJobManagerDeploymentStatus( + JobManagerDeploymentStatus.DEPLOYED_NOT_READY); + return; + } + logger.info( + "JobManager deployment {} in namespace {} exists but not ready yet, status {}", + flinkApp.getMetadata().getName(), + flinkApp.getMetadata().getNamespace(), + status); + + List<DeploymentCondition> conditions = status.getConditions(); + for (DeploymentCondition dc : conditions) { + if ("FailedCreate".equals(dc.getReason()) + && "ReplicaFailure".equals(dc.getType())) { + // throw only when not already in error status to allow for spec update + if (!JobManagerDeploymentStatus.ERROR.equals( + deploymentStatus.getJobManagerDeploymentStatus())) { + throw new DeploymentFailedException( + DeploymentFailedException.COMPONENT_JOBMANAGER, dc); + } + return; + } + } + deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); + return; + } + + deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); + } + + protected boolean isClusterReady(FlinkDeployment dep) { + return dep.getStatus().getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.READY; + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java new file mode 100644 index 0000000..ea5a58f --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.observer; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; +import org.apache.flink.kubernetes.operator.crd.status.JobStatus; +import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.service.FlinkService; +import org.apache.flink.kubernetes.operator.utils.SavepointUtils; +import org.apache.flink.runtime.client.JobStatusMessage; + +import io.javaoperatorsdk.operator.api.reconciler.Context; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** The observer of {@link org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */ +public class JobObserver extends BaseObserver { + + public JobObserver( + FlinkService flinkService, FlinkOperatorConfiguration operatorConfiguration) { + super(flinkService, operatorConfiguration); + } + + @Override + public void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) { + observeJmDeployment(flinkApp, context, effectiveConfig); + if (isClusterReady(flinkApp)) { + boolean jobFound = observeFlinkJobStatus(flinkApp, effectiveConfig); + if (jobFound) { + observeSavepointStatus(flinkApp, effectiveConfig); + } + } + } + + private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) { + logger.info("Getting job statuses for {}", flinkApp.getMetadata().getName()); + FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus(); + + Collection<JobStatusMessage> clusterJobStatuses; + try { + clusterJobStatuses = flinkService.listJobs(effectiveConfig); + } catch (Exception e) { + logger.error("Exception while listing jobs", e); + flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN); + return false; + } + if (clusterJobStatuses.isEmpty()) { + logger.info("No jobs found on {} yet", flinkApp.getMetadata().getName()); + flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN); + return false; + } + + updateJobStatus(flinkAppStatus.getJobStatus(), new ArrayList<>(clusterJobStatuses)); + logger.info("Job statuses updated for {}", flinkApp.getMetadata().getName()); + return true; + } + + /** Update previous job status based on the job list from the cluster. */ + private void updateJobStatus(JobStatus status, List<JobStatusMessage> clusterJobStatuses) { + Collections.sort( + clusterJobStatuses, (j1, j2) -> Long.compare(j2.getStartTime(), j1.getStartTime())); + JobStatusMessage newJob = clusterJobStatuses.get(0); + + status.setState(newJob.getJobState().name()); + status.setJobName(newJob.getJobName()); + status.setJobId(newJob.getJobId().toHexString()); + // track the start time, changing timestamp would cause busy reconciliation + status.setUpdateTime(String.valueOf(newJob.getStartTime())); + } + + private void observeSavepointStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) { + SavepointInfo savepointInfo = flinkApp.getStatus().getJobStatus().getSavepointInfo(); + if (!SavepointUtils.savepointInProgress(flinkApp)) { + logger.debug("Checkpointing not in progress"); + return; + } + SavepointFetchResult savepointFetchResult; + try { + savepointFetchResult = flinkService.fetchSavepointInfo(flinkApp, effectiveConfig); + } catch (Exception e) { + logger.error("Exception while fetching savepoint info", e); + return; + } + + if (!savepointFetchResult.isTriggered()) { + String error = savepointFetchResult.getError(); + if (error != null + || SavepointUtils.gracePeriodEnded(operatorConfiguration, savepointInfo)) { + String errorMsg = error != null ? error : "Savepoint status unknown"; + logger.error(errorMsg); + savepointInfo.resetTrigger(); + ReconciliationUtils.updateForReconciliationError(flinkApp, errorMsg); + return; + } + logger.info("Savepoint operation not running, waiting within grace period"); + } + if (savepointFetchResult.getSavepoint() == null) { + logger.info("Savepoint not completed yet"); + return; + } + savepointInfo.updateLastSavepoint(savepointFetchResult.getSavepoint()); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java index 51ccf75..e354689 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java @@ -18,188 +18,19 @@ package org.apache.flink.kubernetes.operator.observer; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; -import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; -import org.apache.flink.kubernetes.operator.crd.status.JobStatus; -import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo; -import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; -import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; -import org.apache.flink.kubernetes.operator.service.FlinkService; -import org.apache.flink.kubernetes.operator.utils.SavepointUtils; -import org.apache.flink.runtime.client.JobStatusMessage; -import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.fabric8.kubernetes.api.model.apps.DeploymentCondition; -import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; -import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; import io.javaoperatorsdk.operator.api.reconciler.Context; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Optional; - -/** Observes the actual state of the running jobs on the Flink cluster. */ -public class Observer { - - private static final Logger LOG = LoggerFactory.getLogger(Observer.class); - - public static final String JOB_STATE_UNKNOWN = "UNKNOWN"; - - private final FlinkService flinkService; - private final FlinkOperatorConfiguration operatorConfiguration; - - public Observer(FlinkService flinkService, FlinkOperatorConfiguration operatorConfiguration) { - this.flinkService = flinkService; - this.operatorConfiguration = operatorConfiguration; - } - - public void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) { - observeJmDeployment(flinkApp, context, effectiveConfig); - if (isApplicationClusterReady(flinkApp)) { - boolean jobFound = observeFlinkJobStatus(flinkApp, effectiveConfig); - if (jobFound) { - observeSavepointStatus(flinkApp, effectiveConfig); - } - } - } - - private boolean isApplicationClusterReady(FlinkDeployment dep) { - return dep.getSpec().getJob() != null - && dep.getStatus().getJobManagerDeploymentStatus() - == JobManagerDeploymentStatus.READY; - } - - private void observeJmDeployment( - FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) { - FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus(); - JobManagerDeploymentStatus previousJmStatus = - deploymentStatus.getJobManagerDeploymentStatus(); - - if (JobManagerDeploymentStatus.READY == previousJmStatus) { - return; - } - - if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) { - deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); - return; - } - - Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class); - if (deployment.isPresent()) { - DeploymentStatus status = deployment.get().getStatus(); - DeploymentSpec spec = deployment.get().getSpec(); - if (status != null - && status.getAvailableReplicas() != null - && spec.getReplicas().intValue() == status.getReplicas() - && spec.getReplicas().intValue() == status.getAvailableReplicas() - && flinkService.isJobManagerPortReady(effectiveConfig)) { - - // typically it takes a few seconds for the REST server to be ready - LOG.info( - "JobManager deployment {} in namespace {} port ready, waiting for the REST API...", - flinkApp.getMetadata().getName(), - flinkApp.getMetadata().getNamespace()); - deploymentStatus.setJobManagerDeploymentStatus( - JobManagerDeploymentStatus.DEPLOYED_NOT_READY); - return; - } - LOG.info( - "JobManager deployment {} in namespace {} exists but not ready yet, status {}", - flinkApp.getMetadata().getName(), - flinkApp.getMetadata().getNamespace(), - status); - - List<DeploymentCondition> conditions = status.getConditions(); - for (DeploymentCondition dc : conditions) { - if ("FailedCreate".equals(dc.getReason()) - && "ReplicaFailure".equals(dc.getType())) { - // throw only when not already in error status to allow for spec update - if (!JobManagerDeploymentStatus.ERROR.equals( - deploymentStatus.getJobManagerDeploymentStatus())) { - throw new DeploymentFailedException( - DeploymentFailedException.COMPONENT_JOBMANAGER, dc); - } - return; - } - } - deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); - return; - } - - deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); - } - - private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) { - LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName()); - FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus(); - - Collection<JobStatusMessage> clusterJobStatuses; - try { - clusterJobStatuses = flinkService.listJobs(effectiveConfig); - } catch (Exception e) { - LOG.error("Exception while listing jobs", e); - flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN); - return false; - } - if (clusterJobStatuses.isEmpty()) { - LOG.info("No jobs found on {} yet", flinkApp.getMetadata().getName()); - flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN); - return false; - } - - updateJobStatus(flinkAppStatus.getJobStatus(), new ArrayList<>(clusterJobStatuses)); - LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName()); - return true; - } - - private void observeSavepointStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) { - SavepointInfo savepointInfo = flinkApp.getStatus().getJobStatus().getSavepointInfo(); - if (!SavepointUtils.savepointInProgress(flinkApp)) { - LOG.debug("Checkpointing not in progress"); - return; - } - SavepointFetchResult savepointFetchResult; - try { - savepointFetchResult = flinkService.fetchSavepointInfo(flinkApp, effectiveConfig); - } catch (Exception e) { - LOG.error("Exception while fetching savepoint info", e); - return; - } - - if (!savepointFetchResult.isTriggered()) { - String error = savepointFetchResult.getError(); - if (error != null - || SavepointUtils.gracePeriodEnded(operatorConfiguration, savepointInfo)) { - String errorMsg = error != null ? error : "Savepoint status unknown"; - LOG.error(errorMsg); - savepointInfo.resetTrigger(); - ReconciliationUtils.updateForReconciliationError(flinkApp, errorMsg); - return; - } - LOG.info("Savepoint operation not running, waiting within grace period"); - } - if (savepointFetchResult.getSavepoint() == null) { - LOG.info("Savepoint not completed yet"); - return; - } - savepointInfo.updateLastSavepoint(savepointFetchResult.getSavepoint()); - } - - /** Update previous job status based on the job list from the cluster. */ - private void updateJobStatus(JobStatus status, List<JobStatusMessage> clusterJobStatuses) { - Collections.sort( - clusterJobStatuses, (j1, j2) -> Long.compare(j2.getStartTime(), j1.getStartTime())); - JobStatusMessage newJob = clusterJobStatuses.get(0); - - status.setState(newJob.getJobState().name()); - status.setJobName(newJob.getJobName()); - status.setJobId(newJob.getJobId().toHexString()); - // track the start time, changing timestamp would cause busy reconciliation - status.setUpdateTime(String.valueOf(newJob.getStartTime())); - } +/** The Observer of {@link FlinkDeployment}. */ +public interface Observer { + + /** + * Observe the flinkApp status, It will reflect the changed status on the flinkApp resource. + * + * @param flinkApp the target flinkDeployment resource + * @param context the context with which the operation is executed + * @param effectiveConfig the effective config of the flinkApp + */ + void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java similarity index 57% copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java index 94050ce..d3ec8b9 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java @@ -16,59 +16,43 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.reconciler; +package org.apache.flink.kubernetes.operator.observer; import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; +import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.service.FlinkService; -import io.fabric8.kubernetes.client.KubernetesClient; - import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -/** The factory to create reconciler based on app mode. */ -public class ReconcilerFactory { +/** The factory to create the observer based ob the {@link FlinkDeployment} mode. */ +public class ObserverFactory { - private final KubernetesClient kubernetesClient; private final FlinkService flinkService; private final FlinkOperatorConfiguration operatorConfiguration; - private final Map<Mode, Reconciler> reconcilerMap; + private final Map<Mode, Observer> observerMap; - public ReconcilerFactory( - KubernetesClient kubernetesClient, - FlinkService flinkService, - FlinkOperatorConfiguration operatorConfiguration) { - this.kubernetesClient = kubernetesClient; + public ObserverFactory( + FlinkService flinkService, FlinkOperatorConfiguration operatorConfiguration) { this.flinkService = flinkService; this.operatorConfiguration = operatorConfiguration; - this.reconcilerMap = new ConcurrentHashMap<>(); + this.observerMap = new ConcurrentHashMap<>(); } - public Reconciler getOrCreate(FlinkDeployment flinkApp) { - return reconcilerMap.computeIfAbsent( - getMode(flinkApp), + public Observer getOrCreate(FlinkDeployment flinkApp) { + return observerMap.computeIfAbsent( + Mode.getMode(flinkApp), mode -> { switch (mode) { case SESSION: - return new SessionReconciler( - kubernetesClient, flinkService, operatorConfiguration); + return new SessionObserver(flinkService, operatorConfiguration); case APPLICATION: - return new JobReconciler( - kubernetesClient, flinkService, operatorConfiguration); + return new JobObserver(flinkService, operatorConfiguration); default: throw new UnsupportedOperationException( String.format("Unsupported running mode: %s", mode)); } }); } - - private Mode getMode(FlinkDeployment flinkApp) { - return flinkApp.getSpec().getJob() != null ? Mode.APPLICATION : Mode.SESSION; - } - - enum Mode { - APPLICATION, - SESSION - } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java new file mode 100644 index 0000000..5cb594c --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.observer; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.service.FlinkService; + +import io.javaoperatorsdk.operator.api.reconciler.Context; + +/** The observer of the {@link org.apache.flink.kubernetes.operator.config.Mode#SESSION} cluster. */ +public class SessionObserver extends BaseObserver { + + public SessionObserver( + FlinkService flinkService, FlinkOperatorConfiguration operatorConfiguration) { + super(flinkService, operatorConfiguration); + } + + @Override + public void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) { + observeJmDeployment(flinkApp, context, effectiveConfig); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java index a43febd..2d71459 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java @@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory; import java.util.Optional; -import static org.apache.flink.kubernetes.operator.observer.Observer.JOB_STATE_UNKNOWN; +import static org.apache.flink.kubernetes.operator.observer.BaseObserver.JOB_STATE_UNKNOWN; /** * Reconciler responsible for handling the job lifecycle according to the desired and current diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java index 94050ce..bdcd8e4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.reconciler; import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; +import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.service.FlinkService; @@ -47,7 +48,7 @@ public class ReconcilerFactory { public Reconciler getOrCreate(FlinkDeployment flinkApp) { return reconcilerMap.computeIfAbsent( - getMode(flinkApp), + Mode.getMode(flinkApp), mode -> { switch (mode) { case SESSION: @@ -62,13 +63,4 @@ public class ReconcilerFactory { } }); } - - private Mode getMode(FlinkDeployment flinkApp) { - return flinkApp.getSpec().getJob() != null ? Mode.APPLICATION : Mode.SESSION; - } - - enum Mode { - APPLICATION, - SESSION - } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 658aa18..226186a 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -27,7 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus; -import org.apache.flink.kubernetes.operator.observer.Observer; +import org.apache.flink.kubernetes.operator.observer.ObserverFactory; import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; @@ -446,7 +446,6 @@ public class FlinkDeploymentControllerTest { private FlinkDeploymentController createTestController( KubernetesClient kubernetesClient, TestingFlinkService flinkService) { - Observer observer = new Observer(flinkService, operatorConfiguration); FlinkDeploymentController controller = new FlinkDeploymentController( @@ -455,9 +454,9 @@ public class FlinkDeploymentControllerTest { kubernetesClient, "test", new DefaultDeploymentValidator(), - observer, new ReconcilerFactory( - kubernetesClient, flinkService, operatorConfiguration)); + kubernetesClient, flinkService, operatorConfiguration), + new ObserverFactory(flinkService, operatorConfiguration)); controller.setControllerConfig(new FlinkControllerConfig(controller)); return controller; } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java similarity index 83% rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java index ea15298..c3869a4 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java @@ -26,7 +26,6 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; -import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import io.javaoperatorsdk.operator.api.reconciler.Context; @@ -35,48 +34,16 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -/** @link Observer unit tests */ -public class ObserverTest { +/** {@link JobObserver} unit tests. */ +public class JobObserverTest { private final Context readyContext = TestUtils.createContextWithReadyJobManagerDeployment(); @Test - public void observeSessionCluster() { - FlinkService flinkService = new TestingFlinkService(); - Observer observer = - new Observer( - flinkService, - FlinkOperatorConfiguration.fromConfiguration(new Configuration())); - FlinkDeployment deployment = TestUtils.buildSessionCluster(); - deployment - .getStatus() - .getReconciliationStatus() - .setLastReconciledSpec(deployment.getSpec()); - - observer.observe( - deployment, - readyContext, - FlinkUtils.getEffectiveConfig(deployment, new Configuration())); - - assertEquals( - JobManagerDeploymentStatus.DEPLOYED_NOT_READY, - deployment.getStatus().getJobManagerDeploymentStatus()); - - observer.observe( - deployment, - readyContext, - FlinkUtils.getEffectiveConfig(deployment, new Configuration())); - - assertEquals( - JobManagerDeploymentStatus.READY, - deployment.getStatus().getJobManagerDeploymentStatus()); - } - - @Test public void observeApplicationCluster() { TestingFlinkService flinkService = new TestingFlinkService(); - Observer observer = - new Observer( + JobObserver observer = + new JobObserver( flinkService, FlinkOperatorConfiguration.fromConfiguration(new Configuration())); FlinkDeployment deployment = TestUtils.buildApplicationCluster(); @@ -136,14 +103,15 @@ public class ObserverTest { assertEquals( JobManagerDeploymentStatus.READY, deployment.getStatus().getJobManagerDeploymentStatus()); - assertEquals(Observer.JOB_STATE_UNKNOWN, deployment.getStatus().getJobStatus().getState()); + assertEquals( + BaseObserver.JOB_STATE_UNKNOWN, deployment.getStatus().getJobStatus().getState()); } @Test public void observeSavepoint() throws Exception { TestingFlinkService flinkService = new TestingFlinkService(); - Observer observer = - new Observer( + JobObserver observer = + new JobObserver( flinkService, FlinkOperatorConfiguration.fromConfiguration(new Configuration())); FlinkDeployment deployment = TestUtils.buildApplicationCluster(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java new file mode 100644 index 0000000..6e7cb48 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.observer; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.TestingFlinkService; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.service.FlinkService; +import org.apache.flink.kubernetes.operator.utils.FlinkUtils; + +import io.javaoperatorsdk.operator.api.reconciler.Context; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** {@link SessionObserver} unit tests. */ +public class SessionObserverTest { + private final Context readyContext = TestUtils.createContextWithReadyJobManagerDeployment(); + + @Test + public void observeSessionCluster() { + FlinkService flinkService = new TestingFlinkService(); + SessionObserver observer = + new SessionObserver( + flinkService, + FlinkOperatorConfiguration.fromConfiguration(new Configuration())); + FlinkDeployment deployment = TestUtils.buildSessionCluster(); + deployment + .getStatus() + .getReconciliationStatus() + .setLastReconciledSpec(deployment.getSpec()); + + observer.observe( + deployment, + readyContext, + FlinkUtils.getEffectiveConfig(deployment, new Configuration())); + + assertEquals( + JobManagerDeploymentStatus.DEPLOYED_NOT_READY, + deployment.getStatus().getJobManagerDeploymentStatus()); + + observer.observe( + deployment, + readyContext, + FlinkUtils.getEffectiveConfig(deployment, new Configuration())); + + assertEquals( + JobManagerDeploymentStatus.READY, + deployment.getStatus().getJobManagerDeploymentStatus()); + } +}