Repository: spark
Updated Branches:
  refs/heads/master 42263fd0c -> 80784a1de


[SPARK-18057][FOLLOW-UP] Use 127.0.0.1 to avoid zookeeper picking up an ipv6 
address

## What changes were proposed in this pull request?

I'm still seeing the Kafka tests failed randomly due to 
`kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for 
connection while in state: CONNECTING`. I checked the test output and saw 
zookeeper picked up an ipv6 address. Most details can be found in 
https://issues.apache.org/jira/browse/KAFKA-7193

This PR just uses `127.0.0.1` rather than `localhost` to make sure zookeeper 
will never use an ipv6 address.

## How was this patch tested?

Jenkins

Closes #22097 from zsxwing/fix-zookeeper-connect.

Authored-by: Shixiong Zhu <zsxw...@gmail.com>
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80784a1d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80784a1d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80784a1d

Branch: refs/heads/master
Commit: 80784a1de8d02536a94f3fd08ef632777478ab14
Parents: 42263fd
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Tue Aug 14 09:57:01 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Tue Aug 14 09:57:01 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/kafka010/KafkaTestUtils.scala     | 80 ++++++++++++--------
 .../streaming/kafka010/KafkaTestUtils.scala     | 79 +++++++++++--------
 2 files changed, 96 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/80784a1d/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index d89cccd..e58d183 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.apache.kafka.common.utils.Exit
 import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
@@ -56,7 +57,7 @@ import org.apache.spark.util.Utils
 class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends 
Logging {
 
   // Zookeeper related configurations
-  private val zkHost = "localhost"
+  private val zkHost = "127.0.0.1"
   private var zkPort: Int = 0
   private val zkConnectionTimeout = 60000
   private val zkSessionTimeout = 6000
@@ -67,7 +68,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = 
Map.empty) extends L
   private var adminClient: AdminClient = null
 
   // Kafka broker related configurations
-  private val brokerHost = "localhost"
+  private val brokerHost = "127.0.0.1"
   private var brokerPort = 0
   private var brokerConf: KafkaConfig = _
 
