Repository: spark Updated Branches: refs/heads/master 7f1e507bf -> 69c9c1771
http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index ccf753e..5f897cb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -21,9 +21,7 @@ import java.util.Collections import java.util.concurrent._ import java.util.regex.Pattern -import org.apache.spark.util.Utils - -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -39,8 +37,8 @@ import org.apache.log4j.{Level, Logger} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.util.Utils /** * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding @@ -164,7 +162,7 @@ private[yarn] class YarnAllocator( * Number of container requests at the given location that have not yet been fulfilled. */ private def getNumPendingAtLocation(location: String): Int = - amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).map(_.size).sum + amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).asScala.map(_.size).sum /** * Request as many executors from the ResourceManager as needed to reach the desired total. If @@ -231,14 +229,14 @@ private[yarn] class YarnAllocator( numExecutorsRunning, allocateResponse.getAvailableResources)) - handleAllocatedContainers(allocatedContainers) + handleAllocatedContainers(allocatedContainers.asScala) } val completedContainers = allocateResponse.getCompletedContainersStatuses() if (completedContainers.size > 0) { logDebug("Completed %d containers".format(completedContainers.size)) - processCompletedContainers(completedContainers) + processCompletedContainers(completedContainers.asScala) logDebug("Finished processing %d completed containers. Current running executor count: %d." .format(completedContainers.size, numExecutorsRunning)) @@ -271,7 +269,7 @@ private[yarn] class YarnAllocator( val request = createContainerRequest(resource, locality.nodes, locality.racks) amClient.addContainerRequest(request) val nodes = request.getNodes - val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last + val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.asScala.last logInfo(s"Container request (host: $hostStr, capability: $resource)") } } else if (missing < 0) { @@ -280,7 +278,8 @@ private[yarn] class YarnAllocator( val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource) if (!matchingRequests.isEmpty) { - matchingRequests.head.take(numToCancel).foreach(amClient.removeContainerRequest) + matchingRequests.iterator().next().asScala + .take(numToCancel).foreach(amClient.removeContainerRequest) } else { logWarning("Expected to find pending requests, but found none.") } @@ -459,7 +458,7 @@ private[yarn] class YarnAllocator( } } - if (allocatedContainerToHostMap.containsKey(containerId)) { + if (allocatedContainerToHostMap.contains(containerId)) { val host = allocatedContainerToHostMap.get(containerId).get val containerSet = allocatedHostToContainersMap.get(host).get http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 4999f9c..df042bf 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -19,17 +19,15 @@ package org.apache.spark.deploy.yarn import java.util.{List => JList} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.{Map, Set} import scala.util.Try import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.hadoop.yarn.webapp.util.WebAppUtils import org.apache.spark.{Logging, SecurityManager, SparkConf} @@ -108,8 +106,8 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter", classOf[Configuration]) val proxies = method.invoke(null, conf).asInstanceOf[JList[String]] - val hosts = proxies.map { proxy => proxy.split(":")(0) } - val uriBases = proxies.map { proxy => prefix + proxy + proxyBase } + val hosts = proxies.asScala.map { proxy => proxy.split(":")(0) } + val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase } Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) } catch { case e: NoSuchMethodException => http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 128e996..b4f8049 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -21,7 +21,7 @@ import java.io.{File, FileOutputStream, OutputStreamWriter} import java.util.Properties import java.util.concurrent.TimeUnit -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files @@ -132,7 +132,7 @@ abstract class BaseYarnClusterSuite props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"") props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two three\"") - yarnCluster.getConfig().foreach { e => + yarnCluster.getConfig.asScala.foreach { e => props.setProperty("spark.hadoop." + e.getKey(), e.getValue()) } @@ -149,7 +149,7 @@ abstract class BaseYarnClusterSuite props.store(writer, "Spark properties.") writer.close() - val extraJarArgs = if (!extraJars.isEmpty()) Seq("--jars", extraJars.mkString(",")) else Nil + val extraJarArgs = if (extraJars.nonEmpty) Seq("--jars", extraJars.mkString(",")) else Nil val mainArgs = if (klass.endsWith(".py")) { Seq(klass) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 0a5402c..e7f2501 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -20,8 +20,8 @@ package org.apache.spark.deploy.yarn import java.io.File import java.net.URI -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ HashMap => MutableHashMap } +import scala.collection.JavaConverters._ +import scala.collection.mutable.{HashMap => MutableHashMap} import scala.reflect.ClassTag import scala.util.Try @@ -38,7 +38,7 @@ import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterAll, Matchers} -import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.util.Utils class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll { @@ -201,7 +201,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll { appContext.getClass.getMethods.filter(_.getName.equals("getApplicationTags")).foreach{ method => val tags = method.invoke(appContext).asInstanceOf[java.util.Set[String]] tags should contain allOf ("tag1", "dup", "tag2", "multi word") - tags.filter(!_.isEmpty).size should be (4) + tags.asScala.filter(_.nonEmpty).size should be (4) } appContext.getMaxAppAttempts should be (42) } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 128350b..5a4ea2e 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -21,7 +21,6 @@ import java.io.File import java.net.URL import scala.collection.mutable -import scala.collection.JavaConversions._ import com.google.common.base.Charsets.UTF_8 import com.google.common.io.{ByteStreams, Files} @@ -216,8 +215,8 @@ private object YarnClusterDriver extends Logging with Matchers { assert(listener.driverLogs.nonEmpty) val driverLogs = listener.driverLogs.get assert(driverLogs.size === 2) - assert(driverLogs.containsKey("stderr")) - assert(driverLogs.containsKey("stdout")) + assert(driverLogs.contains("stderr")) + assert(driverLogs.contains("stdout")) val urlStr = driverLogs("stderr") // Ensure that this is a valid URL, else this will throw an exception new URL(urlStr) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org