This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 261fed2076efe385ede148152c946eb7c5f1f48d Author: Usamah Jassat <us...@amazon.com> AuthorDate: Mon Jun 13 15:00:12 2022 +0100 [FLINK-27444] Add KubernetesStandaloneClusterDescriptor and FlinkStandaloneKubeClient --- flink-kubernetes-standalone/pom.xml | 1 - .../Fabric8FlinkStandaloneKubeClient.java | 71 ++++++ .../kubeclient/FlinkStandaloneKubeClient.java | 27 ++ .../CmdStandaloneJobManagerDecorator.java | 6 + .../decorators/UserLibMountDecorator.java | 13 + .../StandaloneKubernetesJobManagerFactory.java | 124 ++++++++++ .../StandaloneKubernetesTaskManagerFactory.java | 90 +++++++ .../StandaloneKubernetesJobManagerParameters.java | 8 + .../KubernetesStandaloneClusterDescriptor.java | 257 +++++++++++++++++++ .../src/main/resources/META-INF/NOTICE | 5 + .../Fabric8FlinkStandaloneKubeClientTest.java | 104 ++++++++ .../CmdStandaloneJobManagerDecoratorTest.java | 26 +- .../decorators/UserLibMountDecoratorTest.java | 50 +++- .../StandaloneKubernetesJobManagerFactoryTest.java | 273 +++++++++++++++++++++ ...StandaloneKubernetesTaskManagerFactoryTest.java | 150 +++++++++++ .../KubernetesStandaloneClusterDescriptorTest.java | 182 ++++++++++++++ 16 files changed, 1383 insertions(+), 4 deletions(-) diff --git a/flink-kubernetes-standalone/pom.xml b/flink-kubernetes-standalone/pom.xml index f2a1c2f6..6125efe3 100644 --- a/flink-kubernetes-standalone/pom.xml +++ b/flink-kubernetes-standalone/pom.xml @@ -29,7 +29,6 @@ under the License. </parent> - <artifactId>flink-kubernetes-standalone</artifactId> <name>Flink Kubernetes Standalone</name> <packaging>jar</packaging> diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java new file mode 100644 index 00000000..45454dd7 --- /dev/null +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient; +import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils; + +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.NamespacedKubernetesClient; + +import java.util.concurrent.ExecutorService; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The Implementation of {@link FlinkStandaloneKubeClient}. */ +public class Fabric8FlinkStandaloneKubeClient extends Fabric8FlinkKubeClient + implements FlinkStandaloneKubeClient { + + private final NamespacedKubernetesClient internalClient; + + public Fabric8FlinkStandaloneKubeClient( + Configuration flinkConfig, + NamespacedKubernetesClient client, + ExecutorService executorService) { + super(flinkConfig, client, executorService); + internalClient = checkNotNull(client); + } + + @Override + public void createTaskManagerDeployment(Deployment tmDeployment) { + this.internalClient.apps().deployments().create(tmDeployment); + } + + @Override + public void stopAndCleanupCluster(String clusterId) { + this.internalClient + .apps() + .deployments() + .withName(StandaloneKubernetesUtils.getJobManagerDeploymentName(clusterId)) + .cascading(true) + .delete(); + + this.internalClient + .apps() + .deployments() + .withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId)) + .cascading(true) + .delete(); + } + + public static NamespacedKubernetesClient createNamespacedKubeClient(String namespace) { + return new DefaultKubernetesClient().inNamespace(namespace); + } +} diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/FlinkStandaloneKubeClient.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/FlinkStandaloneKubeClient.java new file mode 100644 index 00000000..9a7f8207 --- /dev/null +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/FlinkStandaloneKubeClient.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient; + +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; + +import io.fabric8.kubernetes.api.model.apps.Deployment; + +/** Extension of the FlinkKubeClient that is used for Flink standalone deployments. */ +public interface FlinkStandaloneKubeClient extends FlinkKubeClient { + void createTaskManagerDeployment(Deployment tmDeployment); +} diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java index 430abe2e..9bfcc866 100644 --- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java @@ -79,6 +79,12 @@ public class CmdStandaloneJobManagerDecorator extends AbstractKubernetesStepDeco args.add(mainClass); } + Boolean allowNonRestoredState = kubernetesJobManagerParameters.getAllowNonRestoredState(); + if (allowNonRestoredState != null) { + args.add("--allowNonRestoredState"); + args.add(allowNonRestoredState.toString()); + } + return args; } } diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java index a66026b2..207637ac 100644 --- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java @@ -27,6 +27,9 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.Volume; import io.fabric8.kubernetes.api.model.VolumeBuilder; +import io.fabric8.kubernetes.api.model.VolumeMount; + +import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -51,6 +54,10 @@ public class UserLibMountDecorator extends AbstractKubernetesStepDecorator { return flinkPod; } + if (mainContainerHasUserLibPath(flinkPod)) { + return flinkPod; + } + final Volume userLibVolume = new VolumeBuilder() .withName(USER_LIB_VOLUME) @@ -78,4 +85,10 @@ public class UserLibMountDecorator extends AbstractKubernetesStepDecorator { .withMainContainer(mountedMainContainer) .build(); } + + private boolean mainContainerHasUserLibPath(FlinkPod flinkPod) { + List<VolumeMount> volumeMounts = flinkPod.getMainContainer().getVolumeMounts(); + return volumeMounts.stream() + .anyMatch(volumeMount -> volumeMount.getMountPath().startsWith(USER_LIB_PATH)); + } } diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java new file mode 100644 index 00000000..f789db93 --- /dev/null +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.factory; + +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification; +import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.InitJobManagerDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesOwnerReference; +import org.apache.flink.kubernetes.operator.kubeclient.decorators.CmdStandaloneJobManagerDecorator; +import org.apache.flink.kubernetes.operator.kubeclient.decorators.UserLibMountDecorator; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters; +import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.util.Preconditions; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Utility class for constructing all the Kubernetes for the JobManager deploying in standalone + * mode. This can include the Deployment, the ConfigMap(s), and the Service(s). + */ +public class StandaloneKubernetesJobManagerFactory { + public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecification( + FlinkPod podTemplate, + StandaloneKubernetesJobManagerParameters kubernetesJobManagerParameters) + throws IOException { + FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy(); + List<HasMetadata> accompanyingResources = new ArrayList<>(); + + final KubernetesStepDecorator[] stepDecorators = + new KubernetesStepDecorator[] { + new InitJobManagerDecorator(kubernetesJobManagerParameters), + new EnvSecretsDecorator(kubernetesJobManagerParameters), + new MountSecretsDecorator(kubernetesJobManagerParameters), + new CmdStandaloneJobManagerDecorator(kubernetesJobManagerParameters), + new InternalServiceDecorator(kubernetesJobManagerParameters), + new ExternalServiceDecorator(kubernetesJobManagerParameters), + new HadoopConfMountDecorator(kubernetesJobManagerParameters), + new KerberosMountDecorator(kubernetesJobManagerParameters), + new FlinkConfMountDecorator(kubernetesJobManagerParameters), + new UserLibMountDecorator(kubernetesJobManagerParameters), + }; + + for (KubernetesStepDecorator stepDecorator : stepDecorators) { + flinkPod = stepDecorator.decorateFlinkPod(flinkPod); + accompanyingResources.addAll(stepDecorator.buildAccompanyingKubernetesResources()); + } + + final Deployment deployment = + createJobManagerDeployment(flinkPod, kubernetesJobManagerParameters); + + return new KubernetesJobManagerSpecification(deployment, accompanyingResources); + } + + private static Deployment createJobManagerDeployment( + FlinkPod flinkPod, KubernetesJobManagerParameters kubernetesJobManagerParameters) { + final Container resolvedMainContainer = flinkPod.getMainContainer(); + + final Pod resolvedPod = + new PodBuilder(flinkPod.getPodWithoutMainContainer()) + .editOrNewSpec() + .addToContainers(resolvedMainContainer) + .endSpec() + .build(); + return new DeploymentBuilder() + .withApiVersion(Constants.APPS_API_VERSION) + .editOrNewMetadata() + .withName( + StandaloneKubernetesUtils.getJobManagerDeploymentName( + kubernetesJobManagerParameters.getClusterId())) + .withAnnotations(kubernetesJobManagerParameters.getAnnotations()) + .withLabels(kubernetesJobManagerParameters.getLabels()) + .withOwnerReferences( + kubernetesJobManagerParameters.getOwnerReference().stream() + .map(e -> KubernetesOwnerReference.fromMap(e).getInternalResource()) + .collect(Collectors.toList())) + .endMetadata() + .editOrNewSpec() + .withReplicas(kubernetesJobManagerParameters.getReplicas()) + .editOrNewTemplate() + .withMetadata(resolvedPod.getMetadata()) + .withSpec(resolvedPod.getSpec()) + .endTemplate() + .editOrNewSelector() + .addToMatchLabels(kubernetesJobManagerParameters.getSelectors()) + .endSelector() + .endSpec() + .build(); + } +} diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java new file mode 100644 index 00000000..8bcd9990 --- /dev/null +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.factory; + +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator; +import org.apache.flink.kubernetes.operator.kubeclient.decorators.CmdStandaloneTaskManagerDecorator; +import org.apache.flink.kubernetes.operator.kubeclient.decorators.InitStandaloneTaskManagerDecorator; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters; +import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.util.Preconditions; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; + +/** Utility class for constructing the TaskManager Deployment when deploying in standalone mode. */ +public class StandaloneKubernetesTaskManagerFactory { + + public static Deployment buildKubernetesTaskManagerDeployment( + FlinkPod podTemplate, + StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters) { + FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy(); + + final KubernetesStepDecorator[] stepDecorators = + new KubernetesStepDecorator[] { + new InitStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters), + new EnvSecretsDecorator(kubernetesTaskManagerParameters), + new MountSecretsDecorator(kubernetesTaskManagerParameters), + new CmdStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters), + new HadoopConfMountDecorator(kubernetesTaskManagerParameters), + new KerberosMountDecorator(kubernetesTaskManagerParameters), + new FlinkConfMountDecorator(kubernetesTaskManagerParameters) + }; + + for (KubernetesStepDecorator stepDecorator : stepDecorators) { + flinkPod = stepDecorator.decorateFlinkPod(flinkPod); + } + + final Pod resolvedPod = + new PodBuilder(flinkPod.getPodWithoutMainContainer()) + .editOrNewSpec() + .addToContainers(flinkPod.getMainContainer()) + .endSpec() + .build(); + + return new DeploymentBuilder() + .withApiVersion(Constants.APPS_API_VERSION) + .editOrNewMetadata() + .withName( + StandaloneKubernetesUtils.getTaskManagerDeploymentName( + kubernetesTaskManagerParameters.getClusterId())) + .withAnnotations(kubernetesTaskManagerParameters.getAnnotations()) + .withLabels(kubernetesTaskManagerParameters.getLabels()) + .endMetadata() + .editOrNewSpec() + .withReplicas(kubernetesTaskManagerParameters.getReplicas()) + .editOrNewTemplate() + .withMetadata(resolvedPod.getMetadata()) + .withSpec(resolvedPod.getSpec()) + .endTemplate() + .editOrNewSelector() + .addToMatchLabels(kubernetesTaskManagerParameters.getSelectors()) + .endSelector() + .endSpec() + .build(); + } +} diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java index bdc28d48..fbfcbdeb 100644 --- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java @@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal; import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import java.util.Collections; import java.util.HashMap; @@ -80,4 +81,11 @@ public class StandaloneKubernetesJobManagerParameters extends KubernetesJobManag } return flinkConfig.getString(ApplicationConfiguration.APPLICATION_MAIN_CLASS); } + + public Boolean getAllowNonRestoredState() { + if (flinkConfig.contains(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)) { + return flinkConfig.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE); + } + return null; + } } diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java new file mode 100644 index 00000000..cfed561a --- /dev/null +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.standalone; + +import org.apache.flink.client.deployment.ClusterDeploymentException; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.KubernetesClusterDescriptor; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.Endpoint; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification; +import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient; +import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesJobManagerFactory; +import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesTaskManagerFactory; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.rpc.AddressResolution; + +import io.fabric8.kubernetes.api.model.apps.Deployment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Standalone Kubernetes specific {@link ClusterDescriptor} implementation. */ +public class KubernetesStandaloneClusterDescriptor extends KubernetesClusterDescriptor { + + private static final Logger LOG = + LoggerFactory.getLogger(KubernetesStandaloneClusterDescriptor.class); + + private static final String CLUSTER_DESCRIPTION = "Standalone Kubernetes cluster"; + + private final Configuration flinkConfig; + + private final FlinkStandaloneKubeClient client; + + private final String clusterId; + + public KubernetesStandaloneClusterDescriptor( + Configuration flinkConfig, FlinkStandaloneKubeClient client) { + super(flinkConfig, client); + this.flinkConfig = checkNotNull(flinkConfig); + this.client = checkNotNull(client); + this.clusterId = + checkNotNull( + flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID), + "ClusterId must be specified!"); + } + + @Override + public String getClusterDescription() { + return CLUSTER_DESCRIPTION; + } + + @Override + public ClusterClientProvider<String> deploySessionCluster( + ClusterSpecification clusterSpecification) throws ClusterDeploymentException { + flinkConfig.set( + StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, + StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION); + + final ClusterClientProvider clusterClientProvider = + deployClusterInternal(clusterSpecification); + + try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Created flink session cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider<String> deployApplicationCluster( + ClusterSpecification clusterSpecification, + ApplicationConfiguration applicationConfiguration) + throws ClusterDeploymentException { + flinkConfig.set( + StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, + StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION); + applicationConfiguration.applyToConfiguration(flinkConfig); + final ClusterClientProvider clusterClientProvider = + deployClusterInternal(clusterSpecification); + + try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Created flink application cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + private ClusterClientProvider<String> deployClusterInternal( + ClusterSpecification clusterSpecification) throws ClusterDeploymentException { + + // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values. + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT); + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT); + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT); + + if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) { + flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, + HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE, + flinkConfig.get(JobManagerOptions.PORT)); + } + + // Deploy JM + resources + try { + KubernetesJobManagerSpecification jmSpec = getJobManagerSpec(clusterSpecification); + Deployment tmDeployment = getTaskManagerDeployment(clusterSpecification); + + client.createJobManagerComponent(jmSpec); + client.createTaskManagerDeployment(tmDeployment); + + return createClusterClientProvider(clusterId); + } catch (Exception e) { + try { + LOG.warn( + "Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.", + clusterId); + client.stopAndCleanupCluster(clusterId); + } catch (Exception e1) { + LOG.info( + "Failed to stop and clean up the Kubernetes cluster \"{}\".", + clusterId, + e1); + } + throw new ClusterDeploymentException( + "Could not create Kubernetes cluster \"" + clusterId + "\".", e); + } + } + + private KubernetesJobManagerSpecification getJobManagerSpec( + ClusterSpecification clusterSpecification) throws IOException { + final StandaloneKubernetesJobManagerParameters kubernetesJobManagerParameters = + new StandaloneKubernetesJobManagerParameters(flinkConfig, clusterSpecification); + + final FlinkPod podTemplate = + kubernetesJobManagerParameters + .getPodTemplateFilePath() + .map( + file -> + KubernetesUtils.loadPodFromTemplateFile( + client, file, Constants.MAIN_CONTAINER_NAME)) + .orElse(new FlinkPod.Builder().build()); + + return StandaloneKubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( + podTemplate, kubernetesJobManagerParameters); + } + + private Deployment getTaskManagerDeployment(ClusterSpecification clusterSpecification) { + final StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters = + new StandaloneKubernetesTaskManagerParameters(flinkConfig, clusterSpecification); + + final FlinkPod podTemplate = + kubernetesTaskManagerParameters + .getPodTemplateFilePath() + .map( + file -> + KubernetesUtils.loadPodFromTemplateFile( + client, file, Constants.MAIN_CONTAINER_NAME)) + .orElse(new FlinkPod.Builder().build()); + + return StandaloneKubernetesTaskManagerFactory.buildKubernetesTaskManagerDeployment( + podTemplate, kubernetesTaskManagerParameters); + } + + private ClusterClientProvider<String> createClusterClientProvider(String clusterId) { + return () -> { + final Configuration configuration = new Configuration(flinkConfig); + + final Optional<Endpoint> restEndpoint = client.getRestEndpoint(clusterId); + + if (restEndpoint.isPresent()) { + configuration.setString(RestOptions.ADDRESS, restEndpoint.get().getAddress()); + configuration.setInteger(RestOptions.PORT, restEndpoint.get().getPort()); + } else { + throw new RuntimeException( + new ClusterRetrieveException( + "Could not get the rest endpoint of " + clusterId)); + } + + try { + // Flink client will always use Kubernetes service to contact with jobmanager. So we + // have a pre-configured web monitor address. Using StandaloneClientHAServices to + // create RestClusterClient is reasonable. + return new RestClusterClient<>( + configuration, + clusterId, + (effectiveConfiguration, fatalErrorHandler) -> + new StandaloneClientHAServices( + getWebMonitorAddress(effectiveConfiguration))); + } catch (Exception e) { + throw new RuntimeException( + new ClusterRetrieveException("Could not create the RestClusterClient.", e)); + } + }; + } + + private String getWebMonitorAddress(Configuration configuration) throws Exception { + AddressResolution resolution = AddressResolution.TRY_ADDRESS_RESOLUTION; + final KubernetesConfigOptions.ServiceExposedType serviceType = + configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE); + if (serviceType.isClusterIP()) { + resolution = AddressResolution.NO_ADDRESS_RESOLUTION; + LOG.warn( + "Please note that Flink client operations(e.g. cancel, list, stop," + + " savepoint, etc.) won't work from outside the Kubernetes cluster" + + " since '{}' has been set to {}.", + KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(), + serviceType); + } + return HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution); + } +} diff --git a/flink-kubernetes-standalone/src/main/resources/META-INF/NOTICE b/flink-kubernetes-standalone/src/main/resources/META-INF/NOTICE new file mode 100644 index 00000000..993ca343 --- /dev/null +++ b/flink-kubernetes-standalone/src/main/resources/META-INF/NOTICE @@ -0,0 +1,5 @@ +flink-kubernetes-operator-standalone +Copyright 2014-2022 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java new file mode 100644 index 00000000..9c52eb32 --- /dev/null +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient; + +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification; +import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesJobManagerFactory; +import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesTaskManagerFactory; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters; +import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils; +import org.apache.flink.util.concurrent.Executors; + +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.client.NamespacedKubernetesClient; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** @link Fabric8FlinkStandaloneKubeClient unit tests */ +@EnableKubernetesMockClient(crud = true) +public class Fabric8FlinkStandaloneKubeClientTest { + private static final String NAMESPACE = "test"; + + KubernetesMockServer mockServer; + protected NamespacedKubernetesClient kubernetesClient; + private FlinkStandaloneKubeClient flinkKubeClient; + private StandaloneKubernetesTaskManagerParameters taskManagerParameters; + private Deployment tmDeployment; + private ClusterSpecification clusterSpecification; + private Configuration flinkConfig = new Configuration(); + + @BeforeEach + public final void setup() { + flinkConfig = TestUtils.createTestFlinkConfig(); + kubernetesClient = mockServer.createClient(); + + flinkKubeClient = + new Fabric8FlinkStandaloneKubeClient( + flinkConfig, kubernetesClient, Executors.newDirectExecutorService()); + clusterSpecification = TestUtils.createClusterSpecification(); + + taskManagerParameters = + new StandaloneKubernetesTaskManagerParameters(flinkConfig, clusterSpecification); + + tmDeployment = + StandaloneKubernetesTaskManagerFactory.buildKubernetesTaskManagerDeployment( + new FlinkPod.Builder().build(), taskManagerParameters); + } + + @Test + public void testCreateTaskManagerDeployment() { + flinkKubeClient.createTaskManagerDeployment(tmDeployment); + + final List<Deployment> resultedDeployments = + kubernetesClient.apps().deployments().inNamespace(NAMESPACE).list().getItems(); + assertEquals(1, resultedDeployments.size()); + } + + @Test + public void testStopAndCleanupCluster() throws Exception { + ClusterSpecification clusterSpecification = TestUtils.createClusterSpecification(); + StandaloneKubernetesJobManagerParameters jmParameters = + new StandaloneKubernetesJobManagerParameters(flinkConfig, clusterSpecification); + KubernetesJobManagerSpecification jmSpec = + StandaloneKubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( + new FlinkPod.Builder().build(), jmParameters); + + flinkKubeClient.createJobManagerComponent(jmSpec); + flinkKubeClient.createTaskManagerDeployment(tmDeployment); + + List<Deployment> resultedDeployments = + kubernetesClient.apps().deployments().inNamespace(NAMESPACE).list().getItems(); + assertEquals(2, resultedDeployments.size()); + + flinkKubeClient.stopAndCleanupCluster(taskManagerParameters.getClusterId()); + + resultedDeployments = + kubernetesClient.apps().deployments().inNamespace(NAMESPACE).list().getItems(); + assertEquals(0, resultedDeployments.size()); + } +} diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java index 49355574..8bdb0f33 100644 --- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java @@ -18,11 +18,13 @@ package org.apache.flink.kubernetes.operator.kubeclient.decorators; import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.FlinkPod; import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters; import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -67,7 +69,29 @@ public class CmdStandaloneJobManagerDecoratorTest { } @Test - public void testApplicationCommandAdded() { + public void testApplicationCommandAndArgsAdded() { + final String testMainClass = "org.main.class"; + configuration.set( + StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, + StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION); + configuration.set(ApplicationConfiguration.APPLICATION_MAIN_CLASS, testMainClass); + configuration.set(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, false); + + FlinkPod decoratedPod = decorator.decorateFlinkPod(new FlinkPod.Builder().build()); + assertThat( + decoratedPod.getMainContainer().getCommand(), containsInAnyOrder(MOCK_ENTRYPATH)); + assertThat( + decoratedPod.getMainContainer().getArgs(), + containsInAnyOrder( + CmdStandaloneJobManagerDecorator.APPLICATION_MODE_ARG, + "--allowNonRestoredState", + "false", + "--job-classname", + testMainClass)); + } + + @Test + public void testApplicationOptionalArgsNotAdded() { configuration.set( StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION); diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java index 07c8af20..1a867521 100644 --- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java @@ -23,6 +23,10 @@ import org.apache.flink.kubernetes.kubeclient.FlinkPod; import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters; import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.PodSpecBuilder; +import io.fabric8.kubernetes.api.model.VolumeBuilder; import io.fabric8.kubernetes.api.model.VolumeMount; import org.junit.jupiter.api.Test; @@ -32,7 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class UserLibMountDecoratorTest { @Test - public void testVolumeAdded() { + public void testVolumeAddedApplicationMode() { StandaloneKubernetesJobManagerParameters jmParameters = createJmParamsWithClusterMode( StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION); @@ -54,7 +58,7 @@ public class UserLibMountDecoratorTest { } @Test - public void testVolumeNotAdded() { + public void testVolumeNotAddedSessionMode() { StandaloneKubernetesJobManagerParameters jmParameters = createJmParamsWithClusterMode( StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION); @@ -69,6 +73,48 @@ public class UserLibMountDecoratorTest { assertEquals(0, decoratedPod.getPodWithoutMainContainer().getSpec().getVolumes().size()); } + @Test + public void testVolumeNotAddedExistingVolumeMount() { + StandaloneKubernetesJobManagerParameters jmParameters = + createJmParamsWithClusterMode( + StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION); + UserLibMountDecorator decorator = new UserLibMountDecorator(jmParameters); + + final String volName = "flink-artifact"; + final String userLibPath = "/opt/flink/usrlib"; + + FlinkPod baseFlinkPod = + new FlinkPod.Builder() + .withMainContainer( + new ContainerBuilder() + .addNewVolumeMount() + .withName(volName) + .withMountPath(userLibPath) + .endVolumeMount() + .build()) + .withPod( + new PodBuilder() + .withSpec( + new PodSpecBuilder() + .addNewVolumeLike( + new VolumeBuilder() + .withName(volName) + .withNewEmptyDir() + .endEmptyDir() + .build()) + .endVolume() + .build()) + .build()) + .build(); + + assertEquals(1, baseFlinkPod.getMainContainer().getVolumeMounts().size()); + assertEquals(1, baseFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes().size()); + + FlinkPod decoratedPod = decorator.decorateFlinkPod(baseFlinkPod); + assertEquals(1, decoratedPod.getMainContainer().getVolumeMounts().size()); + assertEquals(1, decoratedPod.getPodWithoutMainContainer().getSpec().getVolumes().size()); + } + private StandaloneKubernetesJobManagerParameters createJmParamsWithClusterMode( StandaloneKubernetesConfigOptionsInternal.ClusterMode clusterMode) { return new StandaloneKubernetesJobManagerParameters( diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactoryTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactoryTest.java new file mode 100644 index 00000000..832ce97a --- /dev/null +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactoryTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.factory; + +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification; +import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator; +import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.ParametersTestBase; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters; +import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils; +import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils; +import org.apache.flink.kubernetes.utils.Constants; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerPort; +import io.fabric8.kubernetes.api.model.ContainerPortBuilder; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.PodSpec; +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE; +import static org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils.CLUSTER_ID; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +/** @link StandaloneKubernetesJobManagerFactory unit tests */ +public class StandaloneKubernetesJobManagerFactoryTest extends ParametersTestBase { + + KubernetesJobManagerSpecification jmSpec; + + @BeforeEach + public void setup() throws Exception { + setupFlinkConfig(); + flinkConfig.set(KubernetesConfigOptions.FLINK_CONF_DIR, "/missing/dir"); + FlinkPod podTemplate = createPodTemplate(); + StandaloneKubernetesJobManagerParameters tmParameters = + new StandaloneKubernetesJobManagerParameters( + flinkConfig, TestUtils.createClusterSpecification()); + jmSpec = + StandaloneKubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( + podTemplate, tmParameters); + } + + @Test + public void testDeploymentMetadata() { + ObjectMeta deploymentMetadata = jmSpec.getDeployment().getMetadata(); + assertEquals( + StandaloneKubernetesUtils.getJobManagerDeploymentName(CLUSTER_ID), + deploymentMetadata.getName()); + + final Map<String, String> expectedLabels = + new HashMap<>(StandaloneKubernetesUtils.getJobManagerSelectors(CLUSTER_ID)); + expectedLabels.putAll(userLabels); + assertEquals(expectedLabels, deploymentMetadata.getLabels()); + + assertEquals(userAnnotations, deploymentMetadata.getAnnotations()); + } + + @Test + public void testDeploymentSpec() { + DeploymentSpec deploymentSpec = jmSpec.getDeployment().getSpec(); + + assertEquals(1, deploymentSpec.getReplicas()); + assertEquals( + StandaloneKubernetesUtils.getJobManagerSelectors(CLUSTER_ID), + deploymentSpec.getSelector().getMatchLabels()); + } + + @Test + public void testTemplateMetadata() { + final ObjectMeta podMetadata = jmSpec.getDeployment().getSpec().getTemplate().getMetadata(); + + final Map<String, String> expectedLabels = + new HashMap<>(StandaloneKubernetesUtils.getJobManagerSelectors(CLUSTER_ID)); + expectedLabels.putAll(userLabels); + expectedLabels.putAll(templateLabels); + assertEquals(expectedLabels, podMetadata.getLabels()); + + final Map<String, String> expectedAnnotations = new HashMap<>(userAnnotations); + expectedAnnotations.putAll(templateAnnotations); + assertEquals(expectedAnnotations, podMetadata.getAnnotations()); + } + + @Test + public void testTemplateSpec() { + final PodSpec podSpec = jmSpec.getDeployment().getSpec().getTemplate().getSpec(); + + assertEquals(1, podSpec.getContainers().size()); + assertEquals(TestUtils.SERVICE_ACCOUNT, podSpec.getServiceAccountName()); + // Config and secret volumes + assertEquals(3, podSpec.getVolumes().size()); + + final Container mainContainer = podSpec.getContainers().get(0); + assertEquals(Constants.MAIN_CONTAINER_NAME, mainContainer.getName()); + assertEquals(TestUtils.IMAGE, mainContainer.getImage()); + assertEquals(TestUtils.IMAGE_POLICY, mainContainer.getImagePullPolicy()); + + final Map<String, String> envs = new HashMap<>(); + mainContainer.getEnv().forEach(env -> envs.put(env.getName(), env.getValue())); + + Map<String, String> expectedEnvs = new HashMap<>(templateEnvs); + expectedEnvs.put(Constants.ENV_FLINK_POD_IP_ADDRESS, null); + assertEquals(expectedEnvs, envs); + + final List<ContainerPort> expectedContainerPorts = + Arrays.asList( + new ContainerPortBuilder() + .withName(Constants.JOB_MANAGER_RPC_PORT_NAME) + .withContainerPort(6123) + .build(), + new ContainerPortBuilder() + .withName(Constants.BLOB_SERVER_PORT_NAME) + .withContainerPort(Constants.BLOB_SERVER_PORT) + .build(), + new ContainerPortBuilder() + .withName(Constants.REST_PORT_NAME) + .withContainerPort(Constants.REST_PORT) + .build(), + new ContainerPortBuilder() + .withName(TEMPLATE_PORT_NAME) + .withContainerPort(TEMPLATE_PORT) + .build()); + + assertThat(mainContainer.getPorts(), containsInAnyOrder(expectedContainerPorts.toArray())); + + final ResourceRequirements resourceRequirements = mainContainer.getResources(); + + final Map<String, Quantity> requests = resourceRequirements.getRequests(); + assertEquals(Double.toString(TestUtils.JOB_MANAGER_CPU), requests.get("cpu").getAmount()); + assertEquals( + String.valueOf(TestUtils.JOB_MANAGER_MEMORY_MB), + requests.get("memory").getAmount()); + + final Map<String, Quantity> limits = resourceRequirements.getLimits(); + assertEquals(Double.toString(TestUtils.JOB_MANAGER_CPU), limits.get("cpu").getAmount()); + assertEquals( + String.valueOf(TestUtils.JOB_MANAGER_MEMORY_MB), limits.get("memory").getAmount()); + + assertEquals(3, mainContainer.getVolumeMounts().size()); + } + + @Test + public void testAdditionalResourcesSize() throws IOException { + final List<HasMetadata> resultAdditionalResources = this.jmSpec.getAccompanyingResources(); + assertEquals(3, resultAdditionalResources.size()); + + final List<HasMetadata> resultServices = + resultAdditionalResources.stream() + .filter(x -> x instanceof Service) + .collect(Collectors.toList()); + assertEquals(2, resultServices.size()); + + final List<HasMetadata> resultConfigMaps = + resultAdditionalResources.stream() + .filter(x -> x instanceof ConfigMap) + .collect(Collectors.toList()); + assertEquals(1, resultConfigMaps.size()); + } + + @Test + public void testServices() throws IOException { + final List<Service> resultServices = + this.jmSpec.getAccompanyingResources().stream() + .filter(x -> x instanceof Service) + .map(x -> (Service) x) + .collect(Collectors.toList()); + + assertEquals(2, resultServices.size()); + + final List<Service> internalServiceCandidates = + resultServices.stream() + .filter( + x -> + x.getMetadata() + .getName() + .equals( + InternalServiceDecorator + .getInternalServiceName( + CLUSTER_ID))) + .collect(Collectors.toList()); + assertEquals(1, internalServiceCandidates.size()); + + final List<Service> restServiceCandidates = + resultServices.stream() + .filter( + x -> + x.getMetadata() + .getName() + .equals( + ExternalServiceDecorator + .getExternalServiceName( + CLUSTER_ID))) + .collect(Collectors.toList()); + assertEquals(1, restServiceCandidates.size()); + + final Service resultInternalService = internalServiceCandidates.get(0); + assertEquals(2, resultInternalService.getMetadata().getLabels().size()); + + assertNull(resultInternalService.getSpec().getType()); + assertEquals( + HeadlessClusterIPService.HEADLESS_CLUSTER_IP, + resultInternalService.getSpec().getClusterIP()); + assertEquals(2, resultInternalService.getSpec().getPorts().size()); + assertEquals(3, resultInternalService.getSpec().getSelector().size()); + + final Service resultRestService = restServiceCandidates.get(0); + assertEquals(2, resultRestService.getMetadata().getLabels().size()); + + assertEquals( + REST_SERVICE_EXPOSED_TYPE.defaultValue().toString(), + resultRestService.getSpec().getType()); + assertEquals(1, resultRestService.getSpec().getPorts().size()); + assertEquals(3, resultRestService.getSpec().getSelector().size()); + } + + @Test + public void testFlinkConfConfigMap() throws IOException { + final ConfigMap resultConfigMap = + (ConfigMap) + jmSpec.getAccompanyingResources().stream() + .filter( + x -> + x instanceof ConfigMap + && x.getMetadata() + .getName() + .equals( + FlinkConfMountDecorator + .getFlinkConfConfigMapName( + CLUSTER_ID))) + .collect(Collectors.toList()) + .get(0); + + assertEquals(2, resultConfigMap.getMetadata().getLabels().size()); + + final Map<String, String> resultData = resultConfigMap.getData(); + assertEquals(1, resultData.size()); + } +} diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactoryTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactoryTest.java new file mode 100644 index 00000000..8c8d4e62 --- /dev/null +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactoryTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.factory; + +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.ParametersTestBase; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters; +import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils; +import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils; +import org.apache.flink.kubernetes.utils.Constants; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerPort; +import io.fabric8.kubernetes.api.model.ContainerPortBuilder; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.PodSpec; +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** @link StandaloneKubernetesJobManagerFactory unit tests */ +public class StandaloneKubernetesTaskManagerFactoryTest extends ParametersTestBase { + + private Deployment deployment; + + @BeforeEach + public void setup() { + setupFlinkConfig(); + FlinkPod podTemplate = createPodTemplate(); + StandaloneKubernetesTaskManagerParameters tmParameters = + new StandaloneKubernetesTaskManagerParameters( + flinkConfig, TestUtils.createClusterSpecification()); + deployment = + StandaloneKubernetesTaskManagerFactory.buildKubernetesTaskManagerDeployment( + podTemplate, tmParameters); + } + + @Test + public void testDeploymentMetadata() { + assertEquals( + StandaloneKubernetesUtils.getTaskManagerDeploymentName(TestUtils.CLUSTER_ID), + deployment.getMetadata().getName()); + + final Map<String, String> expectedLabels = + new HashMap<>( + StandaloneKubernetesUtils.getTaskManagerSelectors(TestUtils.CLUSTER_ID)); + expectedLabels.putAll(userLabels); + assertEquals(expectedLabels, deployment.getMetadata().getLabels()); + + assertEquals(userAnnotations, deployment.getMetadata().getAnnotations()); + } + + @Test + public void testDeploymentSpec() { + assertEquals(1, deployment.getSpec().getReplicas()); + assertEquals( + StandaloneKubernetesUtils.getTaskManagerSelectors(TestUtils.CLUSTER_ID), + deployment.getSpec().getSelector().getMatchLabels()); + } + + @Test + public void testTemplateMetadata() { + final ObjectMeta podMetadata = deployment.getSpec().getTemplate().getMetadata(); + + final Map<String, String> expectedLabels = + new HashMap<>( + StandaloneKubernetesUtils.getTaskManagerSelectors(TestUtils.CLUSTER_ID)); + expectedLabels.putAll(userLabels); + expectedLabels.putAll(templateLabels); + assertEquals(expectedLabels, podMetadata.getLabels()); + + final Map<String, String> expectedAnnotations = new HashMap<>(userAnnotations); + expectedAnnotations.putAll(templateAnnotations); + assertEquals(expectedAnnotations, podMetadata.getAnnotations()); + } + + @Test + public void testTemplateSpec() { + final PodSpec podSpec = deployment.getSpec().getTemplate().getSpec(); + + assertEquals(1, podSpec.getContainers().size()); + assertEquals(TestUtils.SERVICE_ACCOUNT, podSpec.getServiceAccountName()); + // Config and secret volumes + assertEquals(2, podSpec.getVolumes().size()); + + final Container mainContainer = podSpec.getContainers().get(0); + assertEquals(Constants.MAIN_CONTAINER_NAME, mainContainer.getName()); + assertEquals(TestUtils.IMAGE, mainContainer.getImage()); + assertEquals(TestUtils.IMAGE_POLICY, mainContainer.getImagePullPolicy()); + + final Map<String, String> envs = new HashMap<>(); + mainContainer.getEnv().forEach(env -> envs.put(env.getName(), env.getValue())); + + assertEquals(templateEnvs, envs); + + final List<ContainerPort> expectedContainerPorts = + Arrays.asList( + new ContainerPortBuilder() + .withName(Constants.TASK_MANAGER_RPC_PORT_NAME) + .withContainerPort(Constants.TASK_MANAGER_RPC_PORT) + .build(), + new ContainerPortBuilder() + .withName(TEMPLATE_PORT_NAME) + .withContainerPort(TEMPLATE_PORT) + .build()); + + assertThat(mainContainer.getPorts(), containsInAnyOrder(expectedContainerPorts.toArray())); + + final ResourceRequirements resourceRequirements = mainContainer.getResources(); + + final Map<String, Quantity> requests = resourceRequirements.getRequests(); + assertEquals(Double.toString(TestUtils.TASK_MANAGER_CPU), requests.get("cpu").getAmount()); + assertEquals( + String.valueOf(TestUtils.TASK_MANAGER_MEMORY_MB), + requests.get("memory").getAmount()); + + final Map<String, Quantity> limits = resourceRequirements.getLimits(); + assertEquals(Double.toString(TestUtils.TASK_MANAGER_CPU), limits.get("cpu").getAmount()); + assertEquals( + String.valueOf(TestUtils.TASK_MANAGER_MEMORY_MB), limits.get("memory").getAmount()); + + assertEquals(2, mainContainer.getVolumeMounts().size()); + } +} diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java new file mode 100644 index 00000000..8aae921e --- /dev/null +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.standalone; + +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; +import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient; +import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient; +import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.util.concurrent.Executors; + +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.client.NamespacedKubernetesClient; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** @link KubernetesStandaloneClusterDescriptor unit tests */ +@EnableKubernetesMockClient(crud = true) +public class KubernetesStandaloneClusterDescriptorTest { + + private KubernetesStandaloneClusterDescriptor clusterDescriptor; + KubernetesMockServer mockServer; + private NamespacedKubernetesClient kubernetesClient; + private FlinkStandaloneKubeClient flinkKubeClient; + private Configuration flinkConfig = new Configuration(); + + @BeforeEach + public void setup() { + flinkConfig = TestUtils.createTestFlinkConfig(); + kubernetesClient = mockServer.createClient().inNamespace(TestUtils.TEST_NAMESPACE); + flinkKubeClient = + new Fabric8FlinkStandaloneKubeClient( + flinkConfig, kubernetesClient, Executors.newDirectExecutorService()); + + clusterDescriptor = new KubernetesStandaloneClusterDescriptor(flinkConfig, flinkKubeClient); + } + + @Test + public void testDeploySessionCluster() throws Exception { + ClusterSpecification clusterSpecification = TestUtils.createClusterSpecification(); + + flinkConfig.setString(BlobServerOptions.PORT, String.valueOf(0)); + flinkConfig.setString(TaskManagerOptions.RPC_PORT, String.valueOf(0)); + flinkConfig.setString(RestOptions.BIND_PORT, String.valueOf(0)); + + ClusterClientProvider clusterClientProvider = + clusterDescriptor.deploySessionCluster(clusterSpecification); + + List<Deployment> deployments = + kubernetesClient + .apps() + .deployments() + .inNamespace(TestUtils.TEST_NAMESPACE) + .list() + .getItems(); + String expectedJMDeploymentName = TestUtils.CLUSTER_ID; + String expectedTMDeploymentName = TestUtils.CLUSTER_ID + "-taskmanager"; + + assertEquals(2, deployments.size()); + assertThat( + deployments.stream() + .map(d -> d.getMetadata().getName()) + .collect(Collectors.toList()), + containsInAnyOrder(expectedJMDeploymentName, expectedTMDeploymentName)); + assertEquals( + flinkConfig.get(BlobServerOptions.PORT), + String.valueOf(Constants.BLOB_SERVER_PORT)); + assertEquals( + flinkConfig.get(TaskManagerOptions.RPC_PORT), + String.valueOf(Constants.TASK_MANAGER_RPC_PORT)); + assertEquals(flinkConfig.get(RestOptions.BIND_PORT), String.valueOf(Constants.REST_PORT)); + + Deployment jmDeployment = + deployments.stream() + .filter(d -> d.getMetadata().getName().equals(expectedJMDeploymentName)) + .findFirst() + .orElse(null); + assertTrue( + jmDeployment.getSpec().getTemplate().getSpec().getContainers().stream() + .anyMatch(c -> c.getArgs().contains("jobmanager"))); + + ClusterClient clusterClient = clusterClientProvider.getClusterClient(); + + String expectedWebUrl = + String.format( + "http://%s:%d", + ExternalServiceDecorator.getNamespacedExternalServiceName( + TestUtils.CLUSTER_ID, TestUtils.TEST_NAMESPACE), + Constants.REST_PORT); + assertEquals(expectedWebUrl, clusterClient.getWebInterfaceURL()); + } + + @Test + public void testDeployApplicationCluster() throws Exception { + ClusterSpecification clusterSpecification = TestUtils.createClusterSpecification(); + + flinkConfig.setString(BlobServerOptions.PORT, String.valueOf(0)); + flinkConfig.setString(TaskManagerOptions.RPC_PORT, String.valueOf(0)); + flinkConfig.setString(RestOptions.BIND_PORT, String.valueOf(0)); + + ClusterClientProvider clusterClientProvider = + clusterDescriptor.deployApplicationCluster( + clusterSpecification, + ApplicationConfiguration.fromConfiguration(flinkConfig)); + + List<Deployment> deployments = + kubernetesClient + .apps() + .deployments() + .inNamespace(TestUtils.TEST_NAMESPACE) + .list() + .getItems(); + String expectedJMDeploymentName = TestUtils.CLUSTER_ID; + String expectedTMDeploymentName = TestUtils.CLUSTER_ID + "-taskmanager"; + + assertEquals(2, deployments.size()); + assertThat( + deployments.stream() + .map(d -> d.getMetadata().getName()) + .collect(Collectors.toList()), + containsInAnyOrder(expectedJMDeploymentName, expectedTMDeploymentName)); + assertEquals( + flinkConfig.get(BlobServerOptions.PORT), + String.valueOf(Constants.BLOB_SERVER_PORT)); + assertEquals( + flinkConfig.get(TaskManagerOptions.RPC_PORT), + String.valueOf(Constants.TASK_MANAGER_RPC_PORT)); + assertEquals(flinkConfig.get(RestOptions.BIND_PORT), String.valueOf(Constants.REST_PORT)); + + Deployment jmDeployment = + deployments.stream() + .filter(d -> d.getMetadata().getName().equals(expectedJMDeploymentName)) + .findFirst() + .orElse(null); + assertTrue( + jmDeployment.getSpec().getTemplate().getSpec().getContainers().stream() + .anyMatch(c -> c.getArgs().contains("standalone-job"))); + + ClusterClient clusterClient = clusterClientProvider.getClusterClient(); + + String expectedWebUrl = + String.format( + "http://%s:%d", + ExternalServiceDecorator.getNamespacedExternalServiceName( + TestUtils.CLUSTER_ID, TestUtils.TEST_NAMESPACE), + Constants.REST_PORT); + assertEquals(expectedWebUrl, clusterClient.getWebInterfaceURL()); + } +}