I'd like to toss out another idea that doesn't involve a complete end-to-end Kerberos implementation. Essentially, have the driver authenticate to Kerberos, instantiate a Hadoop file system, and serialize/cache it for the executors to use instead of them having to instantiate their own.
- Driver authenticates to Kerberos via UserGroupInformation.loginUserFromKeytab(principal, keytab) - Driver instantiates a Hadoop configuration via hdfs-site.xml and core-site.xml - Driver instantiates the Hadoop file system from a path based on the Hadoop root URI (hdfs://hadoop-cluster.site.org/) and hadoop config - Driver makes this file system available to all future executors - Executors first check for an existing/cached file system object before instantiating their own I've used this technique within (non-distributed) multi-threaded Java applications (https://github.com/blackberry/KaBoom) and it works quite well. I'm not sure if it's possible to implement on a distributed application like Spark, but it might be worth investigation... Here's the references to file system instantiation that I found in 1.4: dariens@dariens-laptop1 (branch-1.4) ~/OtherProjects/spark $ grep -ir getFileSystem * core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala: val fs = path.getFileSystem(sc.hadoopConfiguration) core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala: @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration) core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala: val fs = outputDir.getFileSystem(broadcastedConf.value.value) core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala: val fs = path.getFileSystem(broadcastedConf.value.value) core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala: val fs = path.getFileSystem(conf) core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala: val fs = path.getFileSystem(rdd.context.hadoopConfiguration) core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala: val fs = path.getFileSystem(sc.hadoopConfiguration) core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala: val threadStats = getFileSystemThreadStatistics() core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala: val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead") core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala: val threadStats = getFileSystemThreadStatistics() core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala: val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten") core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala: private def getFileSystemThreadStatistics(): Seq[AnyRef] = { core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala: private def getFileSystemThreadStatisticsMethod(methodName: String): Method = { core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala: val fs = pattern.getFileSystem(conf) core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala: path.getFileSystem(conf.value) core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala: val fs = outputPath.getFileSystem(conf) core/src/main/scala/org/apache/spark/SparkContext.scala: val fs = hadoopPath.getFileSystem(hadoopConfiguration) core/src/main/scala/org/apache/spark/SparkContext.scala: val fs = path.getFileSystem(hadoopConfiguration) core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala: val fs = file.getFileSystem(job) core/src/main/scala/org/apache/spark/input/PortableDataStream.scala: val fs = pathp.getFileSystem(conf) core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala: private[this] val fs = path.getFileSystem( sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala: val fs = path.getFileSystem(configuration) sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala: val fs = outputPath.getFileSystem(hadoopConf) sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala: val fs = hdfsPath.getFileSystem(hadoopConf) sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala: val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala: val fileSystem = outputPath.getFileSystem(configuration) sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala: val fs = origPath.getFileSystem(conf) sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala: val fs = origPath.getFileSystem(conf) sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala: val fs: FileSystem = origPath.getFileSystem(conf) sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala: path.getFileSystem(conf).makeQualified(path) sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala: val fs = fspath.getFileSystem(conf) sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala: val fs = footer.getFile.getFileSystem(configuration) sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala: val fs = file.getFileSystem(configuration) sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala: val fs = origPath.getFileSystem(conf) sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala: val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala: val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java: fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration()); sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala: val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala: val fs = path.getFileSystem(SparkHadoopUtil.get.conf) sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala: val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala: val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala: val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf) sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala: val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala: val fs = summaryPath.getFileSystem(configuration) sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala: val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala: val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala: val fs = fspath.getFileSystem(conf) sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala: val fs = origPath.getFileSystem(conf) sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala: new Path(path, filename).getFileSystem(conf), sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala: val fs = pathPattern.getFileSystem(sc.hiveconf) sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala: val fs = path.getFileSystem(sc.hiveconf) sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala: val fs = outputPath.getFileSystem(conf) sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala: val fs = path.getFileSystem(conf) streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala: val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala: val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala: val fs = rootDir.getFileSystem(new Configuration()) streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala: var fs = testDir.getFileSystem(new Configuration()) streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala: fs = testDir.getFileSystem(new Configuration()) streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala: val dfs = getFileSystemForPath(dfsPath, conf) streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala: val dfs = getFileSystemForPath(dfsPath, conf) streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala: val dfs = getFileSystemForPath(dfsPath, conf) streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala: def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = { streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala: val fs = path.getFileSystem(conf) streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala: val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf) streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala: val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala: val fs = fsOption.getOrElse(path.getFileSystem(new Configuration())) streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala: if (fs_ == null) fs_ = new Path(checkpointDir).getFileSystem(hadoopConf) streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala: def fs: FileSystem = checkpointPath.getFileSystem(hadoopConf) streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala: if (fs_ == null) fs_ = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration) streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala: fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala: val fs = path.getFileSystem(sparkContext.hadoopConfiguration) yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala: val destFs = destDir.getFileSystem(hadoopConf) yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala: val srcFs = srcPath.getFileSystem(hadoopConf) yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala: val dstFs = dst.getFileSystem(conf) From: Steve Loughran [mailto:ste...@hortonworks.com] Sent: Sunday, June 28, 2015 10:34 AM To: Tim Chen Cc: Marcelo Vanzin; Dave Ariens; Olivier Girardot; user@spark.apache.org Subject: Re: Accessing Kerberos Secured HDFS Resources from Spark on Mesos On 27 Jun 2015, at 07:56, Tim Chen <t...@mesosphere.io<mailto:t...@mesosphere.io>> wrote: Does YARN provide the token through that env variable you mentioned? Or how does YARN do this? Roughly: 1. client-side launcher creates the delegation tokens and adds them as byte[] data to the the request. 2. The YARN RM uses the HDFS token for the localisation, so the node managers can access the content the user has the rights to. 3. There's some other stuff related to token refresh of restarted app masters, essentially guaranteeing that even an AM restarted 3 days after the first launch will still have current credentials. 4. It's the duty of the launched App master to download those delegated tokens and make use of them. partly through the UGI stuff, also through other mechanisms (example, a subset of the tokens are usually passed to the launched containers) 5. It's also the duty of the launched AM to deal with token renewal and expiry. Short-lived (< 72h) apps don't have to worry about this -making the jump to long lived services adds a lot of extra work (which is in Spark 1.4) Tim On Fri, Jun 26, 2015 at 3:51 PM, Marcelo Vanzin <van...@cloudera.com<mailto:van...@cloudera.com>> wrote: On Fri, Jun 26, 2015 at 3:44 PM, Dave Ariens <dari...@blackberry.com<mailto:dari...@blackberry.com>> wrote: Fair. I will look into an alternative with a generated delegation token. However the same issue exists. How can I have the executor run some arbitrary code when it gets a task assignment and before it proceeds to process it's resources? Hmm, good question. If it doesn't already, Mesos could have its own implementation of CoarseGrainedExecutorBackend that provides that functionality. The only difference is that you'd run something before the executor starts up, not before each task. YARN actually doesn't do it that way; YARN provides the tokens to the executor before the process starts, so that when you call "UserGroupInformation.getCurrentUser()" the tokens are already there. One way of doing that is by writing the tokens to a file and setting the KRB5CCNAME env variable when starting the process. You can check the Hadoop sources for details. Not sure if there's another way. From: Marcelo Vanzin Sent: Friday, June 26, 2015 6:20 PM To: Dave Ariens Cc: Tim Chen; Olivier Girardot; user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: Accessing Kerberos Secured HDFS Resources from Spark on Mesos On Fri, Jun 26, 2015 at 3:09 PM, Dave Ariens <dari...@blackberry.com<mailto:dari...@blackberry.com>> wrote: Would there be any way to have the task instances in the slaves call the UGI login with a principal/keytab provided to the driver? That would only work with a very small number of executors. If you have many login requests in a short period of time with the same principal, the KDC will start to deny logins. That's why delegation tokens are used instead of explicit logins. -- Marcelo -- Marcelo