This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit becf19c82616e8f3a148c2ce27141b742f019ab3
Author: Thomas Weise <t...@apache.org>
AuthorDate: Thu Feb 24 14:26:30 2022 -0800

    [FLINK-26261] Wait for JobManager deployment ready before accessing REST API
---
 .../controller/FlinkDeploymentController.java      | 76 +++++++++++++++++++---
 .../operator/observer/JobStatusObserver.java       |  7 ++
 .../kubernetes/operator/service/FlinkService.java  | 25 +++++++
 .../kubernetes/operator/TestingFlinkService.java   |  5 ++
 .../controller/FlinkDeploymentControllerTest.java  | 58 +++++++++++++----
 5 files changed, 151 insertions(+), 20 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index f97d5fd..a6c6072 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -30,7 +30,11 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import 
org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
 
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
@@ -41,10 +45,12 @@ import 
io.javaoperatorsdk.operator.api.reconciler.Reconciler;
 import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
+import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -58,6 +64,7 @@ public class FlinkDeploymentController
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDeploymentController.class);
 
     public static final int REFRESH_SECONDS = 60;
+    public static final int PORT_READY_DELAY_SECONDS = 10;
 
     private final KubernetesClient kubernetesClient;
 
@@ -68,6 +75,7 @@ public class FlinkDeploymentController
     private final JobReconciler jobReconciler;
     private final SessionReconciler sessionReconciler;
     private final DefaultConfig defaultConfig;
+    private final HashSet<String> jobManagerDeployments = new HashSet<>();
 
     public FlinkDeploymentController(
             DefaultConfig defaultConfig,
@@ -96,6 +104,7 @@ public class FlinkDeploymentController
                 operatorNamespace,
                 kubernetesClient,
                 true);
+        jobManagerDeployments.remove(flinkApp.getMetadata().getSelfLink());
         return DeleteControl.defaultDelete();
     }
 
@@ -113,8 +122,11 @@ public class FlinkDeploymentController
         Configuration effectiveConfig =
                 FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
         try {
-            boolean successfulObserve = 
observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
-            if (successfulObserve) {
+            // only check job status when the JM deployment is ready
+            boolean shouldReconcile =
+                    
!jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink())
+                            || observer.observeFlinkJobStatus(flinkApp, 
effectiveConfig);
+            if (shouldReconcile) {
                 reconcileFlinkDeployment(operatorNamespace, flinkApp, 
effectiveConfig);
                 updateForReconciliationSuccess(flinkApp);
             }
@@ -125,6 +137,49 @@ public class FlinkDeploymentController
         } catch (Exception e) {
             throw new ReconciliationException(e);
         }
+
+        return checkJobManagerDeployment(flinkApp, context, effectiveConfig);
+    }
+
+    private UpdateControl<FlinkDeployment> checkJobManagerDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration 
effectiveConfig) {
+        if 
(!jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink())) {
+            Optional<Deployment> deployment = 
context.getSecondaryResource(Deployment.class);
+            if (deployment.isPresent()) {
+                DeploymentStatus status = deployment.get().getStatus();
+                DeploymentSpec spec = deployment.get().getSpec();
+                if (status != null
+                        && status.getAvailableReplicas() != null
+                        && spec.getReplicas().intValue() == 
status.getReplicas()
+                        && spec.getReplicas().intValue() == 
status.getAvailableReplicas()) {
+                    // typically it takes a few seconds for the REST server to 
be ready
+                    if (observer.isJobManagerReady(effectiveConfig)) {
+                        LOG.info(
+                                "JobManager deployment {} in namespace {} is 
ready",
+                                flinkApp.getMetadata().getName(),
+                                flinkApp.getMetadata().getNamespace());
+                        
jobManagerDeployments.add(flinkApp.getMetadata().getSelfLink());
+                        if (flinkApp.getStatus().getJobStatus() != null) {
+                            // short circuit, if the job was already running
+                            // reschedule for immediate job status check
+                            return 
UpdateControl.updateStatus(flinkApp).rescheduleAfter(0);
+                        }
+                    }
+                    LOG.info(
+                            "JobManager deployment {} in namespace {} port not 
ready",
+                            flinkApp.getMetadata().getName(),
+                            flinkApp.getMetadata().getNamespace());
+                    return UpdateControl.updateStatus(flinkApp)
+                            .rescheduleAfter(PORT_READY_DELAY_SECONDS, 
TimeUnit.SECONDS);
+                } else {
+                    LOG.info(
+                            "JobManager deployment {} in namespace {} not yet 
ready, status {}",
+                            flinkApp.getMetadata().getName(),
+                            flinkApp.getMetadata().getNamespace(),
+                            status);
+                }
+            }
+        }
         return UpdateControl.updateStatus(flinkApp)
                 .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
     }
