Repository: spark Updated Branches: refs/heads/master b29663eee -> 77620be76
[SPARK-6207] [YARN] [SQL] Adds delegation tokens for metastore to conf. Adds hive2-metastore delegation token to conf when running in secure mode. Without this change, running on YARN in cluster mode fails with a GSS exception. This is a rough patch that adds a dependency to spark/yarn on hive-exec. I'm looking for suggestions on how to make this patch better. This contribution is my original work and that I licenses the work to the Apache Spark project under the project's open source licenses. Author: Doug Balog <doug.balogtarget.com> Author: Doug Balog <doug.ba...@target.com> Closes #5031 from dougb/SPARK-6207 and squashes the following commits: 3e9ac16 [Doug Balog] [SPARK-6207] Fixes minor code spacing issues. e260765 [Doug Balog] [SPARK-6207] Second pass at adding Hive delegation token to conf. - Use reflection instead of adding dependency on hive. - Tested on Hive 0.13 and Hadoop 2.4.1 1ab1729 [Doug Balog] Merge branch 'master' of git://github.com/apache/spark into SPARK-6207 bf356d2 [Doug Balog] [SPARK-6207] [YARN] [SQL] Adds delegation tokens for metastore to conf. Adds hive2-metastore delagations token to conf when running in securemode. Without this change, runing on YARN in cluster mode fails with a GSS exception. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77620be7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77620be7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77620be7 Branch: refs/heads/master Commit: 77620be76e82b6cdaae406cd752d3272656f5fe0 Parents: b29663e Author: Doug Balog <doug.ba...@target.com> Authored: Mon Apr 13 09:49:58 2015 -0500 Committer: Thomas Graves <tgra...@apache.org> Committed: Mon Apr 13 09:49:58 2015 -0500 ---------------------------------------------------------------------- .../org/apache/spark/deploy/yarn/Client.scala | 63 ++++++++++++++++++++ 1 file changed, 63 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/77620be7/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index c1effd3..1091ff5 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -22,17 +22,21 @@ import java.nio.ByteBuffer import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map} +import scala.reflect.runtime.universe import scala.util.{Try, Success, Failure} import com.google.common.base.Objects import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.Master import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token import org.apache.hadoop.util.StringUtils import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment @@ -220,6 +224,7 @@ private[spark] class Client( val dst = new Path(fs.getHomeDirectory(), appStagingDir) val nns = getNameNodesToAccess(sparkConf) + dst obtainTokensForNamenodes(nns, hadoopConf, credentials) + obtainTokenForHiveMetastore(hadoopConf, credentials) val replication = sparkConf.getInt("spark.yarn.submit.file.replication", fs.getDefaultReplication(dst)).toShort @@ -937,6 +942,64 @@ object Client extends Logging { } /** + * Obtains token for the Hive metastore and adds them to the credentials. + */ + private def obtainTokenForHiveMetastore(conf: Configuration, credentials: Credentials) { + if (UserGroupInformation.isSecurityEnabled) { + val mirror = universe.runtimeMirror(getClass.getClassLoader) + + try { + val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive") + val hive = hiveClass.getMethod("get").invoke(null) + + val hiveConf = hiveClass.getMethod("getConf").invoke(hive) + val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf") + + val hiveConfGet = (param:String) => Option(hiveConfClass + .getMethod("get", classOf[java.lang.String]) + .invoke(hiveConf, param)) + + val metastore_uri = hiveConfGet("hive.metastore.uris") + + // Check for local metastore + if (metastore_uri != None && metastore_uri.get.toString.size > 0) { + val metastore_kerberos_principal_conf_var = mirror.classLoader + .loadClass("org.apache.hadoop.hive.conf.HiveConf$ConfVars") + .getField("METASTORE_KERBEROS_PRINCIPAL").get("varname").toString + + val principal = hiveConfGet(metastore_kerberos_principal_conf_var) + + val username = Option(UserGroupInformation.getCurrentUser().getUserName) + if (principal != None && username != None) { + val tokenStr = hiveClass.getMethod("getDelegationToken", + classOf[java.lang.String], classOf[java.lang.String]) + .invoke(hive, username.get, principal.get).asInstanceOf[java.lang.String] + + val hive2Token = new Token[DelegationTokenIdentifier]() + hive2Token.decodeFromUrlString(tokenStr) + credentials.addToken(new Text("hive.server2.delegation.token"),hive2Token) + logDebug("Added hive.Server2.delegation.token to conf.") + hiveClass.getMethod("closeCurrent").invoke(null) + } else { + logError("Username or principal == NULL") + logError(s"""username=${username.getOrElse("(NULL)")}""") + logError(s"""principal=${principal.getOrElse("(NULL)")}""") + throw new IllegalArgumentException("username and/or principal is equal to null!") + } + } else { + logDebug("HiveMetaStore configured in localmode") + } + } catch { + case e:java.lang.NoSuchMethodException => { logInfo("Hive Method not found " + e); return } + case e:java.lang.ClassNotFoundException => { logInfo("Hive Class not found " + e); return } + case e:Exception => { logError("Unexpected Exception " + e) + throw new RuntimeException("Unexpected exception", e) + } + } + } + } + + /** * Return whether the two file systems are the same. */ private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org