This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 819e5ea  [SPARK-26615][CORE] Fixing transport server/client resource 
leaks in the core unittests
819e5ea is described below

commit 819e5ea7c290f842c51ead8b4a6593678aeef6bf
Author: “attilapiros” <piros.attila.zs...@gmail.com>
AuthorDate: Wed Jan 16 09:00:21 2019 -0600

    [SPARK-26615][CORE] Fixing transport server/client resource leaks in the 
core unittests
    
    ## What changes were proposed in this pull request?
    
    Fixing resource leaks where TransportClient/TransportServer instances are 
not closed properly.
    
    In StandaloneSchedulerBackend the null check is added because during the 
SparkContextSchedulerCreationSuite #"local-cluster" test it turned out that 
client is not initialised as 
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend#start isn't 
called. It throw an NPE and some resource remained in open.
    
    ## How was this patch tested?
    
    By executing the unittests and using some extra temporary logging for 
counting created and closed TransportClient/TransportServer instances.
    
    Closes #23540 from attilapiros/leaks.
    
    Authored-by: “attilapiros” <piros.attila.zs...@gmail.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../cluster/StandaloneSchedulerBackend.scala       |   5 +-
 .../spark/SparkContextSchedulerCreationSuite.scala | 103 ++++++++-------
 .../spark/deploy/client/AppClientSuite.scala       |  75 ++++++-----
 .../apache/spark/deploy/master/MasterSuite.scala   | 111 +++++++++--------
 .../apache/spark/storage/BlockManagerSuite.scala   | 138 +++++++++------------
 5 files changed, 228 insertions(+), 204 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 66080b6..e0605fe 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -224,8 +224,9 @@ private[spark] class StandaloneSchedulerBackend(
     if (stopping.compareAndSet(false, true)) {
       try {
         super.stop()
-        client.stop()
-
+        if (client != null) {
+          client.stop()
+        }
         val callback = shutdownCallback
         if (callback != null) {
           callback(this)
diff --git 
a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index f8938df..811b975 100644
--- 
a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -23,110 +23,129 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, 
TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
+import org.apache.spark.util.Utils
 
 
 class SparkContextSchedulerCreationSuite
   extends SparkFunSuite with LocalSparkContext with PrivateMethodTester with 
Logging {
 
-  def createTaskScheduler(master: String): TaskSchedulerImpl =
-    createTaskScheduler(master, "client")
+  def noOp(taskSchedulerImpl: TaskSchedulerImpl): Unit = {}
 
-  def createTaskScheduler(master: String, deployMode: String): 
TaskSchedulerImpl =
-    createTaskScheduler(master, deployMode, new SparkConf())
+  def createTaskScheduler(master: String)(body: TaskSchedulerImpl => Unit = 
noOp): Unit =
+    createTaskScheduler(master, "client")(body)
+
+  def createTaskScheduler(master: String, deployMode: String)(
+      body: TaskSchedulerImpl => Unit): Unit =
+    createTaskScheduler(master, deployMode, new SparkConf())(body)
 
   def createTaskScheduler(
       master: String,
       deployMode: String,
-      conf: SparkConf): TaskSchedulerImpl = {
+      conf: SparkConf)(body: TaskSchedulerImpl => Unit): Unit = {
     // Create local SparkContext to setup a SparkEnv. We don't actually want 
to start() the
     // real schedulers, so we don't want to create a full SparkContext with 
the desired scheduler.
     sc = new SparkContext("local", "test", conf)
     val createTaskSchedulerMethod =
       PrivateMethod[Tuple2[SchedulerBackend, 
TaskScheduler]]('createTaskScheduler)
-    val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, 
master, deployMode)
-    sched.asInstanceOf[TaskSchedulerImpl]
+    val (_, sched) =
+      SparkContext invokePrivate createTaskSchedulerMethod(sc, master, 
deployMode)
+    try {
+      body(sched.asInstanceOf[TaskSchedulerImpl])
+    } finally {
+      Utils.tryLogNonFatalError {
+        sched.stop()
+      }
+    }
   }
 
   test("bad-master") {
     val e = intercept[SparkException] {
-      createTaskScheduler("localhost:1234")
+      createTaskScheduler("localhost:1234")()
     }
     assert(e.getMessage.contains("Could not parse Master URL"))
   }
 
   test("local") {
-    val sched = createTaskScheduler("local")
-    sched.backend match {
-      case s: LocalSchedulerBackend => assert(s.totalCores === 1)
-      case _ => fail()
+    val sched = createTaskScheduler("local") { sched =>
+      sched.backend match {
+        case s: LocalSchedulerBackend => assert(s.totalCores === 1)
+        case _ => fail()
+      }
     }
   }
 
   test("local-*") {
-    val sched = createTaskScheduler("local[*]")
-    sched.backend match {
-      case s: LocalSchedulerBackend =>
-        assert(s.totalCores === Runtime.getRuntime.availableProcessors())
-      case _ => fail()
+    val sched = createTaskScheduler("local[*]") { sched =>
+      sched.backend match {
+        case s: LocalSchedulerBackend =>
+          assert(s.totalCores === Runtime.getRuntime.availableProcessors())
+        case _ => fail()
+      }
     }
   }
 
   test("local-n") {
-    val sched = createTaskScheduler("local[5]")
-    assert(sched.maxTaskFailures === 1)
-    sched.backend match {
-      case s: LocalSchedulerBackend => assert(s.totalCores === 5)
-      case _ => fail()
+    val sched = createTaskScheduler("local[5]") { sched =>
+      assert(sched.maxTaskFailures === 1)
+      sched.backend match {
+        case s: LocalSchedulerBackend => assert(s.totalCores === 5)
+        case _ => fail()
+      }
     }
   }
 
   test("local-*-n-failures") {
-    val sched = createTaskScheduler("local[* ,2]")
-    assert(sched.maxTaskFailures === 2)
-    sched.backend match {
-      case s: LocalSchedulerBackend =>
-        assert(s.totalCores === Runtime.getRuntime.availableProcessors())
-      case _ => fail()
+    val sched = createTaskScheduler("local[* ,2]") { sched =>
+      assert(sched.maxTaskFailures === 2)
+      sched.backend match {
+        case s: LocalSchedulerBackend =>
+          assert(s.totalCores === Runtime.getRuntime.availableProcessors())
+        case _ => fail()
+      }
     }
   }
 
   test("local-n-failures") {
-    val sched = createTaskScheduler("local[4, 2]")
-    assert(sched.maxTaskFailures === 2)
-    sched.backend match {
-      case s: LocalSchedulerBackend => assert(s.totalCores === 4)
-      case _ => fail()
+    val sched = createTaskScheduler("local[4, 2]") { sched =>
+      assert(sched.maxTaskFailures === 2)
+      sched.backend match {
+        case s: LocalSchedulerBackend => assert(s.totalCores === 4)
+        case _ => fail()
+      }
     }
   }
 
   test("bad-local-n") {
     val e = intercept[SparkException] {
-      createTaskScheduler("local[2*]")
+      createTaskScheduler("local[2*]")()
     }
     assert(e.getMessage.contains("Could not parse Master URL"))
   }
 
   test("bad-local-n-failures") {
     val e = intercept[SparkException] {
-      createTaskScheduler("local[2*,4]")
+      createTaskScheduler("local[2*,4]")()
     }
     assert(e.getMessage.contains("Could not parse Master URL"))
   }
 
   test("local-default-parallelism") {
     val conf = new SparkConf().set("spark.default.parallelism", "16")
-    val sched = createTaskScheduler("local", "client", conf)
 
-    sched.backend match {
-      case s: LocalSchedulerBackend => assert(s.defaultParallelism() === 16)
-      case _ => fail()
+    val sched = createTaskScheduler("local", "client", conf) { sched =>
+      sched.backend match {
+        case s: LocalSchedulerBackend => assert(s.defaultParallelism() === 16)
+        case _ => fail()
+      }
     }
   }
 
   test("local-cluster") {
-    createTaskScheduler("local-cluster[3, 14, 1024]").backend match {
-      case s: StandaloneSchedulerBackend => // OK
-      case _ => fail()
+    createTaskScheduler("local-cluster[3, 14, 1024]") { sched =>
+      sched.backend match {
+        case s: StandaloneSchedulerBackend => // OK
+        case _ => fail()
+      }
     }
   }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index a1707e6..baeefea 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.deploy.client
 
+import java.io.Closeable
 import java.util.concurrent.ConcurrentLinkedQueue
 
 import scala.concurrent.duration._
@@ -85,57 +86,59 @@ class AppClientSuite
   }
 
   test("interface methods of AppClient using local Master") {
-    val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
+    Utils.tryWithResource(new AppClientInst(masterRpcEnv.address.toSparkURL)) 
{ ci =>
 
-    ci.client.start()
+      ci.client.start()
 
-    // Client should connect with one Master which registers the application
-    eventually(timeout(10.seconds), interval(10.millis)) {
-      val apps = getApplications()
-      assert(ci.listener.connectedIdList.size === 1, "client listener should 
have one connection")
-      assert(apps.size === 1, "master should have 1 registered app")
-    }
+      // Client should connect with one Master which registers the application
+      eventually(timeout(10.seconds), interval(10.millis)) {
+        val apps = getApplications()
+        assert(ci.listener.connectedIdList.size === 1, "client listener should 
have one connection")
+        assert(apps.size === 1, "master should have 1 registered app")
+      }
 
-    // Send message to Master to request Executors, verify request by change 
in executor limit
-    val numExecutorsRequested = 1
-    whenReady(
+      // Send message to Master to request Executors, verify request by change 
in executor limit
+      val numExecutorsRequested = 1
+      whenReady(
         ci.client.requestTotalExecutors(numExecutorsRequested),
         timeout(10.seconds),
         interval(10.millis)) { acknowledged =>
-      assert(acknowledged)
-    }
+        assert(acknowledged)
+      }
 
-    eventually(timeout(10.seconds), interval(10.millis)) {
-      val apps = getApplications()
-      assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor 
request failed")
-    }
+      eventually(timeout(10.seconds), interval(10.millis)) {
+        val apps = getApplications()
+        assert(apps.head.getExecutorLimit === numExecutorsRequested, 
s"executor request failed")
+      }
 
-    // Send request to kill executor, verify request was made
-    val executorId: String = getApplications().head.executors.head._2.fullId
-    whenReady(
+      // Send request to kill executor, verify request was made
+      val executorId: String = getApplications().head.executors.head._2.fullId
+      whenReady(
         ci.client.killExecutors(Seq(executorId)),
         timeout(10.seconds),
         interval(10.millis)) { acknowledged =>
-      assert(acknowledged)
-    }
+        assert(acknowledged)
+      }
 
-    // Issue stop command for Client to disconnect from Master
-    ci.client.stop()
+      // Issue stop command for Client to disconnect from Master
+      ci.client.stop()
 
-    // Verify Client is marked dead and unregistered from Master
-    eventually(timeout(10.seconds), interval(10.millis)) {
-      val apps = getApplications()
-      assert(ci.listener.deadReasonList.size === 1, "client should have been 
marked dead")
-      assert(apps.isEmpty, "master should have 0 registered apps")
+      // Verify Client is marked dead and unregistered from Master
+      eventually(timeout(10.seconds), interval(10.millis)) {
+        val apps = getApplications()
+        assert(ci.listener.deadReasonList.size === 1, "client should have been 
marked dead")
+        assert(apps.isEmpty, "master should have 0 registered apps")
+      }
     }
   }
 
   test("request from AppClient before initialized with master") {
-    val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
+    Utils.tryWithResource(new AppClientInst(masterRpcEnv.address.toSparkURL)) 
{ ci =>
 
-    // requests to master should fail immediately
-    whenReady(ci.client.requestTotalExecutors(3), timeout(1.seconds)) { 
success =>
-      assert(success === false)
+      // requests to master should fail immediately
+      whenReady(ci.client.requestTotalExecutors(3), timeout(1.seconds)) { 
success =>
+        assert(success === false)
+      }
     }
   }
 
@@ -219,13 +222,17 @@ class AppClientSuite
   }
 
   /** Create AppClient and supporting objects */
-  private class AppClientInst(masterUrl: String) {
+  private class AppClientInst(masterUrl: String) extends Closeable {
     val rpcEnv = RpcEnv.create("spark", Utils.localHostName(), 0, conf, 
securityManager)
     private val cmd = new 
Command(TestExecutor.getClass.getCanonicalName.stripSuffix("$"),
       List(), Map(), Seq(), Seq(), Seq())
     private val desc = new ApplicationDescription("AppClientSuite", Some(1), 
512, cmd, "ignored")
     val listener = new AppClientCollector
     val client = new StandaloneAppClient(rpcEnv, Array(masterUrl), desc, 
listener, new SparkConf)
+
+    override def close(): Unit = {
+      rpcEnv.shutdown()
+    }
   }
 
 }
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index f788db7..5904d03 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -644,59 +644,70 @@ class MasterSuite extends SparkFunSuite
       val masterState = 
master.self.askSync[MasterStateResponse](RequestMasterState)
       assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
     }
-    val worker1 = new MockWorker(master.self)
-    worker1.rpcEnv.setupEndpoint("worker", worker1)
-    val worker1Reg = RegisterWorker(
-      worker1.id,
-      "localhost",
-      9998,
-      worker1.self,
-      10,
-      1024,
-      "http://localhost:8080";,
-      RpcAddress("localhost2", 10000))
-    master.self.send(worker1Reg)
-    val driver = DeployTestUtils.createDriverDesc().copy(supervise = true)
-    master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))
-
-    eventually(timeout(10.seconds)) {
-      assert(worker1.apps.nonEmpty)
-    }
-
-    eventually(timeout(10.seconds)) {
-      val masterState = 
master.self.askSync[MasterStateResponse](RequestMasterState)
-      assert(masterState.workers(0).state == WorkerState.DEAD)
-    }
+    var worker1: MockWorker = null
+    var worker2: MockWorker = null
+    try {
+      worker1 = new MockWorker(master.self)
+      worker1.rpcEnv.setupEndpoint("worker", worker1)
+      val worker1Reg = RegisterWorker(
+        worker1.id,
+        "localhost",
+        9998,
+        worker1.self,
+        10,
+        1024,
+        "http://localhost:8080";,
+        RpcAddress("localhost2", 10000))
+      master.self.send(worker1Reg)
+      val driver = DeployTestUtils.createDriverDesc().copy(supervise = true)
+      master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))
+
+      eventually(timeout(10.seconds)) {
+        assert(worker1.apps.nonEmpty)
+      }
 
-    val worker2 = new MockWorker(master.self)
-    worker2.rpcEnv.setupEndpoint("worker", worker2)
-    master.self.send(RegisterWorker(
-      worker2.id,
-      "localhost",
-      9999,
-      worker2.self,
-      10,
-      1024,
-      "http://localhost:8081";,
-      RpcAddress("localhost", 10001)))
-    eventually(timeout(10.seconds)) {
-      assert(worker2.apps.nonEmpty)
-    }
+      eventually(timeout(10.seconds)) {
+        val masterState = 
master.self.askSync[MasterStateResponse](RequestMasterState)
+        assert(masterState.workers(0).state == WorkerState.DEAD)
+      }
 
-    master.self.send(worker1Reg)
-    eventually(timeout(10.seconds)) {
-      val masterState = 
master.self.askSync[MasterStateResponse](RequestMasterState)
+      worker2 = new MockWorker(master.self)
+      worker2.rpcEnv.setupEndpoint("worker", worker2)
+      master.self.send(RegisterWorker(
+        worker2.id,
+        "localhost",
+        9999,
+        worker2.self,
+        10,
+        1024,
+        "http://localhost:8081";,
+        RpcAddress("localhost", 10001)))
+      eventually(timeout(10.seconds)) {
+        assert(worker2.apps.nonEmpty)
+      }
 
-      val worker = masterState.workers.filter(w => w.id == worker1.id)
-      assert(worker.length == 1)
-      // make sure the `DriverStateChanged` arrives at Master.
-      assert(worker(0).drivers.isEmpty)
-      assert(worker1.apps.isEmpty)
-      assert(worker1.drivers.isEmpty)
-      assert(worker2.apps.size == 1)
-      assert(worker2.drivers.size == 1)
-      assert(masterState.activeDrivers.length == 1)
-      assert(masterState.activeApps.length == 1)
+      master.self.send(worker1Reg)
+      eventually(timeout(10.seconds)) {
+        val masterState = 
master.self.askSync[MasterStateResponse](RequestMasterState)
+
+        val worker = masterState.workers.filter(w => w.id == worker1.id)
+        assert(worker.length == 1)
+        // make sure the `DriverStateChanged` arrives at Master.
+        assert(worker(0).drivers.isEmpty)
+        assert(worker1.apps.isEmpty)
+        assert(worker1.drivers.isEmpty)
+        assert(worker2.apps.size == 1)
+        assert(worker2.drivers.size == 1)
+        assert(masterState.activeDrivers.length == 1)
+        assert(masterState.activeApps.length == 1)
+      }
+    } finally {
+      if (worker1 != null) {
+        worker1.rpcEnv.shutdown()
+      }
+      if (worker2 != null) {
+        worker2.rpcEnv.shutdown()
+      }
     }
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index cafd980..233a84e 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -46,7 +46,6 @@ import 
org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransport
 import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, 
TransportServerBootstrap}
 import org.apache.spark.network.shuffle.{BlockFetchingListener, 
DownloadFileManager}
 import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, 
RegisterExecutor}
-import org.apache.spark.network.util.TransportConf
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite}
@@ -66,9 +65,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   implicit val defaultSignaler: Signaler = ThreadSignaler
 
   var conf: SparkConf = null
-  var store: BlockManager = null
-  var store2: BlockManager = null
-  var store3: BlockManager = null
+  val allStores = ArrayBuffer[BlockManager]()
   var rpcEnv: RpcEnv = null
   var master: BlockManagerMaster = null
   val securityMgr = new SecurityManager(new SparkConf(false))
@@ -106,6 +103,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     val blockManager = new BlockManager(name, rpcEnv, master, 
serializerManager, bmConf,
       memManager, mapOutputTracker, shuffleManager, transfer, bmSecurityMgr, 0)
     memManager.setMemoryStore(blockManager.memoryStore)
+    allStores += blockManager
     blockManager.initialize("app-id")
     blockManager
   }
@@ -140,18 +138,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   override def afterEach(): Unit = {
     try {
       conf = null
-      if (store != null) {
-        store.stop()
-        store = null
-      }
-      if (store2 != null) {
-        store2.stop()
-        store2 = null
-      }
-      if (store3 != null) {
-        store3.stop()
-        store3 = null
-      }
+      allStores.foreach(_.stop())
+      allStores.clear()
       rpcEnv.shutdown()
       rpcEnv.awaitTermination()
       rpcEnv = null
@@ -161,6 +149,11 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     }
   }
 
+  private def stopBlockManager(blockManager: BlockManager): Unit = {
+    allStores -= blockManager
+    blockManager.stop()
+  }
+
   test("StorageLevel object caching") {
     val level1 = StorageLevel(false, false, false, 3)
     // this should return the same object as level1
@@ -204,7 +197,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   test("master + 1 manager interaction") {
-    store = makeBlockManager(20000)
+    val store = makeBlockManager(20000)
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
@@ -234,8 +227,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   test("master + 2 managers interaction") {
-    store = makeBlockManager(2000, "exec1")
-    store2 = makeBlockManager(2000, "exec2")
+    val store = makeBlockManager(2000, "exec1")
+    val store2 = makeBlockManager(2000, "exec2")
 
     val peers = master.getPeers(store.blockManagerId)
     assert(peers.size === 1, "master did not return the other manager as a 
peer")
@@ -250,7 +243,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   test("removing block") {
-    store = makeBlockManager(20000)
+    val store = makeBlockManager(20000)
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
@@ -298,7 +291,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   test("removing rdd") {
-    store = makeBlockManager(20000)
+    val store = makeBlockManager(20000)
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
@@ -331,7 +324,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   test("removing broadcast") {
-    store = makeBlockManager(2000)
+    val store = makeBlockManager(2000)
     val driverStore = store
     val executorStore = makeBlockManager(2000, "executor")
     val a1 = new Array[Byte](400)
@@ -397,11 +390,10 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     }
     executorStore.stop()
     driverStore.stop()
-    store = null
   }
 
   test("reregistration on heart beat") {
-    store = makeBlockManager(2000)
+    val store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
 
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -418,7 +410,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   test("reregistration on block update") {
-    store = makeBlockManager(2000)
+    val store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
 
@@ -436,7 +428,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   test("reregistration doesn't dead lock") {
-    store = makeBlockManager(2000)
+    val store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
     val a2 = List(new Array[Byte](400))
 
@@ -474,7 +466,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   test("correct BlockResult returned from get() calls") {
-    store = makeBlockManager(12000)
+    val store = makeBlockManager(12000)
     val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list2 = List(new Array[Byte](500), new Array[Byte](1000), new 
Array[Byte](1500))
     val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
@@ -545,27 +537,25 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 
   test("SPARK-9591: getRemoteBytes from another location when Exception 
throw") {
     conf.set("spark.shuffle.io.maxRetries", "0")
-    store = makeBlockManager(8000, "executor1")
-    store2 = makeBlockManager(8000, "executor2")
-    store3 = makeBlockManager(8000, "executor3")
+    val store = makeBlockManager(8000, "executor1")
+    val store2 = makeBlockManager(8000, "executor2")
+    val store3 = makeBlockManager(8000, "executor3")
     val list1 = List(new Array[Byte](4000))
     store2.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     store3.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be 
fetched")
-    store2.stop()
-    store2 = null
+    stopBlockManager(store2)
     assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be 
fetched")
-    store3.stop()
-    store3 = null
+    stopBlockManager(store3)
     // Should return None instead of throwing an exception:
     assert(store.getRemoteBytes("list1").isEmpty)
   }
 
   test("SPARK-14252: getOrElseUpdate should still read from remote storage") {
-    store = makeBlockManager(8000, "executor1")
-    store2 = makeBlockManager(8000, "executor2")
+    val store = makeBlockManager(8000, "executor1")
+    val store2 = makeBlockManager(8000, "executor2")
     val list1 = List(new Array[Byte](4000))
     store2.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
@@ -593,7 +583,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   private def testInMemoryLRUStorage(storageLevel: StorageLevel): Unit = {
-    store = makeBlockManager(12000)
+    val store = makeBlockManager(12000)
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
@@ -612,7 +602,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   test("in-memory LRU for partitions of same RDD") {
-    store = makeBlockManager(12000)
+    val store = makeBlockManager(12000)
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
@@ -631,7 +621,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   test("in-memory LRU for partitions of multiple RDDs") {
-    store = makeBlockManager(12000)
+    val store = makeBlockManager(12000)
     store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
@@ -654,7 +644,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   encryptionTest("on-disk storage") { _conf =>
-    store = makeBlockManager(1200, testConf = Some(_conf))
+    val store = makeBlockManager(1200, testConf = Some(_conf))
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -694,7 +684,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
       storageLevel: StorageLevel,
       getAsBytes: Boolean,
       testConf: SparkConf): Unit = {
-    store = makeBlockManager(12000, testConf = Some(testConf))
+    val store = makeBlockManager(12000, testConf = Some(testConf))
     val accessMethod =
       if (getAsBytes) store.getLocalBytesAndReleaseLock else 
store.getSingleAndReleaseLock
     val a1 = new Array[Byte](4000)
@@ -723,7 +713,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   encryptionTest("LRU with mixed storage levels") { _conf =>
-    store = makeBlockManager(12000, testConf = Some(_conf))
+    val store = makeBlockManager(12000, testConf = Some(_conf))
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
@@ -745,7 +735,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   encryptionTest("in-memory LRU with streams") { _conf =>
-    store = makeBlockManager(12000, testConf = Some(_conf))
+    val store = makeBlockManager(12000, testConf = Some(_conf))
     val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
@@ -773,7 +763,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   encryptionTest("LRU with mixed storage levels and streams") { _conf =>
-    store = makeBlockManager(12000, testConf = Some(_conf))
+    val store = makeBlockManager(12000, testConf = Some(_conf))
     val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
@@ -826,7 +816,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   }
 
   test("overly large block") {
-    store = makeBlockManager(5000)
+    val store = makeBlockManager(5000)
     store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
     assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
     store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
@@ -837,13 +827,12 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   test("block compression") {
     try {
       conf.set("spark.shuffle.compress", "true")
-      store = makeBlockManager(20000, "exec1")
+      var store = makeBlockManager(20000, "exec1")
       store.putSingle(
         ShuffleBlockId(0, 0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
         "shuffle_0_0_0 was not compressed")
-      store.stop()
-      store = null
+      stopBlockManager(store)
 
       conf.set("spark.shuffle.compress", "false")
       store = makeBlockManager(20000, "exec2")
@@ -851,8 +840,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
         ShuffleBlockId(0, 0, 0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
         "shuffle_0_0_0 was compressed")
-      store.stop()
-      store = null
+      stopBlockManager(store)
 
       conf.set(BROADCAST_COMPRESS, true)
       store = makeBlockManager(20000, "exec3")
@@ -860,37 +848,32 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
         BroadcastBlockId(0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
         "broadcast_0 was not compressed")
-      store.stop()
-      store = null
+      stopBlockManager(store)
 
       conf.set(BROADCAST_COMPRESS, false)
       store = makeBlockManager(20000, "exec4")
       store.putSingle(
         BroadcastBlockId(0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, 
"broadcast_0 was compressed")
-      store.stop()
-      store = null
+      stopBlockManager(store)
 
       conf.set(RDD_COMPRESS, true)
       store = makeBlockManager(20000, "exec5")
       store.putSingle(rdd(0, 0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not 
compressed")
-      store.stop()
-      store = null
+      stopBlockManager(store)
 
       conf.set(RDD_COMPRESS, false)
       store = makeBlockManager(20000, "exec6")
       store.putSingle(rdd(0, 0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was 
compressed")
-      store.stop()
-      store = null
+      stopBlockManager(store)
 
       // Check that any other block types are also kept uncompressed
       store = makeBlockManager(20000, "exec7")
       store.putSingle("other_block", new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY)
       assert(store.memoryStore.getSize("other_block") >= 10000, "other_block 
was compressed")
-      store.stop()
-      store = null
+      stopBlockManager(store)
     } finally {
       System.clearProperty("spark.shuffle.compress")
       System.clearProperty(BROADCAST_COMPRESS.key)
@@ -904,7 +887,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     val transfer = new NettyBlockTransferService(conf, securityMgr, 
"localhost", "localhost", 0, 1)
     val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(new JavaSerializer(conf), 
conf)
-    store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
+    val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, 
master,
       serializerManager, conf, memoryManager, mapOutputTracker,
       shuffleManager, transfer, securityMgr, 0)
     memoryManager.setMemoryStore(store.memoryStore)
@@ -926,7 +909,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   test("turn off updated block statuses") {
     val conf = new SparkConf()
     conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, false)
-    store = makeBlockManager(12000, testConf = Some(conf))
+    val store = makeBlockManager(12000, testConf = Some(conf))
 
     store.registerTask(0)
     val list = List.fill(2)(new Array[Byte](2000))
@@ -954,7 +937,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   test("updated block statuses") {
     val conf = new SparkConf()
     conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, true)
-    store = makeBlockManager(12000, testConf = Some(conf))
+    val store = makeBlockManager(12000, testConf = Some(conf))
     store.registerTask(0)
     val list = List.fill(2)(new Array[Byte](2000))
     val bigList = List.fill(8)(new Array[Byte](2000))
@@ -1052,7 +1035,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   }
 
   test("query block statuses") {
-    store = makeBlockManager(12000)
+    val store = makeBlockManager(12000)
     val list = List.fill(2)(new Array[Byte](2000))
 
     // Tell master. By LRU, only list2 and list3 remains.
@@ -1097,7 +1080,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   }
 
   test("get matching blocks") {
-    store = makeBlockManager(12000)
+    val store = makeBlockManager(12000)
     val list = List.fill(2)(new Array[Byte](100))
 
     // insert some blocks
@@ -1141,7 +1124,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   }
 
   test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
-    store = makeBlockManager(12000)
+    val store = makeBlockManager(12000)
     store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Access rdd_1_0 to ensure it's not least recently used.
@@ -1155,7 +1138,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   }
 
   test("safely unroll blocks through putIterator (disk)") {
-    store = makeBlockManager(12000)
+    val store = makeBlockManager(12000)
     val memoryStore = store.memoryStore
     val diskStore = store.diskStore
     val smallList = List.fill(40)(new Array[Byte](100))
@@ -1194,7 +1177,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   }
 
   test("read-locked blocks cannot be evicted from memory") {
-    store = makeBlockManager(12000)
+    val store = makeBlockManager(12000)
     val arr = new Array[Byte](4000)
     // First store a1 and a2, both in memory, and a3, on disk only
     store.putSingle("a1", arr, StorageLevel.MEMORY_ONLY_SER)
@@ -1220,7 +1203,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   private def testReadWithLossOfOnDiskFiles(
       storageLevel: StorageLevel,
       readMethod: BlockManager => Option[_]): Unit = {
-    store = makeBlockManager(12000)
+    val store = makeBlockManager(12000)
     assert(store.putSingle("blockId", new Array[Byte](4000), storageLevel))
     assert(store.getStatus("blockId").isDefined)
     // Directly delete all files from the disk store, triggering failures when 
reading blocks:
@@ -1260,7 +1243,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   test("SPARK-13328: refresh block locations (fetch should fail after hitting 
a threshold)") {
     val mockBlockTransferService =
       new 
MockBlockTransferService(conf.getInt("spark.block.failures.beforeLocationRefresh",
 5))
-    store = makeBlockManager(8000, "executor1", transferService = 
Option(mockBlockTransferService))
+    val store =
+      makeBlockManager(8000, "executor1", transferService = 
Option(mockBlockTransferService))
     store.putSingle("item", 999L, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.getRemoteBytes("item").isEmpty)
   }
@@ -1280,7 +1264,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(
       blockManagerIds)
 
-    store = makeBlockManager(8000, "executor1", mockBlockManagerMaster,
+    val store = makeBlockManager(8000, "executor1", mockBlockManagerMaster,
       transferService = Option(mockBlockTransferService))
     val block = store.getRemoteBytes("item")
       .asInstanceOf[Option[ByteBuffer]]
@@ -1301,8 +1285,10 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
         throw new InterruptedException("Intentional interrupt")
       }
     }
-    store = makeBlockManager(8000, "executor1", transferService = 
Option(mockBlockTransferService))
-    store2 = makeBlockManager(8000, "executor2", transferService = 
Option(mockBlockTransferService))
+    val store =
+      makeBlockManager(8000, "executor1", transferService = 
Option(mockBlockTransferService))
+    val store2 =
+      makeBlockManager(8000, "executor2", transferService = 
Option(mockBlockTransferService))
     intercept[InterruptedException] {
       store.putSingle("item", "value", StorageLevel.MEMORY_ONLY_2, tellMaster 
= true)
     }
@@ -1312,8 +1298,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   }
 
   test("SPARK-17484: master block locations are updated following an invalid 
remote block fetch") {
-    store = makeBlockManager(8000, "executor1")
-    store2 = makeBlockManager(8000, "executor2")
+    val store = makeBlockManager(8000, "executor1")
+    val store2 = makeBlockManager(8000, "executor2")
     store.putSingle("item", "value", StorageLevel.MEMORY_ONLY, tellMaster = 
true)
     assert(master.getLocations("item").nonEmpty)
     store.removeBlock("item", tellMaster = false)
@@ -1410,7 +1396,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
       Option(BlockLocationsAndStatus(blockLocations, blockStatus)))
     
when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(blockLocations)
 
-    store = makeBlockManager(8000, "executor1", mockBlockManagerMaster,
+    val store = makeBlockManager(8000, "executor1", mockBlockManagerMaster,
       transferService = Option(mockBlockTransferService))
     val block = store.getRemoteBytes("item")
       .asInstanceOf[Option[ByteBuffer]]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to