This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 819e5ea [SPARK-26615][CORE] Fixing transport server/client resource leaks in the core unittests 819e5ea is described below commit 819e5ea7c290f842c51ead8b4a6593678aeef6bf Author: “attilapiros” <piros.attila.zs...@gmail.com> AuthorDate: Wed Jan 16 09:00:21 2019 -0600 [SPARK-26615][CORE] Fixing transport server/client resource leaks in the core unittests ## What changes were proposed in this pull request? Fixing resource leaks where TransportClient/TransportServer instances are not closed properly. In StandaloneSchedulerBackend the null check is added because during the SparkContextSchedulerCreationSuite #"local-cluster" test it turned out that client is not initialised as org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend#start isn't called. It throw an NPE and some resource remained in open. ## How was this patch tested? By executing the unittests and using some extra temporary logging for counting created and closed TransportClient/TransportServer instances. Closes #23540 from attilapiros/leaks. Authored-by: “attilapiros” <piros.attila.zs...@gmail.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../cluster/StandaloneSchedulerBackend.scala | 5 +- .../spark/SparkContextSchedulerCreationSuite.scala | 103 ++++++++------- .../spark/deploy/client/AppClientSuite.scala | 75 ++++++----- .../apache/spark/deploy/master/MasterSuite.scala | 111 +++++++++-------- .../apache/spark/storage/BlockManagerSuite.scala | 138 +++++++++------------ 5 files changed, 228 insertions(+), 204 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 66080b6..e0605fe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -224,8 +224,9 @@ private[spark] class StandaloneSchedulerBackend( if (stopping.compareAndSet(false, true)) { try { super.stop() - client.stop() - + if (client != null) { + client.stop() + } val callback = shutdownCallback if (callback != null) { callback(this) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index f8938df..811b975 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -23,110 +23,129 @@ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend import org.apache.spark.scheduler.local.LocalSchedulerBackend +import org.apache.spark.util.Utils class SparkContextSchedulerCreationSuite extends SparkFunSuite with LocalSparkContext with PrivateMethodTester with Logging { - def createTaskScheduler(master: String): TaskSchedulerImpl = - createTaskScheduler(master, "client") + def noOp(taskSchedulerImpl: TaskSchedulerImpl): Unit = {} - def createTaskScheduler(master: String, deployMode: String): TaskSchedulerImpl = - createTaskScheduler(master, deployMode, new SparkConf()) + def createTaskScheduler(master: String)(body: TaskSchedulerImpl => Unit = noOp): Unit = + createTaskScheduler(master, "client")(body) + + def createTaskScheduler(master: String, deployMode: String)( + body: TaskSchedulerImpl => Unit): Unit = + createTaskScheduler(master, deployMode, new SparkConf())(body) def createTaskScheduler( master: String, deployMode: String, - conf: SparkConf): TaskSchedulerImpl = { + conf: SparkConf)(body: TaskSchedulerImpl => Unit): Unit = { // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the // real schedulers, so we don't want to create a full SparkContext with the desired scheduler. sc = new SparkContext("local", "test", conf) val createTaskSchedulerMethod = PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler) - val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master, deployMode) - sched.asInstanceOf[TaskSchedulerImpl] + val (_, sched) = + SparkContext invokePrivate createTaskSchedulerMethod(sc, master, deployMode) + try { + body(sched.asInstanceOf[TaskSchedulerImpl]) + } finally { + Utils.tryLogNonFatalError { + sched.stop() + } + } } test("bad-master") { val e = intercept[SparkException] { - createTaskScheduler("localhost:1234") + createTaskScheduler("localhost:1234")() } assert(e.getMessage.contains("Could not parse Master URL")) } test("local") { - val sched = createTaskScheduler("local") - sched.backend match { - case s: LocalSchedulerBackend => assert(s.totalCores === 1) - case _ => fail() + val sched = createTaskScheduler("local") { sched => + sched.backend match { + case s: LocalSchedulerBackend => assert(s.totalCores === 1) + case _ => fail() + } } } test("local-*") { - val sched = createTaskScheduler("local[*]") - sched.backend match { - case s: LocalSchedulerBackend => - assert(s.totalCores === Runtime.getRuntime.availableProcessors()) - case _ => fail() + val sched = createTaskScheduler("local[*]") { sched => + sched.backend match { + case s: LocalSchedulerBackend => + assert(s.totalCores === Runtime.getRuntime.availableProcessors()) + case _ => fail() + } } } test("local-n") { - val sched = createTaskScheduler("local[5]") - assert(sched.maxTaskFailures === 1) - sched.backend match { - case s: LocalSchedulerBackend => assert(s.totalCores === 5) - case _ => fail() + val sched = createTaskScheduler("local[5]") { sched => + assert(sched.maxTaskFailures === 1) + sched.backend match { + case s: LocalSchedulerBackend => assert(s.totalCores === 5) + case _ => fail() + } } } test("local-*-n-failures") { - val sched = createTaskScheduler("local[* ,2]") - assert(sched.maxTaskFailures === 2) - sched.backend match { - case s: LocalSchedulerBackend => - assert(s.totalCores === Runtime.getRuntime.availableProcessors()) - case _ => fail() + val sched = createTaskScheduler("local[* ,2]") { sched => + assert(sched.maxTaskFailures === 2) + sched.backend match { + case s: LocalSchedulerBackend => + assert(s.totalCores === Runtime.getRuntime.availableProcessors()) + case _ => fail() + } } } test("local-n-failures") { - val sched = createTaskScheduler("local[4, 2]") - assert(sched.maxTaskFailures === 2) - sched.backend match { - case s: LocalSchedulerBackend => assert(s.totalCores === 4) - case _ => fail() + val sched = createTaskScheduler("local[4, 2]") { sched => + assert(sched.maxTaskFailures === 2) + sched.backend match { + case s: LocalSchedulerBackend => assert(s.totalCores === 4) + case _ => fail() + } } } test("bad-local-n") { val e = intercept[SparkException] { - createTaskScheduler("local[2*]") + createTaskScheduler("local[2*]")() } assert(e.getMessage.contains("Could not parse Master URL")) } test("bad-local-n-failures") { val e = intercept[SparkException] { - createTaskScheduler("local[2*,4]") + createTaskScheduler("local[2*,4]")() } assert(e.getMessage.contains("Could not parse Master URL")) } test("local-default-parallelism") { val conf = new SparkConf().set("spark.default.parallelism", "16") - val sched = createTaskScheduler("local", "client", conf) - sched.backend match { - case s: LocalSchedulerBackend => assert(s.defaultParallelism() === 16) - case _ => fail() + val sched = createTaskScheduler("local", "client", conf) { sched => + sched.backend match { + case s: LocalSchedulerBackend => assert(s.defaultParallelism() === 16) + case _ => fail() + } } } test("local-cluster") { - createTaskScheduler("local-cluster[3, 14, 1024]").backend match { - case s: StandaloneSchedulerBackend => // OK - case _ => fail() + createTaskScheduler("local-cluster[3, 14, 1024]") { sched => + sched.backend match { + case s: StandaloneSchedulerBackend => // OK + case _ => fail() + } } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index a1707e6..baeefea 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.client +import java.io.Closeable import java.util.concurrent.ConcurrentLinkedQueue import scala.concurrent.duration._ @@ -85,57 +86,59 @@ class AppClientSuite } test("interface methods of AppClient using local Master") { - val ci = new AppClientInst(masterRpcEnv.address.toSparkURL) + Utils.tryWithResource(new AppClientInst(masterRpcEnv.address.toSparkURL)) { ci => - ci.client.start() + ci.client.start() - // Client should connect with one Master which registers the application - eventually(timeout(10.seconds), interval(10.millis)) { - val apps = getApplications() - assert(ci.listener.connectedIdList.size === 1, "client listener should have one connection") - assert(apps.size === 1, "master should have 1 registered app") - } + // Client should connect with one Master which registers the application + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(ci.listener.connectedIdList.size === 1, "client listener should have one connection") + assert(apps.size === 1, "master should have 1 registered app") + } - // Send message to Master to request Executors, verify request by change in executor limit - val numExecutorsRequested = 1 - whenReady( + // Send message to Master to request Executors, verify request by change in executor limit + val numExecutorsRequested = 1 + whenReady( ci.client.requestTotalExecutors(numExecutorsRequested), timeout(10.seconds), interval(10.millis)) { acknowledged => - assert(acknowledged) - } + assert(acknowledged) + } - eventually(timeout(10.seconds), interval(10.millis)) { - val apps = getApplications() - assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor request failed") - } + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor request failed") + } - // Send request to kill executor, verify request was made - val executorId: String = getApplications().head.executors.head._2.fullId - whenReady( + // Send request to kill executor, verify request was made + val executorId: String = getApplications().head.executors.head._2.fullId + whenReady( ci.client.killExecutors(Seq(executorId)), timeout(10.seconds), interval(10.millis)) { acknowledged => - assert(acknowledged) - } + assert(acknowledged) + } - // Issue stop command for Client to disconnect from Master - ci.client.stop() + // Issue stop command for Client to disconnect from Master + ci.client.stop() - // Verify Client is marked dead and unregistered from Master - eventually(timeout(10.seconds), interval(10.millis)) { - val apps = getApplications() - assert(ci.listener.deadReasonList.size === 1, "client should have been marked dead") - assert(apps.isEmpty, "master should have 0 registered apps") + // Verify Client is marked dead and unregistered from Master + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(ci.listener.deadReasonList.size === 1, "client should have been marked dead") + assert(apps.isEmpty, "master should have 0 registered apps") + } } } test("request from AppClient before initialized with master") { - val ci = new AppClientInst(masterRpcEnv.address.toSparkURL) + Utils.tryWithResource(new AppClientInst(masterRpcEnv.address.toSparkURL)) { ci => - // requests to master should fail immediately - whenReady(ci.client.requestTotalExecutors(3), timeout(1.seconds)) { success => - assert(success === false) + // requests to master should fail immediately + whenReady(ci.client.requestTotalExecutors(3), timeout(1.seconds)) { success => + assert(success === false) + } } } @@ -219,13 +222,17 @@ class AppClientSuite } /** Create AppClient and supporting objects */ - private class AppClientInst(masterUrl: String) { + private class AppClientInst(masterUrl: String) extends Closeable { val rpcEnv = RpcEnv.create("spark", Utils.localHostName(), 0, conf, securityManager) private val cmd = new Command(TestExecutor.getClass.getCanonicalName.stripSuffix("$"), List(), Map(), Seq(), Seq(), Seq()) private val desc = new ApplicationDescription("AppClientSuite", Some(1), 512, cmd, "ignored") val listener = new AppClientCollector val client = new StandaloneAppClient(rpcEnv, Array(masterUrl), desc, listener, new SparkConf) + + override def close(): Unit = { + rpcEnv.shutdown() + } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index f788db7..5904d03 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -644,59 +644,70 @@ class MasterSuite extends SparkFunSuite val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) assert(masterState.status === RecoveryState.ALIVE, "Master is not alive") } - val worker1 = new MockWorker(master.self) - worker1.rpcEnv.setupEndpoint("worker", worker1) - val worker1Reg = RegisterWorker( - worker1.id, - "localhost", - 9998, - worker1.self, - 10, - 1024, - "http://localhost:8080", - RpcAddress("localhost2", 10000)) - master.self.send(worker1Reg) - val driver = DeployTestUtils.createDriverDesc().copy(supervise = true) - master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)) - - eventually(timeout(10.seconds)) { - assert(worker1.apps.nonEmpty) - } - - eventually(timeout(10.seconds)) { - val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) - assert(masterState.workers(0).state == WorkerState.DEAD) - } + var worker1: MockWorker = null + var worker2: MockWorker = null + try { + worker1 = new MockWorker(master.self) + worker1.rpcEnv.setupEndpoint("worker", worker1) + val worker1Reg = RegisterWorker( + worker1.id, + "localhost", + 9998, + worker1.self, + 10, + 1024, + "http://localhost:8080", + RpcAddress("localhost2", 10000)) + master.self.send(worker1Reg) + val driver = DeployTestUtils.createDriverDesc().copy(supervise = true) + master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)) + + eventually(timeout(10.seconds)) { + assert(worker1.apps.nonEmpty) + } - val worker2 = new MockWorker(master.self) - worker2.rpcEnv.setupEndpoint("worker", worker2) - master.self.send(RegisterWorker( - worker2.id, - "localhost", - 9999, - worker2.self, - 10, - 1024, - "http://localhost:8081", - RpcAddress("localhost", 10001))) - eventually(timeout(10.seconds)) { - assert(worker2.apps.nonEmpty) - } + eventually(timeout(10.seconds)) { + val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) + assert(masterState.workers(0).state == WorkerState.DEAD) + } - master.self.send(worker1Reg) - eventually(timeout(10.seconds)) { - val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) + worker2 = new MockWorker(master.self) + worker2.rpcEnv.setupEndpoint("worker", worker2) + master.self.send(RegisterWorker( + worker2.id, + "localhost", + 9999, + worker2.self, + 10, + 1024, + "http://localhost:8081", + RpcAddress("localhost", 10001))) + eventually(timeout(10.seconds)) { + assert(worker2.apps.nonEmpty) + } - val worker = masterState.workers.filter(w => w.id == worker1.id) - assert(worker.length == 1) - // make sure the `DriverStateChanged` arrives at Master. - assert(worker(0).drivers.isEmpty) - assert(worker1.apps.isEmpty) - assert(worker1.drivers.isEmpty) - assert(worker2.apps.size == 1) - assert(worker2.drivers.size == 1) - assert(masterState.activeDrivers.length == 1) - assert(masterState.activeApps.length == 1) + master.self.send(worker1Reg) + eventually(timeout(10.seconds)) { + val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) + + val worker = masterState.workers.filter(w => w.id == worker1.id) + assert(worker.length == 1) + // make sure the `DriverStateChanged` arrives at Master. + assert(worker(0).drivers.isEmpty) + assert(worker1.apps.isEmpty) + assert(worker1.drivers.isEmpty) + assert(worker2.apps.size == 1) + assert(worker2.drivers.size == 1) + assert(masterState.activeDrivers.length == 1) + assert(masterState.activeApps.length == 1) + } + } finally { + if (worker1 != null) { + worker1.rpcEnv.shutdown() + } + if (worker2 != null) { + worker2.rpcEnv.shutdown() + } } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index cafd980..233a84e 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -46,7 +46,6 @@ import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransport import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap} import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager} import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor} -import org.apache.spark.network.util.TransportConf import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite} @@ -66,9 +65,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE implicit val defaultSignaler: Signaler = ThreadSignaler var conf: SparkConf = null - var store: BlockManager = null - var store2: BlockManager = null - var store3: BlockManager = null + val allStores = ArrayBuffer[BlockManager]() var rpcEnv: RpcEnv = null var master: BlockManagerMaster = null val securityMgr = new SecurityManager(new SparkConf(false)) @@ -106,6 +103,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, bmConf, memManager, mapOutputTracker, shuffleManager, transfer, bmSecurityMgr, 0) memManager.setMemoryStore(blockManager.memoryStore) + allStores += blockManager blockManager.initialize("app-id") blockManager } @@ -140,18 +138,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE override def afterEach(): Unit = { try { conf = null - if (store != null) { - store.stop() - store = null - } - if (store2 != null) { - store2.stop() - store2 = null - } - if (store3 != null) { - store3.stop() - store3 = null - } + allStores.foreach(_.stop()) + allStores.clear() rpcEnv.shutdown() rpcEnv.awaitTermination() rpcEnv = null @@ -161,6 +149,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } + private def stopBlockManager(blockManager: BlockManager): Unit = { + allStores -= blockManager + blockManager.stop() + } + test("StorageLevel object caching") { val level1 = StorageLevel(false, false, false, 3) // this should return the same object as level1 @@ -204,7 +197,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("master + 1 manager interaction") { - store = makeBlockManager(20000) + val store = makeBlockManager(20000) val a1 = new Array[Byte](4000) val a2 = new Array[Byte](4000) val a3 = new Array[Byte](4000) @@ -234,8 +227,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("master + 2 managers interaction") { - store = makeBlockManager(2000, "exec1") - store2 = makeBlockManager(2000, "exec2") + val store = makeBlockManager(2000, "exec1") + val store2 = makeBlockManager(2000, "exec2") val peers = master.getPeers(store.blockManagerId) assert(peers.size === 1, "master did not return the other manager as a peer") @@ -250,7 +243,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("removing block") { - store = makeBlockManager(20000) + val store = makeBlockManager(20000) val a1 = new Array[Byte](4000) val a2 = new Array[Byte](4000) val a3 = new Array[Byte](4000) @@ -298,7 +291,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("removing rdd") { - store = makeBlockManager(20000) + val store = makeBlockManager(20000) val a1 = new Array[Byte](4000) val a2 = new Array[Byte](4000) val a3 = new Array[Byte](4000) @@ -331,7 +324,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("removing broadcast") { - store = makeBlockManager(2000) + val store = makeBlockManager(2000) val driverStore = store val executorStore = makeBlockManager(2000, "executor") val a1 = new Array[Byte](400) @@ -397,11 +390,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } executorStore.stop() driverStore.stop() - store = null } test("reregistration on heart beat") { - store = makeBlockManager(2000) + val store = makeBlockManager(2000) val a1 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) @@ -418,7 +410,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("reregistration on block update") { - store = makeBlockManager(2000) + val store = makeBlockManager(2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) @@ -436,7 +428,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("reregistration doesn't dead lock") { - store = makeBlockManager(2000) + val store = makeBlockManager(2000) val a1 = new Array[Byte](400) val a2 = List(new Array[Byte](400)) @@ -474,7 +466,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("correct BlockResult returned from get() calls") { - store = makeBlockManager(12000) + val store = makeBlockManager(12000) val list1 = List(new Array[Byte](2000), new Array[Byte](2000)) val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500)) val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray) @@ -545,27 +537,25 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("SPARK-9591: getRemoteBytes from another location when Exception throw") { conf.set("spark.shuffle.io.maxRetries", "0") - store = makeBlockManager(8000, "executor1") - store2 = makeBlockManager(8000, "executor2") - store3 = makeBlockManager(8000, "executor3") + val store = makeBlockManager(8000, "executor1") + val store2 = makeBlockManager(8000, "executor2") + val store3 = makeBlockManager(8000, "executor3") val list1 = List(new Array[Byte](4000)) store2.putIterator( "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) store3.putIterator( "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched") - store2.stop() - store2 = null + stopBlockManager(store2) assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched") - store3.stop() - store3 = null + stopBlockManager(store3) // Should return None instead of throwing an exception: assert(store.getRemoteBytes("list1").isEmpty) } test("SPARK-14252: getOrElseUpdate should still read from remote storage") { - store = makeBlockManager(8000, "executor1") - store2 = makeBlockManager(8000, "executor2") + val store = makeBlockManager(8000, "executor1") + val store2 = makeBlockManager(8000, "executor2") val list1 = List(new Array[Byte](4000)) store2.putIterator( "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) @@ -593,7 +583,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } private def testInMemoryLRUStorage(storageLevel: StorageLevel): Unit = { - store = makeBlockManager(12000) + val store = makeBlockManager(12000) val a1 = new Array[Byte](4000) val a2 = new Array[Byte](4000) val a3 = new Array[Byte](4000) @@ -612,7 +602,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("in-memory LRU for partitions of same RDD") { - store = makeBlockManager(12000) + val store = makeBlockManager(12000) val a1 = new Array[Byte](4000) val a2 = new Array[Byte](4000) val a3 = new Array[Byte](4000) @@ -631,7 +621,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("in-memory LRU for partitions of multiple RDDs") { - store = makeBlockManager(12000) + val store = makeBlockManager(12000) store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) @@ -654,7 +644,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } encryptionTest("on-disk storage") { _conf => - store = makeBlockManager(1200, testConf = Some(_conf)) + val store = makeBlockManager(1200, testConf = Some(_conf)) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -694,7 +684,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE storageLevel: StorageLevel, getAsBytes: Boolean, testConf: SparkConf): Unit = { - store = makeBlockManager(12000, testConf = Some(testConf)) + val store = makeBlockManager(12000, testConf = Some(testConf)) val accessMethod = if (getAsBytes) store.getLocalBytesAndReleaseLock else store.getSingleAndReleaseLock val a1 = new Array[Byte](4000) @@ -723,7 +713,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } encryptionTest("LRU with mixed storage levels") { _conf => - store = makeBlockManager(12000, testConf = Some(_conf)) + val store = makeBlockManager(12000, testConf = Some(_conf)) val a1 = new Array[Byte](4000) val a2 = new Array[Byte](4000) val a3 = new Array[Byte](4000) @@ -745,7 +735,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } encryptionTest("in-memory LRU with streams") { _conf => - store = makeBlockManager(12000, testConf = Some(_conf)) + val store = makeBlockManager(12000, testConf = Some(_conf)) val list1 = List(new Array[Byte](2000), new Array[Byte](2000)) val list2 = List(new Array[Byte](2000), new Array[Byte](2000)) val list3 = List(new Array[Byte](2000), new Array[Byte](2000)) @@ -773,7 +763,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } encryptionTest("LRU with mixed storage levels and streams") { _conf => - store = makeBlockManager(12000, testConf = Some(_conf)) + val store = makeBlockManager(12000, testConf = Some(_conf)) val list1 = List(new Array[Byte](2000), new Array[Byte](2000)) val list2 = List(new Array[Byte](2000), new Array[Byte](2000)) val list3 = List(new Array[Byte](2000), new Array[Byte](2000)) @@ -826,7 +816,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("overly large block") { - store = makeBlockManager(5000) + val store = makeBlockManager(5000) store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY) assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store") store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK) @@ -837,13 +827,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("block compression") { try { conf.set("spark.shuffle.compress", "true") - store = makeBlockManager(20000, "exec1") + var store = makeBlockManager(20000, "exec1") store.putSingle( ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100, "shuffle_0_0_0 was not compressed") - store.stop() - store = null + stopBlockManager(store) conf.set("spark.shuffle.compress", "false") store = makeBlockManager(20000, "exec2") @@ -851,8 +840,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000, "shuffle_0_0_0 was compressed") - store.stop() - store = null + stopBlockManager(store) conf.set(BROADCAST_COMPRESS, true) store = makeBlockManager(20000, "exec3") @@ -860,37 +848,32 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000, "broadcast_0 was not compressed") - store.stop() - store = null + stopBlockManager(store) conf.set(BROADCAST_COMPRESS, false) store = makeBlockManager(20000, "exec4") store.putSingle( BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed") - store.stop() - store = null + stopBlockManager(store) conf.set(RDD_COMPRESS, true) store = makeBlockManager(20000, "exec5") store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed") - store.stop() - store = null + stopBlockManager(store) conf.set(RDD_COMPRESS, false) store = makeBlockManager(20000, "exec6") store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed") - store.stop() - store = null + stopBlockManager(store) // Check that any other block types are also kept uncompressed store = makeBlockManager(20000, "exec7") store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY) assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed") - store.stop() - store = null + stopBlockManager(store) } finally { System.clearProperty("spark.shuffle.compress") System.clearProperty(BROADCAST_COMPRESS.key) @@ -904,7 +887,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(new JavaSerializer(conf), conf) - store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, + val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) memoryManager.setMemoryStore(store.memoryStore) @@ -926,7 +909,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("turn off updated block statuses") { val conf = new SparkConf() conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, false) - store = makeBlockManager(12000, testConf = Some(conf)) + val store = makeBlockManager(12000, testConf = Some(conf)) store.registerTask(0) val list = List.fill(2)(new Array[Byte](2000)) @@ -954,7 +937,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("updated block statuses") { val conf = new SparkConf() conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, true) - store = makeBlockManager(12000, testConf = Some(conf)) + val store = makeBlockManager(12000, testConf = Some(conf)) store.registerTask(0) val list = List.fill(2)(new Array[Byte](2000)) val bigList = List.fill(8)(new Array[Byte](2000)) @@ -1052,7 +1035,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("query block statuses") { - store = makeBlockManager(12000) + val store = makeBlockManager(12000) val list = List.fill(2)(new Array[Byte](2000)) // Tell master. By LRU, only list2 and list3 remains. @@ -1097,7 +1080,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("get matching blocks") { - store = makeBlockManager(12000) + val store = makeBlockManager(12000) val list = List.fill(2)(new Array[Byte](100)) // insert some blocks @@ -1141,7 +1124,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { - store = makeBlockManager(12000) + val store = makeBlockManager(12000) store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) // Access rdd_1_0 to ensure it's not least recently used. @@ -1155,7 +1138,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("safely unroll blocks through putIterator (disk)") { - store = makeBlockManager(12000) + val store = makeBlockManager(12000) val memoryStore = store.memoryStore val diskStore = store.diskStore val smallList = List.fill(40)(new Array[Byte](100)) @@ -1194,7 +1177,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("read-locked blocks cannot be evicted from memory") { - store = makeBlockManager(12000) + val store = makeBlockManager(12000) val arr = new Array[Byte](4000) // First store a1 and a2, both in memory, and a3, on disk only store.putSingle("a1", arr, StorageLevel.MEMORY_ONLY_SER) @@ -1220,7 +1203,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE private def testReadWithLossOfOnDiskFiles( storageLevel: StorageLevel, readMethod: BlockManager => Option[_]): Unit = { - store = makeBlockManager(12000) + val store = makeBlockManager(12000) assert(store.putSingle("blockId", new Array[Byte](4000), storageLevel)) assert(store.getStatus("blockId").isDefined) // Directly delete all files from the disk store, triggering failures when reading blocks: @@ -1260,7 +1243,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("SPARK-13328: refresh block locations (fetch should fail after hitting a threshold)") { val mockBlockTransferService = new MockBlockTransferService(conf.getInt("spark.block.failures.beforeLocationRefresh", 5)) - store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService)) + val store = + makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService)) store.putSingle("item", 999L, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.getRemoteBytes("item").isEmpty) } @@ -1280,7 +1264,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn( blockManagerIds) - store = makeBlockManager(8000, "executor1", mockBlockManagerMaster, + val store = makeBlockManager(8000, "executor1", mockBlockManagerMaster, transferService = Option(mockBlockTransferService)) val block = store.getRemoteBytes("item") .asInstanceOf[Option[ByteBuffer]] @@ -1301,8 +1285,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE throw new InterruptedException("Intentional interrupt") } } - store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService)) - store2 = makeBlockManager(8000, "executor2", transferService = Option(mockBlockTransferService)) + val store = + makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService)) + val store2 = + makeBlockManager(8000, "executor2", transferService = Option(mockBlockTransferService)) intercept[InterruptedException] { store.putSingle("item", "value", StorageLevel.MEMORY_ONLY_2, tellMaster = true) } @@ -1312,8 +1298,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("SPARK-17484: master block locations are updated following an invalid remote block fetch") { - store = makeBlockManager(8000, "executor1") - store2 = makeBlockManager(8000, "executor2") + val store = makeBlockManager(8000, "executor1") + val store2 = makeBlockManager(8000, "executor2") store.putSingle("item", "value", StorageLevel.MEMORY_ONLY, tellMaster = true) assert(master.getLocations("item").nonEmpty) store.removeBlock("item", tellMaster = false) @@ -1410,7 +1396,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE Option(BlockLocationsAndStatus(blockLocations, blockStatus))) when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(blockLocations) - store = makeBlockManager(8000, "executor1", mockBlockManagerMaster, + val store = makeBlockManager(8000, "executor1", mockBlockManagerMaster, transferService = Option(mockBlockTransferService)) val block = store.getRemoteBytes("item") .asInstanceOf[Option[ByteBuffer]] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org