Repository: spark
Updated Branches:
  refs/heads/branch-1.6 0c23dd52d -> fbe6888cc


[SPARK-11887] Close PersistenceEngine at the end of PersistenceEngineSuite tests

In PersistenceEngineSuite, we do not call `close()` on the PersistenceEngine at 
the end of the test. For the ZooKeeperPersistenceEngine, this causes us to leak 
a ZooKeeper client, causing the logs of unrelated tests to be periodically 
spammed with connection error messages from that client:

```
15/11/20 05:13:35.789 
pool-1-thread-1-ScalaTest-running-PersistenceEngineSuite-SendThread(localhost:15741)
 INFO ClientCnxn: Opening socket connection to server 
localhost/127.0.0.1:15741. Will not attempt to authenticate using SASL (unknown 
error)
15/11/20 05:13:35.790 
pool-1-thread-1-ScalaTest-running-PersistenceEngineSuite-SendThread(localhost:15741)
 WARN ClientCnxn: Session 0x15124ff48dd0000 for server null, unexpected error, 
closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
        at 
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1068)
```

This patch fixes this by using a `finally` block.

Author: Josh Rosen <joshro...@databricks.com>

Closes #9864 from JoshRosen/close-zookeeper-client-in-tests.

(cherry picked from commit 89fd9bd06160fa89dedbf685bfe159ffe4a06ec6)
Signed-off-by: Josh Rosen <joshro...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbe6888c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbe6888c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbe6888c

Branch: refs/heads/branch-1.6
Commit: fbe6888cc0c8a16531a4ba7ce5235b84474f1a7b
Parents: 0c23dd5
Author: Josh Rosen <joshro...@databricks.com>
Authored: Fri Nov 20 14:31:26 2015 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Fri Nov 20 14:31:38 2015 -0800

----------------------------------------------------------------------
 .../deploy/master/PersistenceEngineSuite.scala  | 100 ++++++++++---------
 1 file changed, 52 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fbe6888c/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index 3477557..7a44728 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -63,56 +63,60 @@ class PersistenceEngineSuite extends SparkFunSuite {
       conf: SparkConf, persistenceEngineCreator: Serializer => 
PersistenceEngine): Unit = {
     val serializer = new JavaSerializer(conf)
     val persistenceEngine = persistenceEngineCreator(serializer)
-    persistenceEngine.persist("test_1", "test_1_value")
-    assert(Seq("test_1_value") === persistenceEngine.read[String]("test_"))
-    persistenceEngine.persist("test_2", "test_2_value")
-    assert(Set("test_1_value", "test_2_value") === 
persistenceEngine.read[String]("test_").toSet)
-    persistenceEngine.unpersist("test_1")
-    assert(Seq("test_2_value") === persistenceEngine.read[String]("test_"))
-    persistenceEngine.unpersist("test_2")
-    assert(persistenceEngine.read[String]("test_").isEmpty)
-
-    // Test deserializing objects that contain RpcEndpointRef
-    val testRpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new 
SecurityManager(conf))
     try {
-      // Create a real endpoint so that we can test RpcEndpointRef 
deserialization
-      val workerEndpoint = testRpcEnv.setupEndpoint("worker", new RpcEndpoint {
-        override val rpcEnv: RpcEnv = testRpcEnv
-      })
-
-      val workerToPersist = new WorkerInfo(
-        id = "test_worker",
-        host = "127.0.0.1",
-        port = 10000,
-        cores = 0,
-        memory = 0,
-        endpoint = workerEndpoint,
-        webUiPort = 0,
-        publicAddress = ""
-      )
-
-      persistenceEngine.addWorker(workerToPersist)
-
-      val (storedApps, storedDrivers, storedWorkers) =
-        persistenceEngine.readPersistedData(testRpcEnv)
-
-      assert(storedApps.isEmpty)
-      assert(storedDrivers.isEmpty)
-
-      // Check deserializing WorkerInfo
-      assert(storedWorkers.size == 1)
-      val recoveryWorkerInfo = storedWorkers.head
-      assert(workerToPersist.id === recoveryWorkerInfo.id)
-      assert(workerToPersist.host === recoveryWorkerInfo.host)
-      assert(workerToPersist.port === recoveryWorkerInfo.port)
-      assert(workerToPersist.cores === recoveryWorkerInfo.cores)
-      assert(workerToPersist.memory === recoveryWorkerInfo.memory)
-      assert(workerToPersist.endpoint === recoveryWorkerInfo.endpoint)
-      assert(workerToPersist.webUiPort === recoveryWorkerInfo.webUiPort)
-      assert(workerToPersist.publicAddress === 
recoveryWorkerInfo.publicAddress)
+      persistenceEngine.persist("test_1", "test_1_value")
+      assert(Seq("test_1_value") === persistenceEngine.read[String]("test_"))
+      persistenceEngine.persist("test_2", "test_2_value")
+      assert(Set("test_1_value", "test_2_value") === 
persistenceEngine.read[String]("test_").toSet)
+      persistenceEngine.unpersist("test_1")
+      assert(Seq("test_2_value") === persistenceEngine.read[String]("test_"))
+      persistenceEngine.unpersist("test_2")
+      assert(persistenceEngine.read[String]("test_").isEmpty)
+
+      // Test deserializing objects that contain RpcEndpointRef
+      val testRpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new 
SecurityManager(conf))
+      try {
+        // Create a real endpoint so that we can test RpcEndpointRef 
deserialization
+        val workerEndpoint = testRpcEnv.setupEndpoint("worker", new 
RpcEndpoint {
+          override val rpcEnv: RpcEnv = testRpcEnv
+        })
+
+        val workerToPersist = new WorkerInfo(
+          id = "test_worker",
+          host = "127.0.0.1",
+          port = 10000,
+          cores = 0,
+          memory = 0,
+          endpoint = workerEndpoint,
+          webUiPort = 0,
+          publicAddress = ""
+        )
+
+        persistenceEngine.addWorker(workerToPersist)
+
+        val (storedApps, storedDrivers, storedWorkers) =
+          persistenceEngine.readPersistedData(testRpcEnv)
+
+        assert(storedApps.isEmpty)
+        assert(storedDrivers.isEmpty)
+
+        // Check deserializing WorkerInfo
+        assert(storedWorkers.size == 1)
+        val recoveryWorkerInfo = storedWorkers.head
+        assert(workerToPersist.id === recoveryWorkerInfo.id)
+        assert(workerToPersist.host === recoveryWorkerInfo.host)
+        assert(workerToPersist.port === recoveryWorkerInfo.port)
+        assert(workerToPersist.cores === recoveryWorkerInfo.cores)
+        assert(workerToPersist.memory === recoveryWorkerInfo.memory)
+        assert(workerToPersist.endpoint === recoveryWorkerInfo.endpoint)
+        assert(workerToPersist.webUiPort === recoveryWorkerInfo.webUiPort)
+        assert(workerToPersist.publicAddress === 
recoveryWorkerInfo.publicAddress)
+      } finally {
+        testRpcEnv.shutdown()
+        testRpcEnv.awaitTermination()
+      }
     } finally {
-      testRpcEnv.shutdown()
-      testRpcEnv.awaitTermination()
+      persistenceEngine.close()
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to