This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b17754a [SPARK-32617][K8S][TESTS] Configure kubernetes client based on kubeconfig settings in kubernetes integration tests b17754a is described below commit b17754a8cbd2593eb2b1952e95a7eeb0f8e09cdb Author: “attilapiros” <piros.attila.zs...@gmail.com> AuthorDate: Wed Feb 24 11:46:27 2021 -0800 [SPARK-32617][K8S][TESTS] Configure kubernetes client based on kubeconfig settings in kubernetes integration tests ### What changes were proposed in this pull request? From [minikube version v1.1.0](https://github.com/kubernetes/minikube/blob/v1.1.0/CHANGELOG.md) kubectl is available as a command. So the kubeconfig settings can be accessed like: ``` $ minikube kubectl config view apiVersion: v1 clusters: - cluster: certificate-authority: /Users/attilazsoltpiros/.minikube/ca.crt server: https://127.0.0.1:32788 name: minikube contexts: - context: cluster: minikube namespace: default user: minikube name: minikube current-context: minikube kind: Config preferences: {} users: - name: minikube user: client-certificate: /Users/attilazsoltpiros/.minikube/profiles/minikube/client.crt client-key: /Users/attilazsoltpiros/.minikube/profiles/minikube/client.key ``` Here the vm-driver was docker and the server port (https://127.0.0.1:32788) is different from the hardcoded 8443. So the main part of this PR is introducing kubernetes client configuration based on the kubeconfig (output of `minikube kubectl config view`) in case of minikube versions after v1.1.0 and the old legacy way of configuration is also kept as minikube version should be supported back to v0.34.1 . Moreover as the old style of config parsing pattern wasn't sufficient in my case as when the `minikube kubectl config view` is called kubectl downloading message might be included before the first key I changed it even for the existent keys to be a consistent pattern in this file. The old parsing in an example: ``` private val HOST_PREFIX = "host:" val hostString = statusString.find(_.contains(s"$HOST_PREFIX ")) val status1 = hostString.get.split(HOST_PREFIX)(1) ``` The new parsing: ``` private val HOST_PREFIX = "host: " val hostString = statusString.find(_.contains(HOST_PREFIX)) hostString.get.split(HOST_PREFIX)(1) ``` So the PREFIX is extended with the extra space at the declaration (this way the two separate string operation are more safe and consistent with each other) and the replace is changed to split and getting the 2nd string from the result (which is guaranteed to contain only the text after the PREFIX when the PREFIX is a contained substring). Finally there is tiny change in `dev-run-integration-tests.sh` to introduce `--skip-building-dependencies` which switchs off building of maven dependencies of `kubernetes-integration-tests` from the Spark project. This could be used when only the `kubernetes-integration-tests` should be rebuilded as only the tests are modified. ### Why are the changes needed? Kubernetes client configuration based on kubeconfig settings is more reliable and provides a solution which is minikube version independent. ### Does this PR introduce _any_ user-facing change? No. This is only test code. ### How was this patch tested? tested manually on two minikube versions. Minikube v0.34.1: ``` $ minikube version minikube version: v0.34.1 $ grep "version\|building" resource-managers/kubernetes/integration-tests/target/integration-tests.log 20/12/12 12:52:25.135 ScalaTest-main-running-DiscoverySuite INFO Minikube: minikube version: v0.34.1 20/12/12 12:52:25.761 ScalaTest-main-running-DiscoverySuite INFO Minikube: building kubernetes config with apiVersion: v1, masterUrl: https://192.168.99.103:8443, caCertFile: /Users/attilazsoltpiros/.minikube/ca.crt, clientCertFile: /Users/attilazsoltpiros/.minikube/apiserver.crt, clientKeyFile: /Users/attilazsoltpiros/.minikube/apiserver.key ``` Minikube v1.15.1 ``` $ minikube version minikube version: v1.15.1 commit: 23f40a012abb52eff365ff99a709501a61ac5876 $ grep "version\|building" resource-managers/kubernetes/integration-tests/target/integration-tests.log 20/12/13 06:25:55.086 ScalaTest-main-running-DiscoverySuite INFO Minikube: minikube version: v1.15.1 20/12/13 06:25:55.597 ScalaTest-main-running-DiscoverySuite INFO Minikube: building kubernetes config with apiVersion: v1, masterUrl: https://192.168.64.4:8443, caCertFile: /Users/attilazsoltpiros/.minikube/ca.crt, clientCertFile: /Users/attilazsoltpiros/.minikube/profiles/minikube/client.crt, clientKeyFile: /Users/attilazsoltpiros/.minikube/profiles/minikube/client.key $ minikube kubectl config view apiVersion: v1 clusters: - cluster: certificate-authority: /Users/attilazsoltpiros/.minikube/ca.crt server: https://192.168.64.4:8443 name: minikube contexts: - context: cluster: minikube namespace: default user: minikube name: minikube current-context: minikube kind: Config preferences: {} users: - name: minikube user: client-certificate: /Users/attilazsoltpiros/.minikube/profiles/minikube/client.crt client-key: /Users/attilazsoltpiros/.minikube/profiles/minikube/client.key ``` Closes #30751 from attilapiros/SPARK-32617. Authored-by: “attilapiros” <piros.attila.zs...@gmail.com> Signed-off-by: Holden Karau <hka...@apple.com> --- .../dev/dev-run-integration-tests.sh | 6 +- .../backend/minikube/Minikube.scala | 138 ++++++++++++++------- 2 files changed, 101 insertions(+), 43 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh index b72a4f7..c87437e 100755 --- a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh +++ b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh @@ -35,6 +35,7 @@ CONTEXT= INCLUDE_TAGS="k8s" EXCLUDE_TAGS= JAVA_VERSION="8" +BUILD_DEPENDENCIES_MVN_FLAG="-am" HADOOP_PROFILE="hadoop-3.2" MVN="$TEST_ROOT_DIR/build/mvn" @@ -117,6 +118,9 @@ while (( "$#" )); do HADOOP_PROFILE="$2" shift ;; + --skip-building-dependencies) + BUILD_DEPENDENCIES_MVN_FLAG="" + ;; *) echo "Unexpected command line flag $2 $1." exit 1 @@ -176,4 +180,4 @@ properties+=( -Dlog4j.logger.org.apache.spark=DEBUG ) -$TEST_ROOT_DIR/build/mvn integration-test -f $TEST_ROOT_DIR/pom.xml -pl resource-managers/kubernetes/integration-tests -am -Pscala-$SCALA_VERSION -P$HADOOP_PROFILE -Pkubernetes -Pkubernetes-integration-tests ${properties[@]} +$TEST_ROOT_DIR/build/mvn integration-test -f $TEST_ROOT_DIR/pom.xml -pl resource-managers/kubernetes/integration-tests $BUILD_DEPENDENCIES_MVN_FLAG -Pscala-$SCALA_VERSION -P$HADOOP_PROFILE -Pkubernetes -Pkubernetes-integration-tests ${properties[@]} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala index c338752..5cb0685 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -16,9 +16,9 @@ */ package org.apache.spark.deploy.k8s.integrationtest.backend.minikube -import java.nio.file.{Files, Paths} +import java.nio.file.Paths -import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} +import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} import org.apache.spark.deploy.k8s.integrationtest.ProcessUtils import org.apache.spark.internal.Logging @@ -26,18 +26,26 @@ import org.apache.spark.internal.Logging // TODO support windows private[spark] object Minikube extends Logging { private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60 - private val HOST_PREFIX = "host:" - private val KUBELET_PREFIX = "kubelet:" - private val APISERVER_PREFIX = "apiserver:" - private val KUBECTL_PREFIX = "kubectl:" - private val KUBECONFIG_PREFIX = "kubeconfig:" + private val VERSION_PREFIX = "minikube version: " + private val HOST_PREFIX = "host: " + private val KUBELET_PREFIX = "kubelet: " + private val APISERVER_PREFIX = "apiserver: " + private val KUBECTL_PREFIX = "kubectl: " + private val KUBECONFIG_PREFIX = "kubeconfig: " private val MINIKUBE_VM_PREFIX = "minikubeVM: " private val MINIKUBE_PREFIX = "minikube: " private val MINIKUBE_PATH = ".minikube" + private val APIVERSION_PREFIX = "apiVersion: " + private val SERVER_PREFIX = "server: " + private val CA_PREFIX = "certificate-authority: " + private val CLIENTCERT_PREFIX = "client-certificate: " + private val CLIENTKEY_PREFIX = "client-key: " - def logVersion(): Unit = { - logInfo(executeMinikube("version").mkString("\n")) - } + lazy val minikubeVersionString = + executeMinikube("version").find(_.contains(VERSION_PREFIX)).get + + def logVersion(): Unit = + logInfo(minikubeVersionString) def getMinikubeIp: String = { val outputs = executeMinikube("ip") @@ -56,60 +64,106 @@ private[spark] object Minikube extends Logging { if (oldMinikube.isEmpty) { getIfNewMinikubeStatus(statusString) } else { - val finalStatusString = oldMinikube - .head - .replaceFirst(MINIKUBE_VM_PREFIX, "") - .replaceFirst(MINIKUBE_PREFIX, "") + val statusLine = oldMinikube.head + val finalStatusString = if (statusLine.contains(MINIKUBE_VM_PREFIX)) { + statusLine.split(MINIKUBE_VM_PREFIX)(1) + } else { + statusLine.split(MINIKUBE_PREFIX)(1) + } MinikubeStatus.unapply(finalStatusString) .getOrElse(throw new IllegalStateException(s"Unknown status $statusString")) } } def getKubernetesClient: DefaultKubernetesClient = { + // only the three-part version number is matched (the optional suffix like "-beta.0" is dropped) + val versionArrayOpt = "\\d+\\.\\d+\\.\\d+".r + .findFirstIn(minikubeVersionString.split(VERSION_PREFIX)(1)) + .map(_.split('.').map(_.toInt)) + + assert(versionArrayOpt.isDefined && versionArrayOpt.get.size == 3, + s"Unexpected version format detected in `$minikubeVersionString`." + + "For minikube version a three-part version number is expected (the optional non-numeric " + + "suffix is intentionally dropped)") + + val kubernetesConf = versionArrayOpt.get match { + case Array(x, y, z) => + // comparing the versions as the kubectl command is only introduced in version v1.1.0: + // https://github.com/kubernetes/minikube/blob/v1.1.0/CHANGELOG.md + if (Ordering.Tuple3[Int, Int, Int].gteq((x, y, z), (1, 1, 0))) { + kubectlBasedKubernetesClientConf + } else { + legacyKubernetesClientConf + } + } + new DefaultKubernetesClient(kubernetesConf) + } + + private def legacyKubernetesClientConf: Config = { val kubernetesMaster = s"https://${getMinikubeIp}:8443" val userHome = System.getProperty("user.home") - val minikubeBasePath = Paths.get(userHome, MINIKUBE_PATH).toString - val profileDir = if (Files.exists(Paths.get(minikubeBasePath, "apiserver.crt"))) { - // For Minikube <1.9 - "" - } else { - // For Minikube >=1.9 - Paths.get("profiles", executeMinikube("profile")(0)).toString - } - val apiServerCertPath = Paths.get(minikubeBasePath, profileDir, "apiserver.crt") - val apiServerKeyPath = Paths.get(minikubeBasePath, profileDir, "apiserver.key") - val kubernetesConf = new ConfigBuilder() - .withApiVersion("v1") - .withMasterUrl(kubernetesMaster) - .withCaCertFile( - Paths.get(userHome, MINIKUBE_PATH, "ca.crt").toFile.getAbsolutePath) - .withClientCertFile(apiServerCertPath.toFile.getAbsolutePath) - .withClientKeyFile(apiServerKeyPath.toFile.getAbsolutePath) + buildKubernetesClientConf( + "v1", + kubernetesMaster, + Paths.get(userHome, MINIKUBE_PATH, "ca.crt").toFile.getAbsolutePath, + Paths.get(userHome, MINIKUBE_PATH, "apiserver.crt").toFile.getAbsolutePath, + Paths.get(userHome, MINIKUBE_PATH, "apiserver.key").toFile.getAbsolutePath) + } + + private def kubectlBasedKubernetesClientConf: Config = { + val outputs = executeMinikube("kubectl config view") + val apiVersionString = outputs.find(_.contains(APIVERSION_PREFIX)) + val serverString = outputs.find(_.contains(SERVER_PREFIX)) + val caString = outputs.find(_.contains(CA_PREFIX)) + val clientCertString = outputs.find(_.contains(CLIENTCERT_PREFIX)) + val clientKeyString = outputs.find(_.contains(CLIENTKEY_PREFIX)) + + assert(!apiVersionString.isEmpty && !serverString.isEmpty && !caString.isEmpty && + !clientKeyString.isEmpty && !clientKeyString.isEmpty, + "The output of 'minikube kubectl config view' does not contain all the neccesary attributes") + + buildKubernetesClientConf( + apiVersionString.get.split(APIVERSION_PREFIX)(1), + serverString.get.split(SERVER_PREFIX)(1), + caString.get.split(CA_PREFIX)(1), + clientCertString.get.split(CLIENTCERT_PREFIX)(1), + clientKeyString.get.split(CLIENTKEY_PREFIX)(1)) + } + + private def buildKubernetesClientConf(apiVersion: String, masterUrl: String, caCertFile: String, + clientCertFile: String, clientKeyFile: String): Config = { + logInfo(s"building kubernetes config with apiVersion: $apiVersion, masterUrl: $masterUrl, " + + s"caCertFile: $caCertFile, clientCertFile: $clientCertFile, clientKeyFile: $clientKeyFile") + new ConfigBuilder() + .withApiVersion(apiVersion) + .withMasterUrl(masterUrl) + .withCaCertFile(caCertFile) + .withClientCertFile(clientCertFile) + .withClientKeyFile(clientKeyFile) .build() - new DefaultKubernetesClient(kubernetesConf) } // Covers minikube status output after Minikube V0.30. private def getIfNewMinikubeStatus(statusString: Seq[String]): MinikubeStatus.Value = { - val hostString = statusString.find(_.contains(s"$HOST_PREFIX ")) - val kubeletString = statusString.find(_.contains(s"$KUBELET_PREFIX ")) - val apiserverString = statusString.find(_.contains(s"$APISERVER_PREFIX ")) - val kubectlString = statusString.find(_.contains(s"$KUBECTL_PREFIX ")) - val kubeconfigString = statusString.find(_.contains(s"$KUBECONFIG_PREFIX ")) + val hostString = statusString.find(_.contains(HOST_PREFIX)) + val kubeletString = statusString.find(_.contains(KUBELET_PREFIX)) + val apiserverString = statusString.find(_.contains(APISERVER_PREFIX)) + val kubectlString = statusString.find(_.contains(KUBECTL_PREFIX)) + val kubeconfigString = statusString.find(_.contains(KUBECONFIG_PREFIX)) val hasConfigStatus = kubectlString.isDefined || kubeconfigString.isDefined if (hostString.isEmpty || kubeletString.isEmpty || apiserverString.isEmpty || !hasConfigStatus) { MinikubeStatus.NONE } else { - val status1 = hostString.get.replaceFirst(s"$HOST_PREFIX ", "") - val status2 = kubeletString.get.replaceFirst(s"$KUBELET_PREFIX ", "") - val status3 = apiserverString.get.replaceFirst(s"$APISERVER_PREFIX ", "") + val status1 = hostString.get.split(HOST_PREFIX)(1) + val status2 = kubeletString.get.split(KUBELET_PREFIX)(1) + val status3 = apiserverString.get.split(APISERVER_PREFIX)(1) val isConfigured = if (kubectlString.isDefined) { - val cfgStatus = kubectlString.get.replaceFirst(s"$KUBECTL_PREFIX ", "") + val cfgStatus = kubectlString.get.split(KUBECTL_PREFIX)(1) cfgStatus.contains("Correctly Configured:") } else { - kubeconfigString.get.replaceFirst(s"$KUBECONFIG_PREFIX ", "") == "Configured" + kubeconfigString.get.split(KUBECONFIG_PREFIX)(1) == "Configured" } if (isConfigured) { val stats = List(status1, status2, status3) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org