Github user lianhuiwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3765#discussion_r22984854
  
    --- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
    @@ -153,498 +154,241 @@ private[yarn] class YarnAllocator(
       }
     
       /**
    -   * Allocate missing containers based on the number of executors 
currently pending and running.
    +   * Request resources such that, if YARN gives us all we ask for, we'll 
have a number of containers
    +   * equal to maxExecutors.
    +   *
    +   * Deal with any containers YARN has granted to us by possibly launching 
executors in them.
        *
    -   * This method prioritizes the allocated container responses from the RM 
based on node and
    -   * rack locality. Additionally, it releases any extra containers 
allocated for this application
    -   * but are not needed. This must be synchronized because variables read 
in this block are
    -   * mutated by other methods.
    +   * This must be synchronized because variables read in this method are 
mutated by other methods.
        */
       def allocateResources(): Unit = synchronized {
    -    val missing = maxExecutors - numPendingAllocate.get() - 
numExecutorsRunning.get()
    +    val numPendingAllocate = getNumPendingAllocate
    +    val missing = maxExecutors - numPendingAllocate - numExecutorsRunning
     
         if (missing > 0) {
    -      val totalExecutorMemory = executorMemory + memoryOverhead
    -      numPendingAllocate.addAndGet(missing)
    -      logInfo(s"Will allocate $missing executor containers, each with 
$totalExecutorMemory MB " +
    +      logInfo(s"Will request $missing executor containers, each with 
${resource.getMemory} MB " +
             s"memory including $memoryOverhead MB overhead")
    -    } else {
    -      logDebug("Empty allocation request ...")
         }
     
    -    val allocateResponse = allocateContainers(missing)
    +    addResourceRequests(missing)
    +    val progressIndicator = 0.1f
    +    // Poll the ResourceManager. This doubles as a heartbeat if there are 
no pending container
    +    // requests.
    +    val allocateResponse = amClient.allocate(progressIndicator)
    +
         val allocatedContainers = allocateResponse.getAllocatedContainers()
     
         if (allocatedContainers.size > 0) {
    -      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * 
allocatedContainers.size)
    -
    -      if (numPendingAllocateNow < 0) {
    -        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * 
numPendingAllocateNow)
    -      }
    -
    -      logDebug("""
    -        Allocated containers: %d
    -        Current executor count: %d
    -        Containers released: %s
    -        Cluster resources: %s
    -        """.format(
    +      logDebug("Allocated containers: %d. Current executor count: %d. 
Cluster resources: %s."
    +        .format(
               allocatedContainers.size,
    -          numExecutorsRunning.get(),
    -          releasedContainers,
    +          numExecutorsRunning,
               allocateResponse.getAvailableResources))
     
    -      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
    -
    -      for (container <- allocatedContainers) {
    -        if (isResourceConstraintSatisfied(container)) {
    -          // Add the accepted `container` to the host's list of already 
accepted,
    -          // allocated containers
    -          val host = container.getNodeId.getHost
    -          val containersForHost = hostToContainers.getOrElseUpdate(host,
    -            new ArrayBuffer[Container]())
    -          containersForHost += container
    -        } else {
    -          // Release container, since it doesn't satisfy resource 
constraints.
    -          internalReleaseContainer(container)
    -        }
    -      }
    -
    -       // Find the appropriate containers to use.
    -      // TODO: Cleanup this group-by...
    -      val dataLocalContainers = new HashMap[String, 
ArrayBuffer[Container]]()
    -      val rackLocalContainers = new HashMap[String, 
ArrayBuffer[Container]]()
    -      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
    -
    -      for (candidateHost <- hostToContainers.keySet) {
    -        val maxExpectedHostCount = 
preferredHostToCount.getOrElse(candidateHost, 0)
    -        val requiredHostCount = maxExpectedHostCount - 
allocatedContainersOnHost(candidateHost)
    -
    -        val remainingContainersOpt = hostToContainers.get(candidateHost)
    -        assert(remainingContainersOpt.isDefined)
    -        var remainingContainers = remainingContainersOpt.get
    -
    -        if (requiredHostCount >= remainingContainers.size) {
    -          // Since we have <= required containers, add all remaining 
containers to
    -          // `dataLocalContainers`.
    -          dataLocalContainers.put(candidateHost, remainingContainers)
    -          // There are no more free containers remaining.
    -          remainingContainers = null
    -        } else if (requiredHostCount > 0) {
    -          // Container list has more containers than we need for data 
locality.
    -          // Split the list into two: one based on the data local 
container count,
    -          // (`remainingContainers.size` - `requiredHostCount`), and the 
other to hold remaining
    -          // containers.
    -          val (dataLocal, remaining) = remainingContainers.splitAt(
    -            remainingContainers.size - requiredHostCount)
    -          dataLocalContainers.put(candidateHost, dataLocal)
    -
    -          // Invariant: remainingContainers == remaining
    -
    -          // YARN has a nasty habit of allocating a ton of containers on a 
host - discourage this.
    -          // Add each container in `remaining` to list of containers to 
release. If we have an
    -          // insufficient number of containers, then the next allocation 
cycle will reallocate
    -          // (but won't treat it as data local).
    -          // TODO(harvey): Rephrase this comment some more.
    -          for (container <- remaining) internalReleaseContainer(container)
    -          remainingContainers = null
    -        }
    -
    -        // For rack local containers
    -        if (remainingContainers != null) {
    -          val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
    -          if (rack != null) {
    -            val maxExpectedRackCount = 
preferredRackToCount.getOrElse(rack, 0)
    -            val requiredRackCount = maxExpectedRackCount - 
allocatedContainersOnRack(rack) -
    -              rackLocalContainers.getOrElse(rack, List()).size
    -
    -            if (requiredRackCount >= remainingContainers.size) {
    -              // Add all remaining containers to to `dataLocalContainers`.
    -              dataLocalContainers.put(rack, remainingContainers)
    -              remainingContainers = null
    -            } else if (requiredRackCount > 0) {
    -              // Container list has more containers that we need for data 
locality.
    -              // Split the list into two: one based on the data local 
container count,
    -              // (`remainingContainers.size` - `requiredHostCount`), and 
the other to hold remaining
    -              // containers.
    -              val (rackLocal, remaining) = remainingContainers.splitAt(
    -                remainingContainers.size - requiredRackCount)
    -              val existingRackLocal = 
rackLocalContainers.getOrElseUpdate(rack,
    -                new ArrayBuffer[Container]())
    -
    -              existingRackLocal ++= rackLocal
    -
    -              remainingContainers = remaining
    -            }
    -          }
    -        }
    -
    -        if (remainingContainers != null) {
    -          // Not all containers have been consumed - add them to the list 
of off-rack containers.
    -          offRackContainers.put(candidateHost, remainingContainers)
    -        }
    -      }
    -
    -      // Now that we have split the containers into various groups, go 
through them in order:
    -      // first host-local, then rack-local, and finally off-rack.
    -      // Note that the list we create below tries to ensure that not all 
containers end up within
    -      // a host if there is a sufficiently large number of 
hosts/containers.
    -      val allocatedContainersToProcess = new 
ArrayBuffer[Container](allocatedContainers.size)
    -      allocatedContainersToProcess ++= 
TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
    -      allocatedContainersToProcess ++= 
TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
    -      allocatedContainersToProcess ++= 
TaskSchedulerImpl.prioritizeContainers(offRackContainers)
    -
    -      // Run each of the allocated containers.
    -      for (container <- allocatedContainersToProcess) {
    -        val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
    -        val executorHostname = container.getNodeId.getHost
    -        val containerId = container.getId
    -
    -        val executorMemoryOverhead = (executorMemory + memoryOverhead)
    -        assert(container.getResource.getMemory >= executorMemoryOverhead)
    -
    -        if (numExecutorsRunningNow > maxExecutors) {
    -          logInfo("""Ignoring container %s at host %s, since we already 
have the required number of
    -            containers for it.""".format(containerId, executorHostname))
    -          internalReleaseContainer(container)
    -          numExecutorsRunning.decrementAndGet()
    -        } else {
    -          val executorId = executorIdCounter.incrementAndGet().toString
    -          val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
    -            SparkEnv.driverActorSystemName,
    -            sparkConf.get("spark.driver.host"),
    -            sparkConf.get("spark.driver.port"),
    -            CoarseGrainedSchedulerBackend.ACTOR_NAME)
    -
    -          logInfo("Launching container %s for on host 
%s".format(containerId, executorHostname))
    -          executorIdToContainer(executorId) = container
    -
    -          // To be safe, remove the container from `releasedContainers`.
    -          releasedContainers.remove(containerId)
    -
    -          val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
    -          allocatedHostToContainersMap.synchronized {
    -            val containerSet = 
allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
    -              new HashSet[ContainerId]())
    -
    -            containerSet += containerId
    -            allocatedContainerToHostMap.put(containerId, executorHostname)
    -
    -            if (rack != null) {
    -              allocatedRackCount.put(rack, 
allocatedRackCount.getOrElse(rack, 0) + 1)
    -            }
    -          }
    -          logInfo("Launching ExecutorRunnable. driverUrl: %s,  
executorHostname: %s".format(
    -            driverUrl, executorHostname))
    -          val executorRunnable = new ExecutorRunnable(
    -            container,
    -            conf,
    -            sparkConf,
    -            driverUrl,
    -            executorId,
    -            executorHostname,
    -            executorMemory,
    -            executorCores,
    -            appAttemptId.getApplicationId.toString,
    -            securityMgr)
    -          launcherPool.execute(executorRunnable)
    -        }
    -      }
    -      logDebug("""
    -        Finished allocating %s containers (from %s originally).
    -        Current number of executors running: %d,
    -        Released containers: %s
    -        """.format(
    -          allocatedContainersToProcess,
    -          allocatedContainers,
    -          numExecutorsRunning.get(),
    -          releasedContainers))
    +      handleAllocatedContainers(allocatedContainers)
         }
     
         val completedContainers = 
