This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new c20228a [FLINK-26605] Check if JM can serve rest api calls every time before reconcile c20228a is described below commit c20228a96700a5efe41cd31cb23db1eca20ccd86 Author: Biao Geng <80749729+bgeng...@users.noreply.github.com> AuthorDate: Fri Mar 18 21:32:45 2022 +0800 [FLINK-26605] Check if JM can serve rest api calls every time before reconcile --- .github/workflows/ci.yml | 4 +-- Dockerfile | 2 +- .../config/FlinkOperatorConfiguration.java | 5 +++ .../operator/config/OperatorConfigOptions.java | 7 ++++ .../operator/observer/SessionObserver.java | 17 ++++++++- .../kubernetes/operator/service/FlinkService.java | 6 +++- .../kubernetes/operator/TestingFlinkService.java | 6 +++- .../controller/FlinkDeploymentControllerTest.java | 1 + .../operator/observer/JobObserverTest.java | 16 +++++++++ .../operator/observer/SessionObserverTest.java | 40 +++++++++++++++++----- 10 files changed, 89 insertions(+), 15 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 269010b..d1a86a7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,7 +52,7 @@ jobs: export SHELL=/bin/bash export DOCKER_BUILDKIT=1 eval $(minikube -p minikube docker-env) - docker build -f ./Dockerfile -t flink-kubernetes-operator:ci-latest . + docker build --progress=plain --no-cache -f ./Dockerfile -t flink-kubernetes-operator:ci-latest . docker images - name: Start the operator run: | @@ -107,7 +107,7 @@ jobs: export SHELL=/bin/bash export DOCKER_BUILDKIT=1 eval $(minikube -p minikube docker-env) - docker build -f ./Dockerfile -t flink-kubernetes-operator:ci-latest . + docker build --progress=plain --no-cache -f ./Dockerfile -t flink-kubernetes-operator:ci-latest --progress plain . docker images - name: Start the operator run: | diff --git a/Dockerfile b/Dockerfile index 2b6655f..adeaaf3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -35,7 +35,7 @@ COPY $WEBHOOK_DIR/src ./$WEBHOOK_DIR/src COPY tools ./tools -RUN --mount=type=cache,target=/root/.m2 mvn clean install +RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install # stage FROM openjdk:11-jre diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java index 016e494..4b273f1 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java @@ -35,6 +35,7 @@ public class FlinkOperatorConfiguration { Duration progressCheckInterval; Duration restApiReadyDelay; Duration savepointTriggerGracePeriod; + Duration flinkClientTimeout; String flinkServiceHostOverride; Set<String> watchedNamespaces; @@ -52,6 +53,9 @@ public class FlinkOperatorConfiguration { operatorConfig.get( OperatorConfigOptions.OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD); + Duration flinkClientTimeout = + operatorConfig.get(OperatorConfigOptions.OPERATOR_OBSERVER_FLINK_CLIENT_TIMEOUT); + String flinkServiceHostOverride = null; if (EnvUtils.get("KUBERNETES_SERVICE_HOST") == null) { // not running in k8s, simplify local development @@ -65,6 +69,7 @@ public class FlinkOperatorConfiguration { progressCheckInterval, restApiReadyDelay, savepointTriggerGracePeriod, + flinkClientTimeout, flinkServiceHostOverride, watchedNamespaces); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java index 67f47e7..9fbaca3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java @@ -53,4 +53,11 @@ public class OperatorConfigOptions { .defaultValue(Duration.ofSeconds(10)) .withDescription( "The interval before a savepoint trigger attempt is marked as unsuccessful"); + + public static final ConfigOption<Duration> OPERATOR_OBSERVER_FLINK_CLIENT_TIMEOUT = + ConfigOptions.key("operator.observer.flink.client.timeout.sec") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription( + "The timeout for the observer to wait the flink rest client to return."); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java index 5cb594c..e98e7e2 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java @@ -25,6 +25,8 @@ import org.apache.flink.kubernetes.operator.service.FlinkService; import io.javaoperatorsdk.operator.api.reconciler.Context; +import java.util.concurrent.TimeoutException; + /** The observer of the {@link org.apache.flink.kubernetes.operator.config.Mode#SESSION} cluster. */ public class SessionObserver extends BaseObserver { @@ -35,6 +37,19 @@ public class SessionObserver extends BaseObserver { @Override public void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) { - observeJmDeployment(flinkApp, context, effectiveConfig); + if (isClusterReady(flinkApp)) { + // Check if session cluster can serve rest calls following our practice in JobObserver + try { + flinkService.listJobs(effectiveConfig); + } catch (Exception e) { + logger.error("REST service in session cluster is bad now", e); + if (e instanceof TimeoutException) { + // check for problems with the underlying deployment + observeJmDeployment(flinkApp, context, effectiveConfig); + } + } + } else { + observeJmDeployment(flinkApp, context, effectiveConfig); + } } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java index c3faf97..568367f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java @@ -138,7 +138,11 @@ public class FlinkService { public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception { try (ClusterClient<String> clusterClient = getClusterClient(conf)) { - return clusterClient.listJobs().get(10, TimeUnit.SECONDS); + return clusterClient + .listJobs() + .get( + operatorConfiguration.getFlinkClientTimeout().getSeconds(), + TimeUnit.SECONDS); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 5f034d2..d6d6d6d 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -37,6 +37,7 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; /** Flink service mock for tests. */ @@ -79,9 +80,12 @@ public class TestingFlinkService extends FlinkService { @Override public List<JobStatusMessage> listJobs(Configuration conf) throws Exception { - if (jobs.isEmpty()) { + if (jobs.isEmpty() && !sessions.isEmpty()) { throw new Exception("Trying to list a job without submitting it"); } + if (!isPortReady) { + throw new TimeoutException("JM port is unavailable"); + } return jobs.stream().map(t -> t.f1).collect(Collectors.toList()); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 6093a8e..24d8f3f 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -78,6 +78,7 @@ public class FlinkDeploymentControllerTest { Duration.ofSeconds(2), Duration.ofSeconds(3), Duration.ofSeconds(4), + Duration.ofSeconds(5), null, Collections.emptySet()); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java index 8a90cf2..2db9425 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java @@ -106,6 +106,22 @@ public class JobObserverTest { .getJobStatus() .getStartTime())) >= 0); + // Test job manager is unavailable suddenly + flinkService.setPortReady(false); + observer.observe(deployment, readyContext, conf); + assertEquals( + JobManagerDeploymentStatus.DEPLOYING, + deployment.getStatus().getJobManagerDeploymentStatus()); + // Job manager recovers + flinkService.setPortReady(true); + observer.observe(deployment, readyContext, conf); + assertEquals( + JobManagerDeploymentStatus.DEPLOYED_NOT_READY, + deployment.getStatus().getJobManagerDeploymentStatus()); + observer.observe(deployment, readyContext, conf); + assertEquals( + JobManagerDeploymentStatus.READY, + deployment.getStatus().getJobManagerDeploymentStatus()); // Test listing failure flinkService.clear(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java index 1b79649..2719438 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java @@ -48,34 +48,53 @@ public class SessionObserverTest { @Test public void observeSessionCluster() { - FlinkService flinkService = new TestingFlinkService(); + TestingFlinkService flinkService = new TestingFlinkService(); SessionObserver observer = new SessionObserver( flinkService, FlinkOperatorConfiguration.fromConfiguration(new Configuration())); FlinkDeployment deployment = TestUtils.buildSessionCluster(); + Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new Configuration()); deployment .getStatus() .getReconciliationStatus() .setLastReconciledSpec(deployment.getSpec()); - observer.observe( - deployment, - readyContext, - FlinkUtils.getEffectiveConfig(deployment, new Configuration())); + observer.observe(deployment, readyContext, conf); assertEquals( JobManagerDeploymentStatus.DEPLOYED_NOT_READY, deployment.getStatus().getJobManagerDeploymentStatus()); - observer.observe( - deployment, - readyContext, - FlinkUtils.getEffectiveConfig(deployment, new Configuration())); + observer.observe(deployment, readyContext, conf); assertEquals( JobManagerDeploymentStatus.READY, deployment.getStatus().getJobManagerDeploymentStatus()); + // Observe again, the JM should be READY + observer.observe(deployment, readyContext, conf); + + assertEquals( + JobManagerDeploymentStatus.READY, + deployment.getStatus().getJobManagerDeploymentStatus()); + + // Test job manager is unavailable suddenly + flinkService.setPortReady(false); + observer.observe(deployment, readyContext, conf); + + assertEquals( + JobManagerDeploymentStatus.DEPLOYING, + deployment.getStatus().getJobManagerDeploymentStatus()); + // Job manager recovers + flinkService.setPortReady(true); + observer.observe(deployment, readyContext, conf); + assertEquals( + JobManagerDeploymentStatus.DEPLOYED_NOT_READY, + deployment.getStatus().getJobManagerDeploymentStatus()); + observer.observe(deployment, readyContext, conf); + assertEquals( + JobManagerDeploymentStatus.READY, + deployment.getStatus().getJobManagerDeploymentStatus()); } @Test @@ -93,6 +112,7 @@ public class SessionObserverTest { Duration.ofSeconds(2), Duration.ofSeconds(3), Duration.ofSeconds(4), + Duration.ofSeconds(5), null, Collections.emptySet()); FlinkOperatorConfiguration specificNsConfig = @@ -101,6 +121,7 @@ public class SessionObserverTest { Duration.ofSeconds(2), Duration.ofSeconds(3), Duration.ofSeconds(4), + Duration.ofSeconds(5), null, Set.of(deployment.getMetadata().getNamespace())); FlinkOperatorConfiguration multipleNsConfig = @@ -109,6 +130,7 @@ public class SessionObserverTest { Duration.ofSeconds(2), Duration.ofSeconds(3), Duration.ofSeconds(4), + Duration.ofSeconds(5), null, Set.of(deployment.getMetadata().getNamespace(), "ns"));