@@ -156,11 +211,16 @@ public class FlinkDeploymentController
     @Override
     public List<EventSource> prepareEventSources(
             EventSourceContext<FlinkDeployment> eventSourceContext) {
-        // TODO: start status updated
-        //        return List.of(new PerResourcePollingEventSource<>(
-        //                new FlinkResourceSupplier, 
context.getPrimaryCache(), POLL_PERIOD,
-        //                FlinkApplication.class));
-        return Collections.emptyList();
+        // reconcile when job manager deployment is ready
+        SharedIndexInformer<Deployment> deploymentInformer =
+                kubernetesClient
+                        .apps()
+                        .deployments()
+                        .inAnyNamespace()
+                        .withLabel("type", "flink-native-kubernetes")
+                        .withLabel("component", "jobmanager")
+                        .runnableInformer(0);
+        return List.of(new InformerEventSource<>(deploymentInformer, 
Mappers.fromLabel("app")));
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 0740dea..cb56dc3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -83,6 +83,10 @@ public class JobStatusObserver {
         }
     }
 
+    public boolean isJobManagerReady(Configuration config) {
+        return flinkService.isJobManagerReady(config);
+    }
+
     /** Merge previous job status with the new one from the flink job cluster. 
*/
     private JobStatus mergeJobStatus(
             JobStatus oldStatus, List<JobStatusMessage> clusterJobStatuses) {
@@ -98,6 +102,8 @@ public class JobStatusObserver {
             newStatus.setState(newJob.getJobState().name());
             newStatus.setJobName(newJob.getJobName());
             newStatus.setJobId(newJob.getJobId().toHexString());
+            // track the start time, changing timestamp would cause busy 
reconciliation
+            newStatus.setUpdateTime(String.valueOf(newJob.getStartTime()));
         }
         return newStatus;
     }
@@ -107,6 +113,7 @@ public class JobStatusObserver {
         jobStatus.setJobId(message.getJobId().toHexString());
         jobStatus.setJobName(message.getJobName());
         jobStatus.setState(message.getJobState().name());
+        jobStatus.setUpdateTime(String.valueOf(message.getStartTime()));
         return jobStatus;
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index 8b9af3b..a2e731c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -45,6 +45,12 @@ import 
io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.URI;
 import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.Executors;
@@ -94,6 +100,25 @@ public class FlinkService {
         LOG.info("Session cluster {} deployed", 
deployment.getMetadata().getName());
     }
 
+    public boolean isJobManagerReady(Configuration config) {
+        final URI uri;
+        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
+            uri = URI.create(clusterClient.getWebInterfaceURL());
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+        SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), 
uri.getPort());
+        Socket socket = new Socket();
+        try {
+            socket.connect(socketAddress, 1000);
+            socket.close();
+            return true;
+        } catch (SocketTimeoutException ste) {
+        } catch (IOException e) {
+        }
+        return false;
+    }
+
     public Collection<JobStatusMessage> listJobs(Configuration conf) throws 
