Repository: spark
Updated Branches:
  refs/heads/master a458efc66 -> 50c3a86f4


[SPARK-6749] [SQL] Make metastore client robust to underlying socket connection 
loss

This works around a bug in the underlying RetryingMetaStoreClient (HIVE-10384) 
by refreshing the metastore client on thrift exceptions. We attempt to emulate 
the proper hive behavior by retrying only as configured by hiveconf.

Author: Eric Liang <e...@databricks.com>

Closes #6912 from ericl/spark-6749 and squashes the following commits:

2d54b55 [Eric Liang] use conf from state
0e3a74e [Eric Liang] use shim properly
980b3e5 [Eric Liang] Fix conf parsing hive 0.14 conf.
92459b6 [Eric Liang] Work around RetryingMetaStoreClient bug


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50c3a86f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50c3a86f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50c3a86f

Branch: refs/heads/master
Commit: 50c3a86f42d7dfd1acbda65c1e5afbd3db1406df
Parents: a458efc
Author: Eric Liang <e...@databricks.com>
Authored: Tue Jun 23 22:27:17 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Jun 23 22:27:17 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/client/ClientWrapper.scala   | 55 +++++++++++++++++++-
 .../apache/spark/sql/hive/client/HiveShim.scala | 19 +++++++
 2 files changed, 72 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/50c3a86f/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 42c2d4c..2f771d7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
 import java.io.{BufferedReader, InputStreamReader, File, PrintStream}
 import java.net.URI
 import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => 
JSet}
+import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.JavaConversions._
 import scala.language.reflectiveCalls
@@ -136,12 +137,62 @@ private[hive] class ClientWrapper(
 
   // TODO: should be a def?s
   // When we create this val client, the HiveConf of it (conf) is the one 
associated with state.
-  private val client = Hive.get(conf)
+  @GuardedBy("this")
+  private var client = Hive.get(conf)
+
+  // We use hive's conf for compatibility.
+  private val retryLimit = 
conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES)
+  private val retryDelayMillis = 
shim.getMetastoreClientConnectRetryDelayMillis(conf)
+
+  /**
+   * Runs `f` with multiple retries in case the hive metastore is temporarily 
unreachable.
+   */
+  private def retryLocked[A](f: => A): A = synchronized {
+    // Hive sometimes retries internally, so set a deadline to avoid 
compounding delays.
+    val deadline = System.nanoTime + (retryLimit * retryDelayMillis * 
1e6).toLong
+    var numTries = 0
+    var caughtException: Exception = null
+    do {
+      numTries += 1
+      try {
+        return f
+      } catch {
+        case e: Exception if causedByThrift(e) =>
+          caughtException = e
+          logWarning(
+            "HiveClientWrapper got thrift exception, destroying client and 
retrying " +
+              s"(${retryLimit - numTries} tries remaining)", e)
+          Thread.sleep(retryDelayMillis)
+          try {
+            client = Hive.get(state.getConf, true)
+          } catch {
+            case e: Exception if causedByThrift(e) =>
+              logWarning("Failed to refresh hive client, will retry.", e)
+          }
+      }
+    } while (numTries <= retryLimit && System.nanoTime < deadline)
+    if (System.nanoTime > deadline) {
+      logWarning("Deadline exceeded")
+    }
+    throw caughtException
+  }
+
+  private def causedByThrift(e: Throwable): Boolean = {
+    var target = e
+    while (target != null) {
+      val msg = target.getMessage()
+      if (msg != null && 
msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
+        return true
+      }
+      target = target.getCause()
+    }
+    false
+  }
 
   /**
    * Runs `f` with ThreadLocal session state and classloaders configured for 
this version of hive.
    */
-  private def withHiveState[A](f: => A): A = synchronized {
+  private def withHiveState[A](f: => A): A = retryLocked {
     val original = Thread.currentThread().getContextClassLoader
     // Set the thread local metastore client to the client associated with 
this ClientWrapper.
     Hive.set(client)

http://git-wip-us.apache.org/repos/asf/spark/blob/50c3a86f/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 5ae2dbb..e7c1779 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -21,6 +21,7 @@ import java.lang.{Boolean => JBoolean, Integer => JInteger}
 import java.lang.reflect.{Method, Modifier}
 import java.net.URI
 import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => 
JSet}
+import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConversions._
 
@@ -64,6 +65,8 @@ private[client] sealed abstract class Shim {
 
   def getDriverResults(driver: Driver): Seq[String]
 
+  def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long
+
   def loadPartition(
       hive: Hive,
       loadPath: Path,
@@ -192,6 +195,10 @@ private[client] class Shim_v0_12 extends Shim {
     res.toSeq
   }
 
+  override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long 
= {
+    conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 
1000
+  }
+
   override def loadPartition(
       hive: Hive,
       loadPath: Path,
@@ -321,6 +328,12 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
       JBoolean.TYPE,
       JBoolean.TYPE,
       JBoolean.TYPE)
+  private lazy val getTimeVarMethod =
+    findMethod(
+      classOf[HiveConf],
+      "getTimeVar",
+      classOf[HiveConf.ConfVars],
+      classOf[TimeUnit])
 
   override def loadPartition(
       hive: Hive,
@@ -359,4 +372,10 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
       numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, 
JBoolean.FALSE)
   }
 
+  override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long 
= {
+    getTimeVarMethod.invoke(
+      conf,
+      HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY,
+      TimeUnit.MILLISECONDS).asInstanceOf[Long]
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to