I'd like to toss out another idea that doesn't involve a complete end-to-end 
Kerberos implementation.  Essentially, have the driver authenticate to  
Kerberos, instantiate a Hadoop file system, and serialize/cache it for the 
executors to use instead of them having to instantiate their own.

- Driver authenticates to Kerberos via 
UserGroupInformation.loginUserFromKeytab(principal, keytab)
- Driver instantiates a Hadoop configuration via hdfs-site.xml and core-site.xml
- Driver instantiates the Hadoop file system from a path based on the Hadoop 
root URI (hdfs://hadoop-cluster.site.org/) and hadoop config
- Driver makes this file system available to all future executors
- Executors first check for an existing/cached file system object before 
instantiating their own

I've used this technique within (non-distributed) multi-threaded Java 
applications (https://github.com/blackberry/KaBoom) and it works quite well. 
I'm not sure if it's possible to implement on a distributed application like 
Spark, but it might be worth investigation...

Here's the references to file system instantiation that I found in 1.4:

dariens@dariens-laptop1 (branch-1.4) ~/OtherProjects/spark $ grep -ir 
getFileSystem *
core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala:    val fs = 
path.getFileSystem(sc.hadoopConfiguration)
core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala:  @transient val 
fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala:    val fs = 
outputDir.getFileSystem(broadcastedConf.value.value)
core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala:    val fs = 
path.getFileSystem(broadcastedConf.value.value)
core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala:    val fs = 
path.getFileSystem(conf)
core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala:    val fs = 
path.getFileSystem(rdd.context.hadoopConfiguration)
core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala:      val fs = 
path.getFileSystem(sc.hadoopConfiguration)
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:      val 
threadStats = getFileSystemThreadStatistics()
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:      val 
getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:      val 
threadStats = getFileSystemThreadStatistics()
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:      val 
getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:  private def 
getFileSystemThreadStatistics(): Seq[AnyRef] = {
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:  private def 
getFileSystemThreadStatisticsMethod(methodName: String): Method = {
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:    val fs = 
pattern.getFileSystem(conf)
core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala:        
path.getFileSystem(conf.value)
core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala:    val fs = 
outputPath.getFileSystem(conf)
core/src/main/scala/org/apache/spark/SparkContext.scala:      val fs = 
hadoopPath.getFileSystem(hadoopConfiguration)
core/src/main/scala/org/apache/spark/SparkContext.scala:      val fs = 
path.getFileSystem(hadoopConfiguration)
core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala: 
   val fs = file.getFileSystem(job)
core/src/main/scala/org/apache/spark/input/PortableDataStream.scala:      val 
fs = pathp.getFileSystem(conf)
core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala:  
private[this] val fs = path.getFileSystem(
sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala:      
  val fs = path.getFileSystem(configuration)
sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala:    val fs 
= outputPath.getFileSystem(hadoopConf)
sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala:        
val fs = hdfsPath.getFileSystem(hadoopConf)
sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala:          val fs 
= path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala:
    val fileSystem = outputPath.getFileSystem(configuration)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala:    
val fs = origPath.getFileSystem(conf)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala:    val 
fs = origPath.getFileSystem(conf)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala:    val 
fs: FileSystem = origPath.getFileSystem(conf)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala:
        path.getFileSystem(conf).makeQualified(path)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala:
    val fs = fspath.getFileSystem(conf)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala:
      val fs = footer.getFile.getFileSystem(configuration)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala:
      val fs = file.getFileSystem(configuration)
sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala:
    val fs = origPath.getFileSystem(conf)
sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala:    val fs 
= filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala:    val fs 
= filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java:
    fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala:
      val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala:
      val fs = path.getFileSystem(SparkHadoopUtil.get.conf)
sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala:
        val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala:
      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala:
      val fs = new 
Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf)
sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala:
      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala:
      val fs = summaryPath.getFileSystem(configuration)
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala:
      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala:
      val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala:    
val fs = fspath.getFileSystem(conf)
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala:    
val fs = origPath.getFileSystem(conf)
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala:      
new Path(path, filename).getFileSystem(conf),
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala:            
  val fs = pathPattern.getFileSystem(sc.hiveconf)
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala:        val 
fs = path.getFileSystem(sc.hiveconf)
sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala:   
 val fs = outputPath.getFileSystem(conf)
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala:            
val fs = path.getFileSystem(conf)
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala:
    val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, 
hadoopConf)
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala:
    val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, 
hadoopConf)
streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala:    
val fs = rootDir.getFileSystem(new Configuration())
streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala:    
var fs = testDir.getFileSystem(new Configuration())
streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala:    
              fs = testDir.getFileSystem(new Configuration())
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala:    
val dfs = getFileSystemForPath(dfsPath, conf)
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala:    
val dfs = getFileSystemForPath(dfsPath, conf)
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala:    
val dfs = getFileSystemForPath(dfsPath, conf)
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala:  def 
getFileSystemForPath(path: Path, conf: Configuration): FileSystem = {
streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala:    
val fs = path.getFileSystem(conf)
streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala:
          val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala:
    val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, 
hadoopConf)
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala:    val fs 
= fsOption.getOrElse(path.getFileSystem(new Configuration()))
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala:    if 
(fs_ == null) fs_ = new Path(checkpointDir).getFileSystem(hadoopConf)
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala:    def 
fs: FileSystem = checkpointPath.getFileSystem(hadoopConf)
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala:
    if (fs_ == null) fs_ = 
directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration)
streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala:
                fileSystem = 
path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration)
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala:     
 val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:    val destFs = 