Exception {
         try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
             return clusterClient.listJobs().get(10, TimeUnit.SECONDS);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 3ebbdac..398ef84 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -100,4 +100,9 @@ public class TestingFlinkService extends FlinkService {
     public void stopSessionCluster(FlinkDeployment deployment, Configuration 
conf) {
         sessions.remove(deployment.getMetadata().getName());
     }
+
+    @Override
+    public boolean isJobManagerReady(Configuration config) {
+        return true;
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 88fcfe2..24db802 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -32,11 +32,17 @@ import 
org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import 
org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -47,6 +53,28 @@ import static org.junit.Assert.assertTrue;
 /** @link JobStatusObserver unit tests */
 public class FlinkDeploymentControllerTest {
 
+    private final Context context =
+            new Context() {
+                @Override
+                public Optional<RetryInfo> getRetryInfo() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public <T> Optional<T> getSecondaryResource(
+                        Class<T> expectedType, String eventSourceName) {
+                    DeploymentStatus status = new DeploymentStatus();
+                    status.setAvailableReplicas(1);
+                    status.setReplicas(1);
+                    DeploymentSpec spec = new DeploymentSpec();
+                    spec.setReplicas(1);
+                    Deployment deployment = new Deployment();
+                    deployment.setSpec(spec);
+                    deployment.setStatus(status);
+                    return Optional.of((T) deployment);
+                }
+            };
+
     @Test
     public void verifyBasicReconcileLoop() {
         TestingFlinkService flinkService = new TestingFlinkService();
@@ -55,7 +83,13 @@ public class FlinkDeploymentControllerTest {
 
         UpdateControl<FlinkDeployment> updateControl;
 
-        updateControl = testController.reconcile(appCluster, null);
+        updateControl = testController.reconcile(appCluster, context);
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                FlinkDeploymentController.PORT_READY_DELAY_SECONDS * 1000,
+                (long) updateControl.getScheduleDelay().get());
+
+        updateControl = testController.reconcile(appCluster, context);
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
                 FlinkDeploymentController.REFRESH_SECONDS * 1000,
@@ -68,7 +102,7 @@ public class FlinkDeploymentControllerTest {
         assertNull(reconciliationStatus.getError());
         assertEquals(appCluster.getSpec(), 
reconciliationStatus.getLastReconciledSpec());
 
-        updateControl = testController.reconcile(appCluster, null);
+        updateControl = testController.reconcile(appCluster, context);
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
                 FlinkDeploymentController.REFRESH_SECONDS * 1000,
@@ -84,7 +118,7 @@ public class FlinkDeploymentControllerTest {
         // Send in invalid update
         appCluster = TestUtils.clone(appCluster);
         appCluster.getSpec().setJob(null);
-        updateControl = testController.reconcile(appCluster, null);
+        updateControl = testController.reconcile(appCluster, context);
         assertTrue(updateControl.isUpdateStatus());
         assertFalse(updateControl.getScheduleDelay().isPresent());
 
@@ -109,7 +143,7 @@ public class FlinkDeploymentControllerTest {
         appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
         appCluster.getSpec().getJob().setInitialSavepointPath("s0");
 
-        testController.reconcile(appCluster, null);
+        testController.reconcile(appCluster, context);
         List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals("s0", jobs.get(0).f0);
@@ -119,14 +153,14 @@ public class FlinkDeploymentControllerTest {
         appCluster.getSpec().getJob().setInitialSavepointPath("s1");
 
         // Send in a no-op change
-        testController.reconcile(appCluster, null);
+        testController.reconcile(appCluster, context);
         assertEquals(previousJobs, new ArrayList<>(flinkService.listJobs()));
 
         // Upgrade job
         appCluster = TestUtils.clone(appCluster);
         appCluster.getSpec().getJob().setParallelism(100);
 
-        testController.reconcile(appCluster, null);
+        testController.reconcile(appCluster, context);
         jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals("savepoint_0", jobs.get(0).f0);
@@ -134,12 +168,12 @@ public class FlinkDeploymentControllerTest {
         // Suspend job
         appCluster = TestUtils.clone(appCluster);
         appCluster.getSpec().getJob().setState(JobState.SUSPENDED);
-        testController.reconcile(appCluster, null);
+        testController.reconcile(appCluster, context);
 
         // Resume from last savepoint
         appCluster = TestUtils.clone(appCluster);
         appCluster.getSpec().getJob().setState(JobState.RUNNING);
-        testController.reconcile(appCluster, null);
+        testController.reconcile(appCluster, context);
         jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals("savepoint_1", jobs.get(0).f0);
@@ -153,7 +187,7 @@ public class FlinkDeploymentControllerTest {
         appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
         appCluster.getSpec().getJob().setInitialSavepointPath("s0");
 
-        testController.reconcile(appCluster, null);
+        testController.reconcile(appCluster, context);
         List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals("s0", jobs.get(0).f0);
@@ -162,7 +196,7 @@ public class FlinkDeploymentControllerTest {
         appCluster = TestUtils.clone(appCluster);
         appCluster.getSpec().getJob().setParallelism(100);
 
-        testController.reconcile(appCluster, null);
+        testController.reconcile(appCluster, context);
         jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals(null, jobs.get(0).f0);
@@ -170,12 +204,12 @@ public class FlinkDeploymentControllerTest {
         // Suspend job
         appCluster = TestUtils.clone(appCluster);
         appCluster.getSpec().getJob().setState(JobState.SUSPENDED);
-        testController.reconcile(appCluster, null);
+        testController.reconcile(appCluster, context);
 
         // Resume from empty state
         appCluster = TestUtils.clone(appCluster);
         appCluster.getSpec().getJob().setState(JobState.RUNNING);
-        testController.reconcile(appCluster, null);
+        testController.reconcile(appCluster, context);
         jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals(null, jobs.get(0).f0);

Reply via email to