Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6205#discussion_r33873268
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
    @@ -182,3 +184,109 @@ private[spark] object RpcAddress {
         RpcAddress(host, port)
       }
     }
    +
    +
    +/**
    + * An exception thrown if RpcTimeout modifies a [[TimeoutException]].
    + */
    +private[rpc] class RpcTimeoutException(message: String, cause: 
TimeoutException)
    +  extends TimeoutException(message) { initCause(cause) }
    +
    +
    +/**
    + * Associates a timeout with a description so that a when a 
TimeoutException occurs, additional
    + * context about the timeout can be amended to the exception message.
    + * @param timeout timeout duration in seconds
    + * @param conf the configuration parameter that controls this timeout
    + */
    +private[spark] class RpcTimeout(timeout: FiniteDuration, val conf: String) 
{
    +
    +  /** Get the timeout duration */
    +  def duration: FiniteDuration = timeout
    +
    +  /** Amends the standard message of TimeoutException to include the 
description */
    +  private def createRpcTimeoutException(te: TimeoutException): 
RpcTimeoutException = {
    +    new RpcTimeoutException(te.getMessage() + ". This timeout is 
controlled by " + conf, te)
    +  }
    +
    +  /**
    +   * PartialFunction to match a TimeoutException and add the timeout 
description to the message
    +   *
    +   * @note This can be used in the recover callback of a Future to add to 
a TimeoutException
    +   * Example:
    +   *    val timeout = new RpcTimeout(5 millis, "short timeout")
    +   *    Future(throw new 
TimeoutException).recover(timeout.addMessageIfTimeout)
    +   */
    +  def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
    +    // The exception has already been converted to a RpcTimeoutException 
so just raise it
    +    case rte: RpcTimeoutException => throw rte
    +    // Any other TimeoutException get converted to a RpcTimeoutException 
with modified message
    +    case te: TimeoutException => throw createRpcTimeoutException(te)
    +  }
    +
    +  /**
    +   * Wait for the completed result and return it. If the result is not 
available within this
    +   * timeout, throw a [[RpcTimeoutException]] to indicate which 
configuration controls the timeout.
    +   * @param  awaitable  the `Awaitable` to be awaited
    +   * @throws RpcTimeoutException if after waiting for the specified time 
`awaitable`
    +   *         is still not ready
    +   */
    +  def awaitResult[T](awaitable: Awaitable[T]): T = {
    +    try {
    +      Await.result(awaitable, duration)
    +    } catch addMessageIfTimeout
    +  }
    +}
    +
    +
    +private[spark] object RpcTimeout {
    +
    +  /**
    +   * Lookup the timeout property in the configuration and create
    +   * a RpcTimeout with the property key in the description.
    +   * @param conf configuration properties containing the timeout
    +   * @param timeoutProp property key for the timeout in seconds
    +   * @throws NoSuchElementException if property is not set
    +   */
    +  def apply(conf: SparkConf, timeoutProp: String): RpcTimeout = {
    +    val timeout = { conf.getTimeAsSeconds(timeoutProp) seconds }
    +    new RpcTimeout(timeout, timeoutProp)
    +  }
    +
    +  /**
    +   * Lookup the timeout property in the configuration and create
    +   * a RpcTimeout with the property key in the description.
    +   * Uses the given default value if property is not set
    +   * @param conf configuration properties containing the timeout
    +   * @param timeoutProp property key for the timeout in seconds
    +   * @param defaultValue default timeout value in seconds if property not 
found
    +   */
    +  def apply(conf: SparkConf, timeoutProp: String, defaultValue: String): 
RpcTimeout = {
    +    val timeout = { conf.getTimeAsSeconds(timeoutProp, defaultValue) 
seconds }
    +    new RpcTimeout(timeout, timeoutProp)
    +  }
    +
    +  /**
    +   * Lookup prioritized list of timeout properties in the configuration
    +   * and create a RpcTimeout with the first set property key in the
    +   * description.
    +   * Uses the given default value if property is not set
    +   * @param conf configuration properties containing the timeout
    +   * @param timeoutPropList prioritized list of property keys for the 
timeout in seconds
    +   * @param defaultValue default timeout value in seconds if no properties 
found
    +   */
    +  def apply(conf: SparkConf, timeoutPropList: Seq[String], defaultValue: 
String): RpcTimeout = {
    +    require(timeoutPropList.nonEmpty)
    +
    +    // Find the first set property or use the default value with the first 
property
    +    val itr = timeoutPropList.iterator
    +    var foundProp: Option[(String, String)] = None
    +    while (itr.hasNext && foundProp.isEmpty){
    +      val propKey = itr.next()
    +      conf.getOption(propKey).foreach { prop => foundProp = Some(propKey, 
prop) }
    +    }
    +    val finalProp = foundProp.getOrElse(timeoutPropList.head, defaultValue)
    --- End diff --
    
    If timeoutPropList.head doesn't appear in conf, defaultValue would be 
returned.
    Is this intended ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to