This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4080c4b  [SPARK-28937][SPARK-28936][KUBERNETES] Reduce test flakyness
4080c4b is described below

commit 4080c4beeb9cb27027145a37799ee8599ee51aab
Author: Holden Karau <hka...@apple.com>
AuthorDate: Fri Sep 20 10:08:16 2019 -0700

    [SPARK-28937][SPARK-28936][KUBERNETES] Reduce test flakyness
    
    ### What changes were proposed in this pull request?
    
    Switch from using a Thread sleep for waiting for commands to finish to just 
waiting for the command to finish with a watcher & improve the error messages 
in the SecretsTestsSuite.
    
    ### Why are the changes needed?
    Currently some of the Spark Kubernetes tests have race conditions with 
command execution, and the frequent use of eventually makes debugging test 
failures difficult.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests pass after removal of thread.sleep
    
    Closes #25765 from 
holdenk/SPARK-28937SPARK-28936-improve-kubernetes-integration-tests.
    
    Authored-by: Holden Karau <hka...@apple.com>
    Signed-off-by: Holden Karau <hka...@apple.com>
---
 .../k8s/integrationtest/SecretsTestsSuite.scala    | 51 +++++++++++++++-------
 .../spark/deploy/k8s/integrationtest/Utils.scala   | 40 +++++++++++++++--
 2 files changed, 72 insertions(+), 19 deletions(-)

diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
index cd61ea1..54a9dbf 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.deploy.k8s.integrationtest
 
+import java.util.Locale
+
 import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model.{Pod, SecretBuilder}
