This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4080c4b [SPARK-28937][SPARK-28936][KUBERNETES] Reduce test flakyness 4080c4b is described below commit 4080c4beeb9cb27027145a37799ee8599ee51aab Author: Holden Karau <hka...@apple.com> AuthorDate: Fri Sep 20 10:08:16 2019 -0700 [SPARK-28937][SPARK-28936][KUBERNETES] Reduce test flakyness ### What changes were proposed in this pull request? Switch from using a Thread sleep for waiting for commands to finish to just waiting for the command to finish with a watcher & improve the error messages in the SecretsTestsSuite. ### Why are the changes needed? Currently some of the Spark Kubernetes tests have race conditions with command execution, and the frequent use of eventually makes debugging test failures difficult. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Existing tests pass after removal of thread.sleep Closes #25765 from holdenk/SPARK-28937SPARK-28936-improve-kubernetes-integration-tests. Authored-by: Holden Karau <hka...@apple.com> Signed-off-by: Holden Karau <hka...@apple.com> --- .../k8s/integrationtest/SecretsTestsSuite.scala | 51 +++++++++++++++------- .../spark/deploy/k8s/integrationtest/Utils.scala | 40 +++++++++++++++-- 2 files changed, 72 insertions(+), 19 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala index cd61ea1..54a9dbf 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.deploy.k8s.integrationtest +import java.util.Locale + import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{Pod, SecretBuilder} @@ -57,11 +59,17 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite => createTestSecret() sparkAppConf .set(s"spark.kubernetes.driver.secrets.$ENV_SECRET_NAME", SECRET_MOUNT_PATH) - .set(s"spark.kubernetes.driver.secretKeyRef.USERNAME", s"$ENV_SECRET_NAME:username") - .set(s"spark.kubernetes.driver.secretKeyRef.PASSWORD", s"$ENV_SECRET_NAME:password") + .set( + s"spark.kubernetes.driver.secretKeyRef.${ENV_SECRET_KEY_1_CAP}", + s"$ENV_SECRET_NAME:${ENV_SECRET_KEY_1}") + .set( + s"spark.kubernetes.driver.secretKeyRef.${ENV_SECRET_KEY_2_CAP}", + s"$ENV_SECRET_NAME:${ENV_SECRET_KEY_2}") .set(s"spark.kubernetes.executor.secrets.$ENV_SECRET_NAME", SECRET_MOUNT_PATH) - .set(s"spark.kubernetes.executor.secretKeyRef.USERNAME", s"$ENV_SECRET_NAME:username") - .set(s"spark.kubernetes.executor.secretKeyRef.PASSWORD", s"$ENV_SECRET_NAME:password") + .set(s"spark.kubernetes.executor.secretKeyRef.${ENV_SECRET_KEY_1_CAP}", + s"${ENV_SECRET_NAME}:$ENV_SECRET_KEY_1") + .set(s"spark.kubernetes.executor.secretKeyRef.${ENV_SECRET_KEY_2_CAP}", + s"${ENV_SECRET_NAME}:$ENV_SECRET_KEY_2") try { runSparkPiAndVerifyCompletion( driverPodChecker = (driverPod: Pod) => { @@ -81,19 +89,30 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite => } private def checkSecrets(pod: Pod): Unit = { - Eventually.eventually(TIMEOUT, INTERVAL) { - implicit val podName: String = pod.getMetadata.getName - implicit val components: KubernetesTestComponents = kubernetesTestComponents + logDebug(s"Checking secrets for ${pod}") + // Wait for the pod to become ready & have secrets provisioned + implicit val podName: String = pod.getMetadata.getName + implicit val components: KubernetesTestComponents = kubernetesTestComponents + val env = Eventually.eventually(TIMEOUT, INTERVAL) { + logDebug(s"Checking env of ${pod.getMetadata().getName()} ....") val env = Utils.executeCommand("env") - assert(env.toString.contains(ENV_SECRET_VALUE_1)) - assert(env.toString.contains(ENV_SECRET_VALUE_2)) - val fileUsernameContents = Utils - .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1") - val filePasswordContents = Utils - .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2") - assert(fileUsernameContents.toString.trim.equals(ENV_SECRET_VALUE_1)) - assert(filePasswordContents.toString.trim.equals(ENV_SECRET_VALUE_2)) + assert(!env.isEmpty) + env } + env.toString should include (s"${ENV_SECRET_KEY_1_CAP}=$ENV_SECRET_VALUE_1") + env.toString should include (s"${ENV_SECRET_KEY_2_CAP}=$ENV_SECRET_VALUE_2") + + // Make sure our secret files are mounted correctly + val files = Utils.executeCommand("ls", s"$SECRET_MOUNT_PATH") + files should include (ENV_SECRET_KEY_1) + files should include (ENV_SECRET_KEY_2) + // Validate the contents + val fileUsernameContents = Utils + .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1") + fileUsernameContents.toString.trim should equal(ENV_SECRET_VALUE_1) + val filePasswordContents = Utils + .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2") + filePasswordContents.toString.trim should equal(ENV_SECRET_VALUE_2) } } @@ -102,6 +121,8 @@ private[spark] object SecretsTestsSuite { val SECRET_MOUNT_PATH = "/etc/secret" val ENV_SECRET_KEY_1 = "username" val ENV_SECRET_KEY_2 = "password" + val ENV_SECRET_KEY_1_CAP = ENV_SECRET_KEY_1.toUpperCase(Locale.ROOT) + val ENV_SECRET_KEY_2_CAP = ENV_SECRET_KEY_2.toUpperCase(Locale.ROOT) val ENV_SECRET_VALUE_1 = "secretusername" val ENV_SECRET_VALUE_2 = "secretpassword" } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala index a687a1b..9f85805 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala @@ -18,9 +18,12 @@ package org.apache.spark.deploy.k8s.integrationtest import java.io.{Closeable, File, PrintWriter} import java.nio.file.{Files, Path} +import java.util.concurrent.CountDownLatch import scala.collection.JavaConverters._ +import io.fabric8.kubernetes.client.dsl.ExecListener +import okhttp3.Response import org.apache.commons.io.output.ByteArrayOutputStream import org.apache.spark.{SPARK_VERSION, SparkException} @@ -45,20 +48,49 @@ object Utils extends Logging { implicit podName: String, kubernetesTestComponents: KubernetesTestComponents): String = { val out = new ByteArrayOutputStream() - val watch = kubernetesTestComponents + val pod = kubernetesTestComponents .kubernetesClient .pods() .withName(podName) + // Avoid timing issues by looking for open/close + class ReadyListener extends ExecListener { + val openLatch: CountDownLatch = new CountDownLatch(1) + val closeLatch: CountDownLatch = new CountDownLatch(1) + + override def onOpen(response: Response) { + openLatch.countDown() + } + + override def onClose(a: Int, b: String) { + closeLatch.countDown() + } + + override def onFailure(e: Throwable, r: Response) { + } + + def waitForInputStreamToConnect(): Unit = { + openLatch.await() + } + + def waitForClose(): Unit = { + closeLatch.await() + } + } + val listener = new ReadyListener() + val watch = pod .readingInput(System.in) .writingOutput(out) .writingError(System.err) .withTTY() + .usingListener(listener) .exec(cmd.toArray: _*) - // wait to get some result back - Thread.sleep(1000) + // under load sometimes the stdout isn't connected by the time we try to read from it. + listener.waitForInputStreamToConnect() + listener.waitForClose() watch.close() out.flush() - out.toString() + val result = out.toString() + result } def createTempFile(contents: String, hostPath: String): String = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org