destDir.getFileSystem(hadoopConf)
yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:    val srcFs = 
srcPath.getFileSystem(hadoopConf)
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala:     
   val dstFs = dst.getFileSystem(conf)



From: Steve Loughran [mailto:ste...@hortonworks.com]
Sent: Sunday, June 28, 2015 10:34 AM
To: Tim Chen
Cc: Marcelo Vanzin; Dave Ariens; Olivier Girardot; user@spark.apache.org
Subject: Re: Accessing Kerberos Secured HDFS Resources from Spark on Mesos


On 27 Jun 2015, at 07:56, Tim Chen 
<t...@mesosphere.io<mailto:t...@mesosphere.io>> wrote:

Does YARN provide the token through that env variable you mentioned? Or how 
does YARN do this?



Roughly:

1. client-side launcher creates the delegation tokens and adds them as byte[] 
data to the the request.
2. The YARN RM uses the HDFS token for the localisation, so the node managers 
can access the content the user has the rights to.
3. There's some other stuff related to token refresh of restarted app masters, 
essentially guaranteeing that even an AM restarted 3 days after the first 
launch will still have current credentials.
4. It's the duty of the launched App master to download those delegated tokens 
and make use of them. partly through the UGI stuff, also through other 
mechanisms (example, a subset of the tokens are usually passed to the launched 
containers)
5. It's also the duty of the launched AM to deal with token renewal and expiry. 
Short-lived (< 72h) apps don't have to worry about this -making the jump to 
long lived services adds a lot of extra work (which is in Spark 1.4)



Tim

On Fri, Jun 26, 2015 at 3:51 PM, Marcelo Vanzin 
<van...@cloudera.com<mailto:van...@cloudera.com>> wrote:
On Fri, Jun 26, 2015 at 3:44 PM, Dave Ariens 
<dari...@blackberry.com<mailto:dari...@blackberry.com>> wrote:
Fair. I will look into an alternative with a generated delegation token.   
However the same issue exists.   How can I have the executor run some arbitrary 
code when it gets a task assignment and before it proceeds to process it's 
resources?

Hmm, good question. If it doesn't already, Mesos could have its own 
implementation of CoarseGrainedExecutorBackend that provides that 
functionality. The only difference is that you'd run something before the 
executor starts up, not before each task.

YARN actually doesn't do it that way; YARN provides the tokens to the executor 
before the process starts, so that when you call 
"UserGroupInformation.getCurrentUser()" the tokens are already there.

One way of doing that is by writing the tokens to a file and setting the 
KRB5CCNAME env variable when starting the process. You can check the Hadoop 
sources for details. Not sure if there's another way.



From: Marcelo Vanzin
Sent: Friday, June 26, 2015 6:20 PM
To: Dave Ariens
Cc: Tim Chen; Olivier Girardot; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Accessing Kerberos Secured HDFS Resources from Spark on Mesos


On Fri, Jun 26, 2015 at 3:09 PM, Dave Ariens 
<dari...@blackberry.com<mailto:dari...@blackberry.com>> wrote:
Would there be any way to have the task instances in the slaves call the UGI 
login with a principal/keytab provided to the driver?

That would only work with a very small number of executors. If you have many 
login requests in a short period of time with the same principal, the KDC will 
start to deny logins. That's why delegation tokens are used instead of explicit 
logins.

--
Marcelo



--
Marcelo


Reply via email to