Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r140625136 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( + conf: SparkConf, + tokenManager: HadoopDelegationTokenManager, + nextRenewal: Long, + de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = + Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { + val keytab64 = conf.get("spark.yarn.keytab", null) + val tgt64 = System.getenv("KRB5CCNAME") + require(keytab64 != null || tgt64 != null, "keytab or tgt required") + require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be used at the same time") + val mode = if (keytab64 != null) "keytab" else "tgt" + val secretFile = if (keytab64 != null) keytab64 else tgt64 + logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS delegation tokens") + logDebug(s"secretFile is $secretFile") + (secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { + def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { + logInfo("Credentials have expired, creating new ones now.") + runnable.run() + } else { + logInfo(s"Scheduling login from keytab in $remainingTime millis.") + credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } + } + + val credentialRenewerRunnable = + new Runnable { + override def run(): Unit = { + try { + val creds = getRenewedDelegationTokens(conf) + broadcastDelegationTokens(creds) + } catch { + case e: Exception => + // Log the error and try to write new tokens back in an hour + logWarning("Couldn't broadcast tokens, trying again in an hour", e) + credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS) + return + } + scheduleRenewal(this) + } + } + scheduleRenewal(credentialRenewerRunnable) + } + + private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = { + logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", null)}") + // Get new delegation tokens by logging in with a new UGI + // inspired by AMCredentialRenewer.scala:L174 + val ugi = if (mode == "keytab") { --- End diff -- Hello @kalvinnchau You are correct, all this does is keep track of when the tokens will expire and renew them at that time. Part of my motivation for doing this is to avoid writing any files to disk (like new TGTs, if that's what you're suggesting). We can simply mount the keytab via the Mesos secrets primitive, then renew the tokens every so often. In order to be consistent I tried to keep this solution as close to YARN as possible.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org