@@ -138,40 +139,55 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] 
= Map.empty) extends L
 
   /** Teardown the whole servers, including Kafka broker and Zookeeper */
   def teardown(): Unit = {
-    brokerReady = false
-    zkReady = false
-
-    if (producer != null) {
-      producer.close()
-      producer = null
+    // There is a race condition that may kill JVM when terminating the Kafka 
cluster. We set
+    // a custom Procedure here during the termination in order to keep JVM 
running and not fail the
+    // tests.
+    val logExitEvent = new Exit.Procedure {
+      override def execute(statusCode: Int, message: String): Unit = {
+        logError(s"Prevent Kafka from killing JVM (statusCode: $statusCode 
message: $message)")
+      }
     }
+    Exit.setExitProcedure(logExitEvent)
+    Exit.setHaltProcedure(logExitEvent)
+    try {
+      brokerReady = false
+      zkReady = false
 
-    if (server != null) {
-      server.shutdown()
-      server.awaitShutdown()
-      server = null
-    }
+      if (producer != null) {
+        producer.close()
+        producer = null
+      }
 
-    // On Windows, `logDirs` is left open even after Kafka server above is 
completely shut down
-    // in some cases. It leads to test failures on Windows if the directory 
deletion failure
-    // throws an exception.
-    brokerConf.logDirs.foreach { f =>
-      try {
-        Utils.deleteRecursively(new File(f))
-      } catch {
-        case e: IOException if Utils.isWindows =>
-          logWarning(e.getMessage)
+      if (server != null) {
+        server.shutdown()
+        server.awaitShutdown()
+        server = null
       }
-    }
 
-    if (zkUtils != null) {
-      zkUtils.close()
-      zkUtils = null
-    }
+      // On Windows, `logDirs` is left open even after Kafka server above is 
completely shut down
+      // in some cases. It leads to test failures on Windows if the directory 
deletion failure
+      // throws an exception.
+      brokerConf.logDirs.foreach { f =>
+        try {
+          Utils.deleteRecursively(new File(f))
+        } catch {
+          case e: IOException if Utils.isWindows =>
+            logWarning(e.getMessage)
+        }
+      }
 
-    if (zookeeper != null) {
-      zookeeper.shutdown()
-      zookeeper = null
+      if (zkUtils != null) {
+        zkUtils.close()
+        zkUtils = null
+      }
+
+      if (zookeeper != null) {
+        zookeeper.shutdown()
+        zookeeper = null
+      }
+    } finally {
+      Exit.resetExitProcedure()
+      Exit.resetHaltProcedure()
     }
   }
 
@@ -299,8 +315,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = 
Map.empty) extends L
   protected def brokerConfiguration: Properties = {
     val props = new Properties()
     props.put("broker.id", "0")
-    props.put("host.name", "localhost")
-    props.put("advertised.host.name", "localhost")
+    props.put("host.name", "127.0.0.1")
+    props.put("advertised.host.name", "127.0.0.1")
     props.put("port", brokerPort.toString)
     props.put("log.dir", Utils.createTempDir().getAbsolutePath)
     props.put("zookeeper.connect", zkAddress)

http://git-wip-us.apache.org/repos/asf/spark/blob/80784a1d/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index eef4c55..bd3cf9a 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -34,6 +34,7 @@ import kafka.utils.ZkUtils
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.serialization.StringSerializer
+import org.apache.kafka.common.utils.Exit
 import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
 
 import org.apache.spark.SparkConf
@@ -50,7 +51,7 @@ import org.apache.spark.util.Utils
 private[kafka010] class KafkaTestUtils extends Logging {
 
   // Zookeeper related configurations
-  private val zkHost = "localhost"
+  private val zkHost = "127.0.0.1"
   private var zkPort: Int = 0
   private val zkConnectionTimeout = 60000
   private val zkSessionTimeout = 6000
@@ -60,7 +61,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
   private var zkUtils: ZkUtils = _
 
   // Kafka broker related configurations
-  private val brokerHost = "localhost"
+  private val brokerHost = "127.0.0.1"
   private var brokerPort = 0
   private var brokerConf: KafkaConfig = _
 
@@ -125,40 +126,55 @@ private[kafka010] class KafkaTestUtils extends Logging {
 
   /** Teardown the whole servers, including Kafka broker and Zookeeper */
   def teardown(): Unit = {
-    brokerReady = false
-    zkReady = false
-
-    if (producer != null) {
-      producer.close()
-      producer = null
+    // There is a race condition that may kill JVM when terminating the Kafka 
cluster. We set
+    // a custom Procedure here during the termination in order to keep JVM 
running and not fail the
+    // tests.
+    val logExitEvent = new Exit.Procedure {
+      override def execute(statusCode: Int, message: String): Unit = {
+        logError(s"Prevent Kafka from killing JVM (statusCode: $statusCode 
message: $message)")
+      }
     }
+    Exit.setExitProcedure(logExitEvent)
+    Exit.setHaltProcedure(logExitEvent)
+    try {
+      brokerReady = false
+      zkReady = false
+
+      if (producer != null) {
+        producer.close()
+        producer = null
+      }
 
-    if (server != null) {
-      server.shutdown()
-      server.awaitShutdown()
-      server = null
-    }
+      if (server != null) {
+        server.shutdown()
+        server.awaitShutdown()
+        server = null
+      }
 
-    // On Windows, `logDirs` is left open even after Kafka server above is 
completely shut down
-    // in some cases. It leads to test failures on Windows if the directory 
deletion failure
-    // throws an exception.
-    brokerConf.logDirs.foreach { f =>
-      try {
-        Utils.deleteRecursively(new File(f))
-      } catch {
-        case e: IOException if Utils.isWindows =>
-          logWarning(e.getMessage)
+      // On Windows, `logDirs` is left open even after Kafka server above is 
completely shut down
+      // in some cases. It leads to test failures on Windows if the directory 
deletion failure
+      // throws an exception.
+      brokerConf.logDirs.foreach { f =>
+        try {
+          Utils.deleteRecursively(new File(f))
+        } catch {
+          case e: IOException if Utils.isWindows =>
+            logWarning(e.getMessage)
+        }
       }
-    }
 
-    if (zkUtils != null) {
-      zkUtils.close()
-      zkUtils = null
-    }
+      if (zkUtils != null) {
+        zkUtils.close()
+        zkUtils = null
+      }
 
-    if (zookeeper != null) {
-      zookeeper.shutdown()
-      zookeeper = null
+      if (zookeeper != null) {
+        zookeeper.shutdown()
+        zookeeper = null
+      }
+    } finally {
+      Exit.resetExitProcedure()
+      Exit.resetHaltProcedure()
     }
   }
 
@@ -217,7 +233,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
   private def brokerConfiguration: Properties = {
     val props = new Properties()
     props.put("broker.id", "0")
-    props.put("host.name", "localhost")
+    props.put("host.name", "127.0.0.1")
+    props.put("advertised.host.name", "127.0.0.1")
     props.put("port", brokerPort.toString)
     props.put("log.dir", brokerLogDir)
     props.put("zookeeper.connect", zkAddress)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to