This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new e5744b99598 [SPARK-40902][MESOS][TESTS] Fix issue with mesos tests 
failing due to quick submission of drivers
e5744b99598 is described below

commit e5744b9959806b7dd9573c1ad5280023c8ca2efd
Author: Mridul <mridulatgmail.com>
AuthorDate: Mon Oct 24 10:51:45 2022 -0700

    [SPARK-40902][MESOS][TESTS] Fix issue with mesos tests failing due to quick 
submission of drivers
    
    ### What changes were proposed in this pull request?
    
    ##### Quick submission of drivers in tests to mesos scheduler results in 
dropping drivers
    
    Queued drivers in `MesosClusterScheduler` are ordered based on 
`MesosDriverDescription` - and the ordering used checks for priority (if 
different), followed by comparison of submission time.
    For two driver submissions with same priority, if made in quick succession 
(such that submission time is same due to millisecond granularity of Date), 
this results in dropping the second `MesosDriverDescription` from 
`queuedDrivers` (since `driverOrdering` returns `0` when comparing the 
descriptions).
    
    This PR fixes the more immediate issue with tests.
    
    ### Why are the changes needed?
    
    Flakey tests, [see 
here](https://lists.apache.org/thread/jof098qxp0s6qqmt9qwv52f9665b1pjg) for an 
example.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    Fixing only tests for now - as mesos support is deprecated, not changing 
scheduler itself to address this.
    
    ### How was this patch tested?
    
    Fixes unit tests
    
    Closes #38378 from mridulm/fix_MesosClusterSchedulerSuite.
    
    Authored-by: Mridul <mridulatgmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit 60b1056307b3ee9d880a936f3a97c5fb16a2b698)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../cluster/mesos/MesosClusterSchedulerSuite.scala | 66 +++++++++++++---------
 1 file changed, 40 insertions(+), 26 deletions(-)

diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 9a1862d32dc..102dd4b76d2 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler.cluster.mesos
 
 import java.util.{Collection, Collections, Date}
+import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConverters._
 
@@ -40,6 +41,19 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
   private var driver: SchedulerDriver = _
   private var scheduler: MesosClusterScheduler = _
 
+  private val submissionTime = new AtomicLong(System.currentTimeMillis())
+
+  // Queued drivers in MesosClusterScheduler are ordered based on 
MesosDriverDescription
+  // The default ordering checks for priority, followed by submission time. 
For two driver
+  // submissions with same priority and if made in quick succession (such that 
submission
+  // time is same due to millisecond granularity), this results in dropping the
+  // second MesosDriverDescription from the queuedDrivers - as driverOrdering
+  // returns 0 when comparing the descriptions. Ensure two seperate submissions
+  // have differnt dates
+  private def getDate: Date = {
+    new Date(submissionTime.incrementAndGet())
+  }
+
   private def setScheduler(sparkConfVars: Map[String, String] = null): Unit = {
     val conf = new SparkConf()
     conf.setMaster("mesos://localhost:5050")
@@ -68,7 +82,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
       command,
       Map[String, String](),
       submissionId,
-      new Date())
+      getDate)
   }
 
   test("can queue drivers") {
@@ -108,7 +122,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
         Map((config.EXECUTOR_HOME.key, "test"), ("spark.app.name", "test"),
           (config.DRIVER_MEMORY_OVERHEAD.key, "0")),
         "s1",
-        new Date()))
+        getDate))
     assert(response.success)
     val offer = Offer.newBuilder()
       .addResources(
@@ -213,7 +227,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
         Map("spark.mesos.executor.home" -> "test",
           "spark.app.name" -> "test"),
         "s1",
-        new Date()))
+        getDate))
     assert(response.success)
 
     val offer = Utils.createOffer("o1", "s1", mem*2, cpu)
@@ -240,7 +254,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
         Map("spark.mesos.executor.home" -> "test",
           "spark.app.name" -> "test"),
         "s1",
-        new Date()))
+        getDate))
     assert(response.success)
 
     val offer = Utils.createOffer("o1", "s1", mem*2, cpu)
@@ -270,7 +284,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
           config.DRIVER_MEMORY_OVERHEAD.key -> "0"
         ),
         "s1",
-        new Date()))
+        getDate))
     assert(response.success)
 
     val offer = Utils.createOffer("o1", "s1", mem, cpu)
@@ -296,7 +310,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
           config.NETWORK_LABELS.key -> "key1:val1,key2:val2",
           config.DRIVER_MEMORY_OVERHEAD.key -> "0"),
         "s1",
-        new Date()))
+        getDate))
 
     assert(response.success)
 
@@ -327,7 +341,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
           "spark.app.name" -> "test",
           config.DRIVER_MEMORY_OVERHEAD.key -> "0"),
         "s1",
-        new Date()))
+        getDate))
 
     assert(response.success)
 
@@ -352,7 +366,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
           "spark.app.name" -> "test",
           config.DRIVER_MEMORY_OVERHEAD.key -> "0"),
         "s1",
-        new Date()))
+        getDate))
 
     assert(response.success)
 
@@ -378,7 +392,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
           "spark.app.name" -> "test",
           config.DRIVER_MEMORY_OVERHEAD.key -> "0"),
         "s1",
-        new Date()))
+        getDate))
 
     assert(response.success)
 
@@ -413,7 +427,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
             config.DRIVER_CONSTRAINTS.key -> driverConstraints,
             config.DRIVER_MEMORY_OVERHEAD.key -> "0"),
           "s1",
-          new Date()))
+          getDate))
       assert(response.success)
     }
 