allocateResponse.getCompletedContainersStatuses()
         if (completedContainers.size > 0) {
           logDebug("Completed %d containers".format(completedContainers.size))
     
    -      for (completedContainer <- completedContainers) {
    -        val containerId = completedContainer.getContainerId
    -
    -        if (releasedContainers.containsKey(containerId)) {
    -          // Already marked the container for release, so remove it from
    -          // `releasedContainers`.
    -          releasedContainers.remove(containerId)
    -        } else {
    -          // Decrement the number of executors running. The next iteration 
of
    -          // the ApplicationMaster's reporting thread will take care of 
allocating.
    -          numExecutorsRunning.decrementAndGet()
    -          logInfo("Completed container %s (state: %s, exit status: 
%s)".format(
    -            containerId,
    -            completedContainer.getState,
    -            completedContainer.getExitStatus))
    -          // Hadoop 2.2.X added a ContainerExitStatus we should switch to 
use
    -          // there are some exit status' we shouldn't necessarily count 
against us, but for
    -          // now I think its ok as none of the containers are expected to 
exit
    -          if (completedContainer.getExitStatus == -103) { // vmem limit 
exceeded
    -            logWarning(memLimitExceededLogMessage(
    -              completedContainer.getDiagnostics,
    -              VMEM_EXCEEDED_PATTERN))
    -          } else if (completedContainer.getExitStatus == -104) { // pmem 
limit exceeded
    -            logWarning(memLimitExceededLogMessage(
    -              completedContainer.getDiagnostics,
    -              PMEM_EXCEEDED_PATTERN))
    -          } else if (completedContainer.getExitStatus != 0) {
    -            logInfo("Container marked as failed: " + containerId +
    -              ". Exit status: " + completedContainer.getExitStatus +
    -              ". Diagnostics: " + completedContainer.getDiagnostics)
    -            numExecutorsFailed.incrementAndGet()
    -          }
    -        }
    +      processCompletedContainers(completedContainers)
     
    -        allocatedHostToContainersMap.synchronized {
    -          if (allocatedContainerToHostMap.containsKey(containerId)) {
    -            val hostOpt = allocatedContainerToHostMap.get(containerId)
    -            assert(hostOpt.isDefined)
    -            val host = hostOpt.get
    -
    -            val containerSetOpt = allocatedHostToContainersMap.get(host)
    -            assert(containerSetOpt.isDefined)
    -            val containerSet = containerSetOpt.get
    -
    -            containerSet.remove(containerId)
    -            if (containerSet.isEmpty) {
    -              allocatedHostToContainersMap.remove(host)
    -            } else {
    -              allocatedHostToContainersMap.update(host, containerSet)
    -            }
    -
    -            allocatedContainerToHostMap.remove(containerId)
    -
    -            // TODO: Move this part outside the synchronized block?
    -            val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
    -            if (rack != null) {
    -              val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
    -              if (rackCount > 0) {
    -                allocatedRackCount.put(rack, rackCount)
    -              } else {
    -                allocatedRackCount.remove(rack)
    -              }
    -            }
    -          }
    -        }
    -      }
    -      logDebug("""
    -        Finished processing %d completed containers.
    -        Current number of executors running: %d,
    -        Released containers: %s
    -        """.format(
    -          completedContainers.size,
    -          numExecutorsRunning.get(),
    -          releasedContainers))
    +      logDebug("Finished processing %d completed containers. Current 
running executor count: %d."
    +        .format(completedContainers.size, numExecutorsRunning))
         }
       }
     
    -  private def allocatedContainersOnHost(host: String): Int = {
    -    allocatedHostToContainersMap.synchronized {
    -     allocatedHostToContainersMap.getOrElse(host, Set()).size
    +  /**
    +   * Request numExecutors additional containers from YARN. Visible for 
testing.
    +   */
    +  def addResourceRequests(numExecutors: Int): Unit = {
    +    val containerRequests = new ArrayBuffer[ContainerRequest]
    +    for (i <- 0 until numExecutors) {
    +      containerRequests += new ContainerRequest(resource, null, null, 
RM_REQUEST_PRIORITY)
         }
    -  }
     
    -  private def allocatedContainersOnRack(rack: String): Int = {
    -    allocatedHostToContainersMap.synchronized {
    -      allocatedRackCount.getOrElse(rack, 0)
    +    for (request <- containerRequests) {
    +      amClient.addContainerRequest(request)
         }
    -  }
     
    -  private def isResourceConstraintSatisfied(container: Container): Boolean 
= {
    -    container.getResource.getMemory >= (executorMemory + memoryOverhead)
    +    for (request <- containerRequests) {
    +      val nodes = request.getNodes
    +      val hostStr = if (nodes == null || nodes.isEmpty) "Any" else 
nodes.last
    +      logInfo("Container request (host: %s, capability: 
%s".format(hostStr, resource))
    +    }
       }
     
    -  // A simple method to copy the split info map.
    -  private def generateNodeToWeight(
    -      conf: Configuration,
    -      input: collection.Map[String, collection.Set[SplitInfo]])
    -    : (Map[String, Int], Map[String, Int]) = {
    -    if (input == null) {
    -      return (Map[String, Int](), Map[String, Int]())
    +  /**
    +   * Handle containers granted by the RM by launching executors on them.
    +   *
    +   * Due to the way the YARN allocation protocol works, certain healthy 
race conditions can result
    +   * in YARN granting containers that we no longer need. In this case, we 
release them.
    +   *
    +   * Visible for testing.
    +   */
    +  def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit 
= {
    +    val containersToUse = new 
ArrayBuffer[Container](allocatedContainers.size)
    +
    +    // Match incoming requests by host
    +    val remainingAfterHostMatches = new ArrayBuffer[Container]
    +    for (allocatedContainer <- allocatedContainers) {
    +      matchContainerToRequest(allocatedContainer, 
allocatedContainer.getNodeId.getHost,
    +        containersToUse, remainingAfterHostMatches)
         }
     
    -    val hostToCount = new HashMap[String, Int]
    -    val rackToCount = new HashMap[String, Int]
    +    // Match remaining by rack
    +    val remainingAfterRackMatches = new ArrayBuffer[Container]
    +    for (allocatedContainer <- remainingAfterHostMatches) {
    +      val rack = RackResolver.resolve(conf, 
allocatedContainer.getNodeId.getHost).getNetworkLocation
    +      matchContainerToRequest(allocatedContainer, rack, containersToUse,
    +        remainingAfterRackMatches)
    +    }
     
    -    for ((host, splits) <- input) {
    -      val hostCount = hostToCount.getOrElse(host, 0)
    -      hostToCount.put(host, hostCount + splits.size)
    +    // Assign remaining that are neither node-local nor rack-local
    +    val remainingAfterOffRackMatches = new ArrayBuffer[Container]
    +    for (allocatedContainer <- remainingAfterRackMatches) {
    +      matchContainerToRequest(allocatedContainer, ANY_HOST, 
containersToUse,
    +        remainingAfterOffRackMatches)
    +    }
     
    -      val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
    -      if (rack != null) {
    -        val rackCount = rackToCount.getOrElse(host, 0)
    -        rackToCount.put(host, rackCount + splits.size)
    +    if (!remainingAfterOffRackMatches.isEmpty) {
    +      for (container <- remainingAfterOffRackMatches) {
    +        internalReleaseContainer(container)
           }
         }
     
    -    (hostToCount.toMap, rackToCount.toMap)
    -  }
    +    runAllocatedContainers(containersToUse)
     
    -  private def internalReleaseContainer(container: Container): Unit = {
    -    releasedContainers.put(container.getId(), true)
    -    amClient.releaseAssignedContainer(container.getId())
    +    logInfo("Received %d containers from YARN, launching executors on %d."
    +      .format(allocatedContainers.size, containersToUse.size))
       }
     
       /**
    -   * Called to allocate containers in the cluster.
    +   * Looks for requests for the given location that match the given 
container allocation. If it
    +   * finds one, removes the request so that it won't be submitted again. 
Places the container into
    +   * containersToUse or remaining.
        *
    -   * @param count Number of containers to allocate.
    -   *              If zero, should still contact RM (as a heartbeat).
    -   * @return Response to the allocation request.
    +   * @param containersToUse list of containers that will be used
    +   * @param remaining list of containers that will not be used
        */
    -  private def allocateContainers(count: Int): AllocateResponse = {
    -    addResourceRequests(count)
    -
    -    // We have already set the container request. Poll the ResourceManager 
for a response.
    -    // This doubles as a heartbeat if there are no pending container 
requests.
    -    val progressIndicator = 0.1f
    -    amClient.allocate(progressIndicator)
    +  private def matchContainerToRequest(
    +      allocatedContainer: Container,
    +      location: String,
    +      containersToUse: ArrayBuffer[Container],
    +      remaining: ArrayBuffer[Container]): Unit = {
    +    val matchingRequests = 
amClient.getMatchingRequests(allocatedContainer.getPriority, location,
    +      allocatedContainer.getResource)
    +
    +    // Match the allocation to a request
    +    if (!matchingRequests.isEmpty) {
    +      val containerRequest = matchingRequests.get(0).iterator.next
    +      amClient.removeContainerRequest(containerRequest)
    +      containersToUse += allocatedContainer
    +    } else {
    +      remaining += allocatedContainer
    +    }
       }
     
    -  private def createRackResourceRequests(hostContainers: 
ArrayBuffer[ContainerRequest])
    -    : ArrayBuffer[ContainerRequest] = {
    -    // Generate modified racks and new set of hosts under it before 
issuing requests.
    -    val rackToCounts = new HashMap[String, Int]()
    +  /**
    +   * Launches executors in the allocated containers.
    +   */
    +  private def runAllocatedContainers(containersToUse: 
ArrayBuffer[Container]): Unit = {
    +    for (container <- containersToUse) {
    +      numExecutorsRunning += 1
    +      assert(numExecutorsRunning <= maxExecutors)
    +      val executorHostname = container.getNodeId.getHost
    +      val containerId = container.getId
    +      executorIdCounter += 1
    +      val executorId = executorIdCounter.toString
    +
    +      assert(container.getResource.getMemory >= resource.getMemory)
    +
    +      logInfo("Launching container %s for on host %s".format(containerId, 
executorHostname))
     
    -    for (container <- hostContainers) {
    -      val candidateHost = container.getNodes.last
    -      assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
    +      allocatedHostToContainersMap.synchronized {
    +        val containerSet = 
allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
    +          new HashSet[ContainerId])
     
    -      val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
    -      if (rack != null) {
    -        var count = rackToCounts.getOrElse(rack, 0)
    -        count += 1
    -        rackToCounts.put(rack, count)
    +        containerSet += containerId
    +        allocatedContainerToHostMap.put(containerId, executorHostname)
           }
    -    }
     
    -    val requestedContainers = new 
ArrayBuffer[ContainerRequest](rackToCounts.size)
    -    for ((rack, count) <- rackToCounts) {
    -      requestedContainers ++= createResourceRequests(
    -        AllocationType.RACK,
    -        rack,
    -        count,
    -        RM_REQUEST_PRIORITY)
    +      val executorRunnable = new ExecutorRunnable(
    +        container,
    +        conf,
    +        sparkConf,
    +        driverUrl,
    +        executorId,
    +        executorHostname,
    +        executorMemory,
    +        executorCores,
    +        appAttemptId.getApplicationId.toString,
    +        securityMgr)
    +      if (launchContainers) {
    +        logInfo("Launching ExecutorRunnable. driverUrl: %s,  
executorHostname: %s".format(
    +          driverUrl, executorHostname))
    +        launcherPool.execute(executorRunnable)
    +      }
         }
    -
    -    requestedContainers
       }
     
    -  private def addResourceRequests(numExecutors: Int): Unit = {
    -    val containerRequests: List[ContainerRequest] =
    -      if (numExecutors <= 0) {
    -        logDebug("numExecutors: " + numExecutors)
    -        List()
    -      } else if (preferredHostToCount.isEmpty) {
    -        logDebug("host preferences is empty")
    -        createResourceRequests(
    -          AllocationType.ANY,
    -          resource = null,
    -          numExecutors,
    -          RM_REQUEST_PRIORITY).toList
    +  private def processCompletedContainers(completedContainers: 
Seq[ContainerStatus]): Unit = {
    +    for (completedContainer <- completedContainers) {
    +      val containerId = completedContainer.getContainerId
    +
    +      if (releasedContainers.contains(containerId)) {
    +        // Already marked the container for release, so remove it from
    +        // `releasedContainers`.
    +        releasedContainers.remove(containerId)
           } else {
    -        // Request for all hosts in preferred nodes and for numExecutors -
    -        // candidates.size, request by default allocation policy.
    -        val hostContainerRequests = new 
ArrayBuffer[ContainerRequest](preferredHostToCount.size)
    -        for ((candidateHost, candidateCount) <- preferredHostToCount) {
    -          val requiredCount = candidateCount - 
allocatedContainersOnHost(candidateHost)
    -
    -          if (requiredCount > 0) {
    -            hostContainerRequests ++= createResourceRequests(
    -              AllocationType.HOST,
    -              candidateHost,
    -              requiredCount,
    -              RM_REQUEST_PRIORITY)
    -          }
    +        // Decrement the number of executors running. The next iteration of
    +        // the ApplicationMaster's reporting thread will take care of 
allocating.
    +        numExecutorsRunning -= 1
    --- End diff --
    
    so i think we should rename maxExecutors to maxRequestExecutors that can be 
better to represent its meaning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to