This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 242581f  [SPARK-33440][CORE] Use current timestamp with warning log in 
HadoopFSDelegationTokenProvider when the issue date for token is not set up 
properly
242581f is described below

commit 242581f4926c994bfc5af388cae31645112b2798
Author: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
AuthorDate: Tue Dec 1 06:44:15 2020 +0900

    [SPARK-33440][CORE] Use current timestamp with warning log in 
HadoopFSDelegationTokenProvider when the issue date for token is not set up 
properly
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to use current timestamp with warning log when the issue 
date for token is not set up properly. The next section will explain the 
rationalization with details.
    
    ### Why are the changes needed?
    
    Unfortunately not every implementations respect the `issue date` in 
`AbstractDelegationTokenIdentifier`, which Spark relies on while calculating. 
The default value of issue date is 0L, which is far from actual issue date, 
breaking logic on calculating next renewal date under some circumstance, 
leading to 0 interval (immediate) on rescheduling token renewal.
    
    In HadoopFSDelegationTokenProvider, Spark calculates token renewal interval 
as below:
    
    
https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala#L123-L134
    
    The interval is calculated as `token.renew() - identifier.getIssueDate`, 
which is providing correct interval assuming both `token.renew()` and 
`identifier.getIssueDate` produce correct value, but it's going to be weird 
when `identifier.getIssueDate` provides 0L (default value), like below:
    
    ```
    20/10/13 06:34:19 INFO security.HadoopFSDelegationTokenProvider: Renewal 
interval is 1603175657000 for token S3ADelegationToken/IDBroker
    20/10/13 06:34:19 INFO security.HadoopFSDelegationTokenProvider: Renewal 
interval is 86400048 for token HDFS_DELEGATION_TOKEN
    ```
    
    Hopefully we pick the minimum value as safety guard (so in this case, 
`86400048` is being picked up), but the safety guard leads unintentional bad 
impact on this case.
    
    
https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala#L58-L71
    
    Spark leverages the interval being calculated in above, "minimum" value of 
intervals, and blindly adds the value to token's issue date to calculates the 
next renewal date for the token, and picks "minimum" value again. In 
problematic case, the value would be `86400048` (86400048 + 0) which is quite 
smaller than current timestamp.
    
    
https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala#L228-L234
    
    The next renewal date is subtracted with current timestamp again to get the 
interval, and multiplexed by configured ratio to produce the final schedule 
interval. In problematic case, this value goes to negative.
    
    
https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala#L180-L188
    
    There's a safety guard to not allow negative value, but that's simply 0 
meaning schedule immediately. This triggers next calculation of next renewal 
date to calculate the schedule interval, lead to the same behavior, hence 
updating delegation token immediately and continuously.
    
    As we fetch token just before the calculation happens, the actual issue 
date is likely slightly before, hence it's not that dangerous to use current 
timestamp as issue date for the token the issue date has not been set up 
properly. Still, it's better not to leave the token implementation as it is, so 
we log warn message to let end users consult with token implementer.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. End users won't encounter the tight loop of schedule of token renewal 
after the PR. In end users' perspective of reflection, there's nothing end 
users need to change.
    
    ### How was this patch tested?
    
    Manually tested with problematic environment.
    
    Closes #30366 from HeartSaVioR/SPARK-33440.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
    (cherry picked from commit f5d2165c95fe83f24be9841807613950c1d5d6d0)
    Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
---
 .../security/HadoopDelegationTokenManager.scala    |  4 +++-
 .../security/HadoopFSDelegationTokenProvider.scala | 27 +++++++++++++++++++---
 2 files changed, 27 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 3168c76..6ce195b 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -178,7 +178,7 @@ private[spark] class HadoopDelegationTokenManager(
 
   private def scheduleRenewal(delay: Long): Unit = {
     val _delay = math.max(0, delay)
-    logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(delay)}.")
+    logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(_delay)}.")
 
     val renewalTask = new Runnable() {
       override def run(): Unit = {
@@ -230,6 +230,8 @@ private[spark] class HadoopDelegationTokenManager(
         val now = System.currentTimeMillis
         val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
         val delay = (ratio * (nextRenewal - now)).toLong
+        logInfo(s"Calculated delay on renewal is $delay, based on next renewal 
$nextRenewal " +
+          s"and the ratio $ratio, and current time $now")
         scheduleRenewal(delay)
         creds
       }
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
index 4e91e72..cd9516b 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
@@ -63,7 +63,8 @@ private[deploy] class HadoopFSDelegationTokenProvider
             val identifier = token
               .decodeIdentifier()
               .asInstanceOf[AbstractDelegationTokenIdentifier]
-            identifier.getIssueDate + interval
+            val tokenKind = token.getKind.toString
+            getIssueDate(tokenKind, identifier) + interval
           }
         if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
       }
@@ -126,13 +127,33 @@ private[deploy] class HadoopFSDelegationTokenProvider
       Try {
         val newExpiration = token.renew(hadoopConf)
         val identifier = 
token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
-        val interval = newExpiration - identifier.getIssueDate
-        logInfo(s"Renewal interval is $interval for token 
${token.getKind.toString}")
+        val tokenKind = token.getKind.toString
+        val interval = newExpiration - getIssueDate(tokenKind, identifier)
+        logInfo(s"Renewal interval is $interval for token $tokenKind")
         interval
       }.toOption
     }
     if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
   }
+
+  private def getIssueDate(kind: String, identifier: 
AbstractDelegationTokenIdentifier): Long = {
+    val now = System.currentTimeMillis()
+    val issueDate = identifier.getIssueDate
+    if (issueDate > now) {
+      logWarning(s"Token $kind has set up issue date later than current time. 
(provided: " +
+        s"$issueDate / current timestamp: $now) Please make sure clocks are in 
sync between " +
+        "machines. If the issue is not a clock mismatch, consult token 
implementor to check " +
+        "whether issue date is valid.")
+      issueDate
+    } else if (issueDate > 0L) {
+      issueDate
+    } else {
+      logWarning(s"Token $kind has not set up issue date properly. (provided: 
$issueDate) " +
+        s"Using current timestamp ($now) as issue date instead. Consult token 
implementor to fix " +
+        "the behavior.")
+      now
+    }
+  }
 }
 
 private[deploy] object HadoopFSDelegationTokenProvider {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to