@@ -452,7 +466,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
           config.DRIVER_LABELS.key -> "key:value",
           config.DRIVER_MEMORY_OVERHEAD.key -> "0"),
         "s1",
-        new Date()))
+        getDate))
 
     assert(response.success)
 
@@ -474,7 +488,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
 
     val response = scheduler.submitDriver(
       new MesosDriverDescription("d1", "jar", 100, 1, true, command,
-        Map((config.EXECUTOR_HOME.key, "test"), ("spark.app.name", "test")), 
"s1", new Date()))
+        Map((config.EXECUTOR_HOME.key, "test"), ("spark.app.name", "test")), 
"s1", getDate))
     assert(response.success)
     val agentId = SlaveID.newBuilder().setValue("s1").build()
     val offer = Offer.newBuilder()
@@ -533,7 +547,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
 
     val response = scheduler.submitDriver(
       new MesosDriverDescription("d1", "jar", 100, 1, true, command,
-        Map(("spark.mesos.executor.home", "test"), ("spark.app.name", 
"test")), "sub1", new Date()))
+        Map(("spark.mesos.executor.home", "test"), ("spark.app.name", 
"test")), "sub1", getDate))
     assert(response.success)
 
     // Offer a resource to launch the submitted driver
@@ -651,7 +665,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
         config.EXECUTOR_URI.key -> "s3a://bucket/spark-version.tgz",
         "another.conf" -> "\\value"),
       "s1",
-      new Date())
+      getDate)
 
     val expectedCmd = "cd spark-version*;  " +
         "bin/spark-submit --name \"app name\" --master 
mesos://mesos://localhost:5050 " +
@@ -691,7 +705,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
       command,
       Map("spark.mesos.dispatcher.queue" -> "dummy"),
       "s1",
-      new Date())
+      getDate)
 
     assertThrows[NoSuchElementException] {
       scheduler.getDriverPriority(desc)
@@ -702,7 +716,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
       command,
       Map[String, String](),
       "s2",
-      new Date())
+      getDate)
 
     assert(scheduler.getDriverPriority(desc) == 0.0f)
 
@@ -711,7 +725,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
       command,
       Map("spark.mesos.dispatcher.queue" -> "default"),
       "s3",
-      new Date())
+      getDate)
 
     assert(scheduler.getDriverPriority(desc) == 0.0f)
 
@@ -720,7 +734,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
       command,
       Map("spark.mesos.dispatcher.queue" -> "ROUTINE"),
       "s4",
-      new Date())
+      getDate)
 
     assert(scheduler.getDriverPriority(desc) == 1.0f)
 
@@ -729,7 +743,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
       command,
       Map("spark.mesos.dispatcher.queue" -> "URGENT"),
       "s5",
-      new Date())
+      getDate)
 
     assert(scheduler.getDriverPriority(desc) == 2.0f)
   }
@@ -746,22 +760,22 @@ class MesosClusterSchedulerSuite extends SparkFunSuite 
with LocalSparkContext wi
 
     val response0 = scheduler.submitDriver(
       new MesosDriverDescription("d1", "jar", 100, 1, true, command,
-        Map("spark.mesos.dispatcher.queue" -> "ROUTINE"), "s0", new Date()))
+        Map("spark.mesos.dispatcher.queue" -> "ROUTINE"), "s0", getDate))
     assert(response0.success)
 
     val response1 = scheduler.submitDriver(
       new MesosDriverDescription("d1", "jar", 100, 1, true, command,
-        Map[String, String](), "s1", new Date()))
+        Map[String, String](), "s1", getDate))
     assert(response1.success)
 
     val response2 = scheduler.submitDriver(
       new MesosDriverDescription("d1", "jar", 100, 1, true, command,
-        Map("spark.mesos.dispatcher.queue" -> "EXCEPTIONAL"), "s2", new 
Date()))
+        Map("spark.mesos.dispatcher.queue" -> "EXCEPTIONAL"), "s2", getDate))
     assert(response2.success)
 
     val response3 = scheduler.submitDriver(
       new MesosDriverDescription("d1", "jar", 100, 1, true, command,
-        Map("spark.mesos.dispatcher.queue" -> "URGENT"), "s3", new Date()))
+        Map("spark.mesos.dispatcher.queue" -> "URGENT"), "s3", getDate))
     assert(response3.success)
 
     val state = scheduler.getSchedulerState()
@@ -782,12 +796,12 @@ class MesosClusterSchedulerSuite extends SparkFunSuite 
with LocalSparkContext wi
 
     val response0 = scheduler.submitDriver(
       new MesosDriverDescription("d1", "jar", 100, 1, true, command,
-        Map("spark.mesos.dispatcher.queue" -> "LOWER"), "s0", new Date()))
+        Map("spark.mesos.dispatcher.queue" -> "LOWER"), "s0", getDate))
     assert(response0.success)
 
     val response1 = scheduler.submitDriver(
       new MesosDriverDescription("d1", "jar", 100, 1, true, command,
-        Map[String, String](), "s1", new Date()))
+        Map[String, String](), "s1", getDate))
     assert(response1.success)
 
     val state = scheduler.getSchedulerState()
@@ -812,7 +826,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with 
LocalSparkContext wi
         config.DRIVER_MEMORY_OVERHEAD.key -> "0") ++
         addlSparkConfVars,
       "s1",
-      new Date())
+      getDate)
     val response = scheduler.submitDriver(driverDesc)
     assert(response.success)
     val offer = Utils.createOffer("o1", "s1", mem, cpu)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to