@@ -57,11 +59,17 @@ private[spark] trait SecretsTestsSuite { k8sSuite: 
KubernetesSuite =>
     createTestSecret()
     sparkAppConf
       .set(s"spark.kubernetes.driver.secrets.$ENV_SECRET_NAME", 
SECRET_MOUNT_PATH)
-      .set(s"spark.kubernetes.driver.secretKeyRef.USERNAME", 
s"$ENV_SECRET_NAME:username")
-      .set(s"spark.kubernetes.driver.secretKeyRef.PASSWORD", 
s"$ENV_SECRET_NAME:password")
+      .set(
+        s"spark.kubernetes.driver.secretKeyRef.${ENV_SECRET_KEY_1_CAP}",
+        s"$ENV_SECRET_NAME:${ENV_SECRET_KEY_1}")
+      .set(
+        s"spark.kubernetes.driver.secretKeyRef.${ENV_SECRET_KEY_2_CAP}",
+        s"$ENV_SECRET_NAME:${ENV_SECRET_KEY_2}")
       .set(s"spark.kubernetes.executor.secrets.$ENV_SECRET_NAME", 
SECRET_MOUNT_PATH)
-      .set(s"spark.kubernetes.executor.secretKeyRef.USERNAME", 
s"$ENV_SECRET_NAME:username")
-      .set(s"spark.kubernetes.executor.secretKeyRef.PASSWORD", 
s"$ENV_SECRET_NAME:password")
+      .set(s"spark.kubernetes.executor.secretKeyRef.${ENV_SECRET_KEY_1_CAP}",
+        s"${ENV_SECRET_NAME}:$ENV_SECRET_KEY_1")
+      .set(s"spark.kubernetes.executor.secretKeyRef.${ENV_SECRET_KEY_2_CAP}",
+        s"${ENV_SECRET_NAME}:$ENV_SECRET_KEY_2")
     try {
       runSparkPiAndVerifyCompletion(
         driverPodChecker = (driverPod: Pod) => {
@@ -81,19 +89,30 @@ private[spark] trait SecretsTestsSuite { k8sSuite: 
KubernetesSuite =>
   }
 
   private def checkSecrets(pod: Pod): Unit = {
-    Eventually.eventually(TIMEOUT, INTERVAL) {
-      implicit val podName: String = pod.getMetadata.getName
-      implicit val components: KubernetesTestComponents = 
kubernetesTestComponents
+    logDebug(s"Checking secrets for ${pod}")
+    // Wait for the pod to become ready & have secrets provisioned
+    implicit val podName: String = pod.getMetadata.getName
+    implicit val components: KubernetesTestComponents = 
kubernetesTestComponents
+    val env = Eventually.eventually(TIMEOUT, INTERVAL) {
+      logDebug(s"Checking env of ${pod.getMetadata().getName()} ....")
       val env = Utils.executeCommand("env")
-      assert(env.toString.contains(ENV_SECRET_VALUE_1))
-      assert(env.toString.contains(ENV_SECRET_VALUE_2))
-      val fileUsernameContents = Utils
-        .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
-      val filePasswordContents = Utils
-        .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
-      assert(fileUsernameContents.toString.trim.equals(ENV_SECRET_VALUE_1))
-      assert(filePasswordContents.toString.trim.equals(ENV_SECRET_VALUE_2))
+      assert(!env.isEmpty)
+      env
     }
+    env.toString should include 
(s"${ENV_SECRET_KEY_1_CAP}=$ENV_SECRET_VALUE_1")
+    env.toString should include 
(s"${ENV_SECRET_KEY_2_CAP}=$ENV_SECRET_VALUE_2")
+
+    // Make sure our secret files are mounted correctly
+    val files = Utils.executeCommand("ls", s"$SECRET_MOUNT_PATH")
+    files should include (ENV_SECRET_KEY_1)
+    files should include (ENV_SECRET_KEY_2)
+    // Validate the contents
+    val fileUsernameContents = Utils
+      .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
+    fileUsernameContents.toString.trim should equal(ENV_SECRET_VALUE_1)
+    val filePasswordContents = Utils
+      .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
+    filePasswordContents.toString.trim should equal(ENV_SECRET_VALUE_2)
   }
 }
 
@@ -102,6 +121,8 @@ private[spark] object SecretsTestsSuite {
   val SECRET_MOUNT_PATH = "/etc/secret"
   val ENV_SECRET_KEY_1 = "username"
   val ENV_SECRET_KEY_2 = "password"
+  val ENV_SECRET_KEY_1_CAP = ENV_SECRET_KEY_1.toUpperCase(Locale.ROOT)
+  val ENV_SECRET_KEY_2_CAP = ENV_SECRET_KEY_2.toUpperCase(Locale.ROOT)
   val ENV_SECRET_VALUE_1 = "secretusername"
   val ENV_SECRET_VALUE_2 = "secretpassword"
 }
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
index a687a1b..9f85805 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
@@ -18,9 +18,12 @@ package org.apache.spark.deploy.k8s.integrationtest
 
 import java.io.{Closeable, File, PrintWriter}
 import java.nio.file.{Files, Path}
+import java.util.concurrent.CountDownLatch
 
 import scala.collection.JavaConverters._
 
+import io.fabric8.kubernetes.client.dsl.ExecListener
+import okhttp3.Response
 import org.apache.commons.io.output.ByteArrayOutputStream
 
 import org.apache.spark.{SPARK_VERSION, SparkException}
@@ -45,20 +48,49 @@ object Utils extends Logging {
       implicit podName: String,
       kubernetesTestComponents: KubernetesTestComponents): String = {
     val out = new ByteArrayOutputStream()
-    val watch = kubernetesTestComponents
+    val pod = kubernetesTestComponents
       .kubernetesClient
       .pods()
       .withName(podName)
+    // Avoid timing issues by looking for open/close
+    class ReadyListener extends ExecListener {
+      val openLatch: CountDownLatch = new CountDownLatch(1)
+      val closeLatch: CountDownLatch = new CountDownLatch(1)
+
+      override def onOpen(response: Response) {
+        openLatch.countDown()
+      }
+
+      override def onClose(a: Int, b: String) {
+        closeLatch.countDown()
+      }
+
+      override def onFailure(e: Throwable, r: Response) {
+      }
+
+      def waitForInputStreamToConnect(): Unit = {
+        openLatch.await()
+      }
+
+      def waitForClose(): Unit = {
+        closeLatch.await()
+      }
+    }
+    val listener = new ReadyListener()
+    val watch = pod
       .readingInput(System.in)
       .writingOutput(out)
       .writingError(System.err)
       .withTTY()
+      .usingListener(listener)
       .exec(cmd.toArray: _*)
-    // wait to get some result back
-    Thread.sleep(1000)
+    // under load sometimes the stdout isn't connected by the time we try to 
read from it.
+    listener.waitForInputStreamToConnect()
+    listener.waitForClose()
     watch.close()
     out.flush()
-    out.toString()
+    val result = out.toString()
+    result
   }
 
   def createTempFile(contents: String, hostPath: String): String = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to