Repository: spark Updated Branches: refs/heads/master 8b0e94d89 -> da2dc6929
[SPARK-25116][TESTS] Fix the Kafka cluster leak and clean up cached producers ## What changes were proposed in this pull request? KafkaContinuousSinkSuite leaks a Kafka cluster because both KafkaSourceTest and KafkaContinuousSinkSuite create a Kafka cluster but `afterAll` only shuts down one cluster. This leaks a Kafka cluster and causes that some Kafka thread crash and kill JVM when SBT is trying to clean up tests. This PR fixes the leak and also adds a shut down hook to detect Kafka cluster leak. In additions, it also fixes `AdminClient` leak and cleans up cached producers (When a record is writtn using a producer, the producer will keep refreshing the topic and I don't find an API to clear it except closing the producer) to eliminate the following annoying logs: ``` 8/13 15:34:42.568 kafka-admin-client-thread | adminclient-4 WARN NetworkClient: [AdminClient clientId=adminclient-4] Connection to node 0 could not be established. Broker may not be available. 18/08/13 15:34:42.570 kafka-admin-client-thread | adminclient-6 WARN NetworkClient: [AdminClient clientId=adminclient-6] Connection to node 0 could not be established. Broker may not be available. 18/08/13 15:34:42.606 kafka-admin-client-thread | adminclient-8 WARN NetworkClient: [AdminClient clientId=adminclient-8] Connection to node -1 could not be established. Broker may not be available. 18/08/13 15:34:42.729 kafka-producer-network-thread | producer-797 WARN NetworkClient: [Producer clientId=producer-797] Connection to node -1 could not be established. Broker may not be available. 18/08/13 15:34:42.906 kafka-producer-network-thread | producer-1598 WARN NetworkClient: [Producer clientId=producer-1598] Connection to node 0 could not be established. Broker may not be available. ``` I also reverted https://github.com/apache/spark/pull/22097/commits/b5eb54244ed573c8046f5abf7bf087f5f08dba58 introduced by #22097 since it doesn't help. ## How was this patch tested? Jenkins Closes #22106 from zsxwing/SPARK-25116. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da2dc692 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da2dc692 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da2dc692 Branch: refs/heads/master Commit: da2dc69291cda8c8e7bb6b4a15001f768a97f65e Parents: 8b0e94d Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Fri Aug 17 14:21:08 2018 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Fri Aug 17 14:21:08 2018 -0700 ---------------------------------------------------------------------- .../sql/kafka010/CachedKafkaProducer.scala | 8 +- .../sql/kafka010/KafkaContinuousReader.scala | 2 +- .../sql/kafka010/CachedKafkaProducerSuite.scala | 5 +- .../sql/kafka010/KafkaContinuousSinkSuite.scala | 7 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 2 +- .../spark/sql/kafka010/KafkaRelationSuite.scala | 3 +- .../spark/sql/kafka010/KafkaSinkSuite.scala | 2 +- .../apache/spark/sql/kafka010/KafkaTest.scala | 32 +++++++ .../spark/sql/kafka010/KafkaTestUtils.scala | 91 ++++++++++---------- .../streaming/kafka010/KafkaTestUtils.scala | 89 +++++++++---------- 10 files changed, 132 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index 571140b..cd680ad 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -33,8 +33,12 @@ private[kafka010] object CachedKafkaProducer extends Logging { private type Producer = KafkaProducer[Array[Byte], Array[Byte]] + private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10) + private lazy val cacheExpireTimeout: Long = - SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m") + Option(SparkEnv.get).map(_.conf.getTimeAsMs( + "spark.kafka.producer.cache.timeout", + s"${defaultCacheExpireTimeout}ms")).getOrElse(defaultCacheExpireTimeout) private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] { override def load(config: Seq[(String, Object)]): Producer = { @@ -102,7 +106,7 @@ private[kafka010] object CachedKafkaProducer extends Logging { } } - private def clear(): Unit = { + private[kafka010] def clear(): Unit = { logInfo("Cleaning up guava cache.") guavaCache.invalidateAll() } http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala index 48b91df..be7ce3b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -216,7 +216,7 @@ class KafkaContinuousInputPartitionReader( } catch { // We didn't read within the timeout. We're supposed to block indefinitely for new data, so // swallow and ignore this. - case _: TimeoutException => + case _: TimeoutException | _: org.apache.kafka.common.errors.TimeoutException => // This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range, // or if it's the endpoint of the data range (i.e. the "true" next offset). http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 789bffa..0b33554 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -26,14 +26,13 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.sql.test.SharedSQLContext -class CachedKafkaProducerSuite extends SharedSQLContext with PrivateMethodTester { +class CachedKafkaProducerSuite extends SharedSQLContext with PrivateMethodTester with KafkaTest { type KP = KafkaProducer[Array[Byte], Array[Byte]] protected override def beforeEach(): Unit = { super.beforeEach() - val clear = PrivateMethod[Unit]('clear) - CachedKafkaProducer.invokePrivate(clear()) + CachedKafkaProducer.clear() } test("Should return the cached instance on calling getOrCreate with same params.") { http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala index 0e1492a..3f6fcf6 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala @@ -40,12 +40,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { override val streamingTimeout = 30.seconds - override def beforeAll(): Unit = { - super.beforeAll() - testUtils = new KafkaTestUtils( - withBrokerProps = Map("auto.create.topics.enable" -> "false")) - testUtils.setup() - } + override val brokerProps = Map("auto.create.topics.enable" -> "false") override def afterAll(): Unit = { if (testUtils != null) { http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index aa89868..172c0ef 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -49,7 +49,7 @@ import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} import org.apache.spark.sql.types.StructType -abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { +abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with KafkaTest { protected var testUtils: KafkaTestUtils = _ http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index 91893df..688e9c4 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -21,13 +21,12 @@ import java.util.Locale import java.util.concurrent.atomic.AtomicInteger import org.apache.kafka.common.TopicPartition -import org.scalatest.BeforeAndAfter import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils -class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLContext { +class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest { import testImplicits._ http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 70ffd7d..a2213e0 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.streaming._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{BinaryType, DataType} -class KafkaSinkSuite extends StreamTest with SharedSQLContext { +class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest { import testImplicits._ protected var testUtils: KafkaTestUtils = _ http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala new file mode 100644 index 0000000..19acda9 --- /dev/null +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.SparkFunSuite + +/** A trait to clean cached Kafka producers in `afterAll` */ +trait KafkaTest extends BeforeAndAfterAll { + self: SparkFunSuite => + + override def afterAll(): Unit = { + super.afterAll() + CachedKafkaProducer.clear() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index e58d183..55d61ef 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -39,14 +39,13 @@ import org.apache.kafka.clients.producer._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} -import org.apache.kafka.common.utils.Exit import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.util.{ShutdownHookManager, Utils} /** * This is a helper class for Kafka test suites. This has the functionality to set up @@ -60,7 +59,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L private val zkHost = "127.0.0.1" private var zkPort: Int = 0 private val zkConnectionTimeout = 60000 - private val zkSessionTimeout = 6000 + private val zkSessionTimeout = 10000 private var zookeeper: EmbeddedZookeeper = _ @@ -81,6 +80,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L // Flag to test whether the system is correctly started private var zkReady = false private var brokerReady = false + private var leakDetector: AnyRef = null def zkAddress: String = { assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address") @@ -130,6 +130,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L /** setup the whole embedded servers, including Zookeeper and Kafka brokers */ def setup(): Unit = { + // Set up a KafkaTestUtils leak detector so that we can see where the leak KafkaTestUtils is + // created. + val exception = new SparkException("It was created at: ") + leakDetector = ShutdownHookManager.addShutdownHook { () => + logError("Found a leak KafkaTestUtils.", exception) + } + setupEmbeddedZookeeper() setupEmbeddedKafkaServer() eventually(timeout(60.seconds)) { @@ -139,55 +146,47 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L /** Teardown the whole servers, including Kafka broker and Zookeeper */ def teardown(): Unit = { - // There is a race condition that may kill JVM when terminating the Kafka cluster. We set - // a custom Procedure here during the termination in order to keep JVM running and not fail the - // tests. - val logExitEvent = new Exit.Procedure { - override def execute(statusCode: Int, message: String): Unit = { - logError(s"Prevent Kafka from killing JVM (statusCode: $statusCode message: $message)") - } + if (leakDetector != null) { + ShutdownHookManager.removeShutdownHook(leakDetector) } - Exit.setExitProcedure(logExitEvent) - Exit.setHaltProcedure(logExitEvent) - try { - brokerReady = false - zkReady = false + brokerReady = false + zkReady = false - if (producer != null) { - producer.close() - producer = null - } + if (producer != null) { + producer.close() + producer = null + } - if (server != null) { - server.shutdown() - server.awaitShutdown() - server = null - } + if (adminClient != null) { + adminClient.close() + } - // On Windows, `logDirs` is left open even after Kafka server above is completely shut down - // in some cases. It leads to test failures on Windows if the directory deletion failure - // throws an exception. - brokerConf.logDirs.foreach { f => - try { - Utils.deleteRecursively(new File(f)) - } catch { - case e: IOException if Utils.isWindows => - logWarning(e.getMessage) - } - } + if (server != null) { + server.shutdown() + server.awaitShutdown() + server = null + } - if (zkUtils != null) { - zkUtils.close() - zkUtils = null + // On Windows, `logDirs` is left open even after Kafka server above is completely shut down + // in some cases. It leads to test failures on Windows if the directory deletion failure + // throws an exception. + brokerConf.logDirs.foreach { f => + try { + Utils.deleteRecursively(new File(f)) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) } + } - if (zookeeper != null) { - zookeeper.shutdown() - zookeeper = null - } - } finally { - Exit.resetExitProcedure() - Exit.resetHaltProcedure() + if (zkUtils != null) { + zkUtils.close() + zkUtils = null + } + + if (zookeeper != null) { + zookeeper.shutdown() + zookeeper = null } } http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index bd3cf9a..efcd5d6 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -34,13 +34,12 @@ import kafka.utils.ZkUtils import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.serialization.StringSerializer -import org.apache.kafka.common.utils.Exit import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.streaming.Time -import org.apache.spark.util.Utils +import org.apache.spark.util.{ShutdownHookManager, Utils} /** * This is a helper class for Kafka test suites. This has the functionality to set up @@ -54,7 +53,7 @@ private[kafka010] class KafkaTestUtils extends Logging { private val zkHost = "127.0.0.1" private var zkPort: Int = 0 private val zkConnectionTimeout = 60000 - private val zkSessionTimeout = 6000 + private val zkSessionTimeout = 10000 private var zookeeper: EmbeddedZookeeper = _ @@ -74,6 +73,7 @@ private[kafka010] class KafkaTestUtils extends Logging { // Flag to test whether the system is correctly started private var zkReady = false private var brokerReady = false + private var leakDetector: AnyRef = null def zkAddress: String = { assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address") @@ -120,61 +120,56 @@ private[kafka010] class KafkaTestUtils extends Logging { /** setup the whole embedded servers, including Zookeeper and Kafka brokers */ def setup(): Unit = { + // Set up a KafkaTestUtils leak detector so that we can see where the leak KafkaTestUtils is + // created. + val exception = new SparkException("It was created at: ") + leakDetector = ShutdownHookManager.addShutdownHook { () => + logError("Found a leak KafkaTestUtils.", exception) + } + setupEmbeddedZookeeper() setupEmbeddedKafkaServer() } /** Teardown the whole servers, including Kafka broker and Zookeeper */ def teardown(): Unit = { - // There is a race condition that may kill JVM when terminating the Kafka cluster. We set - // a custom Procedure here during the termination in order to keep JVM running and not fail the - // tests. - val logExitEvent = new Exit.Procedure { - override def execute(statusCode: Int, message: String): Unit = { - logError(s"Prevent Kafka from killing JVM (statusCode: $statusCode message: $message)") - } + if (leakDetector != null) { + ShutdownHookManager.removeShutdownHook(leakDetector) } - Exit.setExitProcedure(logExitEvent) - Exit.setHaltProcedure(logExitEvent) - try { - brokerReady = false - zkReady = false - - if (producer != null) { - producer.close() - producer = null - } + brokerReady = false + zkReady = false - if (server != null) { - server.shutdown() - server.awaitShutdown() - server = null - } + if (producer != null) { + producer.close() + producer = null + } - // On Windows, `logDirs` is left open even after Kafka server above is completely shut down - // in some cases. It leads to test failures on Windows if the directory deletion failure - // throws an exception. - brokerConf.logDirs.foreach { f => - try { - Utils.deleteRecursively(new File(f)) - } catch { - case e: IOException if Utils.isWindows => - logWarning(e.getMessage) - } - } + if (server != null) { + server.shutdown() + server.awaitShutdown() + server = null + } - if (zkUtils != null) { - zkUtils.close() - zkUtils = null + // On Windows, `logDirs` is left open even after Kafka server above is completely shut down + // in some cases. It leads to test failures on Windows if the directory deletion failure + // throws an exception. + brokerConf.logDirs.foreach { f => + try { + Utils.deleteRecursively(new File(f)) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) } + } - if (zookeeper != null) { - zookeeper.shutdown() - zookeeper = null - } - } finally { - Exit.resetExitProcedure() - Exit.resetHaltProcedure() + if (zkUtils != null) { + zkUtils.close() + zkUtils = null + } + + if (zookeeper != null) { + zookeeper.shutdown() + zookeeper = null } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org