Github user ArtRand commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19272#discussion_r140625136
  
    --- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
    @@ -0,0 +1,150 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster.mesos
    +
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{Executors, TimeUnit}
    +
    +import scala.collection.JavaConverters._
    +import scala.util.Try
    +
    +import org.apache.hadoop.security.UserGroupInformation
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.RpcEndpointRef
    +import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.util.ThreadUtils
    +
    +
    +class MesosCredentialRenewer(
    +    conf: SparkConf,
    +    tokenManager: HadoopDelegationTokenManager,
    +    nextRenewal: Long,
    +    de: RpcEndpointRef) extends Logging {
    +  private val credentialRenewerThread =
    +    Executors.newSingleThreadScheduledExecutor(
    +      ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
    +
    +  @volatile private var timeOfNextRenewal = nextRenewal
    +
    +  private val principal = conf.get("spark.yarn.principal")
    +
    +  private val (secretFile, mode) = getSecretFile(conf)
    +
    +  private def getSecretFile(conf: SparkConf): (String, String) = {
    +    val keytab64 = conf.get("spark.yarn.keytab", null)
    +    val tgt64 = System.getenv("KRB5CCNAME")
    +    require(keytab64 != null || tgt64 != null, "keytab or tgt required")
    +    require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be 
used at the same time")
    +    val mode = if (keytab64 != null) "keytab" else "tgt"
    +    val secretFile = if (keytab64 != null) keytab64 else tgt64
    +    logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS 
delegation tokens")
    +    logDebug(s"secretFile is $secretFile")
    +    (secretFile, mode)
    +  }
    +
    +  def scheduleTokenRenewal(): Unit = {
    +    def scheduleRenewal(runnable: Runnable): Unit = {
    +      val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
    +      if (remainingTime <= 0) {
    +        logInfo("Credentials have expired, creating new ones now.")
    +        runnable.run()
    +      } else {
    +        logInfo(s"Scheduling login from keytab in $remainingTime millis.")
    +        credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
    +      }
    +    }
    +
    +    val credentialRenewerRunnable =
    +      new Runnable {
    +        override def run(): Unit = {
    +          try {
    +            val creds = getRenewedDelegationTokens(conf)
    +            broadcastDelegationTokens(creds)
    +          } catch {
    +            case e: Exception =>
    +              // Log the error and try to write new tokens back in an hour
    +              logWarning("Couldn't broadcast tokens, trying again in an 
hour", e)
    +              credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
    +              return
    +          }
    +          scheduleRenewal(this)
    +        }
    +      }
    +    scheduleRenewal(credentialRenewerRunnable)
    +  }
    +
    +  private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = {
    +    logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", 
null)}")
    +    // Get new delegation tokens by logging in with a new UGI
    +    // inspired by AMCredentialRenewer.scala:L174
    +    val ugi = if (mode == "keytab") {
    --- End diff --
    
    Hello @kalvinnchau You are correct, all this does is keep track of when the 
tokens will expire and renew them at that time. Part of my motivation for doing 
this is to avoid writing any files to disk (like new TGTs, if that's what 
you're suggesting). We can simply mount the keytab via the Mesos secrets 
primitive, then renew the tokens every so often. In order to be consistent I 
tried to keep this solution as close to YARN as possible. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to