This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit becf19c82616e8f3a148c2ce27141b742f019ab3 Author: Thomas Weise <t...@apache.org> AuthorDate: Thu Feb 24 14:26:30 2022 -0800 [FLINK-26261] Wait for JobManager deployment ready before accessing REST API --- .../controller/FlinkDeploymentController.java | 76 +++++++++++++++++++--- .../operator/observer/JobStatusObserver.java | 7 ++ .../kubernetes/operator/service/FlinkService.java | 25 +++++++ .../kubernetes/operator/TestingFlinkService.java | 5 ++ .../controller/FlinkDeploymentControllerTest.java | 58 +++++++++++++---- 5 files changed, 151 insertions(+), 20 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index f97d5fd..a6c6072 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -30,7 +30,11 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.utils.IngressUtils; import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; +import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; @@ -41,10 +45,12 @@ import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -58,6 +64,7 @@ public class FlinkDeploymentController private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class); public static final int REFRESH_SECONDS = 60; + public static final int PORT_READY_DELAY_SECONDS = 10; private final KubernetesClient kubernetesClient; @@ -68,6 +75,7 @@ public class FlinkDeploymentController private final JobReconciler jobReconciler; private final SessionReconciler sessionReconciler; private final DefaultConfig defaultConfig; + private final HashSet<String> jobManagerDeployments = new HashSet<>(); public FlinkDeploymentController( DefaultConfig defaultConfig, @@ -96,6 +104,7 @@ public class FlinkDeploymentController operatorNamespace, kubernetesClient, true); + jobManagerDeployments.remove(flinkApp.getMetadata().getSelfLink()); return DeleteControl.defaultDelete(); } @@ -113,8 +122,11 @@ public class FlinkDeploymentController Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig()); try { - boolean successfulObserve = observer.observeFlinkJobStatus(flinkApp, effectiveConfig); - if (successfulObserve) { + // only check job status when the JM deployment is ready + boolean shouldReconcile = + !jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink()) + || observer.observeFlinkJobStatus(flinkApp, effectiveConfig); + if (shouldReconcile) { reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig); updateForReconciliationSuccess(flinkApp); } @@ -125,6 +137,49 @@ public class FlinkDeploymentController } catch (Exception e) { throw new ReconciliationException(e); } + + return checkJobManagerDeployment(flinkApp, context, effectiveConfig); + } + + private UpdateControl<FlinkDeployment> checkJobManagerDeployment( + FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) { + if (!jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink())) { + Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class); + if (deployment.isPresent()) { + DeploymentStatus status = deployment.get().getStatus(); + DeploymentSpec spec = deployment.get().getSpec(); + if (status != null + && status.getAvailableReplicas() != null + && spec.getReplicas().intValue() == status.getReplicas() + && spec.getReplicas().intValue() == status.getAvailableReplicas()) { + // typically it takes a few seconds for the REST server to be ready + if (observer.isJobManagerReady(effectiveConfig)) { + LOG.info( + "JobManager deployment {} in namespace {} is ready", + flinkApp.getMetadata().getName(), + flinkApp.getMetadata().getNamespace()); + jobManagerDeployments.add(flinkApp.getMetadata().getSelfLink()); + if (flinkApp.getStatus().getJobStatus() != null) { + // short circuit, if the job was already running + // reschedule for immediate job status check + return UpdateControl.updateStatus(flinkApp).rescheduleAfter(0); + } + } + LOG.info( + "JobManager deployment {} in namespace {} port not ready", + flinkApp.getMetadata().getName(), + flinkApp.getMetadata().getNamespace()); + return UpdateControl.updateStatus(flinkApp) + .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS); + } else { + LOG.info( + "JobManager deployment {} in namespace {} not yet ready, status {}", + flinkApp.getMetadata().getName(), + flinkApp.getMetadata().getNamespace(), + status); + } + } + } return UpdateControl.updateStatus(flinkApp) .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); } @@ -156,11 +211,16 @@ public class FlinkDeploymentController @Override public List<EventSource> prepareEventSources( EventSourceContext<FlinkDeployment> eventSourceContext) { - // TODO: start status updated - // return List.of(new PerResourcePollingEventSource<>( - // new FlinkResourceSupplier, context.getPrimaryCache(), POLL_PERIOD, - // FlinkApplication.class)); - return Collections.emptyList(); + // reconcile when job manager deployment is ready + SharedIndexInformer<Deployment> deploymentInformer = + kubernetesClient + .apps() + .deployments() + .inAnyNamespace() + .withLabel("type", "flink-native-kubernetes") + .withLabel("component", "jobmanager") + .runnableInformer(0); + return List.of(new InformerEventSource<>(deploymentInformer, Mappers.fromLabel("app"))); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java index 0740dea..cb56dc3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java @@ -83,6 +83,10 @@ public class JobStatusObserver { } } + public boolean isJobManagerReady(Configuration config) { + return flinkService.isJobManagerReady(config); + } + /** Merge previous job status with the new one from the flink job cluster. */ private JobStatus mergeJobStatus( JobStatus oldStatus, List<JobStatusMessage> clusterJobStatuses) { @@ -98,6 +102,8 @@ public class JobStatusObserver { newStatus.setState(newJob.getJobState().name()); newStatus.setJobName(newJob.getJobName()); newStatus.setJobId(newJob.getJobId().toHexString()); + // track the start time, changing timestamp would cause busy reconciliation + newStatus.setUpdateTime(String.valueOf(newJob.getStartTime())); } return newStatus; } @@ -107,6 +113,7 @@ public class JobStatusObserver { jobStatus.setJobId(message.getJobId().toHexString()); jobStatus.setJobName(message.getJobName()); jobStatus.setState(message.getJobState().name()); + jobStatus.setUpdateTime(String.valueOf(message.getStartTime())); return jobStatus; } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java index 8b9af3b..a2e731c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java @@ -45,6 +45,12 @@ import io.fabric8.kubernetes.client.NamespacedKubernetesClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketTimeoutException; +import java.net.URI; import java.util.Collection; import java.util.Optional; import java.util.concurrent.Executors; @@ -94,6 +100,25 @@ public class FlinkService { LOG.info("Session cluster {} deployed", deployment.getMetadata().getName()); } + public boolean isJobManagerReady(Configuration config) { + final URI uri; + try (ClusterClient<String> clusterClient = getClusterClient(config)) { + uri = URI.create(clusterClient.getWebInterfaceURL()); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), uri.getPort()); + Socket socket = new Socket(); + try { + socket.connect(socketAddress, 1000); + socket.close(); + return true; + } catch (SocketTimeoutException ste) { + } catch (IOException e) { + } + return false; + } + public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception { try (ClusterClient<String> clusterClient = getClusterClient(conf)) { return clusterClient.listJobs().get(10, TimeUnit.SECONDS); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 3ebbdac..398ef84 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -100,4 +100,9 @@ public class TestingFlinkService extends FlinkService { public void stopSessionCluster(FlinkDeployment deployment, Configuration conf) { sessions.remove(deployment.getMetadata().getName()); } + + @Override + public boolean isJobManagerReady(Configuration config) { + return true; + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 88fcfe2..24db802 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -32,11 +32,17 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator; import org.apache.flink.runtime.client.JobStatusMessage; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; +import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -47,6 +53,28 @@ import static org.junit.Assert.assertTrue; /** @link JobStatusObserver unit tests */ public class FlinkDeploymentControllerTest { + private final Context context = + new Context() { + @Override + public Optional<RetryInfo> getRetryInfo() { + return Optional.empty(); + } + + @Override + public <T> Optional<T> getSecondaryResource( + Class<T> expectedType, String eventSourceName) { + DeploymentStatus status = new DeploymentStatus(); + status.setAvailableReplicas(1); + status.setReplicas(1); + DeploymentSpec spec = new DeploymentSpec(); + spec.setReplicas(1); + Deployment deployment = new Deployment(); + deployment.setSpec(spec); + deployment.setStatus(status); + return Optional.of((T) deployment); + } + }; + @Test public void verifyBasicReconcileLoop() { TestingFlinkService flinkService = new TestingFlinkService(); @@ -55,7 +83,13 @@ public class FlinkDeploymentControllerTest { UpdateControl<FlinkDeployment> updateControl; - updateControl = testController.reconcile(appCluster, null); + updateControl = testController.reconcile(appCluster, context); + assertTrue(updateControl.isUpdateStatus()); + assertEquals( + FlinkDeploymentController.PORT_READY_DELAY_SECONDS * 1000, + (long) updateControl.getScheduleDelay().get()); + + updateControl = testController.reconcile(appCluster, context); assertTrue(updateControl.isUpdateStatus()); assertEquals( FlinkDeploymentController.REFRESH_SECONDS * 1000, @@ -68,7 +102,7 @@ public class FlinkDeploymentControllerTest { assertNull(reconciliationStatus.getError()); assertEquals(appCluster.getSpec(), reconciliationStatus.getLastReconciledSpec()); - updateControl = testController.reconcile(appCluster, null); + updateControl = testController.reconcile(appCluster, context); assertTrue(updateControl.isUpdateStatus()); assertEquals( FlinkDeploymentController.REFRESH_SECONDS * 1000, @@ -84,7 +118,7 @@ public class FlinkDeploymentControllerTest { // Send in invalid update appCluster = TestUtils.clone(appCluster); appCluster.getSpec().setJob(null); - updateControl = testController.reconcile(appCluster, null); + updateControl = testController.reconcile(appCluster, context); assertTrue(updateControl.isUpdateStatus()); assertFalse(updateControl.getScheduleDelay().isPresent()); @@ -109,7 +143,7 @@ public class FlinkDeploymentControllerTest { appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT); appCluster.getSpec().getJob().setInitialSavepointPath("s0"); - testController.reconcile(appCluster, null); + testController.reconcile(appCluster, context); List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs(); assertEquals(1, jobs.size()); assertEquals("s0", jobs.get(0).f0); @@ -119,14 +153,14 @@ public class FlinkDeploymentControllerTest { appCluster.getSpec().getJob().setInitialSavepointPath("s1"); // Send in a no-op change - testController.reconcile(appCluster, null); + testController.reconcile(appCluster, context); assertEquals(previousJobs, new ArrayList<>(flinkService.listJobs())); // Upgrade job appCluster = TestUtils.clone(appCluster); appCluster.getSpec().getJob().setParallelism(100); - testController.reconcile(appCluster, null); + testController.reconcile(appCluster, context); jobs = flinkService.listJobs(); assertEquals(1, jobs.size()); assertEquals("savepoint_0", jobs.get(0).f0); @@ -134,12 +168,12 @@ public class FlinkDeploymentControllerTest { // Suspend job appCluster = TestUtils.clone(appCluster); appCluster.getSpec().getJob().setState(JobState.SUSPENDED); - testController.reconcile(appCluster, null); + testController.reconcile(appCluster, context); // Resume from last savepoint appCluster = TestUtils.clone(appCluster); appCluster.getSpec().getJob().setState(JobState.RUNNING); - testController.reconcile(appCluster, null); + testController.reconcile(appCluster, context); jobs = flinkService.listJobs(); assertEquals(1, jobs.size()); assertEquals("savepoint_1", jobs.get(0).f0); @@ -153,7 +187,7 @@ public class FlinkDeploymentControllerTest { appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS); appCluster.getSpec().getJob().setInitialSavepointPath("s0"); - testController.reconcile(appCluster, null); + testController.reconcile(appCluster, context); List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs(); assertEquals(1, jobs.size()); assertEquals("s0", jobs.get(0).f0); @@ -162,7 +196,7 @@ public class FlinkDeploymentControllerTest { appCluster = TestUtils.clone(appCluster); appCluster.getSpec().getJob().setParallelism(100); - testController.reconcile(appCluster, null); + testController.reconcile(appCluster, context); jobs = flinkService.listJobs(); assertEquals(1, jobs.size()); assertEquals(null, jobs.get(0).f0); @@ -170,12 +204,12 @@ public class FlinkDeploymentControllerTest { // Suspend job appCluster = TestUtils.clone(appCluster); appCluster.getSpec().getJob().setState(JobState.SUSPENDED); - testController.reconcile(appCluster, null); + testController.reconcile(appCluster, context); // Resume from empty state appCluster = TestUtils.clone(appCluster); appCluster.getSpec().getJob().setState(JobState.RUNNING); - testController.reconcile(appCluster, null); + testController.reconcile(appCluster, context); jobs = flinkService.listJobs(); assertEquals(1, jobs.size()); assertEquals(null, jobs.get(